Browse Source

Merge pull request #13247 from richardkchapman/hpcc22833

HPCC-22833 Fix race condition when deploying a new query

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 5 years ago
parent
commit
10981334c7
4 changed files with 28 additions and 17 deletions
  1. 11 10
      roxie/ccd/ccddali.cpp
  2. 9 4
      roxie/ccd/ccddali.hpp
  3. 2 1
      roxie/ccd/ccdfile.cpp
  4. 6 2
      roxie/ccd/ccdstate.cpp

+ 11 - 10
roxie/ccd/ccddali.cpp

@@ -35,17 +35,17 @@
 
 const char *roxieStateName = "RoxieLocalState.xml";
 
-class CDaliPackageWatcher : public CInterface, implements ISDSSubscription, implements ISDSNodeSubscription, implements IDaliPackageWatcher
+class CDaliPackageWatcher : public CInterface, implements ISafeSDSSubscription, implements ISDSNodeSubscription, implements IDaliPackageWatcher
 {
     SubscriptionId change;
-    ISDSSubscription *notifier;
+    ISafeSDSSubscription *notifier;
     StringAttr id;
     StringAttr xpath;
     mutable CriticalSection crit;
     bool isExact;
 public:
     IMPLEMENT_IINTERFACE;
-    CDaliPackageWatcher(const char *_id, const char *_xpath, ISDSSubscription *_notifier)
+    CDaliPackageWatcher(const char *_id, const char *_xpath, ISafeSDSSubscription *_notifier)
       : change(0), id(_id), xpath(_xpath), isExact(false)
     {
         notifier = _notifier;
@@ -55,6 +55,7 @@ public:
         if (change)
             unsubscribe();
     }
+    virtual ISafeSDSSubscription *linkIfAlive() override { return isAliveAndLink() ? this : nullptr; }
     virtual void subscribe(bool exact)
     {
         CriticalBlock b(crit);
@@ -125,12 +126,12 @@ public:
     virtual void notify(SubscriptionId subid, const char *daliXpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
     {
         Linked<CDaliPackageWatcher> me = this;  // Ensure that I am not released by the notify call (which would then access freed memory to release the critsec)
-        Linked<ISDSSubscription> myNotifier;
+        Linked<ISafeSDSSubscription> myNotifier;
         {
             CriticalBlock b(crit);
             if (traceLevel > 5)
                 DBGLOG("Notification on %s (%s), %p", xpath.get(), daliXpath ? daliXpath : "", this);
-            myNotifier.set(notifier);
+            myNotifier.setown(notifier ? notifier->linkIfAlive() : nullptr);
             // allow crit to be released, allowing this to be unsubscribed, to avoid deadlocking when other threads via notify call unsubscribe
         }
         if (myNotifier)
@@ -696,7 +697,7 @@ public:
         subscription->unsubscribe();
     }
 
-    IDaliPackageWatcher *getSubscription(const char *id, const char *xpath, ISDSSubscription *notifier, bool exact)
+    IDaliPackageWatcher *getSubscription(const char *id, const char *xpath, ISafeSDSSubscription *notifier, bool exact)
     {
         IDaliPackageWatcher *watcher = new CDaliPackageWatcher(id, xpath, notifier);
         watchers.append(*LINK(watcher));
@@ -705,25 +706,25 @@ public:
         return watcher;
     }
 
-    virtual IDaliPackageWatcher *getQuerySetSubscription(const char *id, ISDSSubscription *notifier)
+    virtual IDaliPackageWatcher *getQuerySetSubscription(const char *id, ISafeSDSSubscription *notifier)
     {
         StringBuffer xpath;
         return getSubscription(id, getQuerySetPath(xpath, id), notifier, false);
     }
 
-    virtual IDaliPackageWatcher *getPackageSetsSubscription(ISDSSubscription *notifier)
+    virtual IDaliPackageWatcher *getPackageSetsSubscription(ISafeSDSSubscription *notifier)
     {
         StringBuffer xpath;
         return getSubscription("PackageSets", "PackageSets", notifier, false);
     }
 
-    virtual IDaliPackageWatcher *getPackageMapsSubscription(ISDSSubscription *notifier)
+    virtual IDaliPackageWatcher *getPackageMapsSubscription(ISafeSDSSubscription *notifier)
     {
         StringBuffer xpath;
         return getSubscription("PackageMaps", "PackageMaps", notifier, false);
     }
 
-    virtual IDaliPackageWatcher *getSuperFileSubscription(const char *lfn, ISDSSubscription *notifier)
+    virtual IDaliPackageWatcher *getSuperFileSubscription(const char *lfn, ISafeSDSSubscription *notifier)
     {
         StringBuffer xpathBuf;
         const char *xpath = getSuperFilePath(xpathBuf, lfn);

+ 9 - 4
roxie/ccd/ccddali.hpp

@@ -27,6 +27,11 @@
 
 extern void addWuException(IConstWorkUnit *workUnit, IException *E);
 
+interface ISafeSDSSubscription : extends ISDSSubscription
+{
+    virtual ISafeSDSSubscription *linkIfAlive() = 0;
+};
+
 interface IDaliPackageWatcher : extends IInterface
 {
     virtual void subscribe(bool exact) = 0;
@@ -44,12 +49,12 @@ interface IRoxieDaliHelper : extends IInterface
     virtual IFileDescriptor *resolveCachedLFN(const char *filename) = 0;
     virtual IConstWorkUnit *attachWorkunit(const char *wuid, ILoadedDllEntry *source) = 0;
     virtual IPropertyTree *getQuerySet(const char *id) = 0;
-    virtual IDaliPackageWatcher *getQuerySetSubscription(const char *id, ISDSSubscription *notifier) = 0;
+    virtual IDaliPackageWatcher *getQuerySetSubscription(const char *id, ISafeSDSSubscription *notifier) = 0;
     virtual IPropertyTree *getPackageSets() = 0;
-    virtual IDaliPackageWatcher *getPackageSetsSubscription(ISDSSubscription *notifier) = 0;
-    virtual IDaliPackageWatcher *getPackageMapsSubscription(ISDSSubscription *notifier) = 0;
+    virtual IDaliPackageWatcher *getPackageSetsSubscription(ISafeSDSSubscription *notifier) = 0;
+    virtual IDaliPackageWatcher *getPackageMapsSubscription(ISafeSDSSubscription *notifier) = 0;
     virtual IPropertyTree *getPackageMap(const char *id) = 0;
-    virtual IDaliPackageWatcher *getSuperFileSubscription(const char *lfn, ISDSSubscription *notifier) = 0;
+    virtual IDaliPackageWatcher *getSuperFileSubscription(const char *lfn, ISafeSDSSubscription *notifier) = 0;
     virtual void releaseSubscription(IDaliPackageWatcher *subscription) = 0;
     virtual bool connect(unsigned timeout) = 0;
     virtual void disconnect() = 0;

+ 2 - 1
roxie/ccd/ccdfile.cpp

@@ -1808,7 +1808,7 @@ template <class X> class PerFormatCacheOf : public PerChannelCacheOf<X>
 
 CRoxieFileCache * fileCache;
 
-class CResolvedFile : implements IResolvedFileCreator, implements ISDSSubscription, public CInterface
+class CResolvedFile : implements IResolvedFileCreator, implements ISafeSDSSubscription, public CInterface
 {
 protected:
     IResolvedFileCache *cached;
@@ -1833,6 +1833,7 @@ protected:
     Linked<IRoxieDaliHelper> daliHelper;
     Owned<IDaliPackageWatcher> notifier;
 
+    virtual ISafeSDSSubscription *linkIfAlive() override { return isAliveAndLink() ? this : nullptr; }
     void addFile(const char *subName, IFileDescriptor *fdesc, IFileDescriptor *remoteFDesc)
     {
         subNames.append(subName);

+ 6 - 2
roxie/ccd/ccdstate.cpp

@@ -1465,7 +1465,7 @@ protected:
  *    The query caching code should ensure that it is quick enough to do so
  *
  **/
-class CRoxieDaliQueryPackageManager : public CRoxieQueryPackageManager, implements ISDSSubscription
+class CRoxieDaliQueryPackageManager : public CRoxieQueryPackageManager, implements ISafeSDSSubscription
 {
     Owned<IRoxieDaliHelper> daliHelper;
     Owned<IDaliPackageWatcher> notifier;
@@ -1484,6 +1484,8 @@ public:
             daliHelper->releaseSubscription(notifier);
     }
 
+    virtual ISafeSDSSubscription *linkIfAlive() override { return isAliveAndLink() ? this : nullptr; }
+
     virtual void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
     {
         reload(false);
@@ -1802,7 +1804,7 @@ private:
 
 };
 
-class CRoxiePackageSetManager : implements IRoxieQueryPackageManagerSet, implements ISDSSubscription, public CInterface
+class CRoxiePackageSetManager : implements IRoxieQueryPackageManagerSet, implements ISafeSDSSubscription, public CInterface
 {
     Owned<IDaliPackageWatcher> pSetsNotifier;
     Owned<IDaliPackageWatcher> pMapsNotifier;
@@ -1828,6 +1830,8 @@ public:
         autoReloadThread.join();
     }
 
+    virtual ISafeSDSSubscription *linkIfAlive() override { return isAliveAndLink() ? this : nullptr; }
+
     void requestReload(bool signal, bool force)
     {
         if (force)