123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893 |
- /*##############################################################################
- HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ############################################################################## */
- #include "jlib.hpp"
- #include "workunit.hpp"
- #include "dalienv.hpp"
- #include "daclient.hpp"
- #include "dautils.hpp"
- #include "dllserver.hpp"
- #include "ccddali.hpp"
- #include "ccdfile.hpp"
- #include "ccdlistener.hpp"
- #include "ccd.hpp"
- #include "jencrypt.hpp"
- #include "jisem.hpp"
- #include "dllserver.hpp"
- #include "thorplugin.hpp"
- #include "workflow.hpp"
- #include "mpcomm.hpp"
- const char *roxieStateName = "RoxieLocalState.xml";
- class CDaliPackageWatcher : public CInterface, implements ISDSSubscription, implements ISDSNodeSubscription, implements IDaliPackageWatcher
- {
- SubscriptionId change;
- ISDSSubscription *notifier;
- StringAttr id;
- StringAttr xpath;
- mutable CriticalSection crit;
- bool isExact;
- public:
- IMPLEMENT_IINTERFACE;
- CDaliPackageWatcher(const char *_id, const char *_xpath, ISDSSubscription *_notifier)
- : id(_id), xpath(_xpath), change(0), isExact(false)
- {
- notifier = _notifier;
- }
- ~CDaliPackageWatcher()
- {
- if (change)
- unsubscribe();
- }
- virtual void subscribe(bool exact)
- {
- CriticalBlock b(crit);
- try
- {
- if (traceLevel > 5)
- DBGLOG("Subscribing to %s, %p", xpath.get(), this);
- if (exact && queryDaliServerVersion().compare(SDS_SVER_MIN_NODESUBSCRIBE) >= 0)
- {
- isExact = true;
- change = querySDS().subscribeExact(xpath, *this, true);
- }
- else
- {
- isExact = false;
- change = querySDS().subscribe(xpath, *this, true);
- }
- }
- catch (IException *E)
- {
- // failure to subscribe implies dali is down... that's ok, we will resubscribe when we notice it come back up.
- E->Release();
- }
- }
- virtual void unsubscribe()
- {
- CriticalBlock b(crit);
- notifier = NULL;
- try
- {
- if (traceLevel > 5)
- DBGLOG("unsubscribing from %s, %p", xpath.get(), this);
- if (change)
- {
- if (isExact)
- querySDS().unsubscribeExact(change);
- else
- querySDS().unsubscribe(change);
- }
- }
- catch (IException *E)
- {
- E->Release();
- }
- change = 0;
- }
- virtual const char *queryName() const
- {
- return id.get();
- }
- virtual void onReconnect()
- {
- Linked<CDaliPackageWatcher> me = this; // Ensure that I am not released by the notify call (which would then access freed memory to release the critsec)
- // It's tempting to think you can avoid holding the critsec during the notify call, and that you only need to hold it while looking up notifier
- // Despite the danger of deadlocks (that requires careful code in the notifier to avoid), I think it is neccessary to hold the lock during the call,
- // as otherwise notifier may point to a deleted object.
- CriticalBlock b(crit);
- if (traceLevel > 5)
- DBGLOG("resubscribing to %s, %p", xpath.get(), this);
- change = querySDS().subscribe(xpath, *this, true);
- if (notifier)
- notifier->notify(0, NULL, SDSNotify_None);
- }
- virtual void notify(SubscriptionId subid, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
- {
- return notify(subid, NULL, flags, valueLen, valueData);
- }
- virtual void notify(SubscriptionId subid, const char *daliXpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
- {
- Linked<CDaliPackageWatcher> me = this; // Ensure that I am not released by the notify call (which would then access freed memory to release the critsec)
- Linked<ISDSSubscription> myNotifier;
- {
- CriticalBlock b(crit);
- if (traceLevel > 5)
- DBGLOG("Notification on %s (%s), %p", xpath.get(), daliXpath ? daliXpath : "", this);
- myNotifier.set(notifier);
- // allow crit to be released, allowing this to be unsubscribed, to avoid deadlocking when other threads via notify call unsubscribe
- }
- if (myNotifier)
- myNotifier->notify(subid, daliXpath, flags, valueLen, valueData);
- }
- };
- /*===============================================================================
- * Roxie is not a typical Dali client - if dali is available it will use it, but
- * it needs to be able to continue to serve any queries that have already been deployed
- * should Dali no longer be available, including the ability to restart a roxie while
- * the dali that originally provided its data is unavailable.
- *
- * This single-instance class is designed to help Roxie manage its dali connections.
- * it connects to dali on demand, and disconnects when all links to this object are
- * released - in practice that won't happen until termination as we keep an active dali
- * connection as long as any QuerySets are subscribed. This class also handles the
- * local caching of sufficient information retrieved from dali so that Roxie can restart
- * even if dali is not available.
- *
- * If dali is down. Data is delivered from the cache. If dali is up, the cache is populated from
- * the information retrieved from dali. We snapshot the cache at the end of each successful reload.
- *
- *
- =================================================================================*/
- class CRoxieDaliHelper : public CInterface, implements IRoxieDaliHelper
- {
- private:
- static bool isConnected;
- static CRoxieDaliHelper *daliHelper; // Note - this does not own the helper
- static CriticalSection daliHelperCrit;
- CriticalSection daliConnectionCrit;
- Owned<IUserDescriptor> userdesc;
- InterruptableSemaphore disconnectSem;
- IArrayOf<IDaliPackageWatcher> watchers;
- CSDSServerStatus *serverStatus;
- class CRoxieDaliConnectWatcher : public Thread
- {
- private:
- CRoxieDaliHelper *owner;
- bool aborted;
- public:
- CRoxieDaliConnectWatcher(CRoxieDaliHelper *_owner) : owner(_owner)
- {
- aborted = false;
- }
- virtual int run()
- {
- while (!aborted)
- {
- if (topology && topology->getPropBool("@lockDali", false))
- {
- Sleep(ROXIE_DALI_CONNECT_TIMEOUT);
- }
- else if (owner->connect(ROXIE_DALI_CONNECT_TIMEOUT))
- {
- DBGLOG("roxie: CRoxieDaliConnectWatcher reconnected");
- try
- {
- owner->disconnectSem.wait();
- Sleep(5000); // Don't retry immediately, give Dali a chance to recover.
- }
- catch (IException *E)
- {
- if (!aborted)
- EXCLOG(E, "roxie: Unexpected exception in CRoxieDaliConnectWatcher");
- E->Release();
- }
- }
- }
- return 0;
- }
- void stop()
- {
- aborted = true;
- }
- virtual void start()
- {
- Thread::start();
- }
- virtual void join()
- {
- if (isAlive())
- Thread::join();
- }
- } connectWatcher;
- virtual void beforeDispose()
- {
- CriticalBlock b(daliHelperCrit);
- disconnectSem.interrupt();
- connectWatcher.stop();
- if (daliHelper==this) // there is a tiny window where new dalihelper created immediately after final release
- {
- disconnect();
- daliHelper = NULL;
- }
- connectWatcher.join();
- }
- // The cache is static since it outlives the dali connections
- static CriticalSection cacheCrit;
- static Owned<IPropertyTree> cache;
- static void initCache()
- {
- IPropertyTree *tree = createPTree("Roxie");
- tree->addPropTree("QuerySets", createPTree("QuerySets"));
- tree->addPropTree("PackageSets", createPTree("PackageSets"));
- tree->addPropTree("PackageMaps", createPTree("PackageMaps"));
- tree->addPropTree("Files", createPTree("Files"));
- cache.setown(tree);
- }
- static void loadCache()
- {
- if (!cache)
- {
- StringBuffer cacheFileName(queryDirectory);
- cacheFileName.append(roxieStateName);
- if (checkFileExists(cacheFileName))
- cache.setown(createPTreeFromXMLFile(cacheFileName));
- else
- initCache();
- }
- }
- static IPropertyTree *readCache(const char *xpath)
- {
- CriticalBlock b(cacheCrit);
- loadCache();
- return cache->getPropTree(xpath);
- }
- static void writeCache(const char *foundLoc, const char *newLoc, IPropertyTree *val)
- {
- CriticalBlock b(cacheCrit);
- if (!cache)
- initCache();
- cache->removeProp(foundLoc);
- if (val)
- cache->addPropTree(newLoc, LINK(val));
- }
- IPropertyTree *loadDaliTree(const char *path, const char *id)
- {
- StringBuffer xpath(path);
- if (id)
- xpath.appendf("[@id='%s']", id);
- Owned <IPropertyTree> localTree;
- if (isConnected)
- {
- try
- {
- Owned<IRemoteConnection> conn = querySDS().connect(xpath, myProcessSession(), 0, 30*1000);
- if (conn)
- {
- Owned <IPropertyTree> daliTree = conn->getRoot();
- localTree.setown(createPTreeFromIPT(daliTree));
- }
- writeCache(xpath, path, localTree);
- return localTree.getClear();
- }
- catch (IDaliClient_Exception *E)
- {
- if (traceLevel)
- EXCLOG(E, "roxie: Dali connection lost");
- E->Release();
- daliHelper->disconnect();
- }
- }
- DBGLOG("LoadDaliTree(%s) - not connected - read from cache", xpath.str());
- localTree.setown(readCache(xpath));
- return localTree.getClear();
- }
- IFileDescriptor *recreateCloneSource(IFileDescriptor *srcfdesc, const char *destfilename)
- {
- Owned<IFileDescriptor> dstfdesc = createFileDescriptor(srcfdesc->getProperties());
- // calculate dest dir
- CDfsLogicalFileName dstlfn;
- if (!dstlfn.setValidate(destfilename,true))
- throw MakeStringException(-1,"Logical name %s invalid",destfilename);
- StringBuffer dstpartmask;
- getPartMask(dstpartmask,destfilename,srcfdesc->numParts());
- dstfdesc->setPartMask(dstpartmask.str());
- unsigned np = srcfdesc->numParts();
- dstfdesc->setNumParts(srcfdesc->numParts());
- dstfdesc->setDefaultDir(srcfdesc->queryProperties().queryProp("@cloneFromDir"));
- for (unsigned pn=0;pn<srcfdesc->numParts();pn++) {
- offset_t sz = srcfdesc->queryPart(pn)->queryProperties().getPropInt64("@size",-1);
- if (sz!=(offset_t)-1)
- dstfdesc->queryPart(pn)->queryProperties().setPropInt64("@size",sz);
- StringBuffer dates;
- if (srcfdesc->queryPart(pn)->queryProperties().getProp("@modified",dates))
- dstfdesc->queryPart(pn)->queryProperties().setProp("@modified",dates.str());
- }
- const char *cloneFrom = srcfdesc->queryProperties().queryProp("@cloneFrom");
- Owned<IPropertyTreeIterator> groups = srcfdesc->queryProperties().getElements("cloneFromGroup");
- ForEach(*groups)
- {
- IPropertyTree &elem = groups->query();
- const char *groupName = elem.queryProp("@groupName");
- StringBuffer dir;
- StringBuffer foreignGroup("foreign::");
- foreignGroup.append(cloneFrom).append("::").append(groupName);
- GroupType groupType;
- queryNamedGroupStore().setRemoteTimeout(2000);
- Owned<IGroup> group = queryNamedGroupStore().lookup(foreignGroup, dir, groupType);
- ClusterPartDiskMapSpec dmSpec;
- dmSpec.fromProp(&elem);
- if (!dmSpec.defaultBaseDir.length())
- {
- if (dir.length())
- {
- dmSpec.setDefaultBaseDir(dir);
- }
- else
- {
- // Due to the really weird code in dadfs, this MUST be set to match the leading portion of cloneFromDir
- // in order to properly handle remote systems with different default directory locations
- StringBuffer tail;
- DFD_OS os = (getPathSepChar(srcfdesc->queryDefaultDir())=='\\')?DFD_OSwindows:DFD_OSunix;
- makePhysicalPartName(dstlfn.get(),0,0,tail,0,os,PATHSEPSTR); // if lfn is a::b::c, tail will be /a/b/
- assertex(tail.length() > 1);
- tail.setLength(tail.length()-1); // strip off the trailing /
- StringBuffer head(srcfdesc->queryProperties().queryProp("@cloneFromDir")); // Will end with /a/b
- assertex(streq(head.str() + head.length() - tail.length(), tail.str()));
- head.setLength(head.length() - tail.length()); // So strip off the end...
- dmSpec.setDefaultBaseDir(head.str());
- }
- }
- dstfdesc->addCluster(groupName, group, dmSpec);
- }
- return dstfdesc.getClear();
- }
- static StringBuffer &normalizeName(const char *name, StringBuffer &ret)
- {
- // Ensure only chars that are accepted by jptree in an xpath element are used
- loop
- {
- char c = *name++;
- if (!c)
- break;
- switch (c)
- {
- case '.':
- ret.append(".."); // Double . as we use it to escape illegal chars
- break;
- case ':':
- case '_':
- case '-':
- ret.append(c);
- break;
- default:
- if (isalnum(c))
- ret.append(c); // Note - we COULD make the cache case-insensitive and we would be right to 99.9% if the time. But there is a weird syntax using H to force uppercase filenames...
- else
- ret.append('.').append((unsigned) (unsigned char) c);
- break;
- }
- }
- return ret;
- }
- public:
- IMPLEMENT_IINTERFACE;
- CRoxieDaliHelper() : connectWatcher(this), serverStatus(NULL)
- {
- userdesc.setown(createUserDescriptor());
- const char *roxieUser = NULL;
- const char *roxiePassword = NULL;
- if (topology)
- {
- roxieUser = topology->queryProp("@ldapUser");
- roxiePassword = topology->queryProp("@ldapPassword");
- }
- if (!roxieUser)
- roxieUser = "roxie";
- if (!roxiePassword)
- roxiePassword = "";
- StringBuffer password;
- decrypt(password, roxiePassword);
- userdesc->set(roxieUser, password.str());
- if (fileNameServiceDali.length())
- connectWatcher.start();
- else
- initMyNode(1); // Hack
- }
- static const char *getQuerySetPath(StringBuffer &buf, const char *id)
- {
- buf.appendf("QuerySets/QuerySet[@id='%s']", id);
- return buf.str();
- }
- virtual IPropertyTree *getQuerySet(const char *id)
- {
- Owned<IPropertyTree> ret = loadDaliTree("QuerySets/QuerySet", id);
- if (!ret)
- {
- ret.setown(createPTree("QuerySet"));
- ret->setProp("@id", id);
- }
- return ret.getClear();
- }
- virtual IPropertyTree *getPackageSets()
- {
- Owned<IPropertyTree> ret = loadDaliTree("PackageSets", NULL);
- if (!ret)
- {
- ret.setown(createPTree("PackageSets"));
- }
- return ret.getClear();
- }
- static const char *getSuperFilePath(StringBuffer &buf, const char *lfn)
- {
- CDfsLogicalFileName lfnParser;
- lfnParser.set(lfn);
- if (!lfnParser.isForeign())
- {
- lfnParser.makeFullnameQuery(buf, DXB_SuperFile, true);
- return buf.str();
- }
- else
- return NULL;
- }
- virtual IPropertyTree *getPackageMap(const char *id)
- {
- Owned<IPropertyTree> ret = loadDaliTree("PackageMaps/PackageMap", id);
- if (!ret)
- {
- ret.setown(createPTree("PackageMap"));
- ret->setProp("@id", id);
- }
- return ret.getClear();
- }
- IFileDescriptor *checkClonedFromRemote(const char *_lfn, IFileDescriptor *fdesc, bool cacheIt)
- {
- // NOTE - we rely on the fact that queryNamedGroupStore().lookup caches results,to avoid excessive load on remote dali
- if (_lfn && !strnicmp(_lfn, "foreign", 7)) //if need to support dali hopping should add each remote location
- return NULL;
- if (!fdesc)
- return NULL;
- const char *cloneFrom = fdesc->queryProperties().queryProp("@cloneFrom");
- if (!cloneFrom)
- return NULL;
- StringBuffer foreignLfn("foreign::");
- foreignLfn.append(cloneFrom);
- if (!connected())
- return resolveCachedLFN(foreignLfn); // Note - cache only used when no dali connection available
- try
- {
- if (fdesc->queryProperties().hasProp("cloneFromGroup") && fdesc->queryProperties().hasProp("@cloneFromDir"))
- {
- Owned<IFileDescriptor> ret = recreateCloneSource(fdesc, _lfn);
- if (cacheIt)
- cacheFileDescriptor(foreignLfn, ret);
- return ret.getClear();
- }
- else // Legacy mode - recently cloned files should have the extra info
- {
- if (traceLevel > 1)
- DBGLOG("checkClonedFromRemote: Resolving %s in legacy mode", _lfn);
- Owned<IDistributedFile> cloneFile = resolveLFN(foreignLfn, cacheIt, false);
- if (cloneFile)
- {
- Owned<IFileDescriptor> cloneFDesc = cloneFile->getFileDescriptor();
- if (cloneFDesc->numParts()==fdesc->numParts())
- return cloneFDesc.getClear();
- DBGLOG(ROXIE_MISMATCH, "File %s cloneFrom(%s) mismatch", _lfn, cloneFrom);
- }
- }
- }
- catch (IException *E)
- {
- if (traceLevel > 3)
- EXCLOG(E);
- E->Release(); // Any failure means act as if no remote info
- }
- return NULL;
- }
- virtual IDistributedFile *resolveLFN(const char *logicalName, bool cacheIt, bool writeAccess)
- {
- if (isConnected)
- {
- unsigned start = msTick();
- CDfsLogicalFileName lfn;
- lfn.set(logicalName);
- Owned<IDistributedFile> dfsFile = queryDistributedFileDirectory().lookup(lfn, userdesc.get(), writeAccess, cacheIt);
- if (dfsFile)
- {
- IDistributedSuperFile *super = dfsFile->querySuperFile();
- if (super && (0 == super->numSubFiles(true)))
- dfsFile.clear();
- }
- if (cacheIt)
- cacheDistributedFile(logicalName, dfsFile);
- if (traceLevel > 1)
- DBGLOG("Dali lookup %s returned %s in %u ms", logicalName, dfsFile != NULL ? "match" : "NO match", msTick()-start);
- return dfsFile.getClear();
- }
- else
- return NULL;
- }
- virtual IFileDescriptor *resolveCachedLFN(const char *logicalName)
- {
- StringBuffer xpath("Files/F.");
- normalizeName(logicalName, xpath);
- Owned<IPropertyTree> pt = readCache(xpath.str());
- if (pt)
- {
- return deserializeFileDescriptorTree(pt);
- }
- else
- return NULL;
- }
- virtual void commitCache()
- {
- if (isConnected)
- {
- CriticalBlock b(cacheCrit);
- if (!recursiveCreateDirectory(queryDirectory))
- throw MakeStringException(ROXIE_FILE_ERROR, "Unable to create directory %s", queryDirectory.str());
- VStringBuffer newCacheFileName("%s%s.new", queryDirectory.str(), roxieStateName);
- VStringBuffer oldCacheFileName("%s%s.bak", queryDirectory.str(), roxieStateName);
- VStringBuffer cacheFileName("%s%s", queryDirectory.str(), roxieStateName);
- saveXML(newCacheFileName, cache);
- Owned<IFile> oldFile = createIFile(oldCacheFileName);
- Owned<IFile> newFile = createIFile(newCacheFileName);
- Owned<IFile> cacheFile = createIFile(cacheFileName);
- if (oldFile->exists())
- oldFile->remove();
- if (cacheFile->exists())
- cacheFile->rename(oldCacheFileName);
- newFile->rename(cacheFileName);
- }
- }
- virtual IConstWorkUnit *attachWorkunit(const char *wuid, ILoadedDllEntry *source)
- {
- assertex(isConnected);
- Owned<IWorkUnitFactory> wuFactory = getWorkUnitFactory();
- Owned<IWorkUnit> w = wuFactory->updateWorkUnit(wuid);
- if (!w)
- return NULL;
- w->setAgentSession(myProcessSession());
- if (source)
- {
- StringBuffer wuXML;
- if (getEmbeddedWorkUnitXML(source, wuXML))
- {
- Owned<ILocalWorkUnit> localWU = createLocalWorkUnit();
- localWU->loadXML(wuXML);
- queryExtendedWU(w)->copyWorkUnit(localWU, true);
- }
- else
- throw MakeStringException(ROXIE_DALI_ERROR, "Failed to locate dll workunit info");
- }
- w->commit();
- w.clear();
- return wuFactory->openWorkUnit(wuid, false);
- }
- virtual void noteWorkunitRunning(const char *wuid, bool running)
- {
- CriticalBlock b(daliConnectionCrit);
- if (isConnected)
- {
- assertex(serverStatus);
- if (running)
- serverStatus->queryProperties()->addProp("WorkUnit",wuid);
- else
- {
- VStringBuffer xpath("WorkUnit[.='%s']",wuid);
- serverStatus->queryProperties()->removeProp(xpath.str());
- }
- serverStatus->commitProperties();
- }
- }
- virtual void noteQueuesRunning(const char *queueNames)
- {
- CriticalBlock b(daliConnectionCrit);
- if (isConnected)
- {
- assertex(serverStatus);
- serverStatus->queryProperties()->setProp("@queue", queueNames);
- serverStatus->commitProperties();
- }
- }
- static IRoxieDaliHelper *connectToDali(unsigned waitToConnect)
- {
- CriticalBlock b(daliHelperCrit);
- LINK(daliHelper);
- if (!daliHelper || !daliHelper->isAlive())
- daliHelper = new CRoxieDaliHelper();
- if (waitToConnect && fileNameServiceDali.length() && (!topology || !topology->getPropBool("@lockDali", false)))
- {
- while (waitToConnect && !daliHelper->connected())
- {
- unsigned delay = 1000;
- if (delay > waitToConnect)
- delay = waitToConnect;
- Sleep(delay);
- waitToConnect -= delay;
- }
- }
- return daliHelper;
- }
- static void releaseCache()
- {
- CriticalBlock b(cacheCrit);
- cache.clear();
- }
- virtual void releaseSubscription(IDaliPackageWatcher *subscription)
- {
- watchers.zap(*subscription);
- subscription->unsubscribe();
- }
- IDaliPackageWatcher *getSubscription(const char *id, const char *xpath, ISDSSubscription *notifier, bool exact)
- {
- IDaliPackageWatcher *watcher = new CDaliPackageWatcher(id, xpath, notifier);
- watchers.append(*LINK(watcher));
- if (isConnected)
- watcher->subscribe(exact);
- return watcher;
- }
- virtual IDaliPackageWatcher *getQuerySetSubscription(const char *id, ISDSSubscription *notifier)
- {
- StringBuffer xpath;
- return getSubscription(id, getQuerySetPath(xpath, id), notifier, false);
- }
- virtual IDaliPackageWatcher *getPackageSetsSubscription(ISDSSubscription *notifier)
- {
- StringBuffer xpath;
- return getSubscription("PackageSets", "PackageSets", notifier, false);
- }
- virtual IDaliPackageWatcher *getPackageMapsSubscription(ISDSSubscription *notifier)
- {
- StringBuffer xpath;
- return getSubscription("PackageMaps", "PackageMaps", notifier, false);
- }
- virtual IDaliPackageWatcher *getSuperFileSubscription(const char *lfn, ISDSSubscription *notifier)
- {
- StringBuffer xpathBuf;
- const char *xpath = getSuperFilePath(xpathBuf, lfn);
- if (xpath)
- return getSubscription(lfn, xpath, notifier, true);
- else
- return NULL;
- }
- virtual bool connected() const
- {
- return isConnected;
- }
- // connect handles the operations generally performed by Dali clients at startup.
- virtual bool connect(unsigned timeout)
- {
- if (!isConnected)
- {
- CriticalBlock b(daliConnectionCrit);
- if (!isConnected)
- {
- try
- {
- // Create server group
- Owned<IGroup> serverGroup = createIGroup(fileNameServiceDali, DALI_SERVER_PORT);
- if (!serverGroup)
- throw MakeStringException(ROXIE_DALI_ERROR, "Could not instantiate dali IGroup");
- // Initialize client process
- if (!initClientProcess(serverGroup, DCR_RoxyMaster, 0, NULL, NULL, timeout))
- throw MakeStringException(ROXIE_DALI_ERROR, "Could not initialize dali client");
- setPasswordsFromSDS();
- serverStatus = new CSDSServerStatus("RoxieServer");
- serverStatus->queryProperties()->setProp("@cluster", roxieName.str());
- serverStatus->commitProperties();
- isConnected = true; // Make sure this is set before the onReconnect calls, so that they refresh with info from Dali rather than from cache
- ForEachItemIn(idx, watchers)
- {
- watchers.item(idx).onReconnect();
- }
- }
- catch(IException *e)
- {
- delete serverStatus;
- serverStatus = NULL;
- ::closedownClientProcess(); // undo any partial initialization
- StringBuffer text;
- e->errorMessage(text);
- DBGLOG(ROXIE_DALI_ERROR, "Error trying to connect to dali %s: %s", fileNameServiceDali.str(), text.str());
- e->Release();
- }
- }
- }
- return isConnected;
- }
- virtual void disconnect()
- {
- if (isConnected)
- {
- CriticalBlock b(daliConnectionCrit);
- if (isConnected)
- {
- isConnected = false;
- delete serverStatus;
- serverStatus = NULL;
- closeDllServer();
- closeEnvironment();
- clientShutdownWorkUnit();
- disconnectRoxieQueues();
- ::closedownClientProcess(); // dali client closedown
- disconnectSem.signal();
- }
- }
- }
- protected:
- void cacheDistributedFile(const char *logicalName, IDistributedFile *dfsFile)
- {
- assertex(isConnected);
- Owned<IFileDescriptor> fd;
- if (dfsFile)
- fd.setown(dfsFile->getFileDescriptor());
- cacheFileDescriptor(logicalName, fd);
- }
- void cacheFileDescriptor(const char *logicalName, IFileDescriptor *fd)
- {
- assertex(isConnected);
- Owned<IPropertyTree> pt;
- if (fd)
- pt.setown(fd->getFileTree());
- StringBuffer xpath("Files/F.");
- normalizeName(logicalName, xpath);
- writeCache(xpath.str(), xpath.str(), pt);
- }
- };
- class CRoxieDllServer : public CInterface, implements IDllServer
- {
- static CriticalSection crit;
- bool started;
- public:
- IMPLEMENT_IINTERFACE;
- CRoxieDllServer()
- {
- started = false;
- }
- virtual IIterator * createDllIterator() { throwUnexpected(); }
- virtual void ensureAvailable(const char * name, DllLocationType location) { throwUnexpected(); }
- virtual void getDll(const char * name, MemoryBuffer & dllText) { throwUnexpected(); }
- virtual IDllEntry * getEntry(const char * name) { throwUnexpected(); }
- virtual void getLibrary(const char * name, MemoryBuffer & dllText) { throwUnexpected(); }
- virtual void getLocalLibraryName(const char * name, StringBuffer & libraryName) { throwUnexpected(); }
- virtual DllLocationType isAvailable(const char * name) { throwUnexpected(); }
- virtual void removeDll(const char * name, bool removeDlls, bool removeDirectory) { throwUnexpected(); }
- virtual void registerDll(const char * name, const char * kind, const char * dllPath) { throwUnexpected(); }
- virtual IDllEntry * createEntry(IPropertyTree *owner, IPropertyTree *entry) { throwUnexpected(); }
- virtual ILoadedDllEntry * loadDll(const char * name, DllLocationType location)
- {
- if (location == DllLocationDirectory)
- {
- StringBuffer localName(queryDirectory);
- localName.append(name);
- if (checkFileExists(localName.str()))
- {
- try
- {
- return createDllEntry(localName.str(), false, NULL);
- }
- catch (ICorruptDllException *E)
- {
- remove(localName.str());
- E->Release();
- }
- catch (...)
- {
- throw;
- }
- }
- }
- CriticalBlock b(crit);
- Owned<IRoxieDaliHelper> daliHelper = connectToDali();
- if (!started)
- {
- if (!recursiveCreateDirectory(queryDirectory))
- throw MakeStringException(ROXIE_FILE_ERROR, "Unable to create directory %s", queryDirectory.str());
- initDllServer(queryDirectory);
- started = true;
- }
- return queryDllServer().loadDll(name, location);
- }
- } roxieDllServer;
- bool CRoxieDaliHelper::isConnected = false;
- CRoxieDaliHelper * CRoxieDaliHelper::daliHelper;
- CriticalSection CRoxieDaliHelper::daliHelperCrit;
- CriticalSection CRoxieDaliHelper::cacheCrit;
- Owned<IPropertyTree> CRoxieDaliHelper::cache;
- CriticalSection CRoxieDllServer::crit;
- IRoxieDaliHelper *connectToDali(unsigned waitToConnect)
- {
- return CRoxieDaliHelper::connectToDali(waitToConnect);
- }
- extern void releaseRoxieStateCache()
- {
- CRoxieDaliHelper::releaseCache();
- }
- extern IDllServer &queryRoxieDllServer()
- {
- return roxieDllServer;
- }
- extern void addWuException(IConstWorkUnit *workUnit, IException *E)
- {
- StringBuffer message;
- E->errorMessage(message);
- unsigned code = E->errorCode();
- OERRLOG("%u - %s", code, message.str());
- WorkunitUpdate w(&workUnit->lock());
- addExceptionToWorkunit(w, ExceptionSeverityError, "Roxie", code, message.str(), NULL, 0, 0);
- }
|