Przeglądaj źródła

Merge pull request #9473 from richardkchapman/failOnLeaks

HPCC-16833 Regression suite mode to fail on row leaks and missing stops

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 8 lat temu
rodzic
commit
11395cc10a

+ 1 - 3
common/thorhelper/roxiehelper.cpp

@@ -81,11 +81,9 @@ void CRHRollingCacheElem::set(const void *_row)
 //CRHRollingCache copied/modified from THOR CRollingCache
 CRHRollingCache::~CRHRollingCache()
 {
-    loop 
+    while (cache.ordinality())
     {  
         CRHRollingCacheElem *e = cache.dequeue();  
-        if (!e)  
-            break;  
         delete e;  
     }  
 }

+ 16 - 0
roxie/ccd/ccdcontext.cpp

@@ -1235,6 +1235,7 @@ public:
     }
 
     // interface IRoxieServerContext
+
     virtual void noteStatistic(StatisticKind kind, unsigned __int64 value) const
     {
         logctx.noteStatistic(kind, value);
@@ -2973,6 +2974,21 @@ public:
             debugContext->debugTerminate();
         if (workUnit)
         {
+            if (options.failOnLeaks && !failed)
+            {
+                graph.clear();
+                childGraphs.kill();
+                probeManager.clear();
+                ::Release(deserializedResultStore);
+                deserializedResultStore = nullptr;
+                if (rowManager && rowManager->allocated())
+                {
+                    rowManager->reportLeaks();
+                    failed = true;
+                    Owned <IException> E = makeStringException(ROXIE_INTERNAL_ERROR, "Row leaks detected");
+                    ::addWuException(workUnit, E);
+                }
+            }
             WorkunitUpdate w(&workUnit->lock());
             if (aborted)
                 w->setState(WUStateAborted);

+ 4 - 0
roxie/ccd/ccdquery.cpp

@@ -302,6 +302,7 @@ QueryOptions::QueryOptions()
     traceEnabled = defaultTraceEnabled;
     traceLimit = defaultTraceLimit;
     allSortsMaySpill = false; // No global default for this
+    failOnLeaks = false;
 }
 
 QueryOptions::QueryOptions(const QueryOptions &other)
@@ -332,6 +333,7 @@ QueryOptions::QueryOptions(const QueryOptions &other)
     traceEnabled = other.traceEnabled;
     traceLimit = other.traceLimit;
     allSortsMaySpill = other.allSortsMaySpill;
+    failOnLeaks = other.failOnLeaks;
 }
 
 void QueryOptions::setFromWorkUnit(IConstWorkUnit &wu, const IPropertyTree *stateInfo)
@@ -372,6 +374,7 @@ void QueryOptions::setFromWorkUnit(IConstWorkUnit &wu, const IPropertyTree *stat
     updateFromWorkUnit(traceEnabled, wu, "traceEnabled");
     updateFromWorkUnit(traceLimit, wu, "traceLimit");
     updateFromWorkUnit(allSortsMaySpill, wu, "allSortsMaySpill");
+    updateFromWorkUnit(failOnLeaks, wu, "failOnLeaks");
 }
 
 void QueryOptions::updateFromWorkUnitM(memsize_t &value, IConstWorkUnit &wu, const char *name)
@@ -422,6 +425,7 @@ void QueryOptions::setFromContext(const IPropertyTree *ctx)
         updateFromContext(traceEnabled, ctx, "@traceEnabled", "_TraceEnabled");
         updateFromContext(traceLimit, ctx, "@traceLimit", "_TraceLimit");
         // Note: allSortsMaySpill is not permitted at context level (too late anyway, unless I refactored)
+        updateFromContext(failOnLeaks, ctx, "@failOnLeaks", "_FailOnLeaks");
     }
 }
 

+ 1 - 0
roxie/ccd/ccdquery.hpp

@@ -123,6 +123,7 @@ public:
     bool timeActivities;
     bool allSortsMaySpill;
     bool traceEnabled;
+    bool failOnLeaks;
 
 private:
     static const char *findProp(const IPropertyTree *ctx, const char *name1, const char *name2);

+ 8 - 14
roxie/ccd/ccdserver.cpp

@@ -1365,6 +1365,8 @@ public:
             {
                 if (state==STATEstarted || state==STATEstarting)
                 {
+                    if (ctx->queryOptions().failOnLeaks)
+                        throw makeStringExceptionV(ROXIE_INTERNAL_ERROR, "STATE: activity %d reset without stop", activityId);
                     if (traceStartStop || traceLevel > 2)
                         CTXLOG("STATE: activity %d reset without stop", activityId);
                     stop();
@@ -8295,6 +8297,7 @@ public:
         calculated = false;
         processedAny = false;
         anyThisGroup = false;
+        ReleaseRoxieRows(sorted);
         CRoxieServerActivity::reset();
     }
 
@@ -8425,7 +8428,7 @@ public:
             curQuantile++;
             if (curQuantile > numDivisions)
             {
-                sorted.kill();
+                ReleaseRoxieRows(sorted);
                 sorter->reset();
                 calculated = false; // ready for next group
             }
@@ -16162,14 +16165,6 @@ public:
     {
     }
 
-    virtual void stop()
-    {
-        ForEachItemIn(i2, selectedStreams)
-            selectedStreams.item(i2)->stop();
-
-        CRoxieServerMultiInputBaseActivity::stop();
-    }
-
     virtual unsigned __int64 queryLocalCycles() const
     {
         __int64 localCycles = totalCycles.totalCycles;
@@ -16283,14 +16278,11 @@ public:
                 selectedJunctions.append(junctionArray[nextIndex-1]);
             }
         }
-
         // NB: Whatever pulls this nwayinput activity, starts and stops the selectedInputs and selectedJunctions
+        // But the N-Way input itself is now done with processing, and can stop itself/dependencies.
+        stop();
     }
 
-    virtual void stop()
-    {
-        // NB: Whatever pulls this nwayinput activity, starts and stops the selectedInputs
-    }
 };
 
 class CRoxieServerNWayInputActivityFactory : public CRoxieServerMultiInputFactory
@@ -16362,6 +16354,8 @@ public:
                 selectedJunctions.append(resultJunctions[i]);
             }
         }
+        // I have done with my processing - note that this won't stop my inputs
+        stop();
     }
 
     virtual void reset()