Quellcode durchsuchen

Merge pull request #14295 from richardkchapman/roxie-wu-stacktrace

HPCC-24939 Spurious tracing output by one-shot roxie workunits

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Merged-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday vor 4 Jahren
Ursprung
Commit
747a53d880
6 geänderte Dateien mit 68 neuen und 52 gelöschten Zeilen
  1. 14 12
      roxie/ccd/ccdfile.cpp
  2. 5 4
      roxie/ccd/ccdmain.cpp
  3. 41 32
      roxie/ccd/ccdqueue.cpp
  4. 6 2
      roxie/ccd/ccdstate.cpp
  5. 1 1
      roxie/udplib/udptrs.cpp
  6. 1 1
      system/jlib/jdebug.cpp

+ 14 - 12
roxie/ccd/ccdfile.cpp

@@ -783,6 +783,7 @@ class CRoxieFileCache : implements IRoxieFileCache, implements ICopyFileProgress
     bool closePending[2];
     StringAttrMapping fileErrorList;
 #ifdef _CONTAINERIZED
+    bool cidtActive = false;
     Semaphore cidtStarted;
 #endif
     Semaphore bctStarted;
@@ -1257,6 +1258,7 @@ public:
         {
             cidt.start();
             cidtStarted.wait();
+            cidtActive = true;
         }
 #endif
     }
@@ -1425,14 +1427,14 @@ public:
             toClose.interrupt();
             bct.join(timeout);
             hct.join(timeout);
+        }
 #ifdef _CONTAINERIZED
-            if (activeCacheReportingBuffer && cacheReportPeriodSeconds)
-            {
-                cidtSleep.interrupt();
-                cidt.join(timeout);
-            }
-#endif
+        if (cidtActive && activeCacheReportingBuffer && cacheReportPeriodSeconds)
+        {
+            cidtSleep.interrupt();
+            cidt.join(timeout);
         }
+#endif
     }
 
     virtual void wait()
@@ -1444,14 +1446,14 @@ public:
             toClose.signal();
             bct.join();
             hct.join();
+        }
 #ifdef _CONTAINERIZED
-            if (activeCacheReportingBuffer && cacheReportPeriodSeconds)
-            {
-                cidtSleep.signal();
-                cidt.join();
-            }
-#endif
+        if (cidtActive && activeCacheReportingBuffer && cacheReportPeriodSeconds)
+        {
+            cidtSleep.signal();
+            cidt.join();
         }
+#endif
     }
 
     virtual CFPmode onProgress(unsigned __int64 sizeDone, unsigned __int64 totalSize)

+ 5 - 4
roxie/ccd/ccdmain.cpp

@@ -917,7 +917,6 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml)
         flushJHtreeCacheOnOOM = topology->getPropBool("@flushJHtreeCacheOnOOM", true);
         fastLaneQueue = topology->getPropBool("@fastLaneQueue", true);
         udpOutQsPriority = topology->getPropInt("@udpOutQsPriority", 0);
-        udpSnifferEnabled = topology->getPropBool("@udpSnifferEnabled", true);
         udpRetryBusySenders = topology->getPropInt("@udpRetryBusySenders", 0);
 
         // Historically, this was specified in seconds. Assume any value <= 10 is a legacy value specified in seconds!
@@ -946,13 +945,13 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml)
         udpLocalWriteSocketSize = topology->getPropInt("@udpLocalWriteSocketSize", 1024000);
 #ifndef _CONTAINERIZED
         roxieMulticastEnabled = topology->getPropBool("@roxieMulticastEnabled", true) && !useAeron;   // enable use of multicast for sending requests to agents
-#endif
+        udpSnifferEnabled = topology->getPropBool("@udpSnifferEnabled", roxieMulticastEnabled);
         if (udpSnifferEnabled && !roxieMulticastEnabled)
         {
             DBGLOG("WARNING: ignoring udpSnifferEnabled setting as multicast not enabled");
             udpSnifferEnabled = false;
         }
-
+#endif
         int ttlTmp = topology->getPropInt("@multicastTTL", 1);
         if (ttlTmp < 0)
         {
@@ -1123,7 +1122,6 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml)
                 queryDirectory.append(codeDirectory).append("queries");
         }
         addNonEmptyPathSepChar(queryDirectory);
-        queryFileCache().start();
         getTempFilePath(tempDirectory, "roxie", topology);
 
 #ifdef _WIN32
@@ -1188,7 +1186,10 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml)
         enableForceRemoteReads(); // forces file reads to be remote reads if they match environment setting 'forceRemotePattern' pattern.
 
         if (!oneShotRoxie)
+        {
+            queryFileCache().start();
             loadPlugins();
+        }
         unsigned snifferChannel = numChannels+2; // MORE - why +2 not +1??
 #ifdef _CONTAINERIZED
         initializeTopology(topoValues, myRoles);

+ 41 - 32
roxie/ccd/ccdqueue.cpp

@@ -2615,50 +2615,59 @@ public:
         Owned<StringContextLogger> logctx = new StringContextLogger("PacketDiscarder");
         rowManager.setown(roxiemem::createRowManager(0, NULL, *logctx, NULL, false));
         mc.setown(ROQ->queryReceiveManager()->createMessageCollator(rowManager, RUID_DISCARD));
-        while (!aborted)
+        try
         {
-            bool anyActivity = false;
-            Owned<IMessageResult> mr = mc->getNextResult(5000, anyActivity);
-            if (mr)
+            while (!aborted)
             {
-                if (traceLevel > 4)
-                    DBGLOG("Discarding unwanted message");
-                unsigned headerLen;
-                const RoxiePacketHeader &header = *(const RoxiePacketHeader *) mr->getMessageHeader(headerLen);
-                if (headerLen)
+                bool anyActivity = false;
+                Owned<IMessageResult> mr = mc->getNextResult(5000, anyActivity);
+                if (mr)
                 {
-                    switch (header.activityId)
+                    if (traceLevel > 4)
+                        DBGLOG("Discarding unwanted message");
+                    unsigned headerLen;
+                    const RoxiePacketHeader &header = *(const RoxiePacketHeader *) mr->getMessageHeader(headerLen);
+                    if (headerLen)
                     {
-                        case ROXIE_FILECALLBACK:
+                        switch (header.activityId)
                         {
-                            Owned<IMessageUnpackCursor> callbackData = mr->getCursor(rowManager);
-                            OwnedConstRoxieRow len = callbackData->getNext(sizeof(RecordLengthType));
-                            if (len)
+                            case ROXIE_FILECALLBACK:
                             {
-                                RecordLengthType *rowlen = (RecordLengthType *) len.get();
-                                OwnedConstRoxieRow row = callbackData->getNext(*rowlen);
-                                const char *rowdata = (const char *) row.get();
-                                // bool isOpt = * (bool *) rowdata;
-                                // bool isLocal = * (bool *) (rowdata+1);
-                                ROQ->sendAbortCallback(header, rowdata+2, *logctx);
+                                Owned<IMessageUnpackCursor> callbackData = mr->getCursor(rowManager);
+                                OwnedConstRoxieRow len = callbackData->getNext(sizeof(RecordLengthType));
+                                if (len)
+                                {
+                                    RecordLengthType *rowlen = (RecordLengthType *) len.get();
+                                    OwnedConstRoxieRow row = callbackData->getNext(*rowlen);
+                                    const char *rowdata = (const char *) row.get();
+                                    // bool isOpt = * (bool *) rowdata;
+                                    // bool isLocal = * (bool *) (rowdata+1);
+                                    ROQ->sendAbortCallback(header, rowdata+2, *logctx);
+                                }
+                                else
+                                    DBGLOG("Unrecognized format in discarded file callback");
+                                break;
                             }
-                            else
-                                DBGLOG("Unrecognized format in discarded file callback");
-                            break;
+                            // MORE - ROXIE_ALIVE perhaps should go here too? debug callbacks? Actually any standard query results should too (though by the time I see them here it's too late (that may change once start streaming)
                         }
-                        // MORE - ROXIE_ALIVE perhaps should go here too? debug callbacks? Actually any standard query results should too (though by the time I see them here it's too late (that may change once start streaming)
                     }
+                    else
+                        DBGLOG("Unwanted message had no header?!");
+                }
+                else if (!anyActivity)
+                {
+                    // to avoid leaking partial unwanted packets, we clear out mc periodically...
+                    ROQ->queryReceiveManager()->detachCollator(mc);
+                    mc.setown(ROQ->queryReceiveManager()->createMessageCollator(rowManager, RUID_DISCARD));
                 }
-                else
-                    DBGLOG("Unwanted message had no header?!");
-            }
-            else if (!anyActivity)
-            {
-                // to avoid leaking partial unwanted packets, we clear out mc periodically...
-                ROQ->queryReceiveManager()->detachCollator(mc);
-                mc.setown(ROQ->queryReceiveManager()->createMessageCollator(rowManager, RUID_DISCARD));
             }
         }
+        catch (IException * E)
+        {
+            if (!aborted || QUERYINTERFACE(E, InterruptedSemaphoreException) == NULL)
+                EXCLOG(E);
+            ::Release(E);
+        }
         return 0;
     }
 

+ 6 - 2
roxie/ccd/ccdstate.cpp

@@ -1854,8 +1854,6 @@ private:
 
 class CRoxiePackageSetManager : implements IRoxieQueryPackageManagerSet, implements ISafeSDSSubscription, public CInterface
 {
-    Owned<IDaliPackageWatcher> pSetsNotifier;
-    Owned<IDaliPackageWatcher> pMapsNotifier;
 public:
     IMPLEMENT_IINTERFACE;
     CRoxiePackageSetManager(const IQueryDll *_standAloneDll) :
@@ -1876,6 +1874,10 @@ public:
     {
         autoReloadThread.stop();
         autoReloadThread.join();
+        if (pSetsNotifier)
+            daliHelper->releaseSubscription(pSetsNotifier);
+        if (pMapsNotifier)
+            daliHelper->releaseSubscription(pMapsNotifier);
     }
 
     virtual ISafeSDSSubscription *linkIfAlive() override { return isAliveAndLink() ? this : nullptr; }
@@ -1959,6 +1961,8 @@ private:
     Owned<const IQueryDll> standAloneDll;
     Owned<CRoxieDebugSessionManager> debugSessionManager;
     Owned<IRoxieDaliHelper> daliHelper;
+    Owned<IDaliPackageWatcher> pSetsNotifier;
+    Owned<IDaliPackageWatcher> pMapsNotifier;
     mutable ReadWriteLock packageCrit;
     InterruptableSemaphore controlSem;
     Owned<CRoxiePackageSetWatcher> allQueryPackages;

+ 1 - 1
roxie/udplib/udptrs.cpp

@@ -37,7 +37,7 @@ unsigned udpOutQsPriority = 0;
 unsigned udpMaxRetryTimedoutReqs = 0; // 0 means off (keep retrying forever)
 unsigned udpRequestToSendTimeout = 0; // value in milliseconds - 0 means calculate from query timeouts
 unsigned udpRequestToSendAckTimeout = 10; // value in milliseconds
-bool udpSnifferEnabled = true;
+bool udpSnifferEnabled = false;
 
 using roxiemem::DataBuffer;
 // MORE - why use DataBuffers on output side?? We could use zeroCopy techniques if we had a dedicated memory area.

+ 1 - 1
system/jlib/jdebug.cpp

@@ -1859,7 +1859,7 @@ void OsDiskInfo::initMajorMinor()
     // and match those with entries in /proc/diskstats
     StringBuffer cmd("lsblk -o TYPE,MAJ:MIN --pairs");
     Owned<IPipeProcess> pipe = createPipeProcess();
-    if (pipe->run("list disks", cmd, nullptr, false, true, true, 8192))
+    if (pipe->run(nullptr, cmd, nullptr, false, true, true, 8192))
     {
         StringBuffer output;
         Owned<ISimpleReadStream> pipeReader = pipe->getOutputStream();