Browse Source

HPCC-11253 Do not lock superfiles in roxie

In place of locking superfiles, we subscribe to them so that if they change
any queries using them are automatically reloaded.

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 11 years ago
parent
commit
d086d73e19

+ 23 - 0
roxie/ccd/ccddali.cpp

@@ -403,6 +403,19 @@ public:
         return buf.str();
     }
 
+    static const char *getSuperFilePath(StringBuffer &buf, const char *lfn)
+    {
+        CDfsLogicalFileName lfnParser;
+        lfnParser.set(lfn);
+        if (!lfnParser.isForeign())
+        {
+            lfnParser.makeFullnameQuery(buf, DXB_SuperFile, true);
+            return buf.str();
+        }
+        else
+            return NULL;
+    }
+
     virtual IPropertyTree *getPackageMap(const char *id)
     {
         Owned<IPropertyTree> ret = loadDaliTree("PackageMaps/PackageMap", id);
@@ -633,6 +646,16 @@ public:
         return getSubscription(id, getPackageMapPath(xpath, id), notifier);
     }
 
+    virtual IDaliPackageWatcher *getSuperFileSubscription(const char *lfn, ISDSSubscription *notifier)
+    {
+        StringBuffer xpathBuf;
+        const char *xpath = getSuperFilePath(xpathBuf, lfn);
+        if (xpath)
+            return getSubscription(lfn, xpath, notifier);
+        else
+            return NULL;
+    }
+
     virtual bool connected() const
     {
         return isConnected;

+ 1 - 0
roxie/ccd/ccddali.hpp

@@ -49,6 +49,7 @@ interface IRoxieDaliHelper : extends IInterface
     virtual IDaliPackageWatcher *getPackageSetsSubscription(ISDSSubscription *notifier) = 0;
     virtual IPropertyTree *getPackageMap(const char *id) = 0;
     virtual IDaliPackageWatcher *getPackageMapSubscription(const char *id, ISDSSubscription *notifier) = 0;
+    virtual IDaliPackageWatcher *getSuperFileSubscription(const char *lfn, ISDSSubscription *notifier) = 0;
     virtual void releaseSubscription(IDaliPackageWatcher *subscription) = 0;
     virtual bool connect(unsigned timeout) = 0;
     virtual void disconnect() = 0;

+ 45 - 8
roxie/ccd/ccdfile.cpp

@@ -1626,7 +1626,7 @@ public:
 
 CRoxieFileCache * fileCache;
 
-class CResolvedFile : public CInterface, implements IResolvedFileCreator
+class CResolvedFile : public CInterface, implements IResolvedFileCreator, implements ISDSSubscription
 {
 protected:
     IResolvedFileCache *cached;
@@ -1647,6 +1647,7 @@ protected:
     IArrayOf<IResolvedFile> subRFiles;  // To make sure subfiles get locked too
 
     Owned <IPropertyTree> properties;
+    Owned<IDaliPackageWatcher> notifier;
 
     void addFile(const char *subName, IFileDescriptor *fdesc, IFileDescriptor *remoteFDesc)
     {
@@ -1676,6 +1677,19 @@ protected:
         fileSize += base;
     }
 
+    virtual void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
+    {
+        if (traceLevel > 2)
+            DBGLOG("Superfile %s change detected", lfn.get());
+        CriticalBlock b(lock);
+        if (cached)
+        {
+            cached->removeCache(this);
+            cached = NULL;
+        }
+        globalPackageSetManager->requestReload();
+    }
+
     // We cache all the file maps/arrays etc here. 
     mutable CriticalSection lock;
     mutable Owned<IFilePartMap> fileMap;
@@ -1695,6 +1709,9 @@ public:
         {
             if (traceLevel > 5)
                 DBGLOG("Roxie server adding information for file %s", lfn.get());
+            bool tsSet = dFile->getModificationTime(fileTimeStamp);
+            bool csSet = dFile->getFileCheckSum(fileCheckSum);
+            assertex(tsSet); // per Nigel, is always set
             IDistributedSuperFile *superFile = dFile->querySuperFile();
             if (superFile)
             {
@@ -1710,20 +1727,21 @@ public:
                     subDFiles.append(OLINK(sub));
                     addFile(sub.queryLogicalName(), fDesc.getClear(), remoteFDesc.getClear());
                 }
+                notifier.setown(daliHelper->getSuperFileSubscription(lfn, this));
+                // We have to clone the properties since we don't want to keep the superfile locked
+                properties.setown(createPTreeFromIPT(&dFile->queryAttributes()));
+                dFile.clear();  // We don't lock superfiles
             }
             else // normal file, not superkey
             {
                 isSuper = false;
+                properties.set(&dFile->queryAttributes());
                 Owned<IFileDescriptor> fDesc = dFile->getFileDescriptor();
                 Owned<IFileDescriptor> remoteFDesc;
                 if (daliHelper)
                     remoteFDesc.setown(daliHelper->checkClonedFromRemote(_lfn, fDesc, cacheIt));
                 addFile(dFile->queryLogicalName(), fDesc.getClear(), remoteFDesc.getClear());
             }
-            bool tsSet = dFile->getModificationTime(fileTimeStamp);
-            bool csSet = dFile->getFileCheckSum(fileCheckSum);
-            assertex(tsSet); // per Nigel, is always set
-            properties.set(&dFile->queryAttributes());
         }
     }
     virtual void beforeDispose()
@@ -2066,6 +2084,14 @@ public:
         return fileSize;
     }
 
+    virtual hash64_t addHash64(hash64_t hashValue) const
+    {
+        hashValue = rtlHash64Data(sizeof(fileTimeStamp), &fileTimeStamp, hashValue);
+        if (fileCheckSum)
+            hashValue = rtlHash64Data(sizeof(fileCheckSum), &fileCheckSum, hashValue);
+        return hashValue;
+    }
+
     virtual void addSubFile(const IResolvedFile *_sub)
     {
         const CResolvedFile *sub = static_cast<const CResolvedFile *>(_sub);
@@ -2130,8 +2156,19 @@ public:
     virtual void remove()
     {
         subFiles.kill();
+        subDFiles.kill();
+        subRFiles.kill();
+        subNames.kill();
+        remoteSubFiles.kill();
+        diskMeta.kill();
         properties.clear();
-        if (dFile)
+        notifier.clear();
+        if (isSuper)
+        {
+            // Because we don't lock superfiles, we need to behave differently
+            UNIMPLEMENTED;
+        }
+        else if (dFile)
         {
             dFile->detach();
         }
@@ -2153,8 +2190,8 @@ public:
     {
         // MORE - this is a little bizarre. We sometimes create a resolvedFile for a file that we are intending to create.
         // This will make more sense if/when we start to lock earlier.
-        if (dFile)
-            return true; // MORE - may need some thought
+        if (dFile || isSuper)
+            return true; // MORE - may need some thought - especially the isSuper case
         else
             return checkFileExists(lfn.get());
     }

+ 1 - 0
roxie/ccd/ccdfile.hpp

@@ -99,6 +99,7 @@ interface IResolvedFile : extends ISimpleSuperFileEnquiry
 
     virtual const CDateTime &queryTimeStamp() const = 0;
     virtual unsigned queryCheckSum() const = 0;
+    virtual hash64_t addHash64(hash64_t hashValue) const = 0;
 
     virtual const char *queryPhysicalName() const = 0; // Returns NULL unless in local file mode.
     virtual const char *queryFileName() const = 0;

+ 53 - 5
roxie/ccd/ccdquery.cpp

@@ -898,11 +898,57 @@ public:
             return NULL;
     }
 
-    static hash64_t getQueryHash(const char *id, const IQueryDll *dll, const IHpccPackage &package, const IPropertyTree *stateInfo)
+    static hash64_t getQueryHash(const char *id, const IQueryDll *dll, const IRoxiePackage &package, const IPropertyTree *stateInfo, IArrayOf<IResolvedFile> &files)
     {
         hash64_t hashValue = package.queryHash();
         if (dll)
+        {
             hashValue = rtlHash64VStr(dll->queryDll()->queryName(), hashValue);
+            if (!allFilesDynamic)
+            {
+                IConstWorkUnit *wu = dll->queryWorkUnit();
+                if (wu) // wu may be null in some unit test cases
+                {
+                    SCMStringBuffer bStr;
+                    if (getClusterType(wu->getDebugValue("targetClusterType", bStr).str(), RoxieCluster) == RoxieCluster)
+                    {
+                        Owned<IConstWUGraphIterator> graphs = &wu->getGraphs(GraphTypeActivities);
+                        ForEach(*graphs)
+                        {
+                            Owned<IPropertyTree> graphXgmml = graphs->query().getXGMMLTree(false);
+                            Owned<IPropertyTreeIterator> nodes = graphXgmml->getElements(".//node");
+                            ForEach(*nodes)
+                            {
+                                IPropertyTree &node = nodes->query();
+                                const char *fileName = queryNodeFileName(node);
+                                const char *indexName = queryNodeIndexName(node);
+                                // MORE - what about write? What about packages that resolve everything without dali?
+                                if (indexName && (!fileName || !streq(indexName, fileName)))
+                                {
+                                    bool isOpt = node.getPropBool("att[@name='_isIndexOpt']/@value") || pretendAllOpt;
+                                    const IResolvedFile *indexFile = package.lookupFileName(indexName, isOpt, true, true, wu);
+                                    if (indexFile)
+                                    {
+                                        hashValue = indexFile->addHash64(hashValue);
+                                        files.append(*const_cast<IResolvedFile *>(indexFile));
+                                    }
+                                }
+                                if (fileName)
+                                {
+                                    bool isOpt = node.getPropBool("att[@name='_isOpt']/@value") || pretendAllOpt;
+                                    const IResolvedFile *dataFile = package.lookupFileName(fileName, isOpt, true, true, wu);
+                                    if (dataFile)
+                                    {
+                                        hashValue = dataFile->addHash64(hashValue);
+                                        files.append(*const_cast<IResolvedFile *>(dataFile));
+                                    }
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+        }
         if (id)
             hashValue = rtlHash64VStr(id, hashValue);
         if (stateInfo)
@@ -1408,10 +1454,11 @@ static void checkWorkunitVersionConsistency(const IQueryDll *dll)
         throw MakeStringException(ROXIE_MISMATCH, "Workunit did not export createProcess function");
 }
 
-extern IQueryFactory *createServerQueryFactory(const char *id, const IQueryDll *dll, const IHpccPackage &package, const IPropertyTree *stateInfo, bool isDynamic, bool forceRetry)
+extern IQueryFactory *createServerQueryFactory(const char *id, const IQueryDll *dll, const IRoxiePackage &package, const IPropertyTree *stateInfo, bool isDynamic, bool forceRetry)
 {
     CriticalBlock b(CQueryFactory::queryCreateLock);
-    hash64_t hashValue = CQueryFactory::getQueryHash(id, dll, package, stateInfo);
+    IArrayOf<IResolvedFile> queryFiles; // Note - these should stay in scope long enough to ensure still cached when (if) query is loaded for real
+    hash64_t hashValue = CQueryFactory::getQueryHash(id, dll, package, stateInfo, queryFiles);
     IQueryFactory *cached = getQueryFactory(hashValue, 0);
     if (cached && !(cached->loadFailed() && (reloadRetriesFailed || forceRetry)))
     {
@@ -1674,10 +1721,11 @@ public:
     }
 };
 
-IQueryFactory *createSlaveQueryFactory(const char *id, const IQueryDll *dll, const IHpccPackage &package, unsigned channel, const IPropertyTree *stateInfo, bool isDynamic, bool forceRetry)
+IQueryFactory *createSlaveQueryFactory(const char *id, const IQueryDll *dll, const IRoxiePackage &package, unsigned channel, const IPropertyTree *stateInfo, bool isDynamic, bool forceRetry)
 {
     CriticalBlock b(CQueryFactory::queryCreateLock);
-    hash64_t hashValue = CQueryFactory::getQueryHash(id, dll, package, stateInfo);
+    IArrayOf<IResolvedFile> queryFiles; // Note - these should stay in scope long enough to ensure still cached when (if) query is loaded for real
+    hash64_t hashValue = CQueryFactory::getQueryHash(id, dll, package, stateInfo, queryFiles);
     IQueryFactory *cached = getQueryFactory(hashValue, channel);
     if (cached)
     {

+ 2 - 2
roxie/ccd/ccdquery.hpp

@@ -246,8 +246,8 @@ extern const IQueryDll *createExeQueryDll(const char *exeName);
 extern const IQueryDll *createWuQueryDll(IConstWorkUnit *wu);
 
 extern IRecordLayoutTranslator *createRecordLayoutTranslator(const char *logicalName, IDefRecordMeta const * diskMeta, IDefRecordMeta const * activityMeta);
-extern IQueryFactory *createServerQueryFactory(const char *id, const IQueryDll *dll, const IHpccPackage &package, const IPropertyTree *stateInfo, bool isDynamic, bool forceRetry);
-extern IQueryFactory *createSlaveQueryFactory(const char *id, const IQueryDll *dll, const IHpccPackage &package, unsigned _channelNo, const IPropertyTree *stateInfo, bool isDynamic, bool forceRetry);
+extern IQueryFactory *createServerQueryFactory(const char *id, const IQueryDll *dll, const IRoxiePackage &package, const IPropertyTree *stateInfo, bool isDynamic, bool forceRetry);
+extern IQueryFactory *createSlaveQueryFactory(const char *id, const IQueryDll *dll, const IRoxiePackage &package, unsigned _channelNo, const IPropertyTree *stateInfo, bool isDynamic, bool forceRetry);
 extern IQueryFactory *getQueryFactory(hash64_t hashvalue, unsigned channel);
 extern IQueryFactory *createServerQueryFactoryFromWu(IConstWorkUnit *wu);
 extern IQueryFactory *createSlaveQueryFactoryFromWu(IConstWorkUnit *wu, unsigned channelNo);

+ 83 - 14
roxie/ccd/ccdstate.cpp

@@ -74,16 +74,23 @@ SafePluginMap *plugins;
 
 const char *queryNodeFileName(const IPropertyTree &graphNode)
 {
-    const char *id = graphNode.queryProp("att[@name='_fileName']/@value");
-    return id;
+    if (graphNode.hasProp("att[@name='_fileName_dynamic']"))
+        return NULL;
+    else
+        return graphNode.queryProp("att[@name='_fileName']/@value");
 }
 
 const char *queryNodeIndexName(const IPropertyTree &graphNode)
 {
-    const char * id = graphNode.queryProp("att[@name='_indexFileName']/@value");
-    if (!id && !graphNode.hasProp("att[@name='_indexFileName_dynamic']"))   // can remove soon
-        id = graphNode.queryProp("att[@name='_fileName']/@value");
-    return id;
+    if (graphNode.hasProp("att[@name='_indexFileName_dynamic']"))
+        return NULL;
+    else
+    {
+        const char * id = graphNode.queryProp("att[@name='_indexFileName']/@value");
+        if (!id)
+            id = graphNode.queryProp("att[@name='_fileName']/@value");
+        return id;
+    }
 }
 
 class CSimpleSuperFileArray : public CInterface, implements ISimpleSuperFileEnquiry
@@ -707,7 +714,7 @@ protected:
             throw MakeStringException(ROXIE_INTERNAL_ERROR, "Invalid parameters to addAlias");
     }
 
-    virtual IQueryFactory *loadQueryFromDll(const char *id, const IQueryDll *dll, const IHpccPackage &package, const IPropertyTree *stateInfo, bool forceRetry) = 0;
+    virtual IQueryFactory *loadQueryFromDll(const char *id, const IQueryDll *dll, const IRoxiePackage &package, const IPropertyTree *stateInfo, bool forceRetry) = 0;
 
 public:
     IMPLEMENT_IINTERFACE;
@@ -754,8 +761,8 @@ public:
                     if(!package) package = packages.matchPackage(id);
                     if (!package) package = &queryRootRoxiePackage();
                 }
-                assertex(package);
-                addQuery(id, loadQueryFromDll(id, queryDll.getClear(), *package, &query, forceRetry), hash);
+                assertex(package && QUERYINTERFACE(package, const IRoxiePackage));
+                addQuery(id, loadQueryFromDll(id, queryDll.getClear(), *QUERYINTERFACE(package, const IRoxiePackage), &query, forceRetry), hash);
             }
             catch (IException *E)
             {
@@ -883,7 +890,7 @@ public:
     {
     }
 
-    virtual IQueryFactory * loadQueryFromDll(const char *id, const IQueryDll *dll, const IHpccPackage &package, const IPropertyTree *stateInfo, bool forceRetry)
+    virtual IQueryFactory * loadQueryFromDll(const char *id, const IQueryDll *dll, const IRoxiePackage &package, const IPropertyTree *stateInfo, bool forceRetry)
     {
         return createServerQueryFactory(id, dll, package, stateInfo, false, forceRetry);
     }
@@ -907,7 +914,7 @@ public:
         channelNo = _channelNo;
     }
 
-    virtual IQueryFactory *loadQueryFromDll(const char *id, const IQueryDll *dll, const IHpccPackage &package, const IPropertyTree *stateInfo, bool forceRetry)
+    virtual IQueryFactory *loadQueryFromDll(const char *id, const IQueryDll *dll, const IRoxiePackage &package, const IPropertyTree *stateInfo, bool forceRetry)
     {
         return createSlaveQueryFactory(id, dll, package, channelNo, stateInfo, false, forceRetry);
     }
@@ -1495,13 +1502,23 @@ class CRoxiePackageSetManager : public CInterface, implements IRoxieQueryPackage
 public:
     IMPLEMENT_IINTERFACE;
     CRoxiePackageSetManager(const IQueryDll *_standAloneDll) :
-        standAloneDll(_standAloneDll)
+        autoReloadThread(*this), standAloneDll(_standAloneDll)
     {
         daliHelper.setown(connectToDali(ROXIE_DALI_CONNECT_TIMEOUT));
+        atomic_set(&autoPending, 0);
+        autoReloadThread.start();
     }
 
     ~CRoxiePackageSetManager()
     {
+        autoReloadThread.stop();
+        autoReloadThread.join();
+    }
+
+    virtual void requestReload()
+    {
+        atomic_inc(&autoPending);
+        autoReloadTrigger.signal();
     }
 
     virtual void load()
@@ -1563,8 +1580,7 @@ public:
 
     virtual void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
     {
-        reload(false);
-        daliHelper->commitCache();
+        requestReload();
     }
 
 private:
@@ -1575,6 +1591,58 @@ private:
     InterruptableSemaphore controlSem;
     Owned<CRoxiePackageSetWatcher> allQueryPackages;
 
+    Semaphore autoReloadTrigger;
+    atomic_t autoPending;
+
+    class AutoReloadThread : public Thread
+    {
+        bool closing;
+        CRoxiePackageSetManager &owner;
+    public:
+        AutoReloadThread(CRoxiePackageSetManager &_owner)
+        : owner(_owner), Thread("AutoReloadThread")
+        {
+            closing = false;
+        }
+
+        virtual int run()
+        {
+            if (traceLevel)
+                DBGLOG("AutoReloadThread %p starting", this);
+            while (!closing)
+            {
+                owner.autoReloadTrigger.wait();
+                if (atomic_read(&owner.autoPending))
+                {
+                    atomic_set(&owner.autoPending, 0);
+                    try
+                    {
+                        owner.reload(false); // Arguably true should be better...
+                    }
+                    catch (IException *E)
+                    {
+                        if (!closing)
+                            EXCLOG(MCoperatorError, E, "AutoReloadThread: ");
+                        E->Release();
+                    }
+                    catch (...)
+                    {
+                        DBGLOG("Unknown exception in AutoReloadThread");
+                    }
+                }
+            }
+            if (traceLevel)
+                DBGLOG("AutoReloadThread %p exiting", this);
+            return 0;
+        }
+
+        void stop()
+        {
+            closing = true;
+            owner.autoReloadTrigger.signal();
+        }
+    } autoReloadThread;
+
     void reload(bool forceRetry)
     {
         clearDaliMisses();
@@ -1594,6 +1662,7 @@ private:
             oldPackages.setown(allQueryPackages.getLink());  // To ensure that the setown just below does not delete it
             allQueryPackages.setown(newPackages.getClear());
         }
+        daliHelper->commitCache();
     }
 
     // Common code used by control:queries and control:getQueryXrefInfo

+ 1 - 3
roxie/ccd/ccdstate.hpp

@@ -118,6 +118,7 @@ interface IRoxieQuerySetManagerSet : extends IInterface
 
 interface IRoxieQueryPackageManagerSet : extends IInterface
 {
+    virtual void requestReload() = 0;
     virtual void load() = 0;
     virtual void doControlMessage(IPropertyTree *xml, StringBuffer &reply, const IRoxieContextLogger &ctx) = 0;
     virtual IQueryFactory *getQuery(const char *id, StringBuffer *querySet, const IRoxieContextLogger &logctx) const = 0;
@@ -142,7 +143,4 @@ extern void mergeQueries(IPropertyTree *s1, IPropertyTree *s2);
 extern const char *queryNodeFileName(const IPropertyTree &graphNode);
 extern const char *queryNodeIndexName(const IPropertyTree &graphNode);
 
-extern IPropertyTreeIterator *getNodeSubFileNames(const IPropertyTree &graphNode);
-extern IPropertyTreeIterator *getNodeSubIndexNames(const IPropertyTree &graphNode);
-
 #endif