Переглянути джерело

HPCC-26384 Fix some valgrind errors and minor leaks

Make sure Roxie closes cleanly in response to kill, so that memory leaks can
be detected.

Fix minor memory leaks detected using valgrind.

Fix a couple of cases of uninitialized data (structure packing) being reported
by valgrind.

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 3 роки тому
батько
коміт
fa8ecc94ff

+ 24 - 2
helm/hpcc/templates/roxie.yaml

@@ -230,8 +230,19 @@ spec:
       containers:
       containers:
       - name: {{ $servername | quote }}
       - name: {{ $servername | quote }}
         workingDir: /var/lib/HPCCSystems
         workingDir: /var/lib/HPCCSystems
+{{- if $roxie.valgrind }}
+        command: [ valgrind ] 
+        args: [
+                "--leak-check=full",
+                "--show-leak-kinds=all",
+                "--track-origins=yes",
+                "--num-callers=8",
+                "--log-fd=1",
+                "roxie",
+{{- else }}
         command: [ roxie ] 
         command: [ roxie ] 
         args: [
         args: [
+{{- end }}
                 {{ include "hpcc.configArg" . }},
                 {{ include "hpcc.configArg" . }},
                 {{ include "hpcc.daliArg" $ }},
                 {{ include "hpcc.daliArg" $ }},
                 "--server=true" 
                 "--server=true" 
@@ -321,8 +332,19 @@ spec:
       containers:
       containers:
       - name: {{ $name | quote}}
       - name: {{ $name | quote}}
         workingDir: /var/lib/HPCCSystems
         workingDir: /var/lib/HPCCSystems
-        command: [ roxie ]
-        args: [ 
+{{- if $roxie.valgrind }}
+        command: [ valgrind ] 
+        args: [
+                "--leak-check=full",
+                "--show-leak-kinds=all",
+                "--track-origins=yes",
+                "--num-callers=8",
+                "--log-fd=1",
+                "roxie",
+{{- else }}
+        command: [ roxie ] 
+        args: [
+{{- end }}
                 {{ include "hpcc.configArg" $roxie }},
                 {{ include "hpcc.configArg" $roxie }},
                 {{ include "hpcc.daliArg" $ }},
                 {{ include "hpcc.daliArg" $ }},
                 "--channels={{ $channel }}", 
                 "--channels={{ $channel }}", 

+ 3 - 0
roxie/ccd/ccd.hpp

@@ -169,7 +169,10 @@ public:
 #endif
 #endif
 #ifdef TIME_PACKETS
 #ifdef TIME_PACKETS
     unsigned tick = 0;
     unsigned tick = 0;
+#else
+    unsigned filler = 0; // keeps valgrind happy
 #endif
 #endif
+
     RoxiePacketHeader() = default;
     RoxiePacketHeader() = default;
 
 
     RoxiePacketHeader(const RemoteActivityId &_remoteId, ruid_t _uid, unsigned _channel, unsigned _overflowSequence);
     RoxiePacketHeader(const RemoteActivityId &_remoteId, ruid_t _uid, unsigned _channel, unsigned _overflowSequence);

+ 2 - 2
roxie/ccd/ccdmain.cpp

@@ -1397,7 +1397,7 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml)
                         }
                         }
                         const char *soname =  roxieFarm.queryProp("@so");
                         const char *soname =  roxieFarm.queryProp("@so");
                         const char *config  = roxieFarm.queryProp("@config");
                         const char *config  = roxieFarm.queryProp("@config");
-                        IHpccProtocolPlugin *protocolPlugin = ensureProtocolPlugin(*protocolCtx, soname);
+                        Owned<IHpccProtocolPlugin> protocolPlugin = ensureProtocolPlugin(*protocolCtx, soname);
                         roxieServer.setown(protocolPlugin->createListener(protocol ? protocol : "native", createRoxieProtocolMsgSink(ip, port, numThreads, suspended), port, listenQueue, config, certFileName.str(), keyFileName.str(), passPhraseStr.str()));
                         roxieServer.setown(protocolPlugin->createListener(protocol ? protocol : "native", createRoxieProtocolMsgSink(ip, port, numThreads, suspended), port, listenQueue, config, certFileName.str(), keyFileName.str(), passPhraseStr.str()));
                     }
                     }
                     else
                     else
@@ -1465,6 +1465,7 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml)
         }
         }
         packetDiscarder->stop();
         packetDiscarder->stop();
         packetDiscarder.clear();
         packetDiscarder.clear();
+        stopTopoThread();
         ROQ->stop();
         ROQ->stop();
         ROQ->join();
         ROQ->join();
         ROQ->Release();
         ROQ->Release();
@@ -1497,7 +1498,6 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml)
     perfMonHook.clear();
     perfMonHook.clear();
 #endif
 #endif
     stopAeronDriver();
     stopAeronDriver();
-    stopTopoThread();
 
 
     strdup("Make sure leak checking is working");
     strdup("Make sure leak checking is working");
     roxiemem::releaseRoxieHeap();
     roxiemem::releaseRoxieHeap();

+ 19 - 2
roxie/ccd/ccdqueue.cpp

@@ -133,6 +133,11 @@ void RoxiePacketHeader::init(const RemoteActivityId &_remoteId, ruid_t _uid, uns
 #ifdef SUBCHANNELS_IN_HEADER
 #ifdef SUBCHANNELS_IN_HEADER
     clearSubChannels();
     clearSubChannels();
 #endif
 #endif
+#ifdef TIME_PACKETS
+    tick = 0;
+#else
+    filler = 0; // keeps valgrind happy
+#endif
 }
 }
 
 
 #ifdef SUBCHANNELS_IN_HEADER
 #ifdef SUBCHANNELS_IN_HEADER
@@ -1045,6 +1050,10 @@ public:
         orphans = new RoxiePacketHeader[numOrphans];
         orphans = new RoxiePacketHeader[numOrphans];
         tail = 0;
         tail = 0;
     }
     }
+    ~IBYTIbuffer()
+    {
+        delete [] orphans;
+    }
     void noteOrphan(const RoxiePacketHeader &hdr)
     void noteOrphan(const RoxiePacketHeader &hdr)
     {
     {
         assert(GetCurrentThreadId()==roxiePacketReaderThread);
         assert(GetCurrentThreadId()==roxiePacketReaderThread);
@@ -2417,8 +2426,16 @@ public:
         {
         {
             StringBuffer s; logctx.CTXLOG("Sending ABORT packet %s", abortHeader.toString(s).str());
             StringBuffer s; logctx.CTXLOG("Sending ABORT packet %s", abortHeader.toString(s).str());
         }
         }
-        if (!channelWrite(abortHeader, true))
-            logctx.CTXLOG("sendAbort wrote too little");
+        try
+        {
+            if (!channelWrite(abortHeader, true))
+                logctx.CTXLOG("sendAbort wrote too little");
+        }
+        catch (IException *E)
+        {
+            EXCLOG(E);
+            E->Release();
+        }
         abortsSent++;
         abortsSent++;
     }
     }
 
 

+ 9 - 1
roxie/ccd/ccdserver.cpp

@@ -1490,7 +1490,15 @@ public:
     {
     {
         aborted = true;
         aborted = true;
         // MORE - should abort dependencies here?
         // MORE - should abort dependencies here?
-        stop();
+        try
+        {
+            stop();
+        }
+        catch (IException *E)
+        {
+            EXCLOG(E);
+            ::Release(E);
+        }
     }
     }
 
 
     inline void reset()
     inline void reset()

+ 1 - 0
roxie/roxiemem/roxiemem.hpp

@@ -221,6 +221,7 @@ public:
     void noteLinked(const void *ptr);
     void noteLinked(const void *ptr);
 public:
 public:
     std::atomic_uint count;
     std::atomic_uint count;
+    unsigned filler = 0; // keeps valgrind happy
     IRowManager *mgr = nullptr;
     IRowManager *mgr = nullptr;
     DataBuffer *next = nullptr;   // Used when chaining them together in rowMgr
     DataBuffer *next = nullptr;   // Used when chaining them together in rowMgr
     DataBuffer *msgNext = nullptr;    // Next databuffer in same slave message
     DataBuffer *msgNext = nullptr;    // Next databuffer in same slave message

+ 13 - 0
roxie/udplib/udpipmap.hpp

@@ -89,6 +89,19 @@ public:
     IpMapOf<T>(std::function<T *(const ServerIdentifier)> _tfunc) : tfunc(_tfunc)
     IpMapOf<T>(std::function<T *(const ServerIdentifier)> _tfunc) : tfunc(_tfunc)
     {
     {
     }
     }
+    ~IpMapOf<T>()
+    {
+        for (unsigned idx = 0; idx < 256; idx++)
+        {
+            const list *head = table[idx].load(std::memory_order_acquire);
+            while (head)
+            {
+                const list *next = head->next;
+                delete head;
+                head = next;
+            }
+        }
+    }
     T &lookup(const ServerIdentifier &) const;
     T &lookup(const ServerIdentifier &) const;
     inline T &operator[](const ServerIdentifier &ip) const { return lookup(ip); }
     inline T &operator[](const ServerIdentifier &ip) const { return lookup(ip); }
     myIterator begin()
     myIterator begin()

+ 1 - 0
system/jhtree/jhtree.cpp

@@ -83,6 +83,7 @@ MODULE_EXIT()
     delete initCrit;
     delete initCrit;
     delete keyStore.load(std::memory_order_relaxed);
     delete keyStore.load(std::memory_order_relaxed);
     ::Release((CInterface*)nodeCache);
     ::Release((CInterface*)nodeCache);
+    nodeCache = nullptr;
 }
 }
 
 
 //#define DUMP_NODES
 //#define DUMP_NODES

+ 11 - 6
system/jlib/jlib.cpp

@@ -102,13 +102,18 @@ void InitTable::exit(SoContext ctx)
         InitializerType &iT = initializers.element(i);
         InitializerType &iT = initializers.element(i);
         if (iT.soCtx == ctx)
         if (iT.soCtx == ctx)
         {
         {
-            assertex(iT.state == ITS_Initialized);
-
-            if (iT.modExit && iT.modExit->func)
+            if (iT.state == ITS_Initialized)
+            {
+                if (iT.modExit && iT.modExit->func)
+                {
+                    //printf("::exit - destroying prior = %d\n", iT.priority);
+                    iT.modExit->func();
+                    iT.modExit->func = NULL;
+                }
+            }
+            else
             {
             {
-                //printf("::exit - destroying prior = %d\n", iT.priority);
-                iT.modExit->func();
-                iT.modExit->func = NULL;
+                fprintf(stderr, "InitTable::exit %p state is %d\n", ctx, (int) iT.state);
             }
             }
             initializers.remove(i);
             initializers.remove(i);
         }
         }