rmtfile.cpp 83 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include <string>
  14. #include <unordered_map>
  15. #include "platform.h"
  16. #include "portlist.h"
  17. #include "jlib.hpp"
  18. #include "jflz.hpp"
  19. #include "jio.hpp"
  20. #include "jlog.hpp"
  21. #include "jregexp.hpp"
  22. #include "jmutex.hpp"
  23. #include "jfile.hpp"
  24. #include "jhtree.hpp"
  25. #include "rtldynfield.hpp"
  26. #include "rtlds_imp.hpp"
  27. #include "rtlread_imp.hpp"
  28. #include "rtlrecord.hpp"
  29. #include "eclhelper_dyn.hpp"
  30. #include "rtlcommon.hpp"
  31. #include "rtlformat.hpp"
  32. #include "digisign.hpp"
  33. #include "remoteerr.hpp"
  34. #include "rmtclient.hpp"
  35. #include "rmtclient_impl.hpp"
  36. #include "rmtfile.hpp"
  37. //----------------------------------------------------------------------------
  38. //#define TEST_DAFILESRV_FOR_UNIX_PATHS // probably not needed
  39. static class CSecuritySettings
  40. {
  41. unsigned short daliServixPort;
  42. public:
  43. CSecuritySettings()
  44. {
  45. querySecuritySettings(nullptr, &daliServixPort, nullptr, nullptr, nullptr);
  46. }
  47. unsigned short queryDaliServixPort() { return daliServixPort; }
  48. } securitySettings;
  49. unsigned short getDaliServixPort()
  50. {
  51. return securitySettings.queryDaliServixPort();
  52. }
  53. void setCanAccessDirectly(RemoteFilename & file,bool set)
  54. {
  55. if (set)
  56. file.setPort(0);
  57. else if (file.getPort()==0) // foreign daliservix may be passed in
  58. file.setPort(getDaliServixPort());
  59. }
  60. bool canAccessDirectly(const RemoteFilename & file) // not that well named but historical
  61. {
  62. return (file.getPort()==0);
  63. }
  64. void setLocalMountRedirect(const IpAddress &ip,const char *dir,const char *mountdir)
  65. {
  66. setDafsLocalMountRedirect(ip,dir,mountdir);
  67. }
  68. class CDaliServixFilter : public CInterface
  69. {
  70. protected:
  71. StringAttr dir, sourceRangeText;
  72. SocketEndpointArray sourceRangeIps;
  73. bool sourceRangeHasPorts, trace;
  74. bool checkForPorts(SocketEndpointArray &ips)
  75. {
  76. ForEachItemIn(i, ips)
  77. {
  78. if (ips.item(i).port)
  79. return true;
  80. }
  81. return false;
  82. }
  83. public:
  84. CDaliServixFilter(const char *_dir, const char *sourceRange, bool _trace) : dir(_dir), trace(_trace)
  85. {
  86. if (sourceRange)
  87. {
  88. sourceRangeText.set(sourceRange);
  89. sourceRangeIps.fromText(sourceRange, 0);
  90. sourceRangeHasPorts = checkForPorts(sourceRangeIps);
  91. }
  92. else
  93. sourceRangeHasPorts = false;
  94. }
  95. bool queryTrace() const { return trace; }
  96. const char *queryDirectory() const { return dir; }
  97. bool testPath(const char *path) const
  98. {
  99. if (!dir) // if no dir in filter, match any
  100. return true;
  101. else
  102. return startsWith(path, dir.get());
  103. }
  104. bool applyFilter(const SocketEndpoint &ep) const
  105. {
  106. if (sourceRangeText.length())
  107. {
  108. SocketEndpoint _ep = ep;
  109. if (!sourceRangeHasPorts) // if source range doesn't have ports, only check ip
  110. _ep.port = 0;
  111. return NotFound != sourceRangeIps.find(_ep);
  112. }
  113. // NB: If no source range, use target range to decide if filter should apply
  114. return testEp(ep);
  115. }
  116. virtual bool testEp(const SocketEndpoint &ep) const = 0;
  117. virtual StringBuffer &getInfo(StringBuffer &info)
  118. {
  119. if (dir.length())
  120. info.append(", dir=").append(dir.get());
  121. if (sourceRangeText.get())
  122. info.append(", sourcerange=").append(sourceRangeText.get());
  123. info.append(", trace=(").append(trace ? "true" : "false").append(")");
  124. return info;
  125. }
  126. };
  127. class CDaliServixSubnetFilter : public CDaliServixFilter
  128. {
  129. IpSubNet ipSubNet;
  130. public:
  131. CDaliServixSubnetFilter(const char *subnet, const char *mask, const char *dir, const char *sourceRange, bool trace) :
  132. CDaliServixFilter(dir, sourceRange, trace)
  133. {
  134. if (!ipSubNet.set(subnet, mask))
  135. throw MakeStringException(0, "Invalid sub net definition: %s, %s", subnet, mask);
  136. }
  137. virtual bool testEp(const SocketEndpoint &ep) const
  138. {
  139. return ipSubNet.test(ep);
  140. }
  141. virtual StringBuffer &getInfo(StringBuffer &info)
  142. {
  143. info.append("subnet=");
  144. ipSubNet.getNetText(info);
  145. info.append(", mask=");
  146. ipSubNet.getMaskText(info);
  147. CDaliServixFilter::getInfo(info);
  148. return info;
  149. }
  150. };
  151. class CDaliServixRangeFilter : public CDaliServixFilter
  152. {
  153. StringAttr rangeText;
  154. SocketEndpointArray rangeIps;
  155. bool rangeIpsHavePorts;
  156. public:
  157. CDaliServixRangeFilter(const char *_range, const char *dir, const char *sourceRange, bool trace)
  158. : CDaliServixFilter(dir, sourceRange, trace)
  159. {
  160. rangeText.set(_range);
  161. rangeIps.fromText(_range, 0);
  162. rangeIpsHavePorts = checkForPorts(rangeIps);
  163. }
  164. virtual bool testEp(const SocketEndpoint &ep) const
  165. {
  166. SocketEndpoint _ep = ep;
  167. if (!rangeIpsHavePorts) // if range doesn't have ports, only check ip
  168. _ep.port = 0;
  169. return NotFound != rangeIps.find(_ep);
  170. }
  171. virtual StringBuffer &getInfo(StringBuffer &info)
  172. {
  173. info.append("range=").append(rangeText.get());
  174. CDaliServixFilter::getInfo(info);
  175. return info;
  176. }
  177. };
  178. CDaliServixFilter *createDaliServixFilter(IPropertyTree &filterProps)
  179. {
  180. CDaliServixFilter *filter = NULL;
  181. const char *dir = filterProps.queryProp("@directory");
  182. const char *sourceRange = filterProps.queryProp("@sourcerange");
  183. bool trace = filterProps.getPropBool("@trace");
  184. if (filterProps.hasProp("@subnet"))
  185. filter = new CDaliServixSubnetFilter(filterProps.queryProp("@subnet"), filterProps.queryProp("@mask"), dir, sourceRange, trace);
  186. else if (filterProps.hasProp("@range"))
  187. filter = new CDaliServixRangeFilter(filterProps.queryProp("@range"), dir, sourceRange, trace);
  188. else
  189. throw MakeStringException(0, "Unknown DaliServix filter definition");
  190. return filter;
  191. }
  192. class CDaliServixIntercept: public CInterface, implements IDaFileSrvHook
  193. {
  194. CIArrayOf<CDaliServixFilter> filters;
  195. StringAttr forceRemotePattern;
  196. void addFilter(CDaliServixFilter *filter)
  197. {
  198. filters.append(*filter);
  199. StringBuffer msg("DaFileSrvHook: adding translateToLocal [");
  200. filter->getInfo(msg);
  201. msg.append("]");
  202. PROGLOG("%s", msg.str());
  203. }
  204. public:
  205. IMPLEMENT_IINTERFACE;
  206. virtual void forceRemote(const char *pattern)
  207. {
  208. forceRemotePattern.set(pattern);
  209. }
  210. virtual IFile * createIFile(const RemoteFilename & filename)
  211. {
  212. SocketEndpoint ep = filename.queryEndpoint();
  213. bool noport = (ep.port==0);
  214. setDafsEndpointPort(ep);
  215. if (!filename.isLocal()||(ep.port!=DAFILESRV_PORT && ep.port!=SECURE_DAFILESRV_PORT)) // assume standard port is running on local machine
  216. {
  217. #ifdef __linux__
  218. #ifndef USE_SAMBA
  219. if (noport && filters.ordinality())
  220. {
  221. ForEachItemIn(sn, filters)
  222. {
  223. CDaliServixFilter &filter = filters.item(sn);
  224. if (filter.testEp(ep))
  225. {
  226. StringBuffer lPath;
  227. filename.getLocalPath(lPath);
  228. if (filter.testPath(lPath.str()))
  229. {
  230. if (filter.queryTrace())
  231. {
  232. StringBuffer fromPath;
  233. filename.getRemotePath(fromPath);
  234. PROGLOG("Redirecting path: '%s' to '%s", fromPath.str(), lPath.str());
  235. }
  236. return ::createIFile(lPath.str());
  237. }
  238. }
  239. }
  240. }
  241. return createDaliServixFile(filename);
  242. #endif
  243. #endif
  244. if (!noport) // expect all filenames that specify port to be dafilesrc or daliservix
  245. return createDaliServixFile(filename);
  246. if (filename.isUnixPath()
  247. #ifdef TEST_DAFILESRV_FOR_UNIX_PATHS
  248. &&testDaliServixPresent(ep)
  249. #endif
  250. )
  251. return createDaliServixFile(filename);
  252. }
  253. else if (forceRemotePattern)
  254. {
  255. StringBuffer localPath;
  256. filename.getLocalPath(localPath);
  257. // must be local to be here, check if matches forceRemotePattern
  258. if (WildMatch(localPath, forceRemotePattern, false))
  259. return createDaliServixFile(filename);
  260. }
  261. return NULL;
  262. }
  263. virtual void addSubnetFilter(const char *subnet, const char *mask, const char *dir, const char *sourceRange, bool trace)
  264. {
  265. Owned<CDaliServixFilter> filter = new CDaliServixSubnetFilter(subnet, mask, dir, sourceRange, trace);
  266. addFilter(filter.getClear());
  267. }
  268. virtual void addRangeFilter(const char *range, const char *dir, const char *sourceRange, bool trace)
  269. {
  270. Owned<CDaliServixFilter> filter = new CDaliServixRangeFilter(range, dir, sourceRange, trace);
  271. addFilter(filter.getClear());
  272. }
  273. virtual IPropertyTree *addFilters(IPropertyTree *config, const SocketEndpoint *myEp)
  274. {
  275. if (!config)
  276. return NULL;
  277. Owned<IPropertyTree> result;
  278. Owned<IPropertyTreeIterator> iter = config->getElements("Filter");
  279. ForEach(*iter)
  280. {
  281. Owned<CDaliServixFilter> filter = createDaliServixFilter(iter->query());
  282. // Only add filters where myIP matches filter criteria
  283. if (!myEp || filter->applyFilter(*myEp))
  284. {
  285. addFilter(filter.getClear());
  286. if (!result)
  287. result.setown(createPTree());
  288. result->addPropTree("Filter", LINK(&iter->query()));
  289. }
  290. }
  291. return result.getClear();
  292. }
  293. virtual IPropertyTree *addMyFilters(IPropertyTree *config, SocketEndpoint *myEp)
  294. {
  295. if (myEp)
  296. return addFilters(config, myEp);
  297. else
  298. {
  299. SocketEndpoint ep;
  300. GetHostIp(ep);
  301. return addFilters(config, &ep);
  302. }
  303. }
  304. virtual void clearFilters()
  305. {
  306. filters.kill();
  307. }
  308. } *DaliServixIntercept = NULL;
  309. void remoteExtractBlobElements(const char * prefix, const RemoteFilename &file, ExtractedBlobArray & extracted)
  310. {
  311. SocketEndpoint ep(file.queryEndpoint());
  312. setDafsEndpointPort(ep);
  313. if (ep.isNull())
  314. return;
  315. StringBuffer filename;
  316. remoteExtractBlobElements(ep, prefix, file.getLocalPath(filename).str(), extracted);
  317. }
  318. //---------------------------------------------------------------------------
  319. class CRemoteDirectoryIterator : implements IDirectoryDifferenceIterator, public CInterface
  320. {
  321. Owned<IFile> cur;
  322. bool curvalid;
  323. bool curisdir;
  324. StringAttr curname;
  325. CDateTime curdt;
  326. __int64 cursize;
  327. StringAttr dir;
  328. SocketEndpoint ep;
  329. byte *flags;
  330. unsigned numflags;
  331. unsigned curidx;
  332. unsigned mask;
  333. MemoryBuffer buf;
  334. public:
  335. static CriticalSection crit;
  336. CRemoteDirectoryIterator(const SocketEndpoint &_ep,const char *_dir)
  337. : dir(_dir)
  338. {
  339. // an extended difference iterator starts with 2 (for bwd compatibility)
  340. ep = _ep;
  341. curisdir = false;
  342. curvalid = false;
  343. cursize = 0;
  344. curidx = (unsigned)-1;
  345. mask = 0;
  346. numflags = 0;
  347. flags = NULL;
  348. }
  349. bool appendBuf(MemoryBuffer &_buf)
  350. {
  351. buf.setSwapEndian(_buf.needSwapEndian());
  352. byte hdr;
  353. _buf.read(hdr);
  354. if (hdr==2) {
  355. _buf.read(numflags);
  356. flags = (byte *)malloc(numflags);
  357. _buf.read(numflags,flags);
  358. }
  359. else {
  360. buf.append(hdr);
  361. flags = NULL;
  362. numflags = 0;
  363. }
  364. size32_t rest = _buf.length()-_buf.getPos();
  365. const byte *rb = (const byte *)_buf.readDirect(rest);
  366. bool ret = true;
  367. // At the last byte of the rb (rb[rest-1]) is the stream live flag
  368. // True if the stream has more data
  369. // False at the end of stream
  370. // The previous byte (rb[rest-2]) is the flag to signal there are more
  371. // valid entries in this block
  372. // True if there are valid directory entry follows this flag
  373. // False if there are no more valid entry in this block aka end of block
  374. // If there is more data in the stream, the end of block flag should be removed
  375. if (rest&&(rb[rest-1]!=0))
  376. {
  377. rest--; // remove stream live flag
  378. if(rest && (0 == rb[rest-1]))
  379. rest--; //Remove end of block flag
  380. ret = false; // continuation
  381. }
  382. buf.append(rest,rb);
  383. return ret;
  384. }
  385. ~CRemoteDirectoryIterator()
  386. {
  387. free(flags);
  388. }
  389. IMPLEMENT_IINTERFACE
  390. bool first()
  391. {
  392. curidx = (unsigned)-1;
  393. buf.reset();
  394. return next();
  395. }
  396. bool next()
  397. {
  398. for (;;) {
  399. curidx++;
  400. cur.clear();
  401. curdt.clear();
  402. curname.clear();
  403. cursize = 0;
  404. curisdir = false;
  405. if (buf.getPos()>=buf.length())
  406. return false;
  407. byte isValidEntry;
  408. buf.read(isValidEntry);
  409. curvalid = isValidEntry!=0;
  410. if (!curvalid)
  411. return false;
  412. buf.read(curisdir);
  413. buf.read(cursize);
  414. curdt.deserialize(buf);
  415. buf.read(curname);
  416. // kludge for bug in old linux jlibs
  417. if (strchr(curname,'\\')&&(getPathSepChar(dir)=='/')) {
  418. StringBuffer temp(curname);
  419. temp.replace('\\','/');
  420. curname.set(temp.str());
  421. }
  422. if ((mask==0)||(getFlags()&mask))
  423. break;
  424. }
  425. return true;
  426. }
  427. bool isValid()
  428. {
  429. return curvalid;
  430. }
  431. IFile & query()
  432. {
  433. if (!cur) {
  434. StringBuffer full(dir);
  435. addPathSepChar(full).append(curname);
  436. if (ep.isNull())
  437. cur.setown(createIFile(full.str()));
  438. else {
  439. RemoteFilename rfn;
  440. rfn.setPath(ep,full.str());
  441. cur.setown(createIFile(rfn));
  442. }
  443. }
  444. return *cur;
  445. }
  446. StringBuffer &getName(StringBuffer &buf)
  447. {
  448. return buf.append(curname);
  449. }
  450. bool isDir()
  451. {
  452. return curisdir;
  453. }
  454. __int64 getFileSize()
  455. {
  456. if (curisdir)
  457. return -1;
  458. return cursize;
  459. }
  460. bool getModifiedTime(CDateTime &ret)
  461. {
  462. ret = curdt;
  463. return true;
  464. }
  465. void setMask(unsigned _mask)
  466. {
  467. mask = _mask;
  468. }
  469. virtual unsigned getFlags()
  470. {
  471. if (flags&&(curidx<numflags))
  472. return flags[curidx];
  473. return 0;
  474. }
  475. static bool serialize(MemoryBuffer &mb,IDirectoryIterator *iter, size32_t bufsize, bool first)
  476. {
  477. bool ret = true;
  478. byte b=1;
  479. StringBuffer tmp;
  480. if (first ? iter->first() : iter->next()) {
  481. for (;;) {
  482. mb.append(b);
  483. bool isdir = iter->isDir();
  484. __int64 sz = isdir?0:iter->getFileSize();
  485. CDateTime dt;
  486. iter->getModifiedTime(dt);
  487. iter->getName(tmp.clear());
  488. mb.append(isdir).append(sz);
  489. dt.serialize(mb);
  490. mb.append(tmp.str());
  491. if (bufsize&&(mb.length()>=bufsize-1)) {
  492. ret = false;
  493. break;
  494. }
  495. if (!iter->next())
  496. break;
  497. }
  498. }
  499. b = 0;
  500. mb.append(b);
  501. return ret;
  502. }
  503. static void serializeDiff(MemoryBuffer &mb,IDirectoryDifferenceIterator *iter)
  504. {
  505. // bit slow
  506. MemoryBuffer flags;
  507. ForEach(*iter)
  508. flags.append((byte)iter->getFlags());
  509. if (flags.length()) {
  510. byte b = 2;
  511. mb.append(b).append((unsigned)flags.length()).append(flags);
  512. }
  513. serialize(mb,iter,0,true);
  514. }
  515. void serialize(MemoryBuffer &mb,bool isdiff)
  516. {
  517. byte b;
  518. if (isdiff&&numflags&&flags) {
  519. b = 2;
  520. mb.append(b).append(numflags).append(numflags,flags);
  521. }
  522. serialize(mb,this,0,true);
  523. }
  524. };
  525. IDirectoryIterator *createRemoteDirectorIterator(const SocketEndpoint &ep, const char *name, MemoryBuffer &state)
  526. {
  527. Owned<CRemoteDirectoryIterator> di = new CRemoteDirectoryIterator(ep, name);
  528. di->appendBuf(state);
  529. return di.getClear();
  530. }
  531. bool serializeRemoteDirectoryIterator(MemoryBuffer &tgt, IDirectoryIterator *iter, size32_t bufsize, bool first)
  532. {
  533. return CRemoteDirectoryIterator::serialize(tgt, iter, bufsize, first);
  534. }
  535. void serializeRemoteDirectoryDiff(MemoryBuffer &tgt, IDirectoryDifferenceIterator *iter)
  536. {
  537. CRemoteDirectoryIterator::serializeDiff(tgt, iter);
  538. }
  539. class CCritTable;
  540. class CEndpointCS : public CriticalSection, public CInterface
  541. {
  542. CCritTable &table;
  543. const SocketEndpoint ep;
  544. public:
  545. CEndpointCS(CCritTable &_table, const SocketEndpoint &_ep) : table(_table), ep(_ep) { }
  546. const void *queryFindParam() const { return &ep; }
  547. virtual void beforeDispose();
  548. };
  549. class CCritTable : private SimpleHashTableOf<CEndpointCS, const SocketEndpoint>
  550. {
  551. typedef SimpleHashTableOf<CEndpointCS, const SocketEndpoint> PARENT;
  552. CriticalSection crit;
  553. public:
  554. CEndpointCS *getCrit(const SocketEndpoint &ep)
  555. {
  556. CriticalBlock b(crit);
  557. CEndpointCS * clientCrit = find(ep);
  558. if (!clientCrit || !clientCrit->isAliveAndLink()) // if !isAliveAndLink(), then it is in the process of being destroyed/removed.
  559. {
  560. clientCrit = new CEndpointCS(*this, ep);
  561. replace(*clientCrit); // NB table doesn't own
  562. }
  563. return clientCrit;
  564. }
  565. unsigned getHashFromElement(const void *e) const
  566. {
  567. const CEndpointCS &elem=*(const CEndpointCS *)e;
  568. return getHashFromFindParam(elem.queryFindParam());
  569. }
  570. unsigned getHashFromFindParam(const void *fp) const
  571. {
  572. return ((const SocketEndpoint *)fp)->hash(0);
  573. }
  574. void removeExact(CEndpointCS *clientCrit)
  575. {
  576. CriticalBlock b(crit);
  577. PARENT::removeExact(clientCrit); // NB may not exist, could have been replaced if detected !isAlive() in getCrit()
  578. }
  579. } *dirCSTable;
  580. MODULE_INIT(INIT_PRIORITY_STANDARD)
  581. {
  582. dirCSTable = new CCritTable;
  583. return true;
  584. }
  585. MODULE_EXIT()
  586. {
  587. delete dirCSTable;
  588. }
  589. void CEndpointCS::beforeDispose()
  590. {
  591. table.removeExact(this);
  592. }
  593. //---------------------------------------------------------------------------
  594. class CRemoteFile : public CRemoteBase, implements IFile
  595. {
  596. StringAttr remotefilename;
  597. unsigned flags;
  598. bool isShareSet;
  599. public:
  600. IMPLEMENT_IINTERFACE_O_USING(CRemoteBase);
  601. CRemoteFile(const SocketEndpoint &_ep, const char * _filename)
  602. : CRemoteBase(_ep, _filename)
  603. {
  604. flags = ((unsigned)IFSHread)|((S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH)<<16);
  605. isShareSet = false;
  606. if (filename.length()>2 && isPathSepChar(filename[0]) && isShareChar(filename[2]))
  607. {
  608. VStringBuffer winDriveFilename("%c:%s", filename[1], filename+3);
  609. filename.set(winDriveFilename);
  610. }
  611. }
  612. bool exists()
  613. {
  614. MemoryBuffer sendBuffer;
  615. initSendBuffer(sendBuffer);
  616. MemoryBuffer replyBuffer;
  617. sendBuffer.append((RemoteFileCommandType)RFCexists).append(filename);
  618. sendRemoteCommand(sendBuffer, replyBuffer);
  619. bool ok;
  620. replyBuffer.read(ok);
  621. return ok;
  622. }
  623. bool getTime(CDateTime * createTime, CDateTime * modifiedTime, CDateTime * accessedTime)
  624. {
  625. CDateTime dummyTime;
  626. if (!createTime)
  627. createTime = &dummyTime;
  628. if (!modifiedTime)
  629. modifiedTime = &dummyTime;
  630. if (!accessedTime)
  631. accessedTime = &dummyTime;
  632. MemoryBuffer sendBuffer;
  633. initSendBuffer(sendBuffer);
  634. MemoryBuffer replyBuffer;
  635. sendBuffer.append((RemoteFileCommandType)RFCgettime).append(filename);
  636. sendRemoteCommand(sendBuffer, replyBuffer);
  637. bool ok;
  638. replyBuffer.read(ok);
  639. if (ok) {
  640. createTime->deserialize(replyBuffer);
  641. modifiedTime->deserialize(replyBuffer);
  642. accessedTime->deserialize(replyBuffer);
  643. }
  644. return ok;
  645. }
  646. bool setTime(const CDateTime * createTime, const CDateTime * modifiedTime, const CDateTime * accessedTime)
  647. {
  648. MemoryBuffer sendBuffer;
  649. initSendBuffer(sendBuffer);
  650. MemoryBuffer replyBuffer;
  651. sendBuffer.append((RemoteFileCommandType)RFCsettime).append(filename);
  652. if (createTime)
  653. {
  654. sendBuffer.append((bool)true);
  655. createTime->serialize(sendBuffer);
  656. }
  657. else
  658. sendBuffer.append((bool)false);
  659. if (modifiedTime)
  660. {
  661. sendBuffer.append((bool)true);
  662. modifiedTime->serialize(sendBuffer);
  663. }
  664. else
  665. sendBuffer.append((bool)false);
  666. if (accessedTime)
  667. {
  668. sendBuffer.append((bool)true);
  669. accessedTime->serialize(sendBuffer);
  670. }
  671. else
  672. sendBuffer.append((bool)false);
  673. sendRemoteCommand(sendBuffer, replyBuffer);
  674. bool ok;
  675. replyBuffer.read(ok);
  676. return ok;
  677. }
  678. fileBool isDirectory()
  679. {
  680. MemoryBuffer sendBuffer;
  681. initSendBuffer(sendBuffer);
  682. MemoryBuffer replyBuffer;
  683. sendBuffer.append((RemoteFileCommandType)RFCisdirectory).append(filename);
  684. sendRemoteCommand(sendBuffer, replyBuffer);
  685. unsigned ret;
  686. replyBuffer.read(ret);
  687. return (fileBool)ret;
  688. }
  689. fileBool isFile()
  690. {
  691. MemoryBuffer sendBuffer;
  692. initSendBuffer(sendBuffer);
  693. MemoryBuffer replyBuffer;
  694. sendBuffer.append((RemoteFileCommandType)RFCisfile).append(filename);
  695. sendRemoteCommand(sendBuffer, replyBuffer);
  696. unsigned ret;
  697. replyBuffer.read(ret);
  698. return (fileBool)ret;
  699. }
  700. fileBool isReadOnly()
  701. {
  702. MemoryBuffer sendBuffer;
  703. initSendBuffer(sendBuffer);
  704. MemoryBuffer replyBuffer;
  705. sendBuffer.append((RemoteFileCommandType)RFCisreadonly).append(filename);
  706. sendRemoteCommand(sendBuffer, replyBuffer);
  707. unsigned ret;
  708. replyBuffer.read(ret);
  709. return (fileBool)ret;
  710. }
  711. IFileIO * open(IFOmode mode,IFEflags extraFlags=IFEnone);
  712. IFileIO * openShared(IFOmode mode,IFSHmode shmode,IFEflags extraFlags=IFEnone);
  713. IFileAsyncIO * openAsync(IFOmode mode) { return NULL; } // not supported
  714. const char * queryFilename()
  715. {
  716. if (remotefilename.isEmpty()) {
  717. RemoteFilename rfn;
  718. rfn.setPath(ep,filename);
  719. StringBuffer path;
  720. rfn.getRemotePath(path);
  721. remotefilename.set(path);
  722. }
  723. return remotefilename.get();
  724. }
  725. void resetLocalFilename(const char *name)
  726. {
  727. remotefilename.clear();
  728. filename.set(name);
  729. }
  730. bool remove()
  731. {
  732. MemoryBuffer sendBuffer;
  733. initSendBuffer(sendBuffer);
  734. MemoryBuffer replyBuffer;
  735. sendBuffer.append((RemoteFileCommandType)RFCremove).append(filename);
  736. sendRemoteCommand(sendBuffer, replyBuffer);
  737. bool ok;
  738. replyBuffer.read(ok);
  739. return ok;
  740. }
  741. void rename(const char *newname)
  742. {
  743. // currently ignores directory on newname (in future versions newname will be required to be tail only and not full path)
  744. StringBuffer path;
  745. splitDirTail(filename,path);
  746. StringBuffer newdir;
  747. path.append(splitDirTail(newname,newdir));
  748. if (newdir.length()&&(strcmp(newdir.str(),path.str())!=0))
  749. WARNLOG("CRemoteFile::rename passed full path '%s' that may not to match original directory '%s'",newname,path.str());
  750. MemoryBuffer sendBuffer;
  751. initSendBuffer(sendBuffer);
  752. MemoryBuffer replyBuffer;
  753. sendBuffer.append((RemoteFileCommandType)RFCrename).append(filename).append(path);
  754. sendRemoteCommand(sendBuffer, replyBuffer);
  755. filename.set(path);
  756. remotefilename.clear();
  757. }
  758. void move(const char *newname)
  759. {
  760. // like rename except between directories
  761. // first create replote path
  762. if (!newname||!*newname)
  763. return;
  764. RemoteFilename destrfn;
  765. if (isPathSepChar(newname[0])&&isPathSepChar(newname[1])) {
  766. destrfn.setRemotePath(newname);
  767. if (!destrfn.queryEndpoint().ipequals(ep)) {
  768. StringBuffer msg;
  769. msg.appendf("IFile::move %s to %s, destination node must match source node", queryFilename(), newname);
  770. throw createDafsException(RFSERR_MoveFailed,msg.str());
  771. }
  772. }
  773. else
  774. destrfn.setPath(ep,newname);
  775. StringBuffer dest;
  776. newname = destrfn.getLocalPath(dest).str();
  777. MemoryBuffer sendBuffer;
  778. initSendBuffer(sendBuffer);
  779. MemoryBuffer replyBuffer;
  780. StringBuffer path;
  781. splitDirTail(filename,path);
  782. StringBuffer newdir;
  783. const char *newtail = splitDirTail(newname,newdir);
  784. if (strcmp(newdir.str(),path.str())==0)
  785. {
  786. path.append(newtail);
  787. newname = path;
  788. sendBuffer.append((RemoteFileCommandType)RFCrename); // use rename if we can (supported on older dafilesrv)
  789. }
  790. else
  791. sendBuffer.append((RemoteFileCommandType)RFCmove);
  792. sendBuffer.append(filename).append(newname);
  793. sendRemoteCommand(sendBuffer, replyBuffer);
  794. filename.set(newname);
  795. remotefilename.clear();
  796. }
  797. void setReadOnly(bool set)
  798. {
  799. MemoryBuffer sendBuffer;
  800. initSendBuffer(sendBuffer);
  801. MemoryBuffer replyBuffer;
  802. sendBuffer.append((RemoteFileCommandType)RFCsetreadonly).append(filename).append(set);
  803. sendRemoteCommand(sendBuffer, replyBuffer);
  804. }
  805. void setFilePermissions(unsigned fPerms)
  806. {
  807. MemoryBuffer sendBuffer;
  808. initSendBuffer(sendBuffer);
  809. MemoryBuffer replyBuffer;
  810. sendBuffer.append((RemoteFileCommandType)RFCsetfileperms).append(filename).append(fPerms);
  811. try
  812. {
  813. sendRemoteCommand(sendBuffer, replyBuffer);
  814. }
  815. catch (IDAFS_Exception *e)
  816. {
  817. if (e->errorCode() == RFSERR_InvalidCommand)
  818. {
  819. WARNLOG("umask setFilePermissions (0%o) not supported on remote server", fPerms);
  820. e->Release();
  821. }
  822. else
  823. throw;
  824. }
  825. }
  826. offset_t size()
  827. {
  828. #if 1 // faster method (consistant with IFile)
  829. // do this by using dir call (could be improved with new function but this not *too* bad)
  830. if (isSpecialPath(filename))
  831. return 0; // queries deemed to always exist (though don't know size).
  832. // if needed to get size I guess could use IFileIO method and cache (bit of pain though)
  833. StringBuffer dir;
  834. const char *tail = splitDirTail(filename,dir);
  835. if (!dir.length())
  836. return false;
  837. MemoryBuffer sendBuffer;
  838. initSendBuffer(sendBuffer);
  839. MemoryBuffer replyBuffer;
  840. bool includedirs = true;
  841. bool sub=false;
  842. {
  843. //Could be removed with new dafilesrv change [ (stream != 0) ], since this is not streaming.
  844. Owned<CEndpointCS> crit = dirCSTable->getCrit(ep); // NB dirCSTable doesn't own, last reference will remove from table
  845. CriticalBlock block(*crit);
  846. sendBuffer.append((RemoteFileCommandType)RFCgetdir).append(dir).append(tail).append(includedirs).append(sub);
  847. try
  848. {
  849. sendRemoteCommand(sendBuffer, replyBuffer);
  850. }
  851. catch (IDAFS_Exception * e)
  852. {
  853. if (e->errorCode() == RFSERR_GetDirFailed)
  854. {
  855. e->Release();
  856. return (offset_t)-1;
  857. }
  858. else
  859. throw e;
  860. }
  861. }
  862. // now should be 0 or 1 files returned
  863. Owned<CRemoteDirectoryIterator> iter = new CRemoteDirectoryIterator(ep, dir.str());
  864. iter->appendBuf(replyBuffer);
  865. if (!iter->first())
  866. return (offset_t)-1;
  867. return (offset_t) iter->getFileSize();
  868. #else
  869. IFileIO * io = open(IFOread);
  870. offset_t length = (offset_t)-1;
  871. if (io)
  872. {
  873. length = io->size();
  874. io->Release();
  875. }
  876. return length;
  877. #endif
  878. }
  879. bool createDirectory()
  880. {
  881. MemoryBuffer sendBuffer;
  882. initSendBuffer(sendBuffer);
  883. MemoryBuffer replyBuffer;
  884. sendBuffer.append((RemoteFileCommandType)RFCcreatedir).append(filename);
  885. sendRemoteCommand(sendBuffer, replyBuffer);
  886. bool ok;
  887. replyBuffer.read(ok);
  888. return ok;
  889. }
  890. virtual IDirectoryIterator *directoryFiles(const char *mask,bool sub,bool includedirs)
  891. {
  892. if (mask&&!*mask)
  893. return createDirectoryIterator("",""); // NULL iterator
  894. CRemoteDirectoryIterator *ret = new CRemoteDirectoryIterator(ep, filename);
  895. byte stream = (sub || !mask || containsFileWildcard(mask)) ? 1 : 0; // no point in streaming if mask without wildcards or sub, as will only be <= 1 match.
  896. Owned<CEndpointCS> crit = dirCSTable->getCrit(ep); // NB dirCSTable doesn't own, last reference will remove from table
  897. CriticalBlock block(*crit);
  898. for (;;)
  899. {
  900. MemoryBuffer sendBuffer;
  901. initSendBuffer(sendBuffer);
  902. MemoryBuffer replyBuffer;
  903. sendBuffer.append((RemoteFileCommandType)RFCgetdir).append(filename).append(mask?mask:"").append(includedirs).append(sub).append(stream);
  904. sendRemoteCommand(sendBuffer, replyBuffer);
  905. if (ret->appendBuf(replyBuffer))
  906. break;
  907. stream = 2; // NB: will never get here if streaming was off (if stream==0 above)
  908. }
  909. return ret;
  910. }
  911. IDirectoryDifferenceIterator *monitorDirectory(
  912. IDirectoryIterator *prev=NULL, // in (NULL means use current as baseline)
  913. const char *mask=NULL,
  914. bool sub=false,
  915. bool includedirs=false,
  916. unsigned checkinterval=60*1000,
  917. unsigned timeout=(unsigned)-1,
  918. Semaphore *abortsem=NULL) // returns NULL if timed out
  919. {
  920. // abortsem not yet supported
  921. MemoryBuffer sendBuffer;
  922. initSendBuffer(sendBuffer);
  923. MemoryBuffer replyBuffer;
  924. sendBuffer.append((RemoteFileCommandType)RFCmonitordir).append(filename).append(mask?mask:"").append(includedirs).append(sub);
  925. sendBuffer.append(checkinterval).append(timeout);
  926. __int64 cancelid=0; // not yet used
  927. sendBuffer.append(cancelid);
  928. byte isprev=(prev!=NULL)?1:0;
  929. sendBuffer.append(isprev);
  930. if (prev)
  931. CRemoteDirectoryIterator::serialize(sendBuffer,prev,0,true);
  932. sendRemoteCommand(sendBuffer, replyBuffer);
  933. byte status;
  934. replyBuffer.read(status);
  935. if (status==1)
  936. {
  937. CRemoteDirectoryIterator *iter = new CRemoteDirectoryIterator(ep, filename);
  938. iter->appendBuf(replyBuffer);
  939. return iter;
  940. }
  941. return NULL;
  942. }
  943. bool getInfo(bool &isdir,offset_t &size,CDateTime &modtime)
  944. {
  945. // do this by using dir call (could be improved with new function but this not *too* bad)
  946. StringBuffer dir;
  947. const char *tail = splitDirTail(filename,dir);
  948. if (!dir.length())
  949. return false;
  950. MemoryBuffer sendBuffer;
  951. initSendBuffer(sendBuffer);
  952. MemoryBuffer replyBuffer;
  953. bool includedirs = true;
  954. bool sub=false;
  955. {
  956. //Could be removed with new dafilesrv change [ (stream != 0) ], since this is not streaming.
  957. Owned<CEndpointCS> crit = dirCSTable->getCrit(ep); // NB dirCSTable doesn't own, last reference will remove from table
  958. CriticalBlock block(*crit);
  959. sendBuffer.append((RemoteFileCommandType)RFCgetdir).append(dir).append(tail).append(includedirs).append(sub);
  960. sendRemoteCommand(sendBuffer, replyBuffer);
  961. }
  962. // now should be 0 or 1 files returned
  963. Owned<CRemoteDirectoryIterator> iter = new CRemoteDirectoryIterator(ep, dir.str());
  964. iter->appendBuf(replyBuffer);
  965. if (!iter->first())
  966. return false;
  967. isdir = iter->isDir();
  968. size = (offset_t) iter->getFileSize();
  969. iter->getModifiedTime(modtime);
  970. return true;
  971. }
  972. bool setCompression(bool set)
  973. {
  974. assertex(!"Need to implement compress()");
  975. return false;
  976. }
  977. offset_t compressedSize()
  978. {
  979. assertex(!"Need to implement actualSize()");
  980. return (offset_t)-1;
  981. }
  982. void serialize(MemoryBuffer &tgt)
  983. {
  984. throwUnexpected();
  985. }
  986. void deserialize(MemoryBuffer &src)
  987. {
  988. throwUnexpected();
  989. }
  990. unsigned getCRC()
  991. {
  992. MemoryBuffer sendBuffer;
  993. initSendBuffer(sendBuffer);
  994. MemoryBuffer replyBuffer;
  995. sendBuffer.append((RemoteFileCommandType)RFCgetcrc).append(filename);
  996. sendRemoteCommand(sendBuffer, replyBuffer, true, true);
  997. unsigned crc;
  998. replyBuffer.read(crc);
  999. return crc;
  1000. }
  1001. void setCreateFlags(unsigned short cflags)
  1002. {
  1003. flags &= 0xffff;
  1004. flags |= ((unsigned)cflags<<16);
  1005. }
  1006. unsigned short getCreateFlags()
  1007. {
  1008. return (unsigned short)(flags>>16);
  1009. }
  1010. void setShareMode(IFSHmode shmode)
  1011. {
  1012. flags &= ~(IFSHfull|IFSHread);
  1013. flags |= (unsigned)(shmode&(IFSHfull|IFSHread));
  1014. isShareSet = true;
  1015. }
  1016. unsigned short getShareMode()
  1017. {
  1018. return (unsigned short)(flags&0xffff);
  1019. }
  1020. bool getIsShareSet()
  1021. {
  1022. return isShareSet;
  1023. }
  1024. void remoteExtractBlobElements(const char * prefix, ExtractedBlobArray & extracted)
  1025. {
  1026. MemoryBuffer sendBuffer;
  1027. initSendBuffer(sendBuffer);
  1028. sendBuffer.append((RemoteFileCommandType)RFCextractblobelements).append(prefix).append(filename);
  1029. MemoryBuffer replyBuffer;
  1030. sendRemoteCommand(sendBuffer, replyBuffer, true, true); // handles error code
  1031. unsigned n;
  1032. replyBuffer.read(n);
  1033. for (unsigned i=0;i<n;i++) {
  1034. ExtractedBlobInfo *item = new ExtractedBlobInfo;
  1035. item->deserialize(replyBuffer);
  1036. extracted.append(*item);
  1037. }
  1038. }
  1039. bool copySectionAsync(const char *uuid,const RemoteFilename &dest, offset_t toOfs, offset_t fromOfs, offset_t size, ICopyFileProgress *progress, unsigned timeout)
  1040. {
  1041. // now if we get here is it can be assumed the source file is local to where we send the command
  1042. StringBuffer tos;
  1043. dest.getRemotePath(tos);
  1044. MemoryBuffer sendBuffer;
  1045. initSendBuffer(sendBuffer);
  1046. MemoryBuffer replyBuffer;
  1047. sendBuffer.append((RemoteFileCommandType)RFCcopysection).append(uuid).append(queryLocalName()).append(tos).append(toOfs).append(fromOfs).append(size).append(timeout);
  1048. sendRemoteCommand(sendBuffer, replyBuffer);
  1049. unsigned status;
  1050. replyBuffer.read(status);
  1051. if (progress)
  1052. {
  1053. offset_t sizeDone;
  1054. offset_t totalSize;
  1055. replyBuffer.read(sizeDone).read(totalSize);
  1056. progress->onProgress(sizeDone,totalSize);
  1057. }
  1058. return (AsyncCommandStatus)status!=ACScontinue; // should only otherwise be done as errors raised by exception
  1059. }
  1060. void copySection(const RemoteFilename &dest, offset_t toOfs, offset_t fromOfs, offset_t size, ICopyFileProgress *progress, CFflags copyFlags=CFnone)
  1061. {
  1062. StringBuffer uuid;
  1063. genUUID(uuid,true);
  1064. unsigned timeout = 60*1000; // check every minute
  1065. while(!copySectionAsync(uuid.str(),dest,toOfs,fromOfs,size,progress,timeout));
  1066. }
  1067. void copyTo(IFile *dest, size32_t buffersize, ICopyFileProgress *progress, bool usetmp, CFflags copyFlags=CFnone);
  1068. virtual IMemoryMappedFile *openMemoryMapped(offset_t ofs, memsize_t len, bool write)
  1069. {
  1070. return NULL;
  1071. }
  1072. };
  1073. class CRemoteFileIO : implements IFileIO, public CInterface
  1074. {
  1075. protected:
  1076. Linked<CRemoteFile> parent;
  1077. RemoteFileIOHandle handle;
  1078. std::atomic<cycle_t> ioReadCycles;
  1079. std::atomic<cycle_t> ioWriteCycles;
  1080. std::atomic<__uint64> ioReadBytes;
  1081. std::atomic<__uint64> ioWriteBytes;
  1082. std::atomic<__uint64> ioReads;
  1083. std::atomic<__uint64> ioWrites;
  1084. std::atomic<unsigned> ioRetries;
  1085. IFOmode mode;
  1086. compatIFSHmode compatmode;
  1087. IFEflags extraFlags;
  1088. bool disconnectonexit;
  1089. public:
  1090. IMPLEMENT_IINTERFACE
  1091. CRemoteFileIO(CRemoteFile *_parent)
  1092. : parent(_parent), ioReadCycles(0), ioWriteCycles(0), ioReadBytes(0), ioWriteBytes(0), ioReads(0), ioWrites(0), ioRetries(0)
  1093. {
  1094. handle = 0;
  1095. disconnectonexit = false;
  1096. }
  1097. ~CRemoteFileIO()
  1098. {
  1099. if (handle) {
  1100. try {
  1101. close();
  1102. }
  1103. catch (IException *e) {
  1104. StringBuffer s;
  1105. e->errorMessage(s);
  1106. WARNLOG("CRemoteFileIO close file: %s",s.str());
  1107. e->Release();
  1108. }
  1109. }
  1110. if (disconnectonexit)
  1111. parent->disconnect();
  1112. }
  1113. void close()
  1114. {
  1115. if (handle)
  1116. {
  1117. try
  1118. {
  1119. MemoryBuffer sendBuffer;
  1120. initSendBuffer(sendBuffer);
  1121. sendBuffer.append((RemoteFileCommandType)RFCcloseIO).append(handle);
  1122. parent->sendRemoteCommand(sendBuffer,false);
  1123. }
  1124. catch (IDAFS_Exception *e)
  1125. {
  1126. if ((e->errorCode()!=RFSERR_InvalidFileIOHandle)&&(e->errorCode()!=RFSERR_NullFileIOHandle))
  1127. throw;
  1128. e->Release();
  1129. }
  1130. handle = 0;
  1131. }
  1132. }
  1133. RemoteFileIOHandle getHandle() const { return handle; }
  1134. bool open(IFOmode _mode,compatIFSHmode _compatmode,IFEflags _extraFlags=IFEnone)
  1135. {
  1136. MemoryBuffer sendBuffer;
  1137. initSendBuffer(sendBuffer);
  1138. MemoryBuffer replyBuffer;
  1139. const char *localname = parent->queryLocalName();
  1140. localname = skipSpecialPath(localname);
  1141. // also send _extraFlags
  1142. // then also send sMode, cFlags
  1143. unsigned short sMode = parent->getShareMode();
  1144. unsigned short cFlags = parent->getCreateFlags();
  1145. if (!(parent->getIsShareSet()))
  1146. {
  1147. switch ((compatIFSHmode)_compatmode)
  1148. {
  1149. case compatIFSHnone:
  1150. sMode = IFSHnone;
  1151. break;
  1152. case compatIFSHread:
  1153. sMode = IFSHread;
  1154. break;
  1155. case compatIFSHwrite:
  1156. sMode = IFSHfull;
  1157. break;
  1158. case compatIFSHall:
  1159. sMode = IFSHfull;
  1160. break;
  1161. }
  1162. }
  1163. sendBuffer.append((RemoteFileCommandType)RFCopenIO).append(localname).append((byte)_mode).append((byte)_compatmode).append((byte)_extraFlags).append(sMode).append(cFlags);
  1164. parent->sendRemoteCommand(sendBuffer, replyBuffer);
  1165. replyBuffer.read(handle);
  1166. if (!handle)
  1167. return false;
  1168. switch (_mode) {
  1169. case IFOcreate:
  1170. mode = IFOwrite;
  1171. break;
  1172. case IFOcreaterw:
  1173. mode = IFOreadwrite;
  1174. break;
  1175. default:
  1176. mode = _mode;
  1177. break;
  1178. }
  1179. compatmode = _compatmode;
  1180. extraFlags = _extraFlags;
  1181. return true;
  1182. }
  1183. bool reopen()
  1184. {
  1185. StringBuffer s;
  1186. PROGLOG("Attempting reopen of %s on %s",parent->queryLocalName(),parent->queryEp().getUrlStr(s).str());
  1187. if (open(mode,compatmode,extraFlags))
  1188. return true;
  1189. return false;
  1190. }
  1191. offset_t size()
  1192. {
  1193. MemoryBuffer sendBuffer;
  1194. initSendBuffer(sendBuffer);
  1195. MemoryBuffer replyBuffer;
  1196. sendBuffer.append((RemoteFileCommandType)RFCsize).append(handle);
  1197. parent->sendRemoteCommand(sendBuffer, replyBuffer, false);
  1198. // Retry using reopen TBD
  1199. offset_t ret;
  1200. replyBuffer.read(ret);
  1201. return ret;
  1202. }
  1203. virtual unsigned __int64 getStatistic(StatisticKind kind)
  1204. {
  1205. switch (kind)
  1206. {
  1207. case StCycleDiskReadIOCycles:
  1208. return ioReadCycles.load(std::memory_order_relaxed);
  1209. case StCycleDiskWriteIOCycles:
  1210. return ioWriteCycles.load(std::memory_order_relaxed);
  1211. case StTimeDiskReadIO:
  1212. return cycle_to_nanosec(ioReadCycles.load(std::memory_order_relaxed));
  1213. case StTimeDiskWriteIO:
  1214. return cycle_to_nanosec(ioWriteCycles.load(std::memory_order_relaxed));
  1215. case StSizeDiskRead:
  1216. return ioReadBytes.load(std::memory_order_relaxed);
  1217. case StSizeDiskWrite:
  1218. return ioWriteBytes.load(std::memory_order_relaxed);
  1219. case StNumDiskReads:
  1220. return ioReads.load(std::memory_order_relaxed);
  1221. case StNumDiskWrites:
  1222. return ioWrites.load(std::memory_order_relaxed);
  1223. case StNumDiskRetries:
  1224. return ioRetries.load(std::memory_order_relaxed);
  1225. }
  1226. return 0;
  1227. }
  1228. size32_t read(offset_t pos, size32_t len, void * data)
  1229. {
  1230. size32_t got;
  1231. MemoryBuffer replyBuffer;
  1232. CCycleTimer timer;
  1233. const void *b;
  1234. try
  1235. {
  1236. b = doRead(pos,len,replyBuffer,got,data);
  1237. }
  1238. catch (...)
  1239. {
  1240. ioReadCycles.fetch_add(timer.elapsedCycles());
  1241. throw;
  1242. }
  1243. ioReadCycles.fetch_add(timer.elapsedCycles());
  1244. ioReadBytes.fetch_add(got);
  1245. ++ioReads;
  1246. if (b!=data)
  1247. memcpy(data,b,got);
  1248. return got;
  1249. }
  1250. virtual void flush()
  1251. {
  1252. }
  1253. const void *doRead(offset_t pos, size32_t len, MemoryBuffer &replyBuffer, size32_t &got, void *dstbuf)
  1254. {
  1255. unsigned tries=0;
  1256. for (;;)
  1257. {
  1258. try
  1259. {
  1260. MemoryBuffer sendBuffer;
  1261. initSendBuffer(sendBuffer);
  1262. replyBuffer.clear();
  1263. sendBuffer.append((RemoteFileCommandType)RFCread).append(handle).append(pos).append(len);
  1264. parent->sendRemoteCommand(sendBuffer, replyBuffer,false);
  1265. // kludge dafilesrv versions <= 1.5e don't return error correctly
  1266. if (replyBuffer.length()>len+sizeof(size32_t)+sizeof(unsigned))
  1267. {
  1268. size32_t save = replyBuffer.getPos();
  1269. replyBuffer.reset(len+sizeof(size32_t)+sizeof(unsigned));
  1270. unsigned errCode;
  1271. replyBuffer.read(errCode);
  1272. if (errCode)
  1273. {
  1274. StringBuffer msg;
  1275. parent->ep.getUrlStr(msg.append('[')).append("] ");
  1276. if (replyBuffer.getPos()<replyBuffer.length())
  1277. {
  1278. StringAttr s;
  1279. replyBuffer.read(s);
  1280. msg.append(s);
  1281. }
  1282. else
  1283. msg.append("ERROR #").append(errCode);
  1284. throw createDafsException(errCode, msg.str());
  1285. }
  1286. else
  1287. replyBuffer.reset(save);
  1288. }
  1289. replyBuffer.read(got);
  1290. if ((got>replyBuffer.remaining())||(got>len))
  1291. {
  1292. PROGLOG("Read beyond buffer %d,%d,%d",got,replyBuffer.remaining(),len);
  1293. throw createDafsException(RFSERR_ReadFailed, "Read beyond buffer");
  1294. }
  1295. return replyBuffer.readDirect(got);
  1296. }
  1297. catch (IJSOCK_Exception *e)
  1298. {
  1299. EXCLOG(e,"CRemoteFileIO::read");
  1300. if (++tries > 3)
  1301. {
  1302. ioRetries.fetch_add(tries);
  1303. throw;
  1304. }
  1305. WARNLOG("Retrying read of %s (%d)",parent->queryLocalName(),tries);
  1306. Owned<IException> exc = e;
  1307. if (!reopen())
  1308. {
  1309. ioRetries.fetch_add(tries);
  1310. throw exc.getClear();
  1311. }
  1312. }
  1313. }
  1314. if (tries)
  1315. ioRetries.fetch_add(tries);
  1316. got = 0;
  1317. return NULL;
  1318. }
  1319. size32_t write(offset_t pos, size32_t len, const void * data)
  1320. {
  1321. unsigned tries=0;
  1322. size32_t ret = 0;
  1323. CCycleTimer timer;
  1324. for (;;)
  1325. {
  1326. try
  1327. {
  1328. MemoryBuffer replyBuffer;
  1329. MemoryBuffer sendBuffer;
  1330. initSendBuffer(sendBuffer);
  1331. sendBuffer.append((RemoteFileCommandType)RFCwrite).append(handle).append(pos).append(len).append(len, data);
  1332. parent->sendRemoteCommand(sendBuffer, replyBuffer, false, true);
  1333. replyBuffer.read(ret);
  1334. break;
  1335. }
  1336. catch (IJSOCK_Exception *e)
  1337. {
  1338. EXCLOG(e,"CRemoteFileIO::write");
  1339. if (++tries > 3)
  1340. {
  1341. ioRetries.fetch_add(tries);
  1342. ioWriteCycles.fetch_add(timer.elapsedCycles());
  1343. throw;
  1344. }
  1345. WARNLOG("Retrying write(%" I64F "d,%d) of %s (%d)",pos,len,parent->queryLocalName(),tries);
  1346. Owned<IException> exc = e;
  1347. if (!reopen())
  1348. {
  1349. ioRetries.fetch_add(tries);
  1350. ioWriteCycles.fetch_add(timer.elapsedCycles());
  1351. throw exc.getClear();
  1352. }
  1353. }
  1354. }
  1355. if (tries)
  1356. ioRetries.fetch_add(tries);
  1357. ioWriteCycles.fetch_add(timer.elapsedCycles());
  1358. ioWriteBytes.fetch_add(ret);
  1359. ++ioWrites;
  1360. if ((ret==(size32_t)-1) || (ret < len))
  1361. throw createDafsException(DISK_FULL_EXCEPTION_CODE,"write failed, disk full?");
  1362. return ret;
  1363. }
  1364. offset_t appendFile(IFile *file,offset_t pos,offset_t len)
  1365. {
  1366. MemoryBuffer sendBuffer;
  1367. initSendBuffer(sendBuffer);
  1368. MemoryBuffer replyBuffer;
  1369. const char * fname = file->queryFilename();
  1370. sendBuffer.append((RemoteFileCommandType)RFCappend).append(handle).append(fname).append(pos).append(len);
  1371. parent->sendRemoteCommand(sendBuffer, replyBuffer, false, true); // retry not safe
  1372. offset_t ret;
  1373. replyBuffer.read(ret);
  1374. if ((ret==(offset_t)-1) || ((len != ((offset_t)-1)) && (ret < len)))
  1375. throw createDafsException(DISK_FULL_EXCEPTION_CODE,"append failed, disk full?"); // though could be file missing TBD
  1376. return ret;
  1377. }
  1378. void setSize(offset_t size)
  1379. {
  1380. MemoryBuffer sendBuffer;
  1381. initSendBuffer(sendBuffer);
  1382. MemoryBuffer replyBuffer;
  1383. sendBuffer.append((RemoteFileCommandType)RFCsetsize).append(handle).append(size);
  1384. parent->sendRemoteCommand(sendBuffer, replyBuffer, false, true);
  1385. // retry using reopen TBD
  1386. }
  1387. void setDisconnectOnExit(bool set) { disconnectonexit = set; }
  1388. void sendRemoteCommand(MemoryBuffer & sendBuffer, MemoryBuffer & replyBuffer, bool retry=true, bool lengthy=false, bool handleErrCode=true)
  1389. {
  1390. parent->sendRemoteCommand(sendBuffer, replyBuffer, retry, lengthy, handleErrCode);
  1391. }
  1392. };
  1393. IFileIO *CRemoteFile::openShared(IFOmode mode,IFSHmode shmode,IFEflags extraFlags)
  1394. {
  1395. // 0x0, 0x8, 0x10 and 0x20 are only share modes supported in this assert
  1396. // currently only 0x0 (IFSHnone), 0x8 (IFSHread) and 0x10 (IFSHfull) are used so this could be 0xffffffe7
  1397. // note: IFSHfull also includes read sharing (ie write|read)
  1398. assertex(((unsigned)shmode&0xffffffc7)==0);
  1399. compatIFSHmode compatmode;
  1400. unsigned fileflags = (flags>>16) & (S_IRUSR|S_IWUSR|S_IXUSR|S_IRGRP|S_IWGRP|S_IXGRP|S_IROTH|S_IWOTH|S_IXOTH);
  1401. if (fileflags&S_IXUSR) // this is bit hit and miss but backward compatible
  1402. compatmode = compatIFSHexec;
  1403. else if (fileflags&(S_IWGRP|S_IWOTH))
  1404. compatmode = compatIFSHall;
  1405. else if (shmode&IFSHfull)
  1406. compatmode = compatIFSHwrite;
  1407. else if (((shmode&(IFSHread|IFSHfull))==0) && ((fileflags&(S_IRGRP|S_IROTH))==0))
  1408. compatmode = compatIFSHnone;
  1409. else
  1410. compatmode = compatIFSHread;
  1411. Owned<CRemoteFileIO> res = new CRemoteFileIO(this);
  1412. if (res->open(mode,compatmode,extraFlags))
  1413. return res.getClear();
  1414. return NULL;
  1415. }
  1416. IFileIO * CRemoteFile::open(IFOmode mode,IFEflags extraFlags)
  1417. {
  1418. return openShared(mode,(IFSHmode)(flags&(IFSHread|IFSHfull)),extraFlags);
  1419. }
  1420. //---------------------------------------------------------------------------
  1421. void CRemoteFile::copyTo(IFile *dest, size32_t buffersize, ICopyFileProgress *progress, bool usetmp, CFflags copyFlags)
  1422. {
  1423. CRemoteFile *dstfile = QUERYINTERFACE(dest,CRemoteFile);
  1424. if (dstfile&&!dstfile->queryEp().isLocal()) {
  1425. StringBuffer tmpname;
  1426. Owned<IFile> destf;
  1427. RemoteFilename dest;
  1428. if (usetmp) {
  1429. makeTempCopyName(tmpname,dstfile->queryLocalName());
  1430. dest.setPath(dstfile->queryEp(),tmpname.str());
  1431. }
  1432. else
  1433. dest.setPath(dstfile->queryEp(),dstfile->queryLocalName());
  1434. destf.setown(createIFile(dest));
  1435. try {
  1436. // following may fail if new dafilesrv not deployed on src
  1437. copySection(dest,(offset_t)-1,0,(offset_t)-1,progress,copyFlags);
  1438. if (usetmp) {
  1439. StringAttr tail(pathTail(dstfile->queryLocalName()));
  1440. dstfile->remove();
  1441. destf->rename(tail);
  1442. }
  1443. return;
  1444. }
  1445. catch (IException *e)
  1446. {
  1447. StringBuffer s;
  1448. s.appendf("Remote File Copy (%d): ",e->errorCode());
  1449. e->errorMessage(s);
  1450. s.append(", retrying local");
  1451. WARNLOG("%s",s.str());
  1452. e->Release();
  1453. }
  1454. // delete dest
  1455. try {
  1456. destf->remove();
  1457. }
  1458. catch (IException *e)
  1459. {
  1460. EXCLOG(e,"Remote File Copy, Deleting temporary file");
  1461. e->Release();
  1462. }
  1463. }
  1464. // assumption if we get here that source remote, dest local (or equiv)
  1465. class cIntercept: implements ICopyFileIntercept
  1466. {
  1467. MemoryAttr ma;
  1468. MemoryBuffer mb;
  1469. virtual offset_t copy(IFileIO *from, IFileIO *to, offset_t ofs, size32_t sz)
  1470. {
  1471. if (ma.length()<sz)
  1472. ma.allocate(sz); // may be not used
  1473. void *buf = ma.bufferBase();
  1474. size32_t got;
  1475. CRemoteFileIO *srcio = QUERYINTERFACE(from,CRemoteFileIO);
  1476. const void *dst;
  1477. if (srcio)
  1478. dst = srcio->doRead(ofs,sz,mb.clear(),got,buf);
  1479. else {
  1480. // shouldn't ever get here if source remote
  1481. got = from->read(ofs, sz, buf);
  1482. dst = buf;
  1483. }
  1484. if (got != 0)
  1485. to->write(ofs, got, dst);
  1486. return got;
  1487. }
  1488. } intercept;
  1489. doCopyFile(dest,this,buffersize,progress,&intercept,usetmp,copyFlags);
  1490. }
  1491. ////////////////
  1492. void remoteExtractBlobElements(const SocketEndpoint &ep,const char * prefix, const char * filename, ExtractedBlobArray & extracted)
  1493. {
  1494. Owned<CRemoteFile> file = new CRemoteFile (ep,filename);
  1495. file->remoteExtractBlobElements(prefix, extracted);
  1496. }
  1497. //---------------------------------------------------------------------------
  1498. // Local mount redirect
  1499. struct CLocalMountRec: public CInterface
  1500. {
  1501. IpAddress ip;
  1502. StringAttr dir; // dir path on remote ip
  1503. StringAttr local; // local dir path
  1504. };
  1505. static CIArrayOf<CLocalMountRec> localMounts;
  1506. static CriticalSection localMountCrit;
  1507. void setDafsLocalMountRedirect(const IpAddress &ip,const char *dir,const char *mountdir)
  1508. {
  1509. CriticalBlock block(localMountCrit);
  1510. ForEachItemInRev(i,localMounts) {
  1511. CLocalMountRec &mount = localMounts.item(i);
  1512. if (dir==NULL) { // remove all matching mount
  1513. if (!mountdir)
  1514. return;
  1515. if (strcmp(mount.local,mountdir)==0)
  1516. localMounts.remove(i);
  1517. }
  1518. else if (mount.ip.ipequals(ip)&&(strcmp(mount.dir,dir)==0)) {
  1519. if (mountdir) {
  1520. mount.local.set(mountdir);
  1521. return;
  1522. }
  1523. else
  1524. localMounts.remove(i);
  1525. }
  1526. }
  1527. if (dir&&mountdir) {
  1528. CLocalMountRec &mount = *new CLocalMountRec;
  1529. mount.ip.ipset(ip);
  1530. mount.dir.set(dir);
  1531. mount.local.set(mountdir);
  1532. localMounts.append(mount);
  1533. }
  1534. }
  1535. IFile *createFileLocalMount(const IpAddress &ip, const char * filename)
  1536. {
  1537. CriticalBlock block(localMountCrit);
  1538. ForEachItemInRev(i,localMounts) {
  1539. CLocalMountRec &mount = localMounts.item(i);
  1540. if (mount.ip.ipequals(ip)) {
  1541. size32_t bl = mount.dir.length();
  1542. if (isPathSepChar(mount.dir[bl-1]))
  1543. bl--;
  1544. if ((memcmp((void *)filename,(void *)mount.dir.get(),bl)==0)&&(isPathSepChar(filename[bl])||!filename[bl])) { // match
  1545. StringBuffer locpath(mount.local);
  1546. if (filename[bl])
  1547. addPathSepChar(locpath).append(filename+bl+1);
  1548. locpath.replace((PATHSEPCHAR=='\\')?'/':'\\',PATHSEPCHAR);
  1549. return createIFile(locpath.str());
  1550. }
  1551. }
  1552. }
  1553. return NULL;
  1554. }
  1555. IFile * createRemoteFile(SocketEndpoint &ep, const char * filename)
  1556. {
  1557. IFile *ret = createFileLocalMount(ep,filename);
  1558. if (ret)
  1559. return ret;
  1560. return new CRemoteFile(ep, filename);
  1561. }
  1562. void clientDisconnectRemoteFile(IFile *file)
  1563. {
  1564. CRemoteFile *cfile = QUERYINTERFACE(file,CRemoteFile);
  1565. if (cfile)
  1566. cfile->disconnect();
  1567. }
  1568. bool clientResetFilename(IFile *file, const char *newname) // returns false if not remote
  1569. {
  1570. CRemoteFile *cfile = QUERYINTERFACE(file,CRemoteFile);
  1571. if (!cfile)
  1572. return false;
  1573. cfile->resetLocalFilename(newname);
  1574. return true;
  1575. }
  1576. IFile *createDaliServixFile(const RemoteFilename & file)
  1577. {
  1578. SocketEndpoint ep(file.queryEndpoint());
  1579. setDafsEndpointPort(ep);
  1580. if (ep.isNull())
  1581. return NULL;
  1582. StringBuffer path;
  1583. file.getLocalPath(path);
  1584. return createRemoteFile(ep, path.str());
  1585. }
  1586. void clientDisconnectRemoteIoOnExit(IFileIO *fileio, bool set)
  1587. {
  1588. CRemoteFileIO *cfileio = QUERYINTERFACE(fileio,CRemoteFileIO);
  1589. if (cfileio)
  1590. cfileio->setDisconnectOnExit(set);
  1591. }
  1592. void setDaliServixSocketCaching(bool set)
  1593. {
  1594. clientSetDaliServixSocketCaching(set);
  1595. }
  1596. void disconnectRemoteFile(IFile *file)
  1597. {
  1598. clientDisconnectRemoteFile(file);
  1599. }
  1600. void disconnectRemoteIoOnExit(IFileIO *fileio,bool set)
  1601. {
  1602. clientDisconnectRemoteIoOnExit(fileio,set);
  1603. }
  1604. bool resetRemoteFilename(IFile *file, const char *newname)
  1605. {
  1606. return clientResetFilename(file,newname);
  1607. }
  1608. extern bool clientAsyncCopyFileSection(const char *uuid,
  1609. IFile *from, // expected to be remote
  1610. RemoteFilename &to,
  1611. offset_t toOfs, // -1 created file and copies to start
  1612. offset_t fromOfs,
  1613. offset_t size,
  1614. ICopyFileProgress *progress,
  1615. unsigned timeout) // returns true when done
  1616. {
  1617. CRemoteFile *cfile = QUERYINTERFACE(from,CRemoteFile);
  1618. if (!cfile) {
  1619. // local - do sync
  1620. from->copySection(to,toOfs,fromOfs,size,progress);
  1621. return true;
  1622. }
  1623. return cfile->copySectionAsync(uuid,to,toOfs,fromOfs, size, progress, timeout);
  1624. }
  1625. bool asyncCopyFileSection(const char *uuid, // from genUUID - must be same for subsequent calls
  1626. IFile *from, // expected to be remote
  1627. RemoteFilename &to,
  1628. offset_t toofs, // (offset_t)-1 created file and copies to start
  1629. offset_t fromofs,
  1630. offset_t size, // (offset_t)-1 for all file
  1631. ICopyFileProgress *progress,
  1632. unsigned timeout // 0 to start, non-zero to wait
  1633. )
  1634. {
  1635. return clientAsyncCopyFileSection(uuid,from,to,toofs,fromofs,size,progress,timeout);
  1636. }
  1637. void setRemoteFileTimeouts(unsigned maxconnecttime,unsigned maxreadtime)
  1638. {
  1639. clientSetRemoteFileTimeouts(maxconnecttime,maxreadtime);
  1640. }
  1641. unsigned validateNodes(const SocketEndpointArray &epso,const char *dataDir, const char *mirrorDir, bool chkver, SocketEndpointArray &failures, UnsignedArray &failedcodes, StringArray &failedmessages, const char *filename)
  1642. {
  1643. // used for detecting duff nodes
  1644. IPointerArrayOf<ISocket> sockets;
  1645. // dedup nodes
  1646. SocketEndpointArray eps;
  1647. ForEachItemIn(i1,epso)
  1648. eps.appendUniq(epso.element(i1));
  1649. unsigned to=30*1000;
  1650. unsigned n=eps.ordinality(); // use approx log scale (timeout is long but only for failure situation)
  1651. while (n>1) {
  1652. n/=2;
  1653. to+=30*1000;
  1654. }
  1655. multiConnect(eps,sockets,to);
  1656. ForEachItemIn(i,eps) {
  1657. if (sockets.item(i)==NULL) {
  1658. failures.append(eps.item(i));
  1659. failedcodes.append(DAFS_VALIDATE_CONNECT_FAIL);
  1660. failedmessages.append("Connect failure");
  1661. }
  1662. }
  1663. CriticalSection sect;
  1664. class casyncfor: public CAsyncFor
  1665. {
  1666. const SocketEndpointArray &eps;
  1667. const IPointerArrayOf<ISocket> &sockets;
  1668. SocketEndpointArray &failures;
  1669. StringArray &failedmessages;
  1670. UnsignedArray &failedcodes;
  1671. CriticalSection &sect;
  1672. StringAttr dataDir, mirrorDir;
  1673. bool chkv;
  1674. const char *filename;
  1675. public:
  1676. casyncfor(const SocketEndpointArray &_eps,const IPointerArrayOf<ISocket> &_sockets,const char *_dataDir,const char *_mirrorDir,bool _chkv, const char *_filename,SocketEndpointArray &_failures, StringArray &_failedmessages,UnsignedArray &_failedcodes,CriticalSection &_sect)
  1677. : eps(_eps), sockets(_sockets), failures(_failures),
  1678. failedmessages(_failedmessages), failedcodes(_failedcodes), sect(_sect),
  1679. dataDir(_dataDir), mirrorDir(_mirrorDir)
  1680. {
  1681. chkv = _chkv;
  1682. filename = _filename;
  1683. }
  1684. void Do(unsigned i)
  1685. {
  1686. ISocket *sock = sockets.item(i);
  1687. if (!sock)
  1688. return;
  1689. SocketEndpoint ep = eps.item(i);
  1690. bool iswin;
  1691. unsigned code = 0;
  1692. StringBuffer errstr;
  1693. StringBuffer ver;
  1694. try {
  1695. getRemoteVersion(sock,ver);
  1696. iswin = (strstr(ver.str(),"Windows")!=NULL);
  1697. }
  1698. catch (IException *e)
  1699. {
  1700. code = DAFS_VALIDATE_CONNECT_FAIL;
  1701. e->errorMessage(errstr);
  1702. e->Release();
  1703. }
  1704. if (!code&&chkv) {
  1705. const char *rv = ver.str();
  1706. const char *v = DAFILESRV_VERSIONSTRING;
  1707. while (*v&&(*v!='-')&&(*v==*rv)) {
  1708. v++;
  1709. rv++;
  1710. }
  1711. if (*rv!=*v) {
  1712. if (*rv) {
  1713. while (*rv&&(*rv!='-'))
  1714. rv++;
  1715. while (*v&&(*v!='-'))
  1716. v++;
  1717. StringBuffer wanted(v-DAFILESRV_VERSIONSTRING,DAFILESRV_VERSIONSTRING);
  1718. ver.setLength(rv-ver.str());
  1719. if (strcmp(ver.str(),wanted.str())<0) { // allow >
  1720. code = DAFS_VALIDATE_BAD_VERSION;
  1721. errstr.appendf("Mismatch dafilesrv version ");
  1722. errstr.append(rv-ver.str(),ver.str());
  1723. errstr.append(", wanted ");
  1724. errstr.append(v-DAFILESRV_VERSIONSTRING,DAFILESRV_VERSIONSTRING);
  1725. }
  1726. }
  1727. else {
  1728. code = DAFS_VALIDATE_CONNECT_FAIL;
  1729. errstr.appendf("could not contact dafilesrv");
  1730. }
  1731. }
  1732. }
  1733. if (!code&&(dataDir.get()||mirrorDir.get())) {
  1734. clientAddSocketToCache(ep,sock);
  1735. const char *drivePath = NULL;
  1736. const char *drivePaths[2];
  1737. unsigned drives=0;
  1738. if (mirrorDir.get()) drivePaths[drives++] = mirrorDir.get();
  1739. if (dataDir.get()) drivePaths[drives++] = dataDir.get();
  1740. do
  1741. {
  1742. StringBuffer path(drivePaths[--drives]);
  1743. addPathSepChar(path);
  1744. if (filename)
  1745. path.append(filename);
  1746. else {
  1747. path.append("dafs_");
  1748. genUUID(path);
  1749. path.append(".tmp");
  1750. }
  1751. RemoteFilename rfn;
  1752. rfn.setPath(ep,path);
  1753. Owned<IFile> file = createIFile(rfn);
  1754. size32_t sz;
  1755. StringBuffer ds;
  1756. try {
  1757. StringBuffer fullPath;
  1758. rfn.getRemotePath(fullPath);
  1759. recursiveCreateDirectoryForFile(fullPath);
  1760. Owned<IFileIO> fileio = file->open(IFOcreate);
  1761. CDateTime dt;
  1762. dt.setNow();
  1763. dt.getString(ds);
  1764. sz = ds.length()+1;
  1765. assertex(sz<64);
  1766. fileio->write(0,sz,ds.str());
  1767. }
  1768. catch (IException *e) {
  1769. if (e->errorCode()==DISK_FULL_EXCEPTION_CODE)
  1770. code |= (drivePath==dataDir.get()?DAFS_VALIDATE_DISK_FULL_DATA:DAFS_VALIDATE_DISK_FULL_MIRROR);
  1771. else
  1772. code |= (drivePath==dataDir.get()?DAFS_VALIDATE_WRITE_FAIL_DATA:DAFS_VALIDATE_WRITE_FAIL_MIRROR);
  1773. if (errstr.length())
  1774. errstr.append(',');
  1775. e->errorMessage(errstr);
  1776. e->Release();
  1777. continue; // no use trying read
  1778. }
  1779. try {
  1780. Owned<IFileIO> fileio = file->open(IFOread);
  1781. char buf[64];
  1782. size32_t rd = fileio->read(0,sizeof(buf)-1,buf);
  1783. if ((rd!=sz)||(memcmp(buf,ds.str(),sz)!=0)) {
  1784. StringBuffer s;
  1785. ep.getIpText(s);
  1786. throw MakeStringException(-1,"Data discrepancy on disk read of %s of %s",path.str(),s.str());
  1787. }
  1788. }
  1789. catch (IException *e) {
  1790. code |= (drivePath==dataDir.get()?DAFS_VALIDATE_READ_FAIL_DATA:DAFS_VALIDATE_READ_FAIL_MIRROR);
  1791. if (errstr.length())
  1792. errstr.append(',');
  1793. e->errorMessage(errstr);
  1794. e->Release();
  1795. }
  1796. if (!filename||!*filename) {
  1797. // delete file created
  1798. try {
  1799. file->remove();
  1800. }
  1801. catch (IException *e) {
  1802. e->Release(); // supress error
  1803. }
  1804. }
  1805. }
  1806. while (0 != drives);
  1807. }
  1808. if (code) {
  1809. CriticalBlock block(sect);
  1810. failures.append(ep);
  1811. failedcodes.append(code);
  1812. failedmessages.append(errstr.str());
  1813. }
  1814. }
  1815. } afor(eps,sockets,dataDir,mirrorDir,chkver,filename,failures,failedmessages,failedcodes,sect);
  1816. afor.For(eps.ordinality(), 10, false, true);
  1817. return failures.ordinality();
  1818. }
  1819. static PointerArrayOf<SharedObject> *hookDlls;
  1820. static void installFileHook(const char *hookFileSpec);
  1821. extern DAFSCLIENT_API void installFileHooks(const char *hookFileSpec)
  1822. {
  1823. if (!hookDlls)
  1824. hookDlls = new PointerArrayOf<SharedObject>;
  1825. const char * cursor = hookFileSpec;
  1826. for (;*cursor;)
  1827. {
  1828. StringBuffer file;
  1829. while (*cursor && *cursor != ENVSEPCHAR)
  1830. file.append(*cursor++);
  1831. if(*cursor)
  1832. cursor++;
  1833. if(!file.length())
  1834. continue;
  1835. installFileHook(file);
  1836. }
  1837. }
  1838. typedef void (*HookInstallFunction)();
  1839. static void installFileHook(const char *hookFile)
  1840. {
  1841. StringBuffer dirPath, dirTail, absolutePath;
  1842. splitFilename(hookFile, &dirPath, &dirPath, &dirTail, &dirTail);
  1843. makeAbsolutePath(dirPath.str(), absolutePath);
  1844. if (!containsFileWildcard(dirTail))
  1845. {
  1846. addPathSepChar(absolutePath).append(dirTail);
  1847. Owned<IFile> file = createIFile(absolutePath);
  1848. if (file->isDirectory() == foundYes)
  1849. {
  1850. installFileHooks(addPathSepChar(absolutePath).append('*'));
  1851. }
  1852. else if (file->isFile() == foundYes)
  1853. {
  1854. HookInstallFunction hookInstall;
  1855. SharedObject *so = new SharedObject(); // MORE - this leaks! Kind-of deliberate right now...
  1856. if (so->load(file->queryFilename(), false) &&
  1857. (hookInstall = (HookInstallFunction) GetSharedProcedure(so->getInstanceHandle(), "installFileHook")) != NULL)
  1858. {
  1859. hookInstall();
  1860. hookDlls->append(so);
  1861. }
  1862. else
  1863. {
  1864. so->unload();
  1865. delete so;
  1866. DBGLOG("File hook library %s could not be loaded", hookFile);
  1867. }
  1868. }
  1869. else
  1870. {
  1871. DBGLOG("File hook library %s not found", hookFile);
  1872. }
  1873. }
  1874. else
  1875. {
  1876. Owned<IDirectoryIterator> dir = createDirectoryIterator(absolutePath, dirTail);
  1877. ForEach(*dir)
  1878. {
  1879. const char *name = dir->query().queryFilename();
  1880. if (name && *name && *name != '.')
  1881. installFileHook(name);
  1882. }
  1883. }
  1884. }
  1885. // Should be called before closedown, ideally. MODEXIT tries to mop up but may be too late to do so cleanly
  1886. extern DAFSCLIENT_API void removeFileHooks()
  1887. {
  1888. if (hookDlls)
  1889. {
  1890. ForEachItemIn(idx, *hookDlls)
  1891. {
  1892. SharedObject *so = hookDlls->item(idx);
  1893. HookInstallFunction hookInstall = (HookInstallFunction) GetSharedProcedure(so->getInstanceHandle(), "removeFileHook");
  1894. if (hookInstall)
  1895. hookInstall();
  1896. delete so;
  1897. }
  1898. delete hookDlls;
  1899. hookDlls = NULL;
  1900. }
  1901. }
  1902. MODULE_INIT(INIT_PRIORITY_DAFSCLIENT)
  1903. {
  1904. if(!DaliServixIntercept)
  1905. {
  1906. DaliServixIntercept = new CDaliServixIntercept;
  1907. addIFileCreateHook(DaliServixIntercept);
  1908. }
  1909. return true;
  1910. }
  1911. MODULE_EXIT()
  1912. {
  1913. if(DaliServixIntercept)
  1914. {
  1915. // delete ConnectionTable; // too late to delete (jsocket closed down)
  1916. removeIFileCreateHook(DaliServixIntercept);
  1917. ::Release(DaliServixIntercept);
  1918. DaliServixIntercept = NULL;
  1919. }
  1920. removeFileHooks();
  1921. }
  1922. IDaFileSrvHook *queryDaFileSrvHook()
  1923. {
  1924. return DaliServixIntercept;
  1925. }
  1926. void enableForceRemoteReads()
  1927. {
  1928. const char *forceRemotePattern = queryEnvironmentConf().queryProp("forceRemotePattern");
  1929. if (!isEmptyString(forceRemotePattern))
  1930. queryDaFileSrvHook()->forceRemote(forceRemotePattern);
  1931. }
  1932. bool testForceRemote(const char *path)
  1933. {
  1934. const char *forceRemotePattern = queryEnvironmentConf().queryProp("forceRemotePattern");
  1935. return !isEmptyString(forceRemotePattern) && WildMatch(path, forceRemotePattern, false);
  1936. }
  1937. //// legacy implementation of streaming implementations, to be replaced by dafsstream.*
  1938. ///
  1939. //
  1940. class CRemoteFilteredFileIOBase : public CRemoteBase, implements IRemoteFileIO
  1941. {
  1942. typedef CRemoteBase PARENT;
  1943. public:
  1944. IMPLEMENT_IINTERFACE_O_USING(CRemoteBase);
  1945. // Really a stream, but life (maybe) easier elsewhere if looks like a file
  1946. // Sometime should refactor to be based on ISerialStream instead - or maybe IRowStream.
  1947. CRemoteFilteredFileIOBase(SocketEndpoint &ep, const char *filename, IOutputMetaData *actual, IOutputMetaData *projected, const RowFilter &fieldFilters, unsigned __int64 chooseN)
  1948. : CRemoteBase(ep, filename)
  1949. {
  1950. // NB: inputGrouped == outputGrouped for now, but may want output to be ungrouped
  1951. openRequest();
  1952. if (queryOutputCompressionDefault())
  1953. {
  1954. expander.setown(getExpander(queryOutputCompressionDefault()));
  1955. if (expander)
  1956. {
  1957. expandMb.setEndian(__BIG_ENDIAN);
  1958. request.appendf("\"outputCompression\" : \"%s\",\n", queryOutputCompressionDefault());
  1959. }
  1960. else
  1961. WARNLOG("Failed to created compression decompressor for: %s", queryOutputCompressionDefault());
  1962. }
  1963. request.appendf("\"format\" : \"binary\",\n"
  1964. "\"node\" : {\n"
  1965. " \"fileName\" : \"%s\"", filename);
  1966. if (chooseN)
  1967. request.appendf(",\n \"chooseN\" : \"%" I64F "u\"", chooseN);
  1968. if (fieldFilters.numFilterFields())
  1969. {
  1970. request.append(",\n \"keyFilter\" : [\n ");
  1971. for (unsigned idx=0; idx < fieldFilters.numFilterFields(); idx++)
  1972. {
  1973. auto &filter = fieldFilters.queryFilter(idx);
  1974. StringBuffer filterString;
  1975. filter.serialize(filterString);
  1976. if (idx)
  1977. request.append(",\n ");
  1978. request.append("\"");
  1979. encodeJSON(request, filterString.length(), filterString.str());
  1980. request.append("\"");
  1981. }
  1982. request.append("\n ]");
  1983. }
  1984. MemoryBuffer actualTypeInfo;
  1985. if (!dumpTypeInfo(actualTypeInfo, actual->querySerializedDiskMeta()->queryTypeInfo()))
  1986. throw createDafsException(DAFSERR_cmdstream_unsupported_recfmt, "Format not supported by remote read");
  1987. request.append(",\n \"inputBin\" : \"");
  1988. JBASE64_Encode(actualTypeInfo.toByteArray(), actualTypeInfo.length(), request, false);
  1989. request.append("\"");
  1990. if (actual != projected)
  1991. {
  1992. MemoryBuffer projectedTypeInfo;
  1993. dumpTypeInfo(projectedTypeInfo, projected->querySerializedDiskMeta()->queryTypeInfo());
  1994. if (actualTypeInfo.length() != projectedTypeInfo.length() ||
  1995. memcmp(actualTypeInfo.toByteArray(), projectedTypeInfo.toByteArray(), actualTypeInfo.length()))
  1996. {
  1997. request.append(",\n \"outputBin\": \"");
  1998. JBASE64_Encode(projectedTypeInfo.toByteArray(), projectedTypeInfo.length(), request, false);
  1999. request.append("\"");
  2000. }
  2001. }
  2002. bufPos = 0;
  2003. }
  2004. virtual size32_t read(offset_t pos, size32_t len, void * data) override
  2005. {
  2006. assertex(pos == bufPos); // Must read sequentially
  2007. if (!bufRemaining && !eof)
  2008. refill();
  2009. if (eof)
  2010. return 0;
  2011. if (len > bufRemaining)
  2012. len = bufRemaining;
  2013. bufPos += len;
  2014. bufRemaining -= len;
  2015. memcpy(data, reply.readDirect(len), len);
  2016. return len;
  2017. }
  2018. virtual offset_t size() override { return -1; }
  2019. virtual size32_t write(offset_t pos, size32_t len, const void * data) override { throwUnexpected(); }
  2020. virtual offset_t appendFile(IFile *file,offset_t pos=0,offset_t len=(offset_t)-1) override { throwUnexpected(); }
  2021. virtual void setSize(offset_t size) override { throwUnexpected(); }
  2022. virtual void flush() override { throwUnexpected(); }
  2023. virtual void close() override
  2024. {
  2025. PARENT::close(handle);
  2026. handle = 0;
  2027. }
  2028. virtual unsigned __int64 getStatistic(StatisticKind kind) override
  2029. {
  2030. /* NB: Would need new stat. categories added for this to make sense,
  2031. * but this class is implemented as a IFileIO for convenience for now,
  2032. * it may be refactored into another form later.
  2033. */
  2034. return 0;
  2035. }
  2036. // IRemoteFileIO
  2037. virtual void addVirtualFieldMapping(const char *fieldName, const char *fieldValue) override
  2038. {
  2039. virtualFields[fieldName] = fieldValue;
  2040. }
  2041. virtual void ensureAvailable() override
  2042. {
  2043. if (firstRequest)
  2044. handleFirstRequest();
  2045. }
  2046. protected:
  2047. StringBuffer &openRequest()
  2048. {
  2049. return request.append("{\n");
  2050. }
  2051. StringBuffer &closeRequest()
  2052. {
  2053. return request.append("\n }\n");
  2054. }
  2055. void addVirtualFields()
  2056. {
  2057. request.append(", \n \"virtualFields\" : {\n");
  2058. bool first=true;
  2059. for (auto &e : virtualFields)
  2060. {
  2061. if (!first)
  2062. request.append(",\n");
  2063. request.appendf(" \"%s\" : \"%s\"", e.first.c_str(), e.second.c_str());
  2064. first = false;
  2065. }
  2066. request.append(" }");
  2067. }
  2068. void handleFirstRequest()
  2069. {
  2070. firstRequest = false;
  2071. addVirtualFields();
  2072. closeRequest();
  2073. sendRequest(0, nullptr);
  2074. }
  2075. void refill()
  2076. {
  2077. if (firstRequest)
  2078. {
  2079. handleFirstRequest();
  2080. return;
  2081. }
  2082. size32_t cursorLength;
  2083. reply.read(cursorLength);
  2084. if (!cursorLength)
  2085. {
  2086. eof = true;
  2087. return;
  2088. }
  2089. MemoryBuffer mrequest;
  2090. MemoryBuffer newReply;
  2091. initSendBuffer(mrequest);
  2092. mrequest.append((RemoteFileCommandType)RFCStreamRead);
  2093. VStringBuffer json("{ \"handle\" : %u, \"format\" : \"binary\" }", handle);
  2094. mrequest.append(json.length(), json.str());
  2095. sendRemoteCommand(mrequest, newReply);
  2096. unsigned newHandle;
  2097. newReply.read(newHandle);
  2098. if (newHandle == handle)
  2099. {
  2100. reply.swapWith(newReply);
  2101. reply.read(bufRemaining);
  2102. eof = (bufRemaining == 0);
  2103. if (expander)
  2104. {
  2105. size32_t expandedSz = expander->init(reply.bytes()+reply.getPos());
  2106. expandMb.clear().reserve(expandedSz);
  2107. expander->expand(expandMb.bufferBase());
  2108. expandMb.swapWith(reply);
  2109. }
  2110. }
  2111. else
  2112. {
  2113. assertex(newHandle == 0);
  2114. sendRequest(cursorLength, reply.readDirect(cursorLength));
  2115. }
  2116. }
  2117. void sendRequest(unsigned cursorLen, const void *cursorData)
  2118. {
  2119. MemoryBuffer mrequest;
  2120. initSendBuffer(mrequest);
  2121. mrequest.append((RemoteFileCommandType)RFCStreamRead);
  2122. mrequest.append(request.length(), request.str());
  2123. if (cursorLen)
  2124. {
  2125. StringBuffer cursorInfo;
  2126. cursorInfo.append(",\"cursorBin\": \"");
  2127. JBASE64_Encode(cursorData, cursorLen, cursorInfo, false);
  2128. cursorInfo.append("\"\n");
  2129. mrequest.append(cursorInfo.length(), cursorInfo.str());
  2130. }
  2131. if (TF_TRACE_FULL)
  2132. PROGLOG("req = <%s}>", request.str());
  2133. mrequest.append(3, " \n}");
  2134. sendRemoteCommand(mrequest, reply);
  2135. reply.read(handle);
  2136. reply.read(bufRemaining);
  2137. eof = (bufRemaining == 0);
  2138. if (expander)
  2139. {
  2140. size32_t expandedSz = expander->init(reply.bytes()+reply.getPos());
  2141. expandMb.clear().reserve(expandedSz);
  2142. expander->expand(expandMb.bufferBase());
  2143. expandMb.swapWith(reply);
  2144. }
  2145. }
  2146. StringBuffer request;
  2147. MemoryBuffer reply;
  2148. unsigned handle = 0;
  2149. size32_t bufRemaining = 0;
  2150. offset_t bufPos = 0;
  2151. bool eof = false;
  2152. bool firstRequest = true;
  2153. std::unordered_map<std::string, std::string> virtualFields;
  2154. Owned<IExpander> expander;
  2155. MemoryBuffer expandMb;
  2156. };
  2157. class CRemoteFilteredFileIO : public CRemoteFilteredFileIOBase
  2158. {
  2159. public:
  2160. // Really a stream, but life (maybe) easier elsewhere if looks like a file
  2161. // Sometime should refactor to be based on ISerialStream instead - or maybe IRowStream.
  2162. CRemoteFilteredFileIO(SocketEndpoint &ep, const char *filename, IOutputMetaData *actual, IOutputMetaData *projected, const RowFilter &fieldFilters, bool compressed, bool grouped, unsigned __int64 chooseN)
  2163. : CRemoteFilteredFileIOBase(ep, filename, actual, projected, fieldFilters, chooseN)
  2164. {
  2165. // NB: inputGrouped == outputGrouped for now, but may want output to be ungrouped
  2166. request.appendf(",\n \"kind\" : \"diskread\",\n"
  2167. " \"compressed\" : \"%s\",\n"
  2168. " \"inputGrouped\" : \"%s\",\n"
  2169. " \"outputGrouped\" : \"%s\"", boolToStr(compressed), boolToStr(grouped), boolToStr(grouped));
  2170. }
  2171. };
  2172. class CRemoteFilteredRowStream : public CRemoteFilteredFileIO, implements IRowStream
  2173. {
  2174. public:
  2175. CRemoteFilteredRowStream(const RtlRecord &_recInfo, SocketEndpoint &ep, const char * filename, IOutputMetaData *actual, IOutputMetaData *projected, const RowFilter &fieldFilters, bool compressed, bool grouped)
  2176. : CRemoteFilteredFileIO(ep, filename, actual, projected, fieldFilters, compressed, grouped, 0), recInfo(_recInfo)
  2177. {
  2178. }
  2179. virtual const byte *queryNextRow() // NOTE - rows returned must NOT be freed
  2180. {
  2181. if (!bufRemaining && !eof)
  2182. refill();
  2183. if (eof)
  2184. return nullptr;
  2185. unsigned len = recInfo.getRecordSize(reply.readDirect(0));
  2186. bufPos += len;
  2187. bufRemaining -= len;
  2188. return reply.readDirect(len);
  2189. }
  2190. virtual void stop() override
  2191. {
  2192. close();
  2193. eof = true;
  2194. }
  2195. protected:
  2196. const RtlRecord &recInfo;
  2197. };
  2198. static StringAttr remoteOutputCompressionDefault;
  2199. void setRemoteOutputCompressionDefault(const char *type)
  2200. {
  2201. if (!isEmptyString(type))
  2202. remoteOutputCompressionDefault.set(type);
  2203. }
  2204. const char *queryOutputCompressionDefault() { return remoteOutputCompressionDefault; }
  2205. extern IRemoteFileIO *createRemoteFilteredFile(SocketEndpoint &ep, const char * filename, IOutputMetaData *actual, IOutputMetaData *projected, const RowFilter &fieldFilters, bool compressed, bool grouped, unsigned __int64 chooseN)
  2206. {
  2207. try
  2208. {
  2209. return new CRemoteFilteredFileIO(ep, filename, actual, projected, fieldFilters, compressed, grouped, chooseN);
  2210. }
  2211. catch (IException *e)
  2212. {
  2213. EXCLOG(e, nullptr);
  2214. e->Release();
  2215. }
  2216. return nullptr;
  2217. }
  2218. class CRemoteFilteredKeyIO : public CRemoteFilteredFileIOBase
  2219. {
  2220. public:
  2221. // Really a stream, but life (maybe) easier elsewhere if looks like a file
  2222. // Sometime should refactor to be based on ISerialStream instead - or maybe IRowStream.
  2223. CRemoteFilteredKeyIO(SocketEndpoint &ep, const char *filename, unsigned crc, IOutputMetaData *actual, IOutputMetaData *projected, const RowFilter &fieldFilters, unsigned __int64 chooseN)
  2224. : CRemoteFilteredFileIOBase(ep, filename, actual, projected, fieldFilters, chooseN)
  2225. {
  2226. request.appendf(",\n \"kind\" : \"indexread\"");
  2227. request.appendf(",\n \"crc\" : \"%u\"", crc);
  2228. }
  2229. };
  2230. class CRemoteFilteredKeyCountIO : public CRemoteFilteredFileIOBase
  2231. {
  2232. public:
  2233. // Really a stream, but life (maybe) easier elsewhere if looks like a file
  2234. // Sometime should refactor to be based on ISerialStream instead - or maybe IRowStream.
  2235. CRemoteFilteredKeyCountIO(SocketEndpoint &ep, const char *filename, unsigned crc, IOutputMetaData *actual, const RowFilter &fieldFilters, unsigned __int64 rowLimit)
  2236. : CRemoteFilteredFileIOBase(ep, filename, actual, actual, fieldFilters, rowLimit)
  2237. {
  2238. request.appendf(",\n \"kind\" : \"indexcount\"");
  2239. request.appendf(",\n \"crc\" : \"%u\"", crc);
  2240. }
  2241. };
  2242. class CRemoteKey : public CSimpleInterfaceOf<IIndexLookup>
  2243. {
  2244. Owned<IRemoteFileIO> iRemoteFileIO;
  2245. offset_t pos = 0;
  2246. Owned<ISourceRowPrefetcher> prefetcher;
  2247. CThorContiguousRowBuffer prefetchBuffer;
  2248. Owned<ISerialStream> strm;
  2249. bool pending = false;
  2250. SocketEndpoint ep;
  2251. StringAttr filename;
  2252. unsigned crc;
  2253. Linked<IOutputMetaData> actual, projected;
  2254. RowFilter fieldFilters;
  2255. public:
  2256. CRemoteKey(SocketEndpoint &_ep, const char *_filename, unsigned _crc, IOutputMetaData *_actual, IOutputMetaData *_projected, const RowFilter &_fieldFilters, unsigned __int64 rowLimit)
  2257. : ep(_ep), filename(_filename), crc(_crc), actual(_actual), projected(_projected)
  2258. {
  2259. for (unsigned f=0; f<_fieldFilters.numFilterFields(); f++)
  2260. fieldFilters.addFilter(OLINK(_fieldFilters.queryFilter(f)));
  2261. iRemoteFileIO.setown(new CRemoteFilteredKeyIO(ep, filename, crc, actual, projected, fieldFilters, rowLimit));
  2262. if (!iRemoteFileIO)
  2263. throwStringExceptionV(DAFSERR_cmdstream_openfailure, "Unable to open remote key part: '%s'", filename.get());
  2264. strm.setown(createFileSerialStream(iRemoteFileIO));
  2265. prefetcher.setown(projected->createDiskPrefetcher());
  2266. assertex(prefetcher);
  2267. prefetchBuffer.setStream(strm);
  2268. }
  2269. // IIndexLookup
  2270. virtual void ensureAvailable() override
  2271. {
  2272. iRemoteFileIO->ensureAvailable(); // will throw an exception if fails
  2273. }
  2274. virtual unsigned __int64 getCount() override
  2275. {
  2276. return checkCount(0);
  2277. }
  2278. virtual unsigned __int64 checkCount(unsigned __int64 limit) override
  2279. {
  2280. Owned<IFileIO> iFileIO = new CRemoteFilteredKeyCountIO(ep, filename, crc, actual, fieldFilters, limit);
  2281. unsigned __int64 result;
  2282. iFileIO->read(0, sizeof(result), &result);
  2283. return result;
  2284. }
  2285. virtual const void *nextKey() override
  2286. {
  2287. if (pending)
  2288. prefetchBuffer.finishedRow();
  2289. if (prefetchBuffer.eos())
  2290. return nullptr;
  2291. prefetcher->readAhead(prefetchBuffer);
  2292. pending = true;
  2293. return prefetchBuffer.queryRow();
  2294. }
  2295. virtual unsigned querySeeks() const override { return 0; } // not sure how best to handle these, perhaps should log/record somewhere on server-side
  2296. virtual unsigned queryScans() const override { return 0; }
  2297. virtual unsigned querySkips() const override { return 0; }
  2298. };
  2299. extern IIndexLookup *createRemoteFilteredKey(SocketEndpoint &ep, const char * filename, unsigned crc, IOutputMetaData *actual, IOutputMetaData *projected, const RowFilter &fieldFilters, unsigned __int64 chooseN)
  2300. {
  2301. try
  2302. {
  2303. return new CRemoteKey(ep, filename, crc, actual, projected, fieldFilters, chooseN);
  2304. }
  2305. catch (IException *e)
  2306. {
  2307. EXCLOG(e, nullptr);
  2308. e->Release();
  2309. }
  2310. return nullptr;
  2311. }