Przeglądaj źródła

HPCC-11253 Do not lock superfiles in roxie

More issues spotted in code review and testing.

Make sure that dynamic lookups DO lock the superfile (to ensure consistency
across the cluster).

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 11 lat temu
rodzic
commit
97be7f74ba
5 zmienionych plików z 36 dodań i 16 usunięć
  1. 1 1
      roxie/ccd/ccd.hpp
  2. 10 7
      roxie/ccd/ccdfile.cpp
  3. 1 1
      roxie/ccd/ccdfile.hpp
  4. 4 3
      roxie/ccd/ccdmain.cpp
  5. 20 4
      roxie/ccd/ccdstate.cpp

+ 1 - 1
roxie/ccd/ccd.hpp

@@ -429,7 +429,7 @@ extern unsigned defaultKeyedJoinPreload;
 extern unsigned defaultPrefetchProjectPreload;
 extern bool defaultCheckingHeap;
 
-extern unsigned delayedSlaveQueryRelease;
+extern unsigned slaveQueryReleaseDelaySeconds;
 
 extern StringBuffer logDirectory;
 extern StringBuffer pluginDirectory;

+ 10 - 7
roxie/ccd/ccdfile.cpp

@@ -1702,7 +1702,7 @@ protected:
 
 public:
     IMPLEMENT_IINTERFACE;
-    CResolvedFile(const char *_lfn, const char *_physicalName, IDistributedFile *_dFile, RoxieFileType _fileType, IRoxieDaliHelper* daliHelper, bool cacheIt, bool writeAccess, bool _isSuperFile)
+    CResolvedFile(const char *_lfn, const char *_physicalName, IDistributedFile *_dFile, RoxieFileType _fileType, IRoxieDaliHelper* daliHelper, bool isDynamic, bool cacheIt, bool writeAccess, bool _isSuperFile)
     : lfn(_lfn), physicalName(_physicalName), dFile(_dFile), fileType(_fileType), isSuper(_isSuperFile)
     {
         cached = NULL;
@@ -1730,10 +1730,13 @@ public:
                     subDFiles.append(OLINK(sub));
                     addFile(sub.queryLogicalName(), fDesc.getClear(), remoteFDesc.getClear());
                 }
-                notifier.setown(daliHelper->getSuperFileSubscription(lfn, this));
                 // We have to clone the properties since we don't want to keep the superfile locked
                 properties.setown(createPTreeFromIPT(&dFile->queryAttributes()));
-                dFile.clear();  // We don't lock superfiles
+                if (!isDynamic)
+                {
+                    notifier.setown(daliHelper->getSuperFileSubscription(lfn, this));
+                    dFile.clear();  // We don't lock superfiles, except dynamic ones
+                }
             }
             else // normal file, not superkey
             {
@@ -2217,7 +2220,7 @@ public:
 
 public:
     CSlaveDynamicFile(const IRoxieContextLogger &logctx, const char *_lfn, RoxiePacketHeader *header, bool _isOpt, bool _isLocal)
-        : CResolvedFile(_lfn, NULL, NULL, ROXIE_FILE, NULL, false, false, false), channel(header->channel), serverIdx(header->serverIdx), isOpt(_isOpt), isLocal(_isLocal)
+        : CResolvedFile(_lfn, NULL, NULL, ROXIE_FILE, NULL, true, false, false, false), channel(header->channel), serverIdx(header->serverIdx), isOpt(_isOpt), isLocal(_isLocal)
     {
         // call back to the server to get the info
         IPendingCallback *callback = ROQ->notePendingCallback(*header, lfn); // note that we register before the send to avoid a race.
@@ -2317,13 +2320,13 @@ private:
 
 extern IResolvedFileCreator *createResolvedFile(const char *lfn, const char *physical, bool isSuperFile)
 {
-    return new CResolvedFile(lfn, physical, NULL, ROXIE_FILE, NULL, false, false, isSuperFile);
+    return new CResolvedFile(lfn, physical, NULL, ROXIE_FILE, NULL, true, false, false, isSuperFile);
 }
 
-extern IResolvedFile *createResolvedFile(const char *lfn, const char *physical, IDistributedFile *dFile, IRoxieDaliHelper *daliHelper, bool cacheIt, bool writeAccess)
+extern IResolvedFile *createResolvedFile(const char *lfn, const char *physical, IDistributedFile *dFile, IRoxieDaliHelper *daliHelper, bool isDynamic, bool cacheIt, bool writeAccess)
 {
     const char *kind = dFile ? dFile->queryAttributes().queryProp("@kind") : NULL;
-    return new CResolvedFile(lfn, physical, dFile, kind && stricmp(kind, "key")==0 ? ROXIE_KEY : ROXIE_FILE, daliHelper, cacheIt, writeAccess, false);
+    return new CResolvedFile(lfn, physical, dFile, kind && stricmp(kind, "key")==0 ? ROXIE_KEY : ROXIE_FILE, daliHelper, isDynamic, cacheIt, writeAccess, false);
 }
 
 class CSlaveDynamicFileCache : public CInterface, implements ISlaveDynamicFileCache

+ 1 - 1
roxie/ccd/ccdfile.hpp

@@ -120,7 +120,7 @@ interface IResolvedFileCreator : extends IResolvedFile
 };
 
 extern IResolvedFileCreator *createResolvedFile(const char *lfn, const char *physical, bool isSuperFile);
-extern IResolvedFile *createResolvedFile(const char *lfn, const char *physical, IDistributedFile *dFile, IRoxieDaliHelper *daliHelper, bool cacheIt, bool writeAccess);
+extern IResolvedFile *createResolvedFile(const char *lfn, const char *physical, IDistributedFile *dFile, IRoxieDaliHelper *daliHelper, bool isDynamic, bool cacheIt, bool writeAccess);
 
 interface IRoxiePublishCallback
 {

+ 4 - 3
roxie/ccd/ccdmain.cpp

@@ -129,7 +129,7 @@ unsigned defaultKeyedJoinPreload = 0;
 unsigned dafilesrvLookupTimeout = 10000;
 bool defaultCheckingHeap = false;
 
-unsigned delayedSlaveQueryRelease = 60;
+unsigned slaveQueryReleaseDelaySeconds = 60;
 
 unsigned logQueueLen;
 unsigned logQueueDrop;
@@ -720,8 +720,9 @@ int STARTQUERY_API start_query(int argc, const char *argv[])
         defaultFullKeyedJoinPreload = topology->getPropInt("@defaultFullKeyedJoinPreload", 0);
         defaultKeyedJoinPreload = topology->getPropInt("@defaultKeyedJoinPreload", 0);
         defaultPrefetchProjectPreload = topology->getPropInt("@defaultPrefetchProjectPreload", 10);
-        defaultCheckingHeap = topology->getPropInt("@checkingHeap", false);  // NOTE - not in configmgr - too dangerous!
-        delayedSlaveQueryRelease = topology->getPropInt("@delayedSlaveQueryRelease", false);
+        defaultCheckingHeap = topology->getPropBool("@checkingHeap", false);  // NOTE - not in configmgr - too dangerous!
+
+        slaveQueryReleaseDelaySeconds = topology->getPropInt("@slaveQueryReleaseDelaySeconds", 60);
 
         diskReadBufferSize = topology->getPropInt("@diskReadBufferSize", 0x10000);
         fieldTranslationEnabled = topology->getPropBool("@fieldTranslationEnabled", false);

+ 20 - 4
roxie/ccd/ccdstate.cpp

@@ -118,12 +118,19 @@ class DelayedReleaserThread : public Thread
 {
 private:
     bool closing;
+    bool started;
     CriticalSection lock;
     IArrayOf<DelayedReleaseQueueItem> queue;
 public:
     DelayedReleaserThread() : Thread("DelayedReleaserThread")
     {
         closing = false;
+        started = false;
+    }
+
+    ~DelayedReleaserThread()
+    {
+        stop();
     }
 
     virtual int run()
@@ -148,12 +155,21 @@ public:
 
     void stop()
     {
-        closing = true;
+        if (started)
+        {
+            closing = true;
+            join();
+        }
     }
 
     void delayedRelease(IInterface *_goer, unsigned delaySeconds)
     {
         CriticalBlock b(lock);
+        if (!started)
+        {
+            start();
+            started = true;
+        }
         queue.append(*new DelayedReleaseQueueItem(_goer, delaySeconds));
     }
 } delayedReleaser;
@@ -395,7 +411,7 @@ protected:
                 {
                     Owned<IDistributedFile> dFile = daliHelper->resolveLFN(fileName, cacheResult, writeAccess);
                     if (dFile)
-                        result = createResolvedFile(fileName, NULL, dFile.getClear(), daliHelper, cacheResult, writeAccess);
+                        result = createResolvedFile(fileName, NULL, dFile.getClear(), daliHelper, !useCache, cacheResult, writeAccess);
                 }
                 else if (!writeAccess)  // If we need write access and expect a dali, but don't have one, we should probably fail
                 {
@@ -1220,8 +1236,8 @@ protected:
             serverManager.setown(newServerManager);
             queryHash = newHash;
         }
-        if (delayedSlaveQueryRelease)
-            delayedReleaser.delayedRelease(oldSlaveManagers, delayedSlaveQueryRelease);
+        if (slaveQueryReleaseDelaySeconds)
+            delayedReleaser.delayedRelease(oldSlaveManagers.getClear(), slaveQueryReleaseDelaySeconds);
     }
 
     mutable CriticalSection updateCrit;  // protects updates of slaveManagers and serverManager