Bladeren bron

Merge pull request #6004 from richardkchapman/roxie-reload-core

HPCC-11607 Roxie reload core

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 11 jaren geleden
bovenliggende
commit
b08ba9c2b8
1 gewijzigde bestanden met toevoegingen van 28 en 7 verwijderingen
  1. 28 7
      roxie/ccd/ccdstate.cpp

+ 28 - 7
roxie/ccd/ccdstate.cpp

@@ -1676,7 +1676,7 @@ public:
         else
             daliHelper.setown(connectToDali(ROXIE_DALI_CONNECT_TIMEOUT));
         atomic_set(&autoPending, 0);
-        autoReloadThread.start();
+        atomic_set(&autoSignalsPending, 0);
         pSetsNotifier.setown(daliHelper->getPackageSetsSubscription(this));
         pMapsNotifier.setown(daliHelper->getPackageMapsSubscription(this));
     }
@@ -1687,10 +1687,14 @@ public:
         autoReloadThread.join();
     }
 
-    virtual void requestReload()
+    void requestReload(bool signal)
     {
+        if (signal)
+            atomic_inc(&autoSignalsPending);
         atomic_inc(&autoPending);
         autoReloadTrigger.signal();
+        if (signal)
+            autoReloadComplete.wait();
     }
 
     virtual void load()
@@ -1700,6 +1704,7 @@ public:
             reload();
             daliHelper->commitCache();
             controlSem.signal();
+            autoReloadThread.start();   // Don't want to overlap auto-reloads with the initial load
         }
         catch(IException *E)
         {
@@ -1752,7 +1757,7 @@ public:
 
     virtual void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
     {
-        requestReload();
+        requestReload(false);
     }
 
 private:
@@ -1764,6 +1769,8 @@ private:
     Owned<CRoxiePackageSetWatcher> allQueryPackages;
 
     Semaphore autoReloadTrigger;
+    Semaphore autoReloadComplete;
+    atomic_t autoSignalsPending;
     atomic_t autoPending;
 
     class AutoReloadThread : public Thread
@@ -1786,7 +1793,9 @@ private:
                 owner.autoReloadTrigger.wait();
                 if (closing)
                     break;
-                Sleep(500); // Typically notifications come in clumps - this avoids reloading too often
+                unsigned signalsPending = atomic_read(&owner.autoSignalsPending);
+                if (!signalsPending)
+                    Sleep(500); // Typically notifications come in clumps - this avoids reloading too often
                 if (atomic_read(&owner.autoPending))
                 {
                     atomic_set(&owner.autoPending, 0);
@@ -1805,6 +1814,11 @@ private:
                         DBGLOG("Unknown exception in AutoReloadThread");
                     }
                 }
+                if (signalsPending)
+                {
+                    atomic_dec(&owner.autoSignalsPending);
+                    owner.autoReloadComplete.signal();
+                }
             }
             if (traceLevel)
                 DBGLOG("AutoReloadThread %p exiting", this);
@@ -1827,14 +1841,21 @@ private:
         if (standAloneDll)
             newPackages.setown(new CRoxiePackageSetWatcher(daliHelper, standAloneDll, numChannels, "roxie"));
         else
-            newPackages.setown(new CRoxiePackageSetWatcher(daliHelper, numChannels, allQueryPackages));
+        {
+            Owned<CRoxiePackageSetWatcher> currentPackages;
+            {
+                ReadLockBlock b(packageCrit);
+                currentPackages.setown(allQueryPackages.getLink());
+            }
+            newPackages.setown(new CRoxiePackageSetWatcher(daliHelper, numChannels, currentPackages));
+        }
         // Hold the lock for as little time as we can
         // Note that we must NOT hold the lock during the delete of the old object - or we deadlock.
         // Hence the slightly convoluted code below
         Owned<CRoxiePackageSetWatcher> oldPackages;  // NB Destroyed outside the WriteLockBlock
         {
             WriteLockBlock b(packageCrit);
-            oldPackages.setown(allQueryPackages.getLink());  // To ensure that the setown just below does not delete it
+            oldPackages.setown(allQueryPackages.getLink());  // Ensure we don't delete the old packages until after we have loaded the new
             allQueryPackages.setown(newPackages.getClear());
         }
         daliHelper->commitCache();
@@ -2360,7 +2381,7 @@ private:
         case 'R':
             if (stricmp(queryName, "control:reload")==0)
             {
-                reload();
+                requestReload(true);
                 if (daliHelper && daliHelper->connected())
                     reply.appendf("<Dali connected='1'/>");
                 else