Browse Source

Merge remote-tracking branch 'origin/candidate-5.0.0' into closedown-5.0.x

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 11 years ago
parent
commit
21ec8124c8
3 changed files with 37 additions and 10 deletions
  1. 1 1
      roxie/ccd/ccdfile.cpp
  2. 35 8
      roxie/ccd/ccdstate.cpp
  3. 1 1
      roxie/ccd/ccdstate.hpp

+ 1 - 1
roxie/ccd/ccdfile.cpp

@@ -1687,7 +1687,7 @@ protected:
                 cached = NULL;
             }
         }
-        globalPackageSetManager->requestReload();
+        globalPackageSetManager->requestReload(false, false);
     }
 
     // We cache all the file maps/arrays etc here. 

+ 35 - 8
roxie/ccd/ccdstate.cpp

@@ -1664,6 +1664,7 @@ private:
                             {
                                 if (traceLevel)
                                     DBGLOG("Package map %s, active %s already loaded", packageMapId, isActive ? "true" : "false");
+                                stateHash = rtlHash64Data(sizeof(stateHash), &stateHash, oldPackageManager->getHash());
                                 allQueryPackages.append(*oldPackageManager.getClear());
                             }
                             else
@@ -1715,7 +1716,8 @@ public:
         else
             daliHelper.setown(connectToDali(ROXIE_DALI_CONNECT_TIMEOUT));
         atomic_set(&autoPending, 0);
-        autoReloadThread.start();
+        atomic_set(&autoSignalsPending, 0);
+        forcePending = false;
         pSetsNotifier.setown(daliHelper->getPackageSetsSubscription(this));
         pMapsNotifier.setown(daliHelper->getPackageMapsSubscription(this));
     }
@@ -1726,10 +1728,16 @@ public:
         autoReloadThread.join();
     }
 
-    virtual void requestReload()
+    void requestReload(bool signal, bool force)
     {
+        if (force)
+            forcePending = true;    
+        if (signal)
+            atomic_inc(&autoSignalsPending);
         atomic_inc(&autoPending);
         autoReloadTrigger.signal();
+        if (signal)
+            autoReloadComplete.wait();
     }
 
     virtual void load()
@@ -1739,6 +1747,7 @@ public:
             reload(false);
             daliHelper->commitCache();
             controlSem.signal();
+            autoReloadThread.start();   // Don't want to overlap auto-reloads with the initial load
         }
         catch(IException *E)
         {
@@ -1791,7 +1800,7 @@ public:
 
     virtual void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
     {
-        requestReload();
+        requestReload(false, false);
     }
 
 private:
@@ -1803,7 +1812,10 @@ private:
     Owned<CRoxiePackageSetWatcher> allQueryPackages;
 
     Semaphore autoReloadTrigger;
+    Semaphore autoReloadComplete;
+    atomic_t autoSignalsPending;
     atomic_t autoPending;
+    bool forcePending;
 
     class AutoReloadThread : public Thread
     {
@@ -1825,13 +1837,15 @@ private:
                 owner.autoReloadTrigger.wait();
                 if (closing)
                     break;
-                Sleep(500); // Typically notifications come in clumps - this avoids reloading too often
+                unsigned signalsPending = atomic_read(&owner.autoSignalsPending);
+                if (!signalsPending)
+                    Sleep(500); // Typically notifications come in clumps - this avoids reloading too often
                 if (atomic_read(&owner.autoPending))
                 {
                     atomic_set(&owner.autoPending, 0);
                     try
                     {
-                        owner.reload(false); // Arguably true should be better...
+                        owner.reload(owner.forcePending);
                     }
                     catch (IException *E)
                     {
@@ -1843,6 +1857,12 @@ private:
                     {
                         DBGLOG("Unknown exception in AutoReloadThread");
                     }
+                    owner.forcePending = false;
+                }
+                if (signalsPending)
+                {
+                    atomic_dec(&owner.autoSignalsPending);
+                    owner.autoReloadComplete.signal();
                 }
             }
             if (traceLevel)
@@ -1866,14 +1886,21 @@ private:
         if (standAloneDll)
             newPackages.setown(new CRoxiePackageSetWatcher(daliHelper, standAloneDll, numChannels, "roxie", forceRetry));
         else
-            newPackages.setown(new CRoxiePackageSetWatcher(daliHelper, numChannels, allQueryPackages, forceRetry));
+        {
+            Owned<CRoxiePackageSetWatcher> currentPackages;
+            {
+                ReadLockBlock b(packageCrit);
+                currentPackages.setown(allQueryPackages.getLink());
+            }
+            newPackages.setown(new CRoxiePackageSetWatcher(daliHelper, numChannels, currentPackages, forceRetry));
+        }
         // Hold the lock for as little time as we can
         // Note that we must NOT hold the lock during the delete of the old object - or we deadlock.
         // Hence the slightly convoluted code below
         Owned<CRoxiePackageSetWatcher> oldPackages;  // NB Destroyed outside the WriteLockBlock
         {
             WriteLockBlock b(packageCrit);
-            oldPackages.setown(allQueryPackages.getLink());  // To ensure that the setown just below does not delete it
+            oldPackages.setown(allQueryPackages.getLink());  // Ensure we don't delete the old packages until after we have loaded the new
             allQueryPackages.setown(newPackages.getClear());
         }
         daliHelper->commitCache();
@@ -2399,7 +2426,7 @@ private:
         case 'R':
             if (stricmp(queryName, "control:reload")==0)
             {
-                reload(control->getPropBool("@forceRetry", false));
+                requestReload(true, control->getPropBool("@forceRetry", false));
                 if (daliHelper && daliHelper->connected())
                     reply.appendf("<Dali connected='1'/>");
                 else

+ 1 - 1
roxie/ccd/ccdstate.hpp

@@ -121,7 +121,7 @@ interface IRoxieDebugSessionManager : extends IInterface
 
 interface IRoxieQueryPackageManagerSet : extends IInterface
 {
-    virtual void requestReload() = 0;
+    virtual void requestReload(bool wait, bool force) = 0;
     virtual void load() = 0;
     virtual void doControlMessage(IPropertyTree *xml, StringBuffer &reply, const IRoxieContextLogger &ctx) = 0;
     virtual IQueryFactory *getQuery(const char *id, StringBuffer *querySet, IArrayOf<IQueryFactory> *slaves, const IRoxieContextLogger &logctx) const = 0;