rmtfile.cpp 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include "portlist.h"
  15. #include "jlib.hpp"
  16. #include "jio.hpp"
  17. #include "jlog.hpp"
  18. #include "jmutex.hpp"
  19. #include "jfile.hpp"
  20. #include "sockfile.hpp"
  21. #include "rmtfile.hpp"
  22. #include "remoteerr.hpp"
  23. //----------------------------------------------------------------------------
  24. //#define TEST_DAFILESRV_FOR_UNIX_PATHS // probably not needed
  25. static class CSecuritySettings
  26. {
  27. unsigned short daliServixPort;
  28. public:
  29. CSecuritySettings()
  30. {
  31. querySecuritySettings(nullptr, &daliServixPort, nullptr, nullptr, nullptr);
  32. }
  33. unsigned short queryDaliServixPort() { return daliServixPort; }
  34. } securitySettings;
  35. unsigned short getDaliServixPort()
  36. {
  37. return securitySettings.queryDaliServixPort();
  38. }
  39. void setCanAccessDirectly(RemoteFilename & file,bool set)
  40. {
  41. if (set)
  42. file.setPort(0);
  43. else if (file.getPort()==0) // foreign daliservix may be passed in
  44. file.setPort(getDaliServixPort());
  45. }
  46. bool canAccessDirectly(const RemoteFilename & file) // not that well named but historical
  47. {
  48. return (file.getPort()==0);
  49. }
  50. void setLocalMountRedirect(const IpAddress &ip,const char *dir,const char *mountdir)
  51. {
  52. setDafsLocalMountRedirect(ip,dir,mountdir);
  53. }
  54. class CDaliServixFilter : public CInterface
  55. {
  56. protected:
  57. StringAttr dir, sourceRangeText;
  58. SocketEndpointArray sourceRangeIps;
  59. bool sourceRangeHasPorts, trace;
  60. bool checkForPorts(SocketEndpointArray &ips)
  61. {
  62. ForEachItemIn(i, ips)
  63. {
  64. if (ips.item(i).port)
  65. return true;
  66. }
  67. return false;
  68. }
  69. public:
  70. CDaliServixFilter(const char *_dir, const char *sourceRange, bool _trace) : dir(_dir), trace(_trace)
  71. {
  72. if (sourceRange)
  73. {
  74. sourceRangeText.set(sourceRange);
  75. sourceRangeIps.fromText(sourceRange, 0);
  76. sourceRangeHasPorts = checkForPorts(sourceRangeIps);
  77. }
  78. else
  79. sourceRangeHasPorts = false;
  80. }
  81. bool queryTrace() const { return trace; }
  82. const char *queryDirectory() const { return dir; }
  83. bool testPath(const char *path) const
  84. {
  85. if (!dir) // if no dir in filter, match any
  86. return true;
  87. else
  88. return startsWith(path, dir.get());
  89. }
  90. bool applyFilter(const SocketEndpoint &ep) const
  91. {
  92. if (sourceRangeText.length())
  93. {
  94. SocketEndpoint _ep = ep;
  95. if (!sourceRangeHasPorts) // if source range doesn't have ports, only check ip
  96. _ep.port = 0;
  97. return NotFound != sourceRangeIps.find(_ep);
  98. }
  99. // NB: If no source range, use target range to decide if filter should apply
  100. return testEp(ep);
  101. }
  102. virtual bool testEp(const SocketEndpoint &ep) const = 0;
  103. virtual StringBuffer &getInfo(StringBuffer &info)
  104. {
  105. if (dir.length())
  106. info.append(", dir=").append(dir.get());
  107. if (sourceRangeText.get())
  108. info.append(", sourcerange=").append(sourceRangeText.get());
  109. info.append(", trace=(").append(trace ? "true" : "false").append(")");
  110. return info;
  111. }
  112. };
  113. class CDaliServixSubnetFilter : public CDaliServixFilter
  114. {
  115. IpSubNet ipSubNet;
  116. public:
  117. CDaliServixSubnetFilter(const char *subnet, const char *mask, const char *dir, const char *sourceRange, bool trace) :
  118. CDaliServixFilter(dir, sourceRange, trace)
  119. {
  120. if (!ipSubNet.set(subnet, mask))
  121. throw MakeStringException(0, "Invalid sub net definition: %s, %s", subnet, mask);
  122. }
  123. virtual bool testEp(const SocketEndpoint &ep) const
  124. {
  125. return ipSubNet.test(ep);
  126. }
  127. virtual StringBuffer &getInfo(StringBuffer &info)
  128. {
  129. info.append("subnet=");
  130. ipSubNet.getNetText(info);
  131. info.append(", mask=");
  132. ipSubNet.getMaskText(info);
  133. CDaliServixFilter::getInfo(info);
  134. return info;
  135. }
  136. };
  137. class CDaliServixRangeFilter : public CDaliServixFilter
  138. {
  139. StringAttr rangeText;
  140. SocketEndpointArray rangeIps;
  141. bool rangeIpsHavePorts;
  142. public:
  143. CDaliServixRangeFilter(const char *_range, const char *dir, const char *sourceRange, bool trace)
  144. : CDaliServixFilter(dir, sourceRange, trace)
  145. {
  146. rangeText.set(_range);
  147. rangeIps.fromText(_range, 0);
  148. rangeIpsHavePorts = checkForPorts(rangeIps);
  149. }
  150. virtual bool testEp(const SocketEndpoint &ep) const
  151. {
  152. SocketEndpoint _ep = ep;
  153. if (!rangeIpsHavePorts) // if range doesn't have ports, only check ip
  154. _ep.port = 0;
  155. return NotFound != rangeIps.find(_ep);
  156. }
  157. virtual StringBuffer &getInfo(StringBuffer &info)
  158. {
  159. info.append("range=").append(rangeText.get());
  160. CDaliServixFilter::getInfo(info);
  161. return info;
  162. }
  163. };
  164. CDaliServixFilter *createDaliServixFilter(IPropertyTree &filterProps)
  165. {
  166. CDaliServixFilter *filter = NULL;
  167. const char *dir = filterProps.queryProp("@directory");
  168. const char *sourceRange = filterProps.queryProp("@sourcerange");
  169. bool trace = filterProps.getPropBool("@trace");
  170. if (filterProps.hasProp("@subnet"))
  171. filter = new CDaliServixSubnetFilter(filterProps.queryProp("@subnet"), filterProps.queryProp("@mask"), dir, sourceRange, trace);
  172. else if (filterProps.hasProp("@range"))
  173. filter = new CDaliServixRangeFilter(filterProps.queryProp("@range"), dir, sourceRange, trace);
  174. else
  175. throw MakeStringException(0, "Unknown DaliServix filter definition");
  176. return filter;
  177. }
  178. class CDaliServixIntercept: public CInterface, implements IDaFileSrvHook
  179. {
  180. CIArrayOf<CDaliServixFilter> filters;
  181. void addFilter(CDaliServixFilter *filter)
  182. {
  183. filters.append(*filter);
  184. StringBuffer msg("DaFileSrvHook: adding translateToLocal [");
  185. filter->getInfo(msg);
  186. msg.append("]");
  187. PROGLOG("%s", msg.str());
  188. }
  189. public:
  190. IMPLEMENT_IINTERFACE;
  191. virtual IFile * createIFile(const RemoteFilename & filename)
  192. {
  193. SocketEndpoint ep = filename.queryEndpoint();
  194. bool noport = (ep.port==0);
  195. setDafsEndpointPort(ep);
  196. if (!filename.isLocal()||(ep.port!=DAFILESRV_PORT && ep.port!=SECURE_DAFILESRV_PORT)) // assume standard port is running on local machine
  197. {
  198. #ifdef __linux__
  199. #ifndef USE_SAMBA
  200. if (noport && filters.ordinality())
  201. {
  202. ForEachItemIn(sn, filters)
  203. {
  204. CDaliServixFilter &filter = filters.item(sn);
  205. if (filter.testEp(ep))
  206. {
  207. StringBuffer lPath;
  208. filename.getLocalPath(lPath);
  209. if (filter.testPath(lPath.str()))
  210. {
  211. if (filter.queryTrace())
  212. {
  213. StringBuffer fromPath;
  214. filename.getRemotePath(fromPath);
  215. PROGLOG("Redirecting path: '%s' to '%s", fromPath.str(), lPath.str());
  216. }
  217. return ::createIFile(lPath.str());
  218. }
  219. }
  220. }
  221. }
  222. return createDaliServixFile(filename);
  223. #endif
  224. #endif
  225. if (!noport) // expect all filenames that specify port to be dafilesrc or daliservix
  226. return createDaliServixFile(filename);
  227. if (filename.isUnixPath()
  228. #ifdef TEST_DAFILESRV_FOR_UNIX_PATHS
  229. &&testDaliServixPresent(ep)
  230. #endif
  231. )
  232. return createDaliServixFile(filename);
  233. }
  234. return NULL;
  235. }
  236. virtual void addSubnetFilter(const char *subnet, const char *mask, const char *dir, const char *sourceRange, bool trace)
  237. {
  238. Owned<CDaliServixFilter> filter = new CDaliServixSubnetFilter(subnet, mask, dir, sourceRange, trace);
  239. addFilter(filter.getClear());
  240. }
  241. virtual void addRangeFilter(const char *range, const char *dir, const char *sourceRange, bool trace)
  242. {
  243. Owned<CDaliServixFilter> filter = new CDaliServixRangeFilter(range, dir, sourceRange, trace);
  244. addFilter(filter.getClear());
  245. }
  246. virtual IPropertyTree *addFilters(IPropertyTree *config, const SocketEndpoint *myEp)
  247. {
  248. if (!config)
  249. return NULL;
  250. Owned<IPropertyTree> result;
  251. Owned<IPropertyTreeIterator> iter = config->getElements("Filter");
  252. ForEach(*iter)
  253. {
  254. Owned<CDaliServixFilter> filter = createDaliServixFilter(iter->query());
  255. // Only add filters where myIP matches filter criteria
  256. if (!myEp || filter->applyFilter(*myEp))
  257. {
  258. addFilter(filter.getClear());
  259. if (!result)
  260. result.setown(createPTree());
  261. result->addPropTree("Filter", LINK(&iter->query()));
  262. }
  263. }
  264. return result.getClear();
  265. }
  266. virtual IPropertyTree *addMyFilters(IPropertyTree *config, SocketEndpoint *myEp)
  267. {
  268. if (myEp)
  269. return addFilters(config, myEp);
  270. else
  271. {
  272. SocketEndpoint ep;
  273. GetHostIp(ep);
  274. return addFilters(config, &ep);
  275. }
  276. }
  277. virtual void clearFilters()
  278. {
  279. filters.kill();
  280. }
  281. } *DaliServixIntercept = NULL;
  282. unsigned short getActiveDaliServixPort(const IpAddress &ip)
  283. {
  284. if (ip.isNull())
  285. return 0;
  286. SocketEndpoint ep(0, ip);
  287. setDafsEndpointPort(ep);
  288. try {
  289. Owned<ISocket> socket = connectDafs(ep, 10000);
  290. return ep.port;
  291. }
  292. catch (IException *e)
  293. {
  294. e->Release();
  295. }
  296. return 0;
  297. }
  298. bool testDaliServixPresent(const IpAddress &ip)
  299. {
  300. return getActiveDaliServixPort(ip) != 0;
  301. }
  302. unsigned getDaliServixVersion(const SocketEndpoint &_ep,StringBuffer &ver)
  303. {
  304. SocketEndpoint ep(_ep);
  305. setDafsEndpointPort(ep);
  306. if (ep.isNull())
  307. return 0;
  308. try
  309. {
  310. Owned<ISocket> socket = connectDafs(ep, 10000);
  311. return getRemoteVersion(socket,ver);
  312. }
  313. catch (IException *e)
  314. {
  315. EXCLOG(e,"getDaliServixVersion");
  316. e->Release();
  317. }
  318. return 0;
  319. }
  320. struct CDafsOsCacheEntry
  321. {
  322. SocketEndpoint ep;
  323. DAFS_OS os;
  324. time_t at;
  325. };
  326. class CDafsOsCache: public SuperHashTableOf<CDafsOsCacheEntry,SocketEndpoint>
  327. {
  328. void onAdd(void *) {}
  329. void onRemove(void *et)
  330. {
  331. CDafsOsCacheEntry *e = (CDafsOsCacheEntry *)et;
  332. delete e;
  333. }
  334. unsigned getHashFromElement(const void *e) const
  335. {
  336. const CDafsOsCacheEntry &elem=*(const CDafsOsCacheEntry *)e;
  337. return elem.ep.hash(0);
  338. }
  339. unsigned getHashFromFindParam(const void *fp) const
  340. {
  341. return ((const SocketEndpoint *)fp)->hash(0);
  342. }
  343. const void * getFindParam(const void *p) const
  344. {
  345. const CDafsOsCacheEntry &elem=*(const CDafsOsCacheEntry *)p;
  346. return (void *)&elem.ep;
  347. }
  348. bool matchesFindParam(const void * et, const void *fp, unsigned) const
  349. {
  350. return ((CDafsOsCacheEntry *)et)->ep.equals(*(SocketEndpoint *)fp);
  351. }
  352. IMPLEMENT_SUPERHASHTABLEOF_REF_FIND(CDafsOsCacheEntry,SocketEndpoint);
  353. public:
  354. static CriticalSection crit;
  355. CDafsOsCache()
  356. {
  357. }
  358. ~CDafsOsCache()
  359. {
  360. SuperHashTableOf<CDafsOsCacheEntry,SocketEndpoint>::_releaseAll();
  361. }
  362. DAFS_OS lookup(const SocketEndpoint &ep,ISocket *sock)
  363. {
  364. CriticalBlock block(crit);
  365. CDafsOsCacheEntry *r = SuperHashTableOf<CDafsOsCacheEntry,SocketEndpoint>::find(&ep);
  366. bool needupdate=false;
  367. unsigned t = (unsigned)time(NULL);
  368. if (!r) {
  369. r = new CDafsOsCacheEntry;
  370. r->ep = ep;
  371. needupdate = true;
  372. SuperHashTableOf<CDafsOsCacheEntry,SocketEndpoint>::add(*r);
  373. }
  374. else
  375. needupdate = (t-r->at>60*5); // update every 5 mins
  376. if (needupdate) {
  377. r->os = DAFS_OSunknown;
  378. StringBuffer ver;
  379. unsigned ret;
  380. if (sock)
  381. ret = getRemoteVersion(sock,ver);
  382. else
  383. ret = getDaliServixVersion(ep,ver);
  384. if (ret!=0) { // if cross-os needs dafilesrv
  385. if (strstr(ver.str(),"Linux")!=NULL)
  386. r->os = DAFS_OSlinux;
  387. else if (strstr(ver.str(),"Windows")!=NULL)
  388. r->os = DAFS_OSwindows;
  389. else if (strstr(ver.str(),"Solaris")!=NULL)
  390. r->os = DAFS_OSsolaris;
  391. }
  392. r->at = t;
  393. }
  394. return r->os;
  395. }
  396. };
  397. CriticalSection CDafsOsCache::crit;
  398. DAFS_OS getDaliServixOs(const SocketEndpoint &ep,ISocket *socket)
  399. {
  400. #ifdef _DEBUG
  401. if (ep.isLocal())
  402. #ifdef _WIN32
  403. return DAFS_OSwindows;
  404. #else
  405. return DAFS_OSlinux;
  406. #endif
  407. #endif
  408. static CDafsOsCache cache;
  409. return cache.lookup(ep,socket);
  410. }
  411. DAFS_OS getDaliServixOs(const SocketEndpoint &ep)
  412. {
  413. return getDaliServixOs(ep,NULL);
  414. }
  415. unsigned getDaliServixVersion(const IpAddress &ip,StringBuffer &ver)
  416. {
  417. SocketEndpoint ep(0,ip);
  418. return getDaliServixVersion(ep,ver);
  419. }
  420. extern REMOTE_API int setDafileSvrTraceFlags(const SocketEndpoint &_ep,byte flags)
  421. {
  422. SocketEndpoint ep(_ep);
  423. setDafsEndpointPort(ep);
  424. if (ep.isNull())
  425. return -3;
  426. try {
  427. Owned<ISocket> socket = connectDafs(ep, 5000);
  428. return setDafsTrace(socket, flags);
  429. }
  430. catch (IException *e)
  431. {
  432. EXCLOG(e,"setDafileSvrTraceFlags");
  433. e->Release();
  434. }
  435. return -2;
  436. }
  437. extern REMOTE_API int setDafileSvrThrottleLimit(const SocketEndpoint &_ep, ThrottleClass throttleClass, unsigned throttleLimit, unsigned throttleDelayMs, unsigned throttleCPULimit, unsigned queueLimit, StringBuffer *errMsg)
  438. {
  439. SocketEndpoint ep(_ep);
  440. setDafsEndpointPort(ep);
  441. if (ep.isNull())
  442. return -3;
  443. try {
  444. Owned<ISocket> socket = connectDafs(ep, 5000);
  445. return setDafsThrottleLimit(socket, throttleClass, throttleLimit, throttleDelayMs, throttleCPULimit, queueLimit, errMsg);
  446. }
  447. catch (IException *e)
  448. {
  449. EXCLOG(e,"setDafileSvrThrottleLimit");
  450. e->Release();
  451. }
  452. return -2;
  453. }
  454. extern REMOTE_API int getDafileSvrInfo(const SocketEndpoint &_ep, unsigned level, StringBuffer &retstr)
  455. {
  456. SocketEndpoint ep(_ep);
  457. setDafsEndpointPort(ep);
  458. if (ep.isNull())
  459. return false;
  460. try {
  461. Owned<ISocket> socket = connectDafs(ep, 5000);
  462. return getDafsInfo(socket, level, retstr);
  463. }
  464. catch (IException *e)
  465. {
  466. EXCLOG(e,"getDafileSvrInfo");
  467. e->Release();
  468. }
  469. return -2;
  470. }
  471. void remoteExtractBlobElements(const char * prefix, const RemoteFilename &file, ExtractedBlobArray & extracted)
  472. {
  473. SocketEndpoint ep(file.queryEndpoint());
  474. setDafsEndpointPort(ep);
  475. if (ep.isNull())
  476. return;
  477. StringBuffer filename;
  478. remoteExtractBlobElements(ep, prefix, file.getLocalPath(filename).str(), extracted);
  479. }
  480. IFile *createDaliServixFile(const RemoteFilename & file)
  481. {
  482. SocketEndpoint ep(file.queryEndpoint());
  483. setDafsEndpointPort(ep);
  484. if (ep.isNull())
  485. return NULL;
  486. StringBuffer path;
  487. file.getLocalPath(path);
  488. return createRemoteFile(ep, path.str());
  489. }
  490. void setDaliServixSocketCaching(bool set)
  491. {
  492. clientSetDaliServixSocketCaching(set);
  493. }
  494. void disconnectRemoteFile(IFile *file)
  495. {
  496. clientDisconnectRemoteFile(file);
  497. }
  498. void disconnectRemoteIoOnExit(IFileIO *fileio,bool set)
  499. {
  500. clientDisconnectRemoteIoOnExit(fileio,set);
  501. }
  502. bool resetRemoteFilename(IFile *file, const char *newname)
  503. {
  504. return clientResetFilename(file,newname);
  505. }
  506. void enableAuthentication(bool set)
  507. {
  508. enableDafsAuthentication(set);
  509. }
  510. bool asyncCopyFileSection(const char *uuid, // from genUUID - must be same for subsequent calls
  511. IFile *from, // expected to be remote
  512. RemoteFilename &to,
  513. offset_t toofs, // (offset_t)-1 created file and copies to start
  514. offset_t fromofs,
  515. offset_t size, // (offset_t)-1 for all file
  516. ICopyFileProgress *progress,
  517. unsigned timeout // 0 to start, non-zero to wait
  518. )
  519. {
  520. return clientAsyncCopyFileSection(uuid,from,to,toofs,fromofs,size,progress,timeout);
  521. }
  522. void setRemoteFileTimeouts(unsigned maxconnecttime,unsigned maxreadtime)
  523. {
  524. clientSetRemoteFileTimeouts(maxconnecttime,maxreadtime);
  525. }
  526. unsigned validateNodes(const SocketEndpointArray &epso,const char *dataDir, const char *mirrorDir, bool chkver, SocketEndpointArray &failures, UnsignedArray &failedcodes, StringArray &failedmessages, const char *filename)
  527. {
  528. // used for detecting duff nodes
  529. IPointerArrayOf<ISocket> sockets;
  530. // dedup nodes
  531. SocketEndpointArray eps;
  532. ForEachItemIn(i1,epso)
  533. eps.appendUniq(epso.element(i1));
  534. unsigned to=30*1000;
  535. unsigned n=eps.ordinality(); // use approx log scale (timeout is long but only for failure situation)
  536. while (n>1) {
  537. n/=2;
  538. to+=30*1000;
  539. }
  540. multiConnect(eps,sockets,to);
  541. ForEachItemIn(i,eps) {
  542. if (sockets.item(i)==NULL) {
  543. failures.append(eps.item(i));
  544. failedcodes.append(DAFS_VALIDATE_CONNECT_FAIL);
  545. failedmessages.append("Connect failure");
  546. }
  547. }
  548. CriticalSection sect;
  549. class casyncfor: public CAsyncFor
  550. {
  551. const SocketEndpointArray &eps;
  552. const IPointerArrayOf<ISocket> &sockets;
  553. SocketEndpointArray &failures;
  554. StringArray &failedmessages;
  555. UnsignedArray &failedcodes;
  556. CriticalSection &sect;
  557. StringAttr dataDir, mirrorDir;
  558. bool chkv;
  559. const char *filename;
  560. public:
  561. casyncfor(const SocketEndpointArray &_eps,const IPointerArrayOf<ISocket> &_sockets,const char *_dataDir,const char *_mirrorDir,bool _chkv, const char *_filename,SocketEndpointArray &_failures, StringArray &_failedmessages,UnsignedArray &_failedcodes,CriticalSection &_sect)
  562. : eps(_eps), sockets(_sockets), failures(_failures),
  563. failedmessages(_failedmessages), failedcodes(_failedcodes), sect(_sect),
  564. dataDir(_dataDir), mirrorDir(_mirrorDir)
  565. {
  566. chkv = _chkv;
  567. filename = _filename;
  568. }
  569. void Do(unsigned i)
  570. {
  571. ISocket *sock = sockets.item(i);
  572. if (!sock)
  573. return;
  574. SocketEndpoint ep = eps.item(i);
  575. bool iswin;
  576. unsigned code = 0;
  577. StringBuffer errstr;
  578. StringBuffer ver;
  579. try {
  580. getRemoteVersion(sock,ver);
  581. iswin = (strstr(ver.str(),"Windows")!=NULL);
  582. }
  583. catch (IException *e)
  584. {
  585. code = DAFS_VALIDATE_CONNECT_FAIL;
  586. e->errorMessage(errstr);
  587. e->Release();
  588. }
  589. if (!code&&chkv) {
  590. const char *rv = ver.str();
  591. const char *v = remoteServerVersionString();
  592. while (*v&&(*v!='-')&&(*v==*rv)) {
  593. v++;
  594. rv++;
  595. }
  596. if (*rv!=*v) {
  597. if (*rv) {
  598. while (*rv&&(*rv!='-'))
  599. rv++;
  600. while (*v&&(*v!='-'))
  601. v++;
  602. StringBuffer wanted(v-remoteServerVersionString(),remoteServerVersionString());
  603. ver.setLength(rv-ver.str());
  604. if (strcmp(ver.str(),wanted.str())<0) { // allow >
  605. code = DAFS_VALIDATE_BAD_VERSION;
  606. errstr.appendf("Mismatch dafilesrv version ");
  607. errstr.append(rv-ver.str(),ver.str());
  608. errstr.append(", wanted ");
  609. errstr.append(v-remoteServerVersionString(),remoteServerVersionString());
  610. }
  611. }
  612. else {
  613. code = DAFS_VALIDATE_CONNECT_FAIL;
  614. errstr.appendf("could not contact dafilesrv");
  615. }
  616. }
  617. }
  618. if (!code&&(dataDir.get()||mirrorDir.get())) {
  619. clientAddSocketToCache(ep,sock);
  620. const char *drivePath = NULL;
  621. const char *drivePaths[2];
  622. unsigned drives=0;
  623. if (mirrorDir.get()) drivePaths[drives++] = mirrorDir.get();
  624. if (dataDir.get()) drivePaths[drives++] = dataDir.get();
  625. do
  626. {
  627. StringBuffer path(drivePaths[--drives]);
  628. addPathSepChar(path);
  629. if (filename)
  630. path.append(filename);
  631. else {
  632. path.append("dafs_");
  633. genUUID(path);
  634. path.append(".tmp");
  635. }
  636. RemoteFilename rfn;
  637. rfn.setPath(ep,path);
  638. Owned<IFile> file = createIFile(rfn);
  639. size32_t sz;
  640. StringBuffer ds;
  641. try {
  642. Owned<IFileIO> fileio = file->open(IFOcreate);
  643. CDateTime dt;
  644. dt.setNow();
  645. dt.getString(ds);
  646. sz = ds.length()+1;
  647. assertex(sz<64);
  648. fileio->write(0,sz,ds.str());
  649. }
  650. catch (IException *e) {
  651. if (e->errorCode()==DISK_FULL_EXCEPTION_CODE)
  652. code |= (drivePath==dataDir.get()?DAFS_VALIDATE_DISK_FULL_DATA:DAFS_VALIDATE_DISK_FULL_MIRROR);
  653. else
  654. code |= (drivePath==dataDir.get()?DAFS_VALIDATE_WRITE_FAIL_DATA:DAFS_VALIDATE_WRITE_FAIL_MIRROR);
  655. if (errstr.length())
  656. errstr.append(',');
  657. e->errorMessage(errstr);
  658. e->Release();
  659. continue; // no use trying read
  660. }
  661. try {
  662. Owned<IFileIO> fileio = file->open(IFOread);
  663. char buf[64];
  664. size32_t rd = fileio->read(0,sizeof(buf)-1,buf);
  665. if ((rd!=sz)||(memcmp(buf,ds.str(),sz)!=0)) {
  666. StringBuffer s;
  667. ep.getIpText(s);
  668. throw MakeStringException(-1,"Data discrepancy on disk read of %s of %s",path.str(),s.str());
  669. }
  670. }
  671. catch (IException *e) {
  672. code |= (drivePath==dataDir.get()?DAFS_VALIDATE_READ_FAIL_DATA:DAFS_VALIDATE_READ_FAIL_MIRROR);
  673. if (errstr.length())
  674. errstr.append(',');
  675. e->errorMessage(errstr);
  676. e->Release();
  677. }
  678. if (!filename||!*filename) {
  679. // delete file created
  680. try {
  681. file->remove();
  682. }
  683. catch (IException *e) {
  684. e->Release(); // supress error
  685. }
  686. }
  687. }
  688. while (0 != drives);
  689. }
  690. if (code) {
  691. CriticalBlock block(sect);
  692. failures.append(ep);
  693. failedcodes.append(code);
  694. failedmessages.append(errstr.str());
  695. }
  696. }
  697. } afor(eps,sockets,dataDir,mirrorDir,chkver,filename,failures,failedmessages,failedcodes,sect);
  698. afor.For(eps.ordinality(), 10, false, true);
  699. return failures.ordinality();
  700. }
  701. static PointerArrayOf<SharedObject> *hookDlls;
  702. static void installFileHook(const char *hookFileSpec);
  703. extern REMOTE_API void installFileHooks(const char *hookFileSpec)
  704. {
  705. if (!hookDlls)
  706. hookDlls = new PointerArrayOf<SharedObject>;
  707. const char * cursor = hookFileSpec;
  708. for (;*cursor;)
  709. {
  710. StringBuffer file;
  711. while (*cursor && *cursor != ENVSEPCHAR)
  712. file.append(*cursor++);
  713. if(*cursor)
  714. cursor++;
  715. if(!file.length())
  716. continue;
  717. installFileHook(file);
  718. }
  719. }
  720. typedef void *(HookInstallFunction)();
  721. static void installFileHook(const char *hookFile)
  722. {
  723. StringBuffer dirPath, dirTail, absolutePath;
  724. splitFilename(hookFile, &dirPath, &dirPath, &dirTail, &dirTail);
  725. makeAbsolutePath(dirPath.str(), absolutePath);
  726. if (!containsFileWildcard(dirTail))
  727. {
  728. addPathSepChar(absolutePath).append(dirTail);
  729. Owned<IFile> file = createIFile(absolutePath);
  730. if (file->isDirectory() == foundYes)
  731. {
  732. installFileHooks(addPathSepChar(absolutePath).append('*'));
  733. }
  734. else if (file->isFile() == foundYes)
  735. {
  736. HookInstallFunction *hookInstall;
  737. SharedObject *so = new SharedObject(); // MORE - this leaks! Kind-of deliberate right now...
  738. if (so->load(file->queryFilename(), false) &&
  739. (hookInstall = (HookInstallFunction *) GetSharedProcedure(so->getInstanceHandle(), "installFileHook")) != NULL)
  740. {
  741. hookInstall();
  742. hookDlls->append(so);
  743. }
  744. else
  745. {
  746. so->unload();
  747. delete so;
  748. DBGLOG("File hook library %s could not be loaded", hookFile);
  749. }
  750. }
  751. else
  752. {
  753. DBGLOG("File hook library %s not found", hookFile);
  754. }
  755. }
  756. else
  757. {
  758. Owned<IDirectoryIterator> dir = createDirectoryIterator(absolutePath, dirTail);
  759. ForEach(*dir)
  760. {
  761. const char *name = dir->query().queryFilename();
  762. if (name && *name && *name != '.')
  763. installFileHook(name);
  764. }
  765. }
  766. }
  767. // Should be called before closedown, ideally. MODEXIT tries to mop up but may be too late to do so cleanly
  768. extern REMOTE_API void removeFileHooks()
  769. {
  770. if (hookDlls)
  771. {
  772. ForEachItemIn(idx, *hookDlls)
  773. {
  774. SharedObject *so = hookDlls->item(idx);
  775. HookInstallFunction *hookInstall = (HookInstallFunction *) GetSharedProcedure(so->getInstanceHandle(), "removeFileHook");
  776. if (hookInstall)
  777. hookInstall();
  778. delete so;
  779. }
  780. delete hookDlls;
  781. hookDlls = NULL;
  782. }
  783. }
  784. MODULE_INIT(INIT_PRIORITY_REMOTE_RMTFILE)
  785. {
  786. if(!DaliServixIntercept)
  787. {
  788. DaliServixIntercept = new CDaliServixIntercept;
  789. addIFileCreateHook(DaliServixIntercept);
  790. }
  791. return true;
  792. }
  793. MODULE_EXIT()
  794. {
  795. if(DaliServixIntercept)
  796. {
  797. // delete ConnectionTable; // too late to delete (jsocket closed down)
  798. removeIFileCreateHook(DaliServixIntercept);
  799. ::Release(DaliServixIntercept);
  800. DaliServixIntercept = NULL;
  801. }
  802. removeFileHooks();
  803. }
  804. IDaFileSrvHook *queryDaFileSrvHook()
  805. {
  806. return DaliServixIntercept;
  807. }