rmtfile.cpp 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include "portlist.h"
  15. #include "jlib.hpp"
  16. #include "jio.hpp"
  17. #include "jlog.hpp"
  18. #include "jmutex.hpp"
  19. #include "jfile.hpp"
  20. #include "sockfile.hpp"
  21. #include "rmtfile.hpp"
  22. #include "remoteerr.hpp"
  23. //----------------------------------------------------------------------------
  24. //#define TEST_DAFILESRV_FOR_UNIX_PATHS // probably not needed
  25. unsigned short getDaliServixPort()
  26. {
  27. return DAFILESRV_PORT;
  28. }
  29. void setCanAccessDirectly(RemoteFilename & file,bool set)
  30. {
  31. if (set)
  32. file.setPort(0);
  33. else if (file.getPort()==0) // foreign daliservix may be passed in
  34. file.setPort(getDaliServixPort());
  35. }
  36. bool canAccessDirectly(const RemoteFilename & file) // not that well named but historical
  37. {
  38. return (file.getPort()==0);
  39. }
  40. void setLocalMountRedirect(const IpAddress &ip,const char *dir,const char *mountdir)
  41. {
  42. setDafsLocalMountRedirect(ip,dir,mountdir);
  43. }
  44. class CDaliServixFilter : public CInterface
  45. {
  46. protected:
  47. StringAttr dir, sourceRangeText;
  48. SocketEndpointArray sourceRangeIps;
  49. bool sourceRangeHasPorts, trace;
  50. bool checkForPorts(SocketEndpointArray &ips)
  51. {
  52. ForEachItemIn(i, ips)
  53. {
  54. if (ips.item(i).port)
  55. return true;
  56. }
  57. return false;
  58. }
  59. public:
  60. CDaliServixFilter(const char *_dir, const char *sourceRange, bool _trace) : dir(_dir), trace(_trace)
  61. {
  62. if (sourceRange)
  63. {
  64. sourceRangeText.set(sourceRange);
  65. sourceRangeIps.fromText(sourceRange, 0);
  66. sourceRangeHasPorts = checkForPorts(sourceRangeIps);
  67. }
  68. else
  69. sourceRangeHasPorts = false;
  70. }
  71. bool queryTrace() const { return trace; }
  72. const char *queryDirectory() const { return dir; }
  73. bool testPath(const char *path) const
  74. {
  75. if (!dir) // if no dir in filter, match any
  76. return true;
  77. else
  78. return startsWith(path, dir.get());
  79. }
  80. bool applyFilter(const SocketEndpoint &ep) const
  81. {
  82. if (sourceRangeText.length())
  83. {
  84. SocketEndpoint _ep = ep;
  85. if (!sourceRangeHasPorts) // if source range doesn't have ports, only check ip
  86. _ep.port = 0;
  87. return NotFound != sourceRangeIps.find(_ep);
  88. }
  89. // NB: If no source range, use target range to decide if filter should apply
  90. return testEp(ep);
  91. }
  92. virtual bool testEp(const SocketEndpoint &ep) const = 0;
  93. virtual StringBuffer &getInfo(StringBuffer &info)
  94. {
  95. if (dir.length())
  96. info.append(", dir=").append(dir.get());
  97. if (sourceRangeText.get())
  98. info.append(", sourcerange=").append(sourceRangeText.get());
  99. info.append(", trace=(").append(trace ? "true" : "false").append(")");
  100. return info;
  101. }
  102. };
  103. class CDaliServixSubnetFilter : public CDaliServixFilter
  104. {
  105. IpSubNet ipSubNet;
  106. public:
  107. CDaliServixSubnetFilter(const char *subnet, const char *mask, const char *dir, const char *sourceRange, bool trace) :
  108. CDaliServixFilter(dir, sourceRange, trace)
  109. {
  110. if (!ipSubNet.set(subnet, mask))
  111. throw MakeStringException(0, "Invalid sub net definition: %s, %s", subnet, mask);
  112. }
  113. virtual bool testEp(const SocketEndpoint &ep) const
  114. {
  115. return ipSubNet.test(ep);
  116. }
  117. virtual StringBuffer &getInfo(StringBuffer &info)
  118. {
  119. info.append("subnet=");
  120. ipSubNet.getNetText(info);
  121. info.append(", mask=");
  122. ipSubNet.getMaskText(info);
  123. CDaliServixFilter::getInfo(info);
  124. return info;
  125. }
  126. };
  127. class CDaliServixRangeFilter : public CDaliServixFilter
  128. {
  129. StringAttr rangeText;
  130. SocketEndpointArray rangeIps;
  131. bool rangeIpsHavePorts;
  132. public:
  133. CDaliServixRangeFilter(const char *_range, const char *dir, const char *sourceRange, bool trace)
  134. : CDaliServixFilter(dir, sourceRange, trace)
  135. {
  136. rangeText.set(_range);
  137. rangeIps.fromText(_range, 0);
  138. rangeIpsHavePorts = checkForPorts(rangeIps);
  139. }
  140. virtual bool testEp(const SocketEndpoint &ep) const
  141. {
  142. SocketEndpoint _ep = ep;
  143. if (!rangeIpsHavePorts) // if range doesn't have ports, only check ip
  144. _ep.port = 0;
  145. return NotFound != rangeIps.find(_ep);
  146. }
  147. virtual StringBuffer &getInfo(StringBuffer &info)
  148. {
  149. info.append("range=").append(rangeText.get());
  150. CDaliServixFilter::getInfo(info);
  151. return info;
  152. }
  153. };
  154. CDaliServixFilter *createDaliServixFilter(IPropertyTree &filterProps)
  155. {
  156. CDaliServixFilter *filter = NULL;
  157. const char *dir = filterProps.queryProp("@directory");
  158. const char *sourceRange = filterProps.queryProp("@sourcerange");
  159. bool trace = filterProps.getPropBool("@trace");
  160. if (filterProps.hasProp("@subnet"))
  161. filter = new CDaliServixSubnetFilter(filterProps.queryProp("@subnet"), filterProps.queryProp("@mask"), dir, sourceRange, trace);
  162. else if (filterProps.hasProp("@range"))
  163. filter = new CDaliServixRangeFilter(filterProps.queryProp("@range"), dir, sourceRange, trace);
  164. else
  165. throw MakeStringException(0, "Unknown DaliServix filter definition");
  166. return filter;
  167. }
  168. class CDaliServixIntercept: public CInterface, implements IDaFileSrvHook
  169. {
  170. CIArrayOf<CDaliServixFilter> filters;
  171. void addFilter(CDaliServixFilter *filter)
  172. {
  173. filters.append(*filter);
  174. StringBuffer msg("DaFileSrvHook: adding translateToLocal [");
  175. filter->getInfo(msg);
  176. msg.append("]");
  177. PROGLOG("%s", msg.str());
  178. }
  179. public:
  180. IMPLEMENT_IINTERFACE;
  181. virtual IFile * createIFile(const RemoteFilename & filename)
  182. {
  183. SocketEndpoint ep = filename.queryEndpoint();
  184. bool noport = (ep.port==0);
  185. setDafsEndpointPort(ep);
  186. if (!filename.isLocal()||(ep.port!=DAFILESRV_PORT)) // assume standard port is running on local machine
  187. {
  188. #ifdef __linux__
  189. #ifndef USE_SAMBA
  190. if (noport && filters.ordinality())
  191. {
  192. ForEachItemIn(sn, filters)
  193. {
  194. CDaliServixFilter &filter = filters.item(sn);
  195. if (filter.testEp(ep))
  196. {
  197. StringBuffer lPath;
  198. filename.getLocalPath(lPath);
  199. if (filter.testPath(lPath.str()))
  200. {
  201. if (filter.queryTrace())
  202. {
  203. StringBuffer fromPath;
  204. filename.getRemotePath(fromPath);
  205. PROGLOG("Redirecting path: '%s' to '%s", fromPath.str(), lPath.str());
  206. }
  207. return ::createIFile(lPath.str());
  208. }
  209. }
  210. }
  211. }
  212. return createDaliServixFile(filename);
  213. #endif
  214. #endif
  215. if (!noport) // expect all filenames that specify port to be dafilesrc or daliservix
  216. return createDaliServixFile(filename);
  217. if (filename.isUnixPath()
  218. #ifdef TEST_DAFILESRV_FOR_UNIX_PATHS
  219. &&testDaliServixPresent(ep)
  220. #endif
  221. )
  222. return createDaliServixFile(filename);
  223. }
  224. return NULL;
  225. }
  226. virtual void addSubnetFilter(const char *subnet, const char *mask, const char *dir, const char *sourceRange, bool trace)
  227. {
  228. Owned<CDaliServixFilter> filter = new CDaliServixSubnetFilter(subnet, mask, dir, sourceRange, trace);
  229. addFilter(filter.getClear());
  230. }
  231. virtual void addRangeFilter(const char *range, const char *dir, const char *sourceRange, bool trace)
  232. {
  233. Owned<CDaliServixFilter> filter = new CDaliServixRangeFilter(range, dir, sourceRange, trace);
  234. addFilter(filter.getClear());
  235. }
  236. virtual IPropertyTree *addFilters(IPropertyTree *config, const SocketEndpoint *myEp)
  237. {
  238. if (!config)
  239. return NULL;
  240. Owned<IPropertyTree> result;
  241. Owned<IPropertyTreeIterator> iter = config->getElements("Filter");
  242. ForEach(*iter)
  243. {
  244. Owned<CDaliServixFilter> filter = createDaliServixFilter(iter->query());
  245. // Only add filters where myIP matches filter criteria
  246. if (!myEp || filter->applyFilter(*myEp))
  247. {
  248. addFilter(filter.getClear());
  249. if (!result)
  250. result.setown(createPTree());
  251. result->addPropTree("Filter", LINK(&iter->query()));
  252. }
  253. }
  254. return result.getClear();
  255. }
  256. virtual IPropertyTree *addMyFilters(IPropertyTree *config, SocketEndpoint *myEp)
  257. {
  258. if (myEp)
  259. return addFilters(config, myEp);
  260. else
  261. {
  262. SocketEndpoint ep;
  263. GetHostIp(ep);
  264. return addFilters(config, &ep);
  265. }
  266. }
  267. virtual void clearFilters()
  268. {
  269. filters.kill();
  270. }
  271. } *DaliServixIntercept = NULL;
  272. bool testDaliServixPresent(const SocketEndpoint &_ep)
  273. {
  274. SocketEndpoint ep(_ep);
  275. setDafsEndpointPort(ep);
  276. if (ep.isNull())
  277. return false;
  278. try {
  279. Owned<ISocket> socket = ISocket::connect_timeout(ep,10000);
  280. return true;
  281. }
  282. catch (IException *e)
  283. {
  284. e->Release();
  285. }
  286. return false;
  287. }
  288. bool testDaliServixPresent(const IpAddress &ip)
  289. {
  290. SocketEndpoint ep(0,ip);
  291. return testDaliServixPresent(ep);
  292. }
  293. unsigned getDaliServixVersion(const SocketEndpoint &_ep,StringBuffer &ver)
  294. {
  295. SocketEndpoint ep(_ep);
  296. setDafsEndpointPort(ep);
  297. if (ep.isNull())
  298. return false;
  299. try {
  300. Owned<ISocket> socket = ISocket::connect_timeout(ep,10000);
  301. return getRemoteVersion(socket,ver);
  302. }
  303. catch (IException *e)
  304. {
  305. EXCLOG(e,"getDaliServixVersion");
  306. e->Release();
  307. }
  308. return 0;
  309. }
  310. struct CDafsOsCacheEntry
  311. {
  312. SocketEndpoint ep;
  313. DAFS_OS os;
  314. time_t at;
  315. };
  316. class CDafsOsCache: public SuperHashTableOf<CDafsOsCacheEntry,SocketEndpoint>
  317. {
  318. void onAdd(void *) {}
  319. void onRemove(void *et)
  320. {
  321. CDafsOsCacheEntry *e = (CDafsOsCacheEntry *)et;
  322. delete e;
  323. }
  324. unsigned getHashFromElement(const void *e) const
  325. {
  326. const CDafsOsCacheEntry &elem=*(const CDafsOsCacheEntry *)e;
  327. return elem.ep.hash(0);
  328. }
  329. unsigned getHashFromFindParam(const void *fp) const
  330. {
  331. return ((const SocketEndpoint *)fp)->hash(0);
  332. }
  333. const void * getFindParam(const void *p) const
  334. {
  335. const CDafsOsCacheEntry &elem=*(const CDafsOsCacheEntry *)p;
  336. return (void *)&elem.ep;
  337. }
  338. bool matchesFindParam(const void * et, const void *fp, unsigned) const
  339. {
  340. return ((CDafsOsCacheEntry *)et)->ep.equals(*(SocketEndpoint *)fp);
  341. }
  342. IMPLEMENT_SUPERHASHTABLEOF_REF_FIND(CDafsOsCacheEntry,SocketEndpoint);
  343. public:
  344. static CriticalSection crit;
  345. CDafsOsCache()
  346. {
  347. }
  348. ~CDafsOsCache()
  349. {
  350. SuperHashTableOf<CDafsOsCacheEntry,SocketEndpoint>::releaseAll();
  351. }
  352. DAFS_OS lookup(const SocketEndpoint &ep,ISocket *sock)
  353. {
  354. CriticalBlock block(crit);
  355. CDafsOsCacheEntry *r = SuperHashTableOf<CDafsOsCacheEntry,SocketEndpoint>::find(&ep);
  356. bool needupdate=false;
  357. unsigned t = (unsigned)time(NULL);
  358. if (!r) {
  359. r = new CDafsOsCacheEntry;
  360. r->ep = ep;
  361. needupdate = true;
  362. SuperHashTableOf<CDafsOsCacheEntry,SocketEndpoint>::add(*r);
  363. }
  364. else
  365. needupdate = (t-r->at>60*5); // update every 5 mins
  366. if (needupdate) {
  367. r->os = DAFS_OSunknown;
  368. StringBuffer ver;
  369. unsigned ret;
  370. if (sock)
  371. ret = getRemoteVersion(sock,ver);
  372. else
  373. ret = getDaliServixVersion(ep,ver);
  374. if (ret!=0) { // if cross-os needs dafilesrv
  375. if (strstr(ver.str(),"Linux")!=NULL)
  376. r->os = DAFS_OSlinux;
  377. else if (strstr(ver.str(),"Windows")!=NULL)
  378. r->os = DAFS_OSwindows;
  379. else if (strstr(ver.str(),"Solaris")!=NULL)
  380. r->os = DAFS_OSsolaris;
  381. }
  382. r->at = t;
  383. }
  384. return r->os;
  385. }
  386. };
  387. CriticalSection CDafsOsCache::crit;
  388. DAFS_OS getDaliServixOs(const SocketEndpoint &ep,ISocket *socket)
  389. {
  390. #ifdef _DEBUG
  391. if (ep.isLocal())
  392. #ifdef _WIN32
  393. return DAFS_OSwindows;
  394. #else
  395. return DAFS_OSlinux;
  396. #endif
  397. #endif
  398. static CDafsOsCache cache;
  399. return cache.lookup(ep,socket);
  400. }
  401. DAFS_OS getDaliServixOs(const SocketEndpoint &ep)
  402. {
  403. return getDaliServixOs(ep,NULL);
  404. }
  405. unsigned getDaliServixVersion(const IpAddress &ip,StringBuffer &ver)
  406. {
  407. SocketEndpoint ep(0,ip);
  408. return getDaliServixVersion(ep,ver);
  409. }
  410. int remoteExec(const SocketEndpoint &_ep,const char *cmdline, const char *workdir,bool sync,
  411. size32_t insize, void *inbuf, MemoryBuffer *outbuf)
  412. {
  413. SocketEndpoint ep(_ep);
  414. setDafsEndpointPort(ep);
  415. if (ep.isNull())
  416. return false;
  417. try {
  418. Owned<ISocket> socket = ISocket::connect_wait(ep,5000);
  419. return remoteExec(socket, cmdline, workdir, sync, insize, inbuf, outbuf);
  420. }
  421. catch (IException *e)
  422. {
  423. EXCLOG(e,"remoteExec");
  424. e->Release();
  425. }
  426. return -2;
  427. }
  428. extern REMOTE_API int setDafileSvrTraceFlags(const SocketEndpoint &_ep,byte flags)
  429. {
  430. SocketEndpoint ep(_ep);
  431. setDafsEndpointPort(ep);
  432. if (ep.isNull())
  433. return -3;
  434. try {
  435. Owned<ISocket> socket = ISocket::connect_wait(ep,5000);
  436. return setDafsTrace(socket, flags);
  437. }
  438. catch (IException *e)
  439. {
  440. EXCLOG(e,"setDafileSvrTraceFlags");
  441. e->Release();
  442. }
  443. return -2;
  444. }
  445. extern REMOTE_API int getDafileSvrInfo(const SocketEndpoint &_ep,StringBuffer &retstr)
  446. {
  447. SocketEndpoint ep(_ep);
  448. setDafsEndpointPort(ep);
  449. if (ep.isNull())
  450. return false;
  451. try {
  452. Owned<ISocket> socket = ISocket::connect_wait(ep,5000);
  453. return getDafsInfo(socket, retstr);
  454. }
  455. catch (IException *e)
  456. {
  457. EXCLOG(e,"getDafileSvrInfo");
  458. e->Release();
  459. }
  460. return -2;
  461. }
  462. void remoteExtractBlobElements(const char * prefix, const RemoteFilename &file, ExtractedBlobArray & extracted)
  463. {
  464. SocketEndpoint ep(file.queryEndpoint());
  465. setDafsEndpointPort(ep);
  466. if (ep.isNull())
  467. return;
  468. StringBuffer filename;
  469. remoteExtractBlobElements(ep, prefix, file.getLocalPath(filename).str(), extracted);
  470. }
  471. IFile *createDaliServixFile(const RemoteFilename & file)
  472. {
  473. SocketEndpoint ep(file.queryEndpoint());
  474. setDafsEndpointPort(ep);
  475. if (ep.isNull())
  476. return NULL;
  477. StringBuffer path;
  478. file.getLocalPath(path);
  479. return createRemoteFile(ep, path.str());
  480. }
  481. void setDaliServixSocketCaching(bool set)
  482. {
  483. clientSetDaliServixSocketCaching(set);
  484. }
  485. void cacheFileConnect(IFile *file,unsigned timeout)
  486. {
  487. RemoteFilename rfn;
  488. rfn.setRemotePath(file->queryFilename());
  489. if (!rfn.isLocal()&&!rfn.isNull()) {
  490. SocketEndpoint ep = rfn.queryEndpoint();
  491. if (ep.port)
  492. clientCacheFileConnect(ep,timeout);
  493. }
  494. }
  495. void disconnectRemoteFile(IFile *file)
  496. {
  497. clientDisconnectRemoteFile(file);
  498. }
  499. void disconnectRemoteIoOnExit(IFileIO *fileio,bool set)
  500. {
  501. clientDisconnectRemoteIoOnExit(fileio,set);
  502. }
  503. bool resetRemoteFilename(IFile *file, const char *newname)
  504. {
  505. return clientResetFilename(file,newname);
  506. }
  507. void enableAuthentication(bool set)
  508. {
  509. enableDafsAuthentication(set);
  510. }
  511. bool asyncCopyFileSection(const char *uuid, // from genUUID - must be same for subsequent calls
  512. IFile *from, // expected to be remote
  513. RemoteFilename &to,
  514. offset_t toofs, // (offset_t)-1 created file and copies to start
  515. offset_t fromofs,
  516. offset_t size, // (offset_t)-1 for all file
  517. ICopyFileProgress *progress,
  518. unsigned timeout // 0 to start, non-zero to wait
  519. )
  520. {
  521. return clientAsyncCopyFileSection(uuid,from,to,toofs,fromofs,size,progress,timeout);
  522. }
  523. void setRemoteFileTimeouts(unsigned maxconnecttime,unsigned maxreadtime)
  524. {
  525. clientSetRemoteFileTimeouts(maxconnecttime,maxreadtime);
  526. }
  527. class CScriptThread : public Thread
  528. {
  529. StringAttr script;
  530. SocketEndpoint ep;
  531. Semaphore done;
  532. bool ok;
  533. public:
  534. IMPLEMENT_IINTERFACE;
  535. CScriptThread(SocketEndpoint &_ep,const char *_script)
  536. : ep(_ep), script(_script)
  537. {
  538. ok = false;
  539. }
  540. int run()
  541. {
  542. try {
  543. int ret = remoteExec(ep,script.get(),"/c$",true,0,NULL,NULL);
  544. if (ret==0)
  545. ok = true;
  546. }
  547. catch (IException *e) {
  548. EXCLOG(e,"validateNodes CScriptThread");
  549. e->Release();
  550. }
  551. done.signal();
  552. return 0;
  553. }
  554. bool waitok(unsigned timeout)
  555. {
  556. done.wait(timeout);
  557. return ok;
  558. }
  559. };
  560. unsigned validateNodes(const SocketEndpointArray &eps,const char *dataDir, const char *mirrorDir, bool chkver, const char *script, unsigned scripttimeout, SocketEndpointArray &failures, UnsignedArray &failedcodes, StringArray &failedmessages, const char *filename)
  561. {
  562. // used for detecting duff nodes
  563. IPointerArrayOf<ISocket> sockets;
  564. unsigned to=30*1000;
  565. unsigned n=eps.ordinality(); // use approx log scale (timeout is long but only for failure situation)
  566. while (n>1) {
  567. n/=2;
  568. to+=30*1000;
  569. }
  570. multiConnect(eps,sockets,to);
  571. ForEachItemIn(i,eps) {
  572. if (sockets.item(i)==NULL) {
  573. failures.append(eps.item(i));
  574. failedcodes.append(DAFS_VALIDATE_CONNECT_FAIL);
  575. failedmessages.append("Connect failure");
  576. }
  577. }
  578. CriticalSection sect;
  579. class casyncfor: public CAsyncFor
  580. {
  581. const SocketEndpointArray &eps;
  582. const IPointerArrayOf<ISocket> &sockets;
  583. SocketEndpointArray &failures;
  584. StringArray &failedmessages;
  585. UnsignedArray &failedcodes;
  586. CriticalSection &sect;
  587. StringAttr dataDir, mirrorDir;
  588. bool chkv;
  589. const char *filename;
  590. const char *script;
  591. unsigned scripttimeout;
  592. public:
  593. casyncfor(const SocketEndpointArray &_eps,const IPointerArrayOf<ISocket> &_sockets,const char *_dataDir,const char *_mirrorDir,bool _chkv, const char *_script, unsigned _scripttimeout, const char *_filename,SocketEndpointArray &_failures, StringArray &_failedmessages,UnsignedArray &_failedcodes,CriticalSection &_sect)
  594. : eps(_eps), sockets(_sockets), dataDir(_dataDir), mirrorDir(_mirrorDir),
  595. failures(_failures), failedmessages(_failedmessages), failedcodes(_failedcodes), sect(_sect)
  596. {
  597. chkv = _chkv;
  598. filename = _filename;
  599. script = _script;
  600. scripttimeout = (script&&*script)?_scripttimeout:0;
  601. }
  602. void Do(unsigned i)
  603. {
  604. ISocket *sock = sockets.item(i);
  605. if (!sock)
  606. return;
  607. SocketEndpoint ep = eps.item(i);
  608. bool iswin;
  609. unsigned code = 0;
  610. StringBuffer errstr;
  611. StringBuffer ver;
  612. try {
  613. getRemoteVersion(sock,ver);
  614. iswin = (strstr(ver.str(),"Windows")!=NULL);
  615. }
  616. catch (IException *e)
  617. {
  618. code = DAFS_VALIDATE_CONNECT_FAIL;
  619. e->errorMessage(errstr);
  620. e->Release();
  621. }
  622. if (!code&&chkv) {
  623. const char *rv = ver.str();
  624. const char *v = remoteServerVersionString();
  625. while (*v&&(*v!='-')&&(*v==*rv)) {
  626. v++;
  627. rv++;
  628. }
  629. if (*rv!=*v) {
  630. if (*rv) {
  631. while (*rv&&(*rv!='-'))
  632. rv++;
  633. while (*v&&(*v!='-'))
  634. v++;
  635. StringBuffer wanted(v-remoteServerVersionString(),remoteServerVersionString());
  636. ver.setLength(rv-ver.str());
  637. if (strcmp(ver.str(),wanted.str())<0) { // allow >
  638. code = DAFS_VALIDATE_BAD_VERSION;
  639. errstr.appendf("Mismatch dafilesrv version ");
  640. errstr.append(rv-ver.str(),ver.str());
  641. errstr.append(", wanted ");
  642. errstr.append(v-remoteServerVersionString(),remoteServerVersionString());
  643. }
  644. }
  645. else {
  646. code = DAFS_VALIDATE_CONNECT_FAIL;
  647. errstr.appendf("could not contact dafilesrv");
  648. }
  649. }
  650. }
  651. if (!code&&(dataDir.get()||mirrorDir.get())) {
  652. clientAddSocketToCache(ep,sock);
  653. const char *drivePath = NULL;
  654. const char *drivePaths[2];
  655. unsigned drives=0;
  656. if (mirrorDir.get()) drivePaths[drives++] = mirrorDir.get();
  657. if (dataDir.get()) drivePaths[drives++] = dataDir.get();
  658. do
  659. {
  660. StringBuffer path(drivePaths[--drives]);
  661. addPathSepChar(path);
  662. if (filename)
  663. path.append(filename);
  664. else {
  665. path.append("dafs_");
  666. genUUID(path);
  667. path.append(".tmp");
  668. }
  669. RemoteFilename rfn;
  670. rfn.setPath(ep,path);
  671. Owned<IFile> file = createIFile(rfn);
  672. size32_t sz;
  673. StringBuffer ds;
  674. try {
  675. Owned<IFileIO> fileio = file->open(IFOcreate);
  676. CDateTime dt;
  677. dt.setNow();
  678. dt.getString(ds);
  679. sz = ds.length()+1;
  680. assertex(sz<64);
  681. fileio->write(0,sz,ds.str());
  682. }
  683. catch (IException *e) {
  684. if (e->errorCode()==DISK_FULL_EXCEPTION_CODE)
  685. code |= (drivePath==dataDir.get()?DAFS_VALIDATE_DISK_FULL_DATA:DAFS_VALIDATE_DISK_FULL_MIRROR);
  686. else
  687. code |= (drivePath==dataDir.get()?DAFS_VALIDATE_WRITE_FAIL_DATA:DAFS_VALIDATE_WRITE_FAIL_MIRROR);
  688. if (errstr.length())
  689. errstr.append(',');
  690. e->errorMessage(errstr);
  691. e->Release();
  692. continue; // no use trying read
  693. }
  694. try {
  695. Owned<IFileIO> fileio = file->open(IFOread);
  696. char buf[64];
  697. size32_t rd = fileio->read(0,sizeof(buf)-1,buf);
  698. if ((rd!=sz)||(memcmp(buf,ds.str(),sz)!=0)) {
  699. StringBuffer s;
  700. ep.getIpText(s);
  701. throw MakeStringException(-1,"Data discrepancy on disk read of %s of %s",path.str(),s.str());
  702. }
  703. }
  704. catch (IException *e) {
  705. code |= (drivePath==dataDir.get()?DAFS_VALIDATE_READ_FAIL_DATA:DAFS_VALIDATE_READ_FAIL_MIRROR);
  706. if (errstr.length())
  707. errstr.append(',');
  708. e->errorMessage(errstr);
  709. e->Release();
  710. }
  711. if (!filename||!*filename) {
  712. // delete file created
  713. try {
  714. file->remove();
  715. }
  716. catch (IException *e) {
  717. e->Release(); // supress error
  718. }
  719. }
  720. }
  721. while (0 != drives);
  722. }
  723. if (!code&&scripttimeout) { // use a second thread to implement script timeout
  724. Owned<CScriptThread> thread = new CScriptThread(ep,script);
  725. thread->start();
  726. if (!thread->waitok(scripttimeout)) {
  727. code |= DAFS_SCRIPT_FAIL;
  728. if (errstr.length())
  729. errstr.append(',');
  730. errstr.append("FAILED: ").append(script);
  731. }
  732. }
  733. if (code) {
  734. CriticalBlock block(sect);
  735. failures.append(ep);
  736. failedcodes.append(code);
  737. failedmessages.append(errstr.str());
  738. }
  739. }
  740. } afor(eps,sockets,dataDir,mirrorDir,chkver,script,scripttimeout,filename,failures,failedmessages,failedcodes,sect);
  741. afor.For(eps.ordinality(), 10, false, true);
  742. return failures.ordinality();
  743. }
  744. static PointerArrayOf<SharedObject> *hookDlls;
  745. static void installFileHook(const char *hookFileSpec);
  746. extern REMOTE_API void installFileHooks(const char *hookFileSpec)
  747. {
  748. if (!hookDlls)
  749. hookDlls = new PointerArrayOf<SharedObject>;
  750. const char * cursor = hookFileSpec;
  751. for (;*cursor;)
  752. {
  753. StringBuffer file;
  754. while (*cursor && *cursor != ENVSEPCHAR)
  755. file.append(*cursor++);
  756. if(*cursor)
  757. cursor++;
  758. if(!file.length())
  759. continue;
  760. installFileHook(file);
  761. }
  762. }
  763. typedef void *(HookInstallFunction)();
  764. static void installFileHook(const char *hookFile)
  765. {
  766. StringBuffer dirPath, dirTail, absolutePath;
  767. splitFilename(hookFile, &dirPath, &dirPath, &dirTail, &dirTail);
  768. makeAbsolutePath(dirPath.str(), absolutePath);
  769. if (!containsFileWildcard(dirTail))
  770. {
  771. addPathSepChar(absolutePath).append(dirTail);
  772. Owned<IFile> file = createIFile(absolutePath);
  773. if (file->isDirectory() == foundYes)
  774. {
  775. installFileHooks(addPathSepChar(absolutePath).append('*'));
  776. }
  777. else if (file->isFile() == foundYes)
  778. {
  779. HookInstallFunction *hookInstall;
  780. SharedObject *so = new SharedObject(); // MORE - this leaks! Kind-of deliberate right now...
  781. if (so->load(file->queryFilename(), false) &&
  782. (hookInstall = (HookInstallFunction *) GetSharedProcedure(so->getInstanceHandle(), "installFileHook")) != NULL)
  783. {
  784. hookInstall();
  785. hookDlls->append(so);
  786. }
  787. else
  788. {
  789. so->unload();
  790. delete so;
  791. DBGLOG("File hook library %s could not be loaded", hookFile);
  792. }
  793. }
  794. else
  795. {
  796. DBGLOG("File hook library %s not found", hookFile);
  797. }
  798. }
  799. else
  800. {
  801. Owned<IDirectoryIterator> dir = createDirectoryIterator(absolutePath, dirTail);
  802. ForEach(*dir)
  803. {
  804. const char *name = dir->query().queryFilename();
  805. if (name && *name && *name != '.')
  806. installFileHook(name);
  807. }
  808. }
  809. }
  810. // Should be called before closedown, ideally. MODEXIT tries to mop up but may be too late to do so cleanly
  811. extern REMOTE_API void removeFileHooks()
  812. {
  813. if (hookDlls)
  814. {
  815. ForEachItemIn(idx, *hookDlls)
  816. {
  817. SharedObject *so = hookDlls->item(idx);
  818. HookInstallFunction *hookInstall = (HookInstallFunction *) GetSharedProcedure(so->getInstanceHandle(), "removeFileHook");
  819. if (hookInstall)
  820. hookInstall();
  821. delete so;
  822. }
  823. delete hookDlls;
  824. hookDlls = NULL;
  825. }
  826. }
  827. MODULE_INIT(INIT_PRIORITY_REMOTE_RMTFILE)
  828. {
  829. if(!DaliServixIntercept)
  830. {
  831. DaliServixIntercept = new CDaliServixIntercept;
  832. addIFileCreateHook(DaliServixIntercept);
  833. }
  834. return true;
  835. }
  836. MODULE_EXIT()
  837. {
  838. if(DaliServixIntercept)
  839. {
  840. // delete ConnectionTable; // too late to delete (jsocket closed down)
  841. removeIFileCreateHook(DaliServixIntercept);
  842. ::Release(DaliServixIntercept);
  843. DaliServixIntercept = NULL;
  844. }
  845. removeFileHooks();
  846. }
  847. IDaFileSrvHook *queryDaFileSrvHook()
  848. {
  849. return DaliServixIntercept;
  850. }