ccddali.cpp 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "jlib.hpp"
  14. #include "workunit.hpp"
  15. #include "dalienv.hpp"
  16. #include "daclient.hpp"
  17. #include "dautils.hpp"
  18. #include "dllserver.hpp"
  19. #include "ccddali.hpp"
  20. #include "ccdfile.hpp"
  21. #include "ccdlistener.hpp"
  22. #include "ccd.hpp"
  23. #include "jencrypt.hpp"
  24. #include "jisem.hpp"
  25. #include "dllserver.hpp"
  26. #include "thorplugin.hpp"
  27. #include "workflow.hpp"
  28. #include "mpcomm.hpp"
  29. const char *roxieStateName = "RoxieLocalState.xml";
  30. class CDaliPackageWatcher : public CInterface, implements ISDSSubscription, implements IDaliPackageWatcher
  31. {
  32. SubscriptionId change;
  33. ISDSSubscription *notifier;
  34. StringAttr id;
  35. StringAttr xpath;
  36. mutable CriticalSection crit;
  37. public:
  38. IMPLEMENT_IINTERFACE;
  39. CDaliPackageWatcher(const char *_id, const char *_xpath, ISDSSubscription *_notifier)
  40. : id(_id), xpath(_xpath), change(0)
  41. {
  42. notifier = _notifier;
  43. }
  44. ~CDaliPackageWatcher()
  45. {
  46. }
  47. virtual void subscribe()
  48. {
  49. CriticalBlock b(crit);
  50. try
  51. {
  52. change = querySDS().subscribe(xpath, *this, true);
  53. }
  54. catch (IException *E)
  55. {
  56. // failure to subscribe implies dali is down... that's ok, we will resubscribe when we notice it come back up.
  57. E->Release();
  58. }
  59. }
  60. virtual void unsubscribe()
  61. {
  62. CriticalBlock b(crit);
  63. notifier = NULL;
  64. try
  65. {
  66. if (change)
  67. querySDS().unsubscribe(change);
  68. }
  69. catch (IException *E)
  70. {
  71. E->Release();
  72. }
  73. change = 0;
  74. }
  75. virtual const char *queryName() const
  76. {
  77. return id.get();
  78. }
  79. virtual void onReconnect()
  80. {
  81. Linked<CDaliPackageWatcher> me = this; // Ensure that I am not released by the notify call (which would then access freed memory to release the critsec)
  82. // It's tempting to think you can avoid holding the critsec during the notify call, and that you only need to hold it while looking up notifier
  83. // Despite the danger of deadlocks (that requires careful code in the notifier to avoid), I think it is neccessary to hold the lock during the call,
  84. // as otherwise notifier may point to a deleted object.
  85. CriticalBlock b(crit);
  86. change = querySDS().subscribe(xpath, *this, true);
  87. if (notifier)
  88. notifier->notify(0, NULL, SDSNotify_None);
  89. }
  90. virtual void notify(SubscriptionId subid, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
  91. {
  92. Linked<CDaliPackageWatcher> me = this; // Ensure that I am not released by the notify call (which would then access freed memory to release the critsec)
  93. CriticalBlock b(crit);
  94. if (notifier)
  95. notifier->notify(subid, xpath, flags, valueLen, valueData);
  96. }
  97. };
  98. /*===============================================================================
  99. * Roxie is not a typical Dali client - if dali is available it will use it, but
  100. * it needs to be able to continue to serve any queries that have already been deployed
  101. * should Dali no longer be available, including the ability to restart a roxie while
  102. * the dali that originally provided its data is unavailable.
  103. *
  104. * This single-instance class is designed to help Roxie manage its dali connections.
  105. * it connects to dali on demand, and disconnects when all links to this object are
  106. * released - in practice that won't happen until termination as we keep an active dali
  107. * connection as long as any QuerySets are subscribed. This class also handles the
  108. * local caching of sufficient information retrieved from dali so that Roxie can restart
  109. * even if dali is not available.
  110. *
  111. * If dali is down. Data is delivered from the cache. If dali is up, the cache is populated from
  112. * the information retrieved from dali. We snapshot the cache at the end of each successful reload.
  113. *
  114. *
  115. =================================================================================*/
  116. class CRoxieDaliHelper : public CInterface, implements IRoxieDaliHelper
  117. {
  118. private:
  119. static bool isConnected;
  120. static CRoxieDaliHelper *daliHelper; // Note - this does not own the helper
  121. static CriticalSection daliHelperCrit;
  122. CriticalSection daliConnectionCrit;
  123. Owned<IUserDescriptor> userdesc;
  124. InterruptableSemaphore disconnectSem;
  125. IArrayOf<IDaliPackageWatcher> watchers;
  126. CSDSServerStatus *serverStatus;
  127. class CRoxieDaliConnectWatcher : public Thread
  128. {
  129. private:
  130. CRoxieDaliHelper *owner;
  131. bool aborted;
  132. public:
  133. CRoxieDaliConnectWatcher(CRoxieDaliHelper *_owner) : owner(_owner)
  134. {
  135. aborted = false;
  136. }
  137. virtual int run()
  138. {
  139. while (!aborted)
  140. {
  141. if (topology && topology->getPropBool("@lockDali", false))
  142. {
  143. Sleep(ROXIE_DALI_CONNECT_TIMEOUT);
  144. }
  145. else if (owner->connect(ROXIE_DALI_CONNECT_TIMEOUT))
  146. {
  147. DBGLOG("roxie: CRoxieDaliConnectWatcher reconnected");
  148. try
  149. {
  150. owner->disconnectSem.wait();
  151. }
  152. catch (IException *E)
  153. {
  154. if (!aborted)
  155. EXCLOG(E, "roxie: Unexpected exception in CRoxieDaliConnectWatcher");
  156. E->Release();
  157. }
  158. }
  159. }
  160. return 0;
  161. }
  162. void stop()
  163. {
  164. aborted = true;
  165. }
  166. virtual void start()
  167. {
  168. Thread::start();
  169. }
  170. virtual void join()
  171. {
  172. if (isAlive())
  173. Thread::join();
  174. }
  175. } connectWatcher;
  176. virtual void beforeDispose()
  177. {
  178. CriticalBlock b(daliHelperCrit);
  179. disconnectSem.interrupt();
  180. connectWatcher.stop();
  181. if (daliHelper==this) // there is a tiny window where new dalihelper created immediately after final release
  182. {
  183. disconnect();
  184. daliHelper = NULL;
  185. }
  186. connectWatcher.join();
  187. }
  188. // The cache is static since it outlives the dali connections
  189. static CriticalSection cacheCrit;
  190. static Owned<IPropertyTree> cache;
  191. static void initCache()
  192. {
  193. IPropertyTree *tree = createPTree("Roxie");
  194. tree->addPropTree("QuerySets", createPTree("QuerySets"));
  195. tree->addPropTree("PackageSets", createPTree("PackageSets"));
  196. tree->addPropTree("PackageMaps", createPTree("PackageMaps"));
  197. tree->addPropTree("Files", createPTree("Files"));
  198. cache.setown(tree);
  199. }
  200. static void loadCache()
  201. {
  202. if (!cache)
  203. {
  204. StringBuffer cacheFileName(queryDirectory);
  205. cacheFileName.append(roxieStateName);
  206. if (checkFileExists(cacheFileName))
  207. cache.setown(createPTreeFromXMLFile(cacheFileName));
  208. else
  209. initCache();
  210. }
  211. }
  212. static IPropertyTree *readCache(const char *xpath)
  213. {
  214. CriticalBlock b(cacheCrit);
  215. loadCache();
  216. return cache->getPropTree(xpath);
  217. }
  218. static void writeCache(const char *foundLoc, const char *newLoc, IPropertyTree *val)
  219. {
  220. CriticalBlock b(cacheCrit);
  221. cache->removeProp(foundLoc);
  222. if (val)
  223. cache->addPropTree(newLoc, LINK(val));
  224. }
  225. IPropertyTree *loadDaliTree(const char *path, const char *id)
  226. {
  227. StringBuffer xpath(path);
  228. if (id)
  229. xpath.appendf("[@id='%s']", id);
  230. Owned <IPropertyTree> localTree;
  231. if (isConnected)
  232. {
  233. try
  234. {
  235. Owned<IRemoteConnection> conn = querySDS().connect(xpath, myProcessSession(), 0, 30*1000);
  236. if (conn)
  237. {
  238. Owned <IPropertyTree> daliTree = conn->getRoot();
  239. localTree.setown(createPTreeFromIPT(daliTree));
  240. }
  241. writeCache(xpath, path, localTree);
  242. return localTree.getClear();
  243. }
  244. catch (IDaliClient_Exception *E)
  245. {
  246. if (traceLevel)
  247. EXCLOG(E, "roxie: Dali connection lost");
  248. E->Release();
  249. daliHelper->disconnect();
  250. }
  251. }
  252. DBGLOG("LoadDaliTree(%s) - not connected - read from cache", xpath.str());
  253. localTree.setown(readCache(xpath));
  254. return localTree.getClear();
  255. }
  256. IFileDescriptor *recreateCloneSource(IFileDescriptor *srcfdesc, const char *destfilename)
  257. {
  258. Owned<IFileDescriptor> dstfdesc = createFileDescriptor(srcfdesc->getProperties());
  259. // calculate dest dir
  260. CDfsLogicalFileName dstlfn;
  261. if (!dstlfn.setValidate(destfilename,true))
  262. throw MakeStringException(-1,"Logical name %s invalid",destfilename);
  263. StringBuffer dstpartmask;
  264. getPartMask(dstpartmask,destfilename,srcfdesc->numParts());
  265. dstfdesc->setPartMask(dstpartmask.str());
  266. unsigned np = srcfdesc->numParts();
  267. dstfdesc->setNumParts(srcfdesc->numParts());
  268. dstfdesc->setDefaultDir(srcfdesc->queryProperties().queryProp("@cloneFromDir"));
  269. for (unsigned pn=0;pn<srcfdesc->numParts();pn++) {
  270. offset_t sz = srcfdesc->queryPart(pn)->queryProperties().getPropInt64("@size",-1);
  271. if (sz!=(offset_t)-1)
  272. dstfdesc->queryPart(pn)->queryProperties().setPropInt64("@size",sz);
  273. StringBuffer dates;
  274. if (srcfdesc->queryPart(pn)->queryProperties().getProp("@modified",dates))
  275. dstfdesc->queryPart(pn)->queryProperties().setProp("@modified",dates.str());
  276. }
  277. const char *cloneFrom = srcfdesc->queryProperties().queryProp("@cloneFrom");
  278. Owned<IPropertyTreeIterator> groups = srcfdesc->queryProperties().getElements("cloneFromGroup");
  279. ForEach(*groups)
  280. {
  281. IPropertyTree &elem = groups->query();
  282. const char *groupName = elem.queryProp("@groupName");
  283. StringBuffer dir;
  284. StringBuffer foreignGroup("foreign::");
  285. foreignGroup.append(cloneFrom).append("::").append(groupName);
  286. GroupType groupType;
  287. Owned<IGroup> group = queryNamedGroupStore().lookup(foreignGroup, dir, groupType);
  288. ClusterPartDiskMapSpec dmSpec;
  289. dmSpec.fromProp(&elem);
  290. if (!dmSpec.defaultBaseDir.length())
  291. {
  292. if (dir.length())
  293. {
  294. dmSpec.setDefaultBaseDir(dir);
  295. }
  296. else
  297. {
  298. // Due to the really weird code in dadfs, this MUST be set to match the leading portion of cloneFromDir
  299. // in order to properly handle remote systems with different default directory locations
  300. StringBuffer tail;
  301. DFD_OS os = (getPathSepChar(srcfdesc->queryDefaultDir())=='\\')?DFD_OSwindows:DFD_OSunix;
  302. makePhysicalPartName(dstlfn.get(),0,0,tail,0,os,PATHSEPSTR); // if lfn is a::b::c, tail will be /a/b/
  303. assertex(tail.length() > 1);
  304. tail.setLength(tail.length()-1); // strip off the trailing /
  305. StringBuffer head(srcfdesc->queryProperties().queryProp("@cloneFromDir")); // Will end with /a/b
  306. assertex(streq(head.str() + head.length() - tail.length(), tail.str()));
  307. head.setLength(head.length() - tail.length()); // So strip off the end...
  308. dmSpec.setDefaultBaseDir(head.str());
  309. }
  310. }
  311. dstfdesc->addCluster(groupName, group, dmSpec);
  312. }
  313. return dstfdesc.getClear();
  314. }
  315. public:
  316. IMPLEMENT_IINTERFACE;
  317. CRoxieDaliHelper() : connectWatcher(this), serverStatus(NULL)
  318. {
  319. userdesc.setown(createUserDescriptor());
  320. const char *roxieUser;
  321. const char *roxiePassword;
  322. if (topology)
  323. {
  324. roxieUser = topology->queryProp("@ldapUser");
  325. roxiePassword = topology->queryProp("@ldapPassword");
  326. }
  327. if (!roxieUser)
  328. roxieUser = "roxie";
  329. if (!roxiePassword)
  330. roxiePassword = "";
  331. StringBuffer password;
  332. decrypt(password, roxiePassword);
  333. userdesc->set(roxieUser, password.str());
  334. if (fileNameServiceDali.length())
  335. connectWatcher.start();
  336. else
  337. initMyNode(1); // Hack
  338. }
  339. static const char *getQuerySetPath(StringBuffer &buf, const char *id)
  340. {
  341. buf.appendf("QuerySets/QuerySet[@id='%s']", id);
  342. return buf.str();
  343. }
  344. virtual IPropertyTree *getQuerySet(const char *id)
  345. {
  346. Owned<IPropertyTree> ret = loadDaliTree("QuerySets/QuerySet", id);
  347. if (!ret)
  348. {
  349. ret.setown(createPTree("QuerySet"));
  350. ret->setProp("@id", id);
  351. }
  352. return ret.getClear();
  353. }
  354. virtual IPropertyTree *getPackageSets()
  355. {
  356. Owned<IPropertyTree> ret = loadDaliTree("PackageSets", NULL);
  357. if (!ret)
  358. {
  359. ret.setown(createPTree("PackageSets"));
  360. }
  361. return ret.getClear();
  362. }
  363. static const char *getPackageMapPath(StringBuffer &buf, const char *id)
  364. {
  365. buf.appendf("PackageMaps/PackageMap[@id='%s']", id);
  366. return buf.str();
  367. }
  368. virtual IPropertyTree *getPackageMap(const char *id)
  369. {
  370. Owned<IPropertyTree> ret = loadDaliTree("PackageMaps/PackageMap", id);
  371. if (!ret)
  372. {
  373. ret.setown(createPTree("PackageMap"));
  374. ret->setProp("@id", id);
  375. }
  376. return ret.getClear();
  377. }
  378. IFileDescriptor *checkClonedFromRemote(const char *_lfn, IFileDescriptor *fdesc, bool cacheIt)
  379. {
  380. // NOTE - we rely on the fact that queryNamedGroupStore().lookup caches results,to avoid excessive load on remote dali
  381. if (_lfn && !strnicmp(_lfn, "foreign", 7)) //if need to support dali hopping should add each remote location
  382. return NULL;
  383. if (!fdesc || !fdesc->queryProperties().hasProp("@cloneFrom"))
  384. return NULL;
  385. if (fdesc->queryProperties().hasProp("cloneFromGroup"))
  386. {
  387. return recreateCloneSource(fdesc, _lfn);
  388. }
  389. else // Legacy mode - recently cloned files should have the extra info
  390. {
  391. SocketEndpoint cloneFrom;
  392. cloneFrom.set(fdesc->queryProperties().queryProp("@cloneFrom"));
  393. if (cloneFrom.isNull())
  394. return NULL;
  395. CDfsLogicalFileName lfn;
  396. lfn.set(_lfn);
  397. lfn.setForeign(cloneFrom, false);
  398. if (!connected())
  399. return resolveCachedLFN(lfn.get());
  400. Owned<IDistributedFile> cloneFile = resolveLFN(lfn.get(), cacheIt, false);
  401. if (cloneFile)
  402. {
  403. Owned<IFileDescriptor> cloneFDesc = cloneFile->getFileDescriptor();
  404. if (cloneFDesc->numParts()==fdesc->numParts())
  405. return cloneFDesc.getClear();
  406. StringBuffer s;
  407. DBGLOG(ROXIE_MISMATCH, "File %s cloneFrom(%s) mismatch", _lfn, cloneFrom.getIpText(s).str());
  408. }
  409. }
  410. return NULL;
  411. }
  412. virtual IDistributedFile *resolveLFN(const char *logicalName, bool cacheIt, bool writeAccess)
  413. {
  414. if (isConnected)
  415. {
  416. if (traceLevel > 1)
  417. DBGLOG("Dali lookup %s", logicalName);
  418. CDfsLogicalFileName lfn;
  419. lfn.set(logicalName);
  420. Owned<IDistributedFile> dfsFile = queryDistributedFileDirectory().lookup(lfn, userdesc.get(), writeAccess, cacheIt);
  421. if (dfsFile)
  422. {
  423. IDistributedSuperFile *super = dfsFile->querySuperFile();
  424. if (super && (0 == super->numSubFiles(true)))
  425. dfsFile.clear();
  426. }
  427. if (cacheIt)
  428. {
  429. Owned<IFileDescriptor> fd;
  430. Owned<IPropertyTree> pt;
  431. if (dfsFile)
  432. {
  433. fd.setown(dfsFile->getFileDescriptor());
  434. if (fd)
  435. pt.setown(fd->getFileTree());
  436. }
  437. StringBuffer xpath("Files/");
  438. StringBuffer lcname;
  439. xpath.append(lcname.append(logicalName).toLowerCase());
  440. writeCache(xpath.str(), xpath.str(), pt);
  441. }
  442. return dfsFile.getClear();
  443. }
  444. else
  445. return NULL;
  446. }
  447. virtual IFileDescriptor *resolveCachedLFN(const char *logicalName)
  448. {
  449. StringBuffer xpath("Files/");
  450. StringBuffer lcname;
  451. xpath.append(lcname.append(logicalName).toLowerCase());
  452. Owned<IPropertyTree> pt = readCache(xpath.str());
  453. if (pt)
  454. {
  455. return deserializeFileDescriptorTree(pt);
  456. }
  457. else
  458. return NULL;
  459. }
  460. virtual void commitCache()
  461. {
  462. if (isConnected)
  463. {
  464. CriticalBlock b(cacheCrit);
  465. if (!recursiveCreateDirectory(queryDirectory))
  466. throw MakeStringException(ROXIE_FILE_ERROR, "Unable to create directory %s", queryDirectory.str());
  467. VStringBuffer newCacheFileName("%s%s.new", queryDirectory.str(), roxieStateName);
  468. VStringBuffer oldCacheFileName("%s%s.bak", queryDirectory.str(), roxieStateName);
  469. VStringBuffer cacheFileName("%s%s", queryDirectory.str(), roxieStateName);
  470. saveXML(newCacheFileName, cache);
  471. Owned<IFile> oldFile = createIFile(oldCacheFileName);
  472. Owned<IFile> newFile = createIFile(newCacheFileName);
  473. Owned<IFile> cacheFile = createIFile(cacheFileName);
  474. if (oldFile->exists())
  475. oldFile->remove();
  476. if (cacheFile->exists())
  477. cacheFile->rename(oldCacheFileName);
  478. newFile->rename(cacheFileName);
  479. }
  480. }
  481. virtual IConstWorkUnit *attachWorkunit(const char *wuid, ILoadedDllEntry *source)
  482. {
  483. assertex(isConnected);
  484. Owned<IWorkUnitFactory> wuFactory = getWorkUnitFactory();
  485. Owned<IWorkUnit> w = wuFactory->updateWorkUnit(wuid);
  486. if (!w)
  487. return NULL;
  488. w->setAgentSession(myProcessSession());
  489. if (source)
  490. {
  491. StringBuffer wuXML;
  492. if (getEmbeddedWorkUnitXML(source, wuXML))
  493. {
  494. Owned<ILocalWorkUnit> localWU = createLocalWorkUnit();
  495. localWU->loadXML(wuXML);
  496. queryExtendedWU(w)->copyWorkUnit(localWU, true);
  497. }
  498. else
  499. throw MakeStringException(ROXIE_DALI_ERROR, "Failed to locate dll workunit info");
  500. }
  501. w->commit();
  502. w.clear();
  503. return wuFactory->openWorkUnit(wuid, false);
  504. }
  505. virtual void noteWorkunitRunning(const char *wuid, bool running)
  506. {
  507. CriticalBlock b(daliConnectionCrit);
  508. if (isConnected)
  509. {
  510. assertex(serverStatus);
  511. if (running)
  512. serverStatus->queryProperties()->addProp("WorkUnit",wuid);
  513. else
  514. {
  515. VStringBuffer xpath("WorkUnit[.='%s']",wuid);
  516. serverStatus->queryProperties()->removeProp(xpath.str());
  517. }
  518. serverStatus->commitProperties();
  519. }
  520. }
  521. virtual void noteQueuesRunning(const char *queueNames)
  522. {
  523. CriticalBlock b(daliConnectionCrit);
  524. if (isConnected)
  525. {
  526. assertex(serverStatus);
  527. serverStatus->queryProperties()->setProp("@queue", queueNames);
  528. serverStatus->commitProperties();
  529. }
  530. }
  531. static IRoxieDaliHelper *connectToDali(unsigned waitToConnect)
  532. {
  533. CriticalBlock b(daliHelperCrit);
  534. LINK(daliHelper);
  535. if (!daliHelper || !daliHelper->isAlive())
  536. daliHelper = new CRoxieDaliHelper();
  537. if (waitToConnect && fileNameServiceDali.length() && (!topology || !topology->getPropBool("@lockDali", false)))
  538. {
  539. while (waitToConnect && !daliHelper->connected())
  540. {
  541. unsigned delay = 1000;
  542. if (delay > waitToConnect)
  543. delay = waitToConnect;
  544. Sleep(delay);
  545. waitToConnect -= delay;
  546. }
  547. }
  548. return daliHelper;
  549. }
  550. static void releaseCache()
  551. {
  552. CriticalBlock b(cacheCrit);
  553. cache.clear();
  554. }
  555. virtual void releaseSubscription(IDaliPackageWatcher *subscription)
  556. {
  557. watchers.zap(*subscription);
  558. subscription->unsubscribe();
  559. }
  560. IDaliPackageWatcher *getSubscription(const char *id, const char *xpath, ISDSSubscription *notifier)
  561. {
  562. IDaliPackageWatcher *watcher = new CDaliPackageWatcher(id, xpath, notifier);
  563. watchers.append(*LINK(watcher));
  564. if (isConnected)
  565. watcher->subscribe();
  566. return watcher;
  567. }
  568. virtual IDaliPackageWatcher *getQuerySetSubscription(const char *id, ISDSSubscription *notifier)
  569. {
  570. StringBuffer xpath;
  571. return getSubscription(id, getQuerySetPath(xpath, id), notifier);
  572. }
  573. virtual IDaliPackageWatcher *getPackageSetsSubscription(ISDSSubscription *notifier)
  574. {
  575. StringBuffer xpath;
  576. return getSubscription("PackageSets", "PackageSets", notifier);
  577. }
  578. virtual IDaliPackageWatcher *getPackageMapSubscription(const char *id, ISDSSubscription *notifier)
  579. {
  580. StringBuffer xpath;
  581. return getSubscription(id, getPackageMapPath(xpath, id), notifier);
  582. }
  583. virtual bool connected() const
  584. {
  585. return isConnected;
  586. }
  587. // connect handles the operations generally performed by Dali clients at startup.
  588. virtual bool connect(unsigned timeout)
  589. {
  590. if (!isConnected)
  591. {
  592. CriticalBlock b(daliConnectionCrit);
  593. if (!isConnected)
  594. {
  595. try
  596. {
  597. // Create server group
  598. Owned<IGroup> serverGroup = createIGroup(fileNameServiceDali, DALI_SERVER_PORT);
  599. if (!serverGroup)
  600. throw MakeStringException(ROXIE_DALI_ERROR, "Could not instantiate dali IGroup");
  601. // Initialize client process
  602. if (!initClientProcess(serverGroup, DCR_RoxyMaster, 0, NULL, NULL, timeout))
  603. throw MakeStringException(ROXIE_DALI_ERROR, "Could not initialize dali client");
  604. setPasswordsFromSDS();
  605. serverStatus = new CSDSServerStatus("RoxieServer");
  606. serverStatus->queryProperties()->setProp("@cluster", roxieName.str());
  607. serverStatus->commitProperties();
  608. initCache();
  609. isConnected = true;
  610. ForEachItemIn(idx, watchers)
  611. {
  612. watchers.item(idx).onReconnect();
  613. }
  614. }
  615. catch(IException *e)
  616. {
  617. ::closedownClientProcess(); // undo any partial initialization
  618. StringBuffer text;
  619. e->errorMessage(text);
  620. DBGLOG(ROXIE_DALI_ERROR, "Error trying to connect to dali %s: %s", fileNameServiceDali.str(), text.str());
  621. e->Release();
  622. }
  623. }
  624. }
  625. return isConnected;
  626. }
  627. virtual void disconnect()
  628. {
  629. if (isConnected)
  630. {
  631. CriticalBlock b(daliConnectionCrit);
  632. if (isConnected)
  633. {
  634. delete serverStatus;
  635. serverStatus = NULL;
  636. closeDllServer();
  637. closeEnvironment();
  638. clientShutdownWorkUnit();
  639. ::closedownClientProcess(); // dali client closedown
  640. isConnected = false;
  641. disconnectSem.signal();
  642. disconnectRoxieQueues();
  643. }
  644. }
  645. }
  646. };
  647. class CRoxieDllServer : public CInterface, implements IDllServer
  648. {
  649. static CriticalSection crit;
  650. bool started;
  651. public:
  652. IMPLEMENT_IINTERFACE;
  653. CRoxieDllServer()
  654. {
  655. started = false;
  656. }
  657. virtual IIterator * createDllIterator() { throwUnexpected(); }
  658. virtual void ensureAvailable(const char * name, DllLocationType location) { throwUnexpected(); }
  659. virtual void getDll(const char * name, MemoryBuffer & dllText) { throwUnexpected(); }
  660. virtual IDllEntry * getEntry(const char * name) { throwUnexpected(); }
  661. virtual void getLibrary(const char * name, MemoryBuffer & dllText) { throwUnexpected(); }
  662. virtual void getLocalLibraryName(const char * name, StringBuffer & libraryName) { throwUnexpected(); }
  663. virtual DllLocationType isAvailable(const char * name) { throwUnexpected(); }
  664. virtual void removeDll(const char * name, bool removeDlls, bool removeDirectory) { throwUnexpected(); }
  665. virtual void registerDll(const char * name, const char * kind, const char * dllPath) { throwUnexpected(); }
  666. virtual IDllEntry * createEntry(IPropertyTree *owner, IPropertyTree *entry) { throwUnexpected(); }
  667. virtual ILoadedDllEntry * loadDll(const char * name, DllLocationType location)
  668. {
  669. if (location == DllLocationDirectory)
  670. {
  671. StringBuffer localName(queryDirectory);
  672. localName.append(name);
  673. if (checkFileExists(localName.str()))
  674. return createDllEntry(localName.str(), false, NULL);
  675. }
  676. CriticalBlock b(crit);
  677. Owned<IRoxieDaliHelper> daliHelper = connectToDali();
  678. if (!started)
  679. {
  680. if (!recursiveCreateDirectory(queryDirectory))
  681. throw MakeStringException(ROXIE_FILE_ERROR, "Unable to create directory %s", queryDirectory.str());
  682. initDllServer(queryDirectory);
  683. started = true;
  684. }
  685. return queryDllServer().loadDll(name, location);
  686. }
  687. } roxieDllServer;
  688. bool CRoxieDaliHelper::isConnected = false;
  689. CRoxieDaliHelper * CRoxieDaliHelper::daliHelper;
  690. CriticalSection CRoxieDaliHelper::daliHelperCrit;
  691. CriticalSection CRoxieDaliHelper::cacheCrit;
  692. Owned<IPropertyTree> CRoxieDaliHelper::cache;
  693. CriticalSection CRoxieDllServer::crit;
  694. IRoxieDaliHelper *connectToDali(unsigned waitToConnect)
  695. {
  696. return CRoxieDaliHelper::connectToDali(waitToConnect);
  697. }
  698. extern void releaseRoxieStateCache()
  699. {
  700. CRoxieDaliHelper::releaseCache();
  701. }
  702. extern IDllServer &queryRoxieDllServer()
  703. {
  704. return roxieDllServer;
  705. }
  706. extern void addWuException(IConstWorkUnit *workUnit, IException *E)
  707. {
  708. StringBuffer message;
  709. E->errorMessage(message);
  710. unsigned code = E->errorCode();
  711. OERRLOG("%u - %s", code, message.str());
  712. WorkunitUpdate w(&workUnit->lock());
  713. addExceptionToWorkunit(w, ExceptionSeverityError, "Roxie", code, message.str(), NULL, 0, 0);
  714. }