ccddali.cpp 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "jlib.hpp"
  14. #include "workunit.hpp"
  15. #include "dalienv.hpp"
  16. #include "daclient.hpp"
  17. #include "dautils.hpp"
  18. #include "dllserver.hpp"
  19. #include "ccddali.hpp"
  20. #include "ccdfile.hpp"
  21. #include "ccdlistener.hpp"
  22. #include "ccd.hpp"
  23. #include "jencrypt.hpp"
  24. #include "jisem.hpp"
  25. #include "dllserver.hpp"
  26. #include "thorplugin.hpp"
  27. #include "workflow.hpp"
  28. #include "mpcomm.hpp"
  29. const char *roxieStateName = "RoxieLocalState.xml";
  30. class CDaliPackageWatcher : public CInterface, implements ISDSSubscription, implements ISDSNodeSubscription, implements IDaliPackageWatcher
  31. {
  32. SubscriptionId change;
  33. ISDSSubscription *notifier;
  34. StringAttr id;
  35. StringAttr xpath;
  36. mutable CriticalSection crit;
  37. bool isExact;
  38. public:
  39. IMPLEMENT_IINTERFACE;
  40. CDaliPackageWatcher(const char *_id, const char *_xpath, ISDSSubscription *_notifier)
  41. : id(_id), xpath(_xpath), change(0), isExact(false)
  42. {
  43. notifier = _notifier;
  44. }
  45. ~CDaliPackageWatcher()
  46. {
  47. if (change)
  48. unsubscribe();
  49. }
  50. virtual void subscribe(bool exact)
  51. {
  52. CriticalBlock b(crit);
  53. try
  54. {
  55. if (traceLevel > 5)
  56. DBGLOG("Subscribing to %s, %p", xpath.get(), this);
  57. if (exact && queryDaliServerVersion().compare(SDS_SVER_MIN_NODESUBSCRIBE) >= 0)
  58. {
  59. isExact = true;
  60. change = querySDS().subscribeExact(xpath, *this, true);
  61. }
  62. else
  63. {
  64. isExact = false;
  65. change = querySDS().subscribe(xpath, *this, true);
  66. }
  67. }
  68. catch (IException *E)
  69. {
  70. // failure to subscribe implies dali is down... that's ok, we will resubscribe when we notice it come back up.
  71. E->Release();
  72. }
  73. }
  74. virtual void unsubscribe()
  75. {
  76. CriticalBlock b(crit);
  77. notifier = NULL;
  78. try
  79. {
  80. if (traceLevel > 5)
  81. DBGLOG("unsubscribing from %s, %p", xpath.get(), this);
  82. if (change)
  83. {
  84. if (isExact)
  85. querySDS().unsubscribeExact(change);
  86. else
  87. querySDS().unsubscribe(change);
  88. }
  89. }
  90. catch (IException *E)
  91. {
  92. E->Release();
  93. }
  94. change = 0;
  95. }
  96. virtual const char *queryName() const
  97. {
  98. return id.get();
  99. }
  100. virtual void onReconnect()
  101. {
  102. Linked<CDaliPackageWatcher> me = this; // Ensure that I am not released by the notify call (which would then access freed memory to release the critsec)
  103. // It's tempting to think you can avoid holding the critsec during the notify call, and that you only need to hold it while looking up notifier
  104. // Despite the danger of deadlocks (that requires careful code in the notifier to avoid), I think it is neccessary to hold the lock during the call,
  105. // as otherwise notifier may point to a deleted object.
  106. CriticalBlock b(crit);
  107. if (traceLevel > 5)
  108. DBGLOG("resubscribing to %s, %p", xpath.get(), this);
  109. change = querySDS().subscribe(xpath, *this, true);
  110. if (notifier)
  111. notifier->notify(0, NULL, SDSNotify_None);
  112. }
  113. virtual void notify(SubscriptionId subid, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
  114. {
  115. return notify(subid, NULL, flags, valueLen, valueData);
  116. }
  117. virtual void notify(SubscriptionId subid, const char *daliXpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
  118. {
  119. Linked<CDaliPackageWatcher> me = this; // Ensure that I am not released by the notify call (which would then access freed memory to release the critsec)
  120. Linked<ISDSSubscription> myNotifier;
  121. {
  122. CriticalBlock b(crit);
  123. if (traceLevel > 5)
  124. DBGLOG("Notification on %s (%s), %p", xpath.get(), daliXpath ? daliXpath : "", this);
  125. myNotifier.set(notifier);
  126. // allow crit to be released, allowing this to be unsubscribed, to avoid deadlocking when other threads via notify call unsubscribe
  127. }
  128. if (myNotifier)
  129. myNotifier->notify(subid, daliXpath, flags, valueLen, valueData);
  130. }
  131. };
  132. /*===============================================================================
  133. * Roxie is not a typical Dali client - if dali is available it will use it, but
  134. * it needs to be able to continue to serve any queries that have already been deployed
  135. * should Dali no longer be available, including the ability to restart a roxie while
  136. * the dali that originally provided its data is unavailable.
  137. *
  138. * This single-instance class is designed to help Roxie manage its dali connections.
  139. * it connects to dali on demand, and disconnects when all links to this object are
  140. * released - in practice that won't happen until termination as we keep an active dali
  141. * connection as long as any QuerySets are subscribed. This class also handles the
  142. * local caching of sufficient information retrieved from dali so that Roxie can restart
  143. * even if dali is not available.
  144. *
  145. * If dali is down. Data is delivered from the cache. If dali is up, the cache is populated from
  146. * the information retrieved from dali. We snapshot the cache at the end of each successful reload.
  147. *
  148. *
  149. =================================================================================*/
  150. class CRoxieDaliHelper : public CInterface, implements IRoxieDaliHelper
  151. {
  152. private:
  153. static bool isConnected;
  154. static CRoxieDaliHelper *daliHelper; // Note - this does not own the helper
  155. static CriticalSection daliHelperCrit;
  156. CriticalSection daliConnectionCrit;
  157. Owned<IUserDescriptor> userdesc;
  158. InterruptableSemaphore disconnectSem;
  159. IArrayOf<IDaliPackageWatcher> watchers;
  160. CSDSServerStatus *serverStatus;
  161. class CRoxieDaliConnectWatcher : public Thread
  162. {
  163. private:
  164. CRoxieDaliHelper *owner;
  165. bool aborted;
  166. public:
  167. CRoxieDaliConnectWatcher(CRoxieDaliHelper *_owner) : owner(_owner)
  168. {
  169. aborted = false;
  170. }
  171. virtual int run()
  172. {
  173. while (!aborted)
  174. {
  175. if (topology && topology->getPropBool("@lockDali", false))
  176. {
  177. Sleep(ROXIE_DALI_CONNECT_TIMEOUT);
  178. }
  179. else if (owner->connect(ROXIE_DALI_CONNECT_TIMEOUT))
  180. {
  181. DBGLOG("roxie: CRoxieDaliConnectWatcher reconnected");
  182. try
  183. {
  184. owner->disconnectSem.wait();
  185. Sleep(5000); // Don't retry immediately, give Dali a chance to recover.
  186. }
  187. catch (IException *E)
  188. {
  189. if (!aborted)
  190. EXCLOG(E, "roxie: Unexpected exception in CRoxieDaliConnectWatcher");
  191. E->Release();
  192. }
  193. }
  194. }
  195. return 0;
  196. }
  197. void stop()
  198. {
  199. aborted = true;
  200. }
  201. virtual void start()
  202. {
  203. Thread::start();
  204. }
  205. virtual void join()
  206. {
  207. if (isAlive())
  208. Thread::join();
  209. }
  210. } connectWatcher;
  211. virtual void beforeDispose()
  212. {
  213. CriticalBlock b(daliHelperCrit);
  214. disconnectSem.interrupt();
  215. connectWatcher.stop();
  216. if (daliHelper==this) // there is a tiny window where new dalihelper created immediately after final release
  217. {
  218. disconnect();
  219. daliHelper = NULL;
  220. }
  221. connectWatcher.join();
  222. }
  223. // The cache is static since it outlives the dali connections
  224. static CriticalSection cacheCrit;
  225. static Owned<IPropertyTree> cache;
  226. static void initCache()
  227. {
  228. IPropertyTree *tree = createPTree("Roxie");
  229. tree->addPropTree("QuerySets", createPTree("QuerySets"));
  230. tree->addPropTree("PackageSets", createPTree("PackageSets"));
  231. tree->addPropTree("PackageMaps", createPTree("PackageMaps"));
  232. tree->addPropTree("Files", createPTree("Files"));
  233. cache.setown(tree);
  234. }
  235. static void loadCache()
  236. {
  237. if (!cache)
  238. {
  239. StringBuffer cacheFileName(queryDirectory);
  240. cacheFileName.append(roxieStateName);
  241. if (checkFileExists(cacheFileName))
  242. cache.setown(createPTreeFromXMLFile(cacheFileName));
  243. else
  244. initCache();
  245. }
  246. }
  247. static IPropertyTree *readCache(const char *xpath)
  248. {
  249. CriticalBlock b(cacheCrit);
  250. loadCache();
  251. return cache->getPropTree(xpath);
  252. }
  253. static void writeCache(const char *foundLoc, const char *newLoc, IPropertyTree *val)
  254. {
  255. CriticalBlock b(cacheCrit);
  256. if (!cache)
  257. initCache();
  258. cache->removeProp(foundLoc);
  259. if (val)
  260. cache->addPropTree(newLoc, LINK(val));
  261. }
  262. IPropertyTree *loadDaliTree(const char *path, const char *id)
  263. {
  264. StringBuffer xpath(path);
  265. if (id)
  266. xpath.appendf("[@id='%s']", id);
  267. Owned <IPropertyTree> localTree;
  268. if (isConnected)
  269. {
  270. try
  271. {
  272. Owned<IRemoteConnection> conn = querySDS().connect(xpath, myProcessSession(), 0, 30*1000);
  273. if (conn)
  274. {
  275. Owned <IPropertyTree> daliTree = conn->getRoot();
  276. localTree.setown(createPTreeFromIPT(daliTree));
  277. }
  278. writeCache(xpath, path, localTree);
  279. return localTree.getClear();
  280. }
  281. catch (IDaliClient_Exception *E)
  282. {
  283. if (traceLevel)
  284. EXCLOG(E, "roxie: Dali connection lost");
  285. E->Release();
  286. daliHelper->disconnect();
  287. }
  288. }
  289. DBGLOG("LoadDaliTree(%s) - not connected - read from cache", xpath.str());
  290. localTree.setown(readCache(xpath));
  291. return localTree.getClear();
  292. }
  293. IFileDescriptor *recreateCloneSource(IFileDescriptor *srcfdesc, const char *destfilename)
  294. {
  295. Owned<IFileDescriptor> dstfdesc = createFileDescriptor(srcfdesc->getProperties());
  296. // calculate dest dir
  297. CDfsLogicalFileName dstlfn;
  298. if (!dstlfn.setValidate(destfilename,true))
  299. throw MakeStringException(-1,"Logical name %s invalid",destfilename);
  300. StringBuffer dstpartmask;
  301. getPartMask(dstpartmask,destfilename,srcfdesc->numParts());
  302. dstfdesc->setPartMask(dstpartmask.str());
  303. unsigned np = srcfdesc->numParts();
  304. dstfdesc->setNumParts(srcfdesc->numParts());
  305. dstfdesc->setDefaultDir(srcfdesc->queryProperties().queryProp("@cloneFromDir"));
  306. for (unsigned pn=0;pn<srcfdesc->numParts();pn++) {
  307. offset_t sz = srcfdesc->queryPart(pn)->queryProperties().getPropInt64("@size",-1);
  308. if (sz!=(offset_t)-1)
  309. dstfdesc->queryPart(pn)->queryProperties().setPropInt64("@size",sz);
  310. StringBuffer dates;
  311. if (srcfdesc->queryPart(pn)->queryProperties().getProp("@modified",dates))
  312. dstfdesc->queryPart(pn)->queryProperties().setProp("@modified",dates.str());
  313. }
  314. const char *cloneFrom = srcfdesc->queryProperties().queryProp("@cloneFrom");
  315. Owned<IPropertyTreeIterator> groups = srcfdesc->queryProperties().getElements("cloneFromGroup");
  316. ForEach(*groups)
  317. {
  318. IPropertyTree &elem = groups->query();
  319. const char *groupName = elem.queryProp("@groupName");
  320. StringBuffer dir;
  321. StringBuffer foreignGroup("foreign::");
  322. foreignGroup.append(cloneFrom).append("::").append(groupName);
  323. GroupType groupType;
  324. queryNamedGroupStore().setRemoteTimeout(2000);
  325. Owned<IGroup> group = queryNamedGroupStore().lookup(foreignGroup, dir, groupType);
  326. ClusterPartDiskMapSpec dmSpec;
  327. dmSpec.fromProp(&elem);
  328. if (!dmSpec.defaultBaseDir.length())
  329. {
  330. if (dir.length())
  331. {
  332. dmSpec.setDefaultBaseDir(dir);
  333. }
  334. else
  335. {
  336. // Due to the really weird code in dadfs, this MUST be set to match the leading portion of cloneFromDir
  337. // in order to properly handle remote systems with different default directory locations
  338. StringBuffer tail;
  339. DFD_OS os = (getPathSepChar(srcfdesc->queryDefaultDir())=='\\')?DFD_OSwindows:DFD_OSunix;
  340. makePhysicalPartName(dstlfn.get(),0,0,tail,0,os,PATHSEPSTR); // if lfn is a::b::c, tail will be /a/b/
  341. assertex(tail.length() > 1);
  342. tail.setLength(tail.length()-1); // strip off the trailing /
  343. StringBuffer head(srcfdesc->queryProperties().queryProp("@cloneFromDir")); // Will end with /a/b
  344. assertex(streq(head.str() + head.length() - tail.length(), tail.str()));
  345. head.setLength(head.length() - tail.length()); // So strip off the end...
  346. dmSpec.setDefaultBaseDir(head.str());
  347. }
  348. }
  349. dstfdesc->addCluster(groupName, group, dmSpec);
  350. }
  351. return dstfdesc.getClear();
  352. }
  353. static StringBuffer &normalizeName(const char *name, StringBuffer &ret)
  354. {
  355. // Ensure only chars that are accepted by jptree in an xpath element are used
  356. loop
  357. {
  358. char c = *name++;
  359. if (!c)
  360. break;
  361. switch (c)
  362. {
  363. case '.':
  364. ret.append(".."); // Double . as we use it to escape illegal chars
  365. break;
  366. case ':':
  367. case '_':
  368. case '-':
  369. ret.append(c);
  370. break;
  371. default:
  372. if (isalnum(c))
  373. ret.append(c); // Note - we COULD make the cache case-insensitive and we would be right to 99.9% if the time. But there is a weird syntax using H to force uppercase filenames...
  374. else
  375. ret.append('.').append((unsigned) (unsigned char) c);
  376. break;
  377. }
  378. }
  379. return ret;
  380. }
  381. public:
  382. IMPLEMENT_IINTERFACE;
  383. CRoxieDaliHelper() : connectWatcher(this), serverStatus(NULL)
  384. {
  385. userdesc.setown(createUserDescriptor());
  386. const char *roxieUser = NULL;
  387. const char *roxiePassword = NULL;
  388. if (topology)
  389. {
  390. roxieUser = topology->queryProp("@ldapUser");
  391. roxiePassword = topology->queryProp("@ldapPassword");
  392. }
  393. if (!roxieUser)
  394. roxieUser = "roxie";
  395. if (!roxiePassword)
  396. roxiePassword = "";
  397. StringBuffer password;
  398. decrypt(password, roxiePassword);
  399. userdesc->set(roxieUser, password.str());
  400. if (fileNameServiceDali.length())
  401. connectWatcher.start();
  402. else
  403. initMyNode(1); // Hack
  404. }
  405. static const char *getQuerySetPath(StringBuffer &buf, const char *id)
  406. {
  407. buf.appendf("QuerySets/QuerySet[@id='%s']", id);
  408. return buf.str();
  409. }
  410. virtual IPropertyTree *getQuerySet(const char *id)
  411. {
  412. Owned<IPropertyTree> ret = loadDaliTree("QuerySets/QuerySet", id);
  413. if (!ret)
  414. {
  415. ret.setown(createPTree("QuerySet"));
  416. ret->setProp("@id", id);
  417. }
  418. return ret.getClear();
  419. }
  420. virtual IPropertyTree *getPackageSets()
  421. {
  422. Owned<IPropertyTree> ret = loadDaliTree("PackageSets", NULL);
  423. if (!ret)
  424. {
  425. ret.setown(createPTree("PackageSets"));
  426. }
  427. return ret.getClear();
  428. }
  429. static const char *getSuperFilePath(StringBuffer &buf, const char *lfn)
  430. {
  431. CDfsLogicalFileName lfnParser;
  432. lfnParser.set(lfn);
  433. if (!lfnParser.isForeign())
  434. {
  435. lfnParser.makeFullnameQuery(buf, DXB_SuperFile, true);
  436. return buf.str();
  437. }
  438. else
  439. return NULL;
  440. }
  441. virtual IPropertyTree *getPackageMap(const char *id)
  442. {
  443. Owned<IPropertyTree> ret = loadDaliTree("PackageMaps/PackageMap", id);
  444. if (!ret)
  445. {
  446. ret.setown(createPTree("PackageMap"));
  447. ret->setProp("@id", id);
  448. }
  449. return ret.getClear();
  450. }
  451. IFileDescriptor *checkClonedFromRemote(const char *_lfn, IFileDescriptor *fdesc, bool cacheIt)
  452. {
  453. // NOTE - we rely on the fact that queryNamedGroupStore().lookup caches results,to avoid excessive load on remote dali
  454. if (_lfn && !strnicmp(_lfn, "foreign", 7)) //if need to support dali hopping should add each remote location
  455. return NULL;
  456. if (!fdesc)
  457. return NULL;
  458. const char *cloneFrom = fdesc->queryProperties().queryProp("@cloneFrom");
  459. if (!cloneFrom)
  460. return NULL;
  461. StringBuffer foreignLfn("foreign::");
  462. foreignLfn.append(cloneFrom);
  463. if (!connected())
  464. return resolveCachedLFN(foreignLfn); // Note - cache only used when no dali connection available
  465. try
  466. {
  467. if (fdesc->queryProperties().hasProp("cloneFromGroup") && fdesc->queryProperties().hasProp("@cloneFromDir"))
  468. {
  469. Owned<IFileDescriptor> ret = recreateCloneSource(fdesc, _lfn);
  470. if (cacheIt)
  471. cacheFileDescriptor(foreignLfn, ret);
  472. return ret.getClear();
  473. }
  474. else // Legacy mode - recently cloned files should have the extra info
  475. {
  476. if (traceLevel > 1)
  477. DBGLOG("checkClonedFromRemote: Resolving %s in legacy mode", _lfn);
  478. Owned<IDistributedFile> cloneFile = resolveLFN(foreignLfn, cacheIt, false);
  479. if (cloneFile)
  480. {
  481. Owned<IFileDescriptor> cloneFDesc = cloneFile->getFileDescriptor();
  482. if (cloneFDesc->numParts()==fdesc->numParts())
  483. return cloneFDesc.getClear();
  484. DBGLOG(ROXIE_MISMATCH, "File %s cloneFrom(%s) mismatch", _lfn, cloneFrom);
  485. }
  486. }
  487. }
  488. catch (IException *E)
  489. {
  490. if (traceLevel > 3)
  491. EXCLOG(E);
  492. E->Release(); // Any failure means act as if no remote info
  493. }
  494. return NULL;
  495. }
  496. virtual IDistributedFile *resolveLFN(const char *logicalName, bool cacheIt, bool writeAccess)
  497. {
  498. if (isConnected)
  499. {
  500. unsigned start = msTick();
  501. CDfsLogicalFileName lfn;
  502. lfn.set(logicalName);
  503. Owned<IDistributedFile> dfsFile = queryDistributedFileDirectory().lookup(lfn, userdesc.get(), writeAccess, cacheIt);
  504. if (dfsFile)
  505. {
  506. IDistributedSuperFile *super = dfsFile->querySuperFile();
  507. if (super && (0 == super->numSubFiles(true)))
  508. dfsFile.clear();
  509. }
  510. if (cacheIt)
  511. cacheDistributedFile(logicalName, dfsFile);
  512. if (traceLevel > 1)
  513. DBGLOG("Dali lookup %s returned %s in %u ms", logicalName, dfsFile != NULL ? "match" : "NO match", msTick()-start);
  514. return dfsFile.getClear();
  515. }
  516. else
  517. return NULL;
  518. }
  519. virtual IFileDescriptor *resolveCachedLFN(const char *logicalName)
  520. {
  521. StringBuffer xpath("Files/F.");
  522. normalizeName(logicalName, xpath);
  523. Owned<IPropertyTree> pt = readCache(xpath.str());
  524. if (pt)
  525. {
  526. return deserializeFileDescriptorTree(pt);
  527. }
  528. else
  529. return NULL;
  530. }
  531. virtual void commitCache()
  532. {
  533. if (isConnected)
  534. {
  535. CriticalBlock b(cacheCrit);
  536. if (!recursiveCreateDirectory(queryDirectory))
  537. throw MakeStringException(ROXIE_FILE_ERROR, "Unable to create directory %s", queryDirectory.str());
  538. VStringBuffer newCacheFileName("%s%s.new", queryDirectory.str(), roxieStateName);
  539. VStringBuffer oldCacheFileName("%s%s.bak", queryDirectory.str(), roxieStateName);
  540. VStringBuffer cacheFileName("%s%s", queryDirectory.str(), roxieStateName);
  541. saveXML(newCacheFileName, cache);
  542. Owned<IFile> oldFile = createIFile(oldCacheFileName);
  543. Owned<IFile> newFile = createIFile(newCacheFileName);
  544. Owned<IFile> cacheFile = createIFile(cacheFileName);
  545. if (oldFile->exists())
  546. oldFile->remove();
  547. if (cacheFile->exists())
  548. cacheFile->rename(oldCacheFileName);
  549. newFile->rename(cacheFileName);
  550. }
  551. }
  552. virtual IConstWorkUnit *attachWorkunit(const char *wuid, ILoadedDllEntry *source)
  553. {
  554. assertex(isConnected);
  555. Owned<IWorkUnitFactory> wuFactory = getWorkUnitFactory();
  556. Owned<IWorkUnit> w = wuFactory->updateWorkUnit(wuid);
  557. if (!w)
  558. return NULL;
  559. w->setAgentSession(myProcessSession());
  560. if (source)
  561. {
  562. StringBuffer wuXML;
  563. if (getEmbeddedWorkUnitXML(source, wuXML))
  564. {
  565. Owned<ILocalWorkUnit> localWU = createLocalWorkUnit();
  566. localWU->loadXML(wuXML);
  567. queryExtendedWU(w)->copyWorkUnit(localWU, true);
  568. }
  569. else
  570. throw MakeStringException(ROXIE_DALI_ERROR, "Failed to locate dll workunit info");
  571. }
  572. w->commit();
  573. w.clear();
  574. return wuFactory->openWorkUnit(wuid, false);
  575. }
  576. virtual void noteWorkunitRunning(const char *wuid, bool running)
  577. {
  578. CriticalBlock b(daliConnectionCrit);
  579. if (isConnected)
  580. {
  581. assertex(serverStatus);
  582. if (running)
  583. serverStatus->queryProperties()->addProp("WorkUnit",wuid);
  584. else
  585. {
  586. VStringBuffer xpath("WorkUnit[.='%s']",wuid);
  587. serverStatus->queryProperties()->removeProp(xpath.str());
  588. }
  589. serverStatus->commitProperties();
  590. }
  591. }
  592. virtual void noteQueuesRunning(const char *queueNames)
  593. {
  594. CriticalBlock b(daliConnectionCrit);
  595. if (isConnected)
  596. {
  597. assertex(serverStatus);
  598. serverStatus->queryProperties()->setProp("@queue", queueNames);
  599. serverStatus->commitProperties();
  600. }
  601. }
  602. static IRoxieDaliHelper *connectToDali(unsigned waitToConnect)
  603. {
  604. CriticalBlock b(daliHelperCrit);
  605. LINK(daliHelper);
  606. if (!daliHelper || !daliHelper->isAlive())
  607. daliHelper = new CRoxieDaliHelper();
  608. if (waitToConnect && fileNameServiceDali.length() && (!topology || !topology->getPropBool("@lockDali", false)))
  609. {
  610. while (waitToConnect && !daliHelper->connected())
  611. {
  612. unsigned delay = 1000;
  613. if (delay > waitToConnect)
  614. delay = waitToConnect;
  615. Sleep(delay);
  616. waitToConnect -= delay;
  617. }
  618. }
  619. return daliHelper;
  620. }
  621. static void releaseCache()
  622. {
  623. CriticalBlock b(cacheCrit);
  624. cache.clear();
  625. }
  626. virtual void releaseSubscription(IDaliPackageWatcher *subscription)
  627. {
  628. watchers.zap(*subscription);
  629. subscription->unsubscribe();
  630. }
  631. IDaliPackageWatcher *getSubscription(const char *id, const char *xpath, ISDSSubscription *notifier, bool exact)
  632. {
  633. IDaliPackageWatcher *watcher = new CDaliPackageWatcher(id, xpath, notifier);
  634. watchers.append(*LINK(watcher));
  635. if (isConnected)
  636. watcher->subscribe(exact);
  637. return watcher;
  638. }
  639. virtual IDaliPackageWatcher *getQuerySetSubscription(const char *id, ISDSSubscription *notifier)
  640. {
  641. StringBuffer xpath;
  642. return getSubscription(id, getQuerySetPath(xpath, id), notifier, false);
  643. }
  644. virtual IDaliPackageWatcher *getPackageSetsSubscription(ISDSSubscription *notifier)
  645. {
  646. StringBuffer xpath;
  647. return getSubscription("PackageSets", "PackageSets", notifier, false);
  648. }
  649. virtual IDaliPackageWatcher *getPackageMapsSubscription(ISDSSubscription *notifier)
  650. {
  651. StringBuffer xpath;
  652. return getSubscription("PackageMaps", "PackageMaps", notifier, false);
  653. }
  654. virtual IDaliPackageWatcher *getSuperFileSubscription(const char *lfn, ISDSSubscription *notifier)
  655. {
  656. StringBuffer xpathBuf;
  657. const char *xpath = getSuperFilePath(xpathBuf, lfn);
  658. if (xpath)
  659. return getSubscription(lfn, xpath, notifier, true);
  660. else
  661. return NULL;
  662. }
  663. virtual bool connected() const
  664. {
  665. return isConnected;
  666. }
  667. // connect handles the operations generally performed by Dali clients at startup.
  668. virtual bool connect(unsigned timeout)
  669. {
  670. if (!isConnected)
  671. {
  672. CriticalBlock b(daliConnectionCrit);
  673. if (!isConnected)
  674. {
  675. try
  676. {
  677. // Create server group
  678. Owned<IGroup> serverGroup = createIGroup(fileNameServiceDali, DALI_SERVER_PORT);
  679. if (!serverGroup)
  680. throw MakeStringException(ROXIE_DALI_ERROR, "Could not instantiate dali IGroup");
  681. // Initialize client process
  682. if (!initClientProcess(serverGroup, DCR_RoxyMaster, 0, NULL, NULL, timeout))
  683. throw MakeStringException(ROXIE_DALI_ERROR, "Could not initialize dali client");
  684. setPasswordsFromSDS();
  685. serverStatus = new CSDSServerStatus("RoxieServer");
  686. serverStatus->queryProperties()->setProp("@cluster", roxieName.str());
  687. serverStatus->commitProperties();
  688. isConnected = true; // Make sure this is set before the onReconnect calls, so that they refresh with info from Dali rather than from cache
  689. ForEachItemIn(idx, watchers)
  690. {
  691. watchers.item(idx).onReconnect();
  692. }
  693. }
  694. catch(IException *e)
  695. {
  696. delete serverStatus;
  697. serverStatus = NULL;
  698. ::closedownClientProcess(); // undo any partial initialization
  699. StringBuffer text;
  700. e->errorMessage(text);
  701. DBGLOG(ROXIE_DALI_ERROR, "Error trying to connect to dali %s: %s", fileNameServiceDali.str(), text.str());
  702. e->Release();
  703. }
  704. }
  705. }
  706. return isConnected;
  707. }
  708. virtual void disconnect()
  709. {
  710. if (isConnected)
  711. {
  712. CriticalBlock b(daliConnectionCrit);
  713. if (isConnected)
  714. {
  715. isConnected = false;
  716. delete serverStatus;
  717. serverStatus = NULL;
  718. closeDllServer();
  719. closeEnvironment();
  720. clientShutdownWorkUnit();
  721. disconnectRoxieQueues();
  722. ::closedownClientProcess(); // dali client closedown
  723. disconnectSem.signal();
  724. }
  725. }
  726. }
  727. protected:
  728. void cacheDistributedFile(const char *logicalName, IDistributedFile *dfsFile)
  729. {
  730. assertex(isConnected);
  731. Owned<IFileDescriptor> fd;
  732. if (dfsFile)
  733. fd.setown(dfsFile->getFileDescriptor());
  734. cacheFileDescriptor(logicalName, fd);
  735. }
  736. void cacheFileDescriptor(const char *logicalName, IFileDescriptor *fd)
  737. {
  738. assertex(isConnected);
  739. Owned<IPropertyTree> pt;
  740. if (fd)
  741. pt.setown(fd->getFileTree());
  742. StringBuffer xpath("Files/F.");
  743. normalizeName(logicalName, xpath);
  744. writeCache(xpath.str(), xpath.str(), pt);
  745. }
  746. };
  747. class CRoxieDllServer : public CInterface, implements IDllServer
  748. {
  749. static CriticalSection crit;
  750. bool started;
  751. public:
  752. IMPLEMENT_IINTERFACE;
  753. CRoxieDllServer()
  754. {
  755. started = false;
  756. }
  757. virtual IIterator * createDllIterator() { throwUnexpected(); }
  758. virtual void ensureAvailable(const char * name, DllLocationType location) { throwUnexpected(); }
  759. virtual void getDll(const char * name, MemoryBuffer & dllText) { throwUnexpected(); }
  760. virtual IDllEntry * getEntry(const char * name) { throwUnexpected(); }
  761. virtual void getLibrary(const char * name, MemoryBuffer & dllText) { throwUnexpected(); }
  762. virtual void getLocalLibraryName(const char * name, StringBuffer & libraryName) { throwUnexpected(); }
  763. virtual DllLocationType isAvailable(const char * name) { throwUnexpected(); }
  764. virtual void removeDll(const char * name, bool removeDlls, bool removeDirectory) { throwUnexpected(); }
  765. virtual void registerDll(const char * name, const char * kind, const char * dllPath) { throwUnexpected(); }
  766. virtual IDllEntry * createEntry(IPropertyTree *owner, IPropertyTree *entry) { throwUnexpected(); }
  767. virtual ILoadedDllEntry * loadDll(const char * name, DllLocationType location)
  768. {
  769. if (location == DllLocationDirectory)
  770. {
  771. StringBuffer localName(queryDirectory);
  772. localName.append(name);
  773. if (checkFileExists(localName.str()))
  774. {
  775. try
  776. {
  777. return createDllEntry(localName.str(), false, NULL);
  778. }
  779. catch (ICorruptDllException *E)
  780. {
  781. remove(localName.str());
  782. E->Release();
  783. }
  784. catch (...)
  785. {
  786. throw;
  787. }
  788. }
  789. }
  790. CriticalBlock b(crit);
  791. Owned<IRoxieDaliHelper> daliHelper = connectToDali();
  792. if (!started)
  793. {
  794. if (!recursiveCreateDirectory(queryDirectory))
  795. throw MakeStringException(ROXIE_FILE_ERROR, "Unable to create directory %s", queryDirectory.str());
  796. initDllServer(queryDirectory);
  797. started = true;
  798. }
  799. return queryDllServer().loadDll(name, location);
  800. }
  801. } roxieDllServer;
  802. bool CRoxieDaliHelper::isConnected = false;
  803. CRoxieDaliHelper * CRoxieDaliHelper::daliHelper;
  804. CriticalSection CRoxieDaliHelper::daliHelperCrit;
  805. CriticalSection CRoxieDaliHelper::cacheCrit;
  806. Owned<IPropertyTree> CRoxieDaliHelper::cache;
  807. CriticalSection CRoxieDllServer::crit;
  808. IRoxieDaliHelper *connectToDali(unsigned waitToConnect)
  809. {
  810. return CRoxieDaliHelper::connectToDali(waitToConnect);
  811. }
  812. extern void releaseRoxieStateCache()
  813. {
  814. CRoxieDaliHelper::releaseCache();
  815. }
  816. extern IDllServer &queryRoxieDllServer()
  817. {
  818. return roxieDllServer;
  819. }
  820. extern void addWuException(IConstWorkUnit *workUnit, IException *E)
  821. {
  822. StringBuffer message;
  823. E->errorMessage(message);
  824. unsigned code = E->errorCode();
  825. OERRLOG("%u - %s", code, message.str());
  826. WorkunitUpdate w(&workUnit->lock());
  827. addExceptionToWorkunit(w, ExceptionSeverityError, "Roxie", code, message.str(), NULL, 0, 0);
  828. }