瀏覽代碼

Merge pull request #15952 from jakesmith/HPCC-27437-index-blob-leaks

HPCC-27437 Fix index read activity blob leaks.

Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 3 年之前
父節點
當前提交
ac017b0c45
共有 4 個文件被更改,包括 24 次插入11 次删除
  1. 6 1
      ecl/hthor/hthorkey.cpp
  2. 2 1
      roxie/ccd/ccdactivities.cpp
  3. 1 0
      roxie/ccd/ccdserver.cpp
  4. 15 9
      thorlcr/activities/indexread/thindexreadslave.cpp

+ 6 - 1
ecl/hthor/hthorkey.cpp

@@ -1226,7 +1226,10 @@ const void *CHThorIndexNormalizeActivity::nextRow()
                 {
                     expanding = helper.next();
                     if (!expanding)
+                    {
+                        callback.finishedRow(); // next could filter
                         break;
+                    }
 
                     const void * ret = createNextRow();
                     if (ret)
@@ -1234,7 +1237,6 @@ const void *CHThorIndexNormalizeActivity::nextRow()
                 }
             }
 
-            callback.finishedRow();
             while (!klManager->lookup(true))
             {
                 keyedProcessed++;
@@ -1253,6 +1255,8 @@ const void *CHThorIndexNormalizeActivity::nextRow()
                 if (ret)
                     return ret;
             }
+            else
+                callback.finishedRow(); // first could filter
         }
     }
 }
@@ -1263,6 +1267,7 @@ const void * CHThorIndexNormalizeActivity::createNextRow()
     {
         outBuilder.ensureRow();
         size32_t thisSize = helper.transform(outBuilder);
+        callback.finishedRow();
         if (thisSize == 0)
         {
             return NULL;

+ 2 - 1
roxie/ccd/ccdactivities.cpp

@@ -2976,9 +2976,9 @@ public:
                                 if (steppingOffset && !steppingRow && stepExtra.returnMismatches())
                                 {
                                     transformedSize = readHelper->unfilteredTransform(rowBuilder, keyRow);
+                                    callback.finishedRow();
                                     if (transformedSize) // will only be zero in odd situations where codegen can't work out how to transform (eg because of a skip)
                                     {
-                                        callback.finishedRow();
                                         rowBuilder.writeToOutput(transformedSize, true);
 
                                         totalSizeSent += transformedSize;
@@ -3209,6 +3209,7 @@ public:
                     }
                     else
                     {
+                        callback.finishedRow(); // first() could have filtered
                         postFiltered++;
                         skipped++;
                     }

+ 1 - 0
roxie/ccd/ccdserver.cpp

@@ -22499,6 +22499,7 @@ public:
             }
             transformedSize = normalizeHelper->transform(rowBuilder);
             firstPending = !normalizeHelper->next();
+            reader->finishedRow();
             if (transformedSize)
                 break;
         }

+ 15 - 9
thorlcr/activities/indexread/thindexreadslave.cpp

@@ -104,6 +104,7 @@ protected:
     protected:
         CIndexReadSlaveBase &activity;
         IKeyManager *keyManager = nullptr;
+        bool needsBlobCleaning = false;
     public:
         TransformCallback(CIndexReadSlaveBase &_activity) : activity(_activity) { };
         IMPLEMENT_IINTERFACE_O_USING(CSimpleInterface)
@@ -114,6 +115,7 @@ protected:
             size32_t dummy;
             if (!keyManager)
                 throw MakeActivityException(&activity, 0, "Callback attempting to read blob with no key manager - index being read remotely?");
+            needsBlobCleaning = true;
             return (byte *) keyManager->loadBlob(id, dummy); 
         }
         void prepareManager(IKeyManager *_keyManager)
@@ -123,8 +125,11 @@ protected:
         }
         void finishedRow()
         {
-            if (keyManager)
+            if (needsBlobCleaning)
+            {
+                needsBlobCleaning = false;
                 keyManager->releaseBlobs(); 
+            }
         }
         void resetManager()
         {
@@ -766,11 +771,9 @@ class CIndexReadSlaveActivity : public CIndexReadSlaveBase
                 if (needTransform)
                 {
                     size32_t sz = helper->transform(ret, r);
+                    callback.finishedRow();
                     if (sz)
-                    {
-                        callback.finishedRow();
                         return ret.finalizeRowClear(sz);
-                    }
                     else
                         ++postFiltered;
                 }
@@ -849,11 +852,9 @@ class CIndexReadSlaveActivity : public CIndexReadSlaveBase
                 if (needTransform)
                 {
                     size32_t sz = helper->transform(ret, r);
+                    callback.finishedRow();
                     if (sz)
-                    {
-                        callback.finishedRow();
                         return ret.finalizeRowClear(sz);
-                    }
                     else
                     {
                         if (optimizeSteppedPostFilter && stepExtra.returnMismatches())
@@ -861,10 +862,10 @@ class CIndexReadSlaveActivity : public CIndexReadSlaveBase
                             if (memcmp(ret.getSelf() + seekGEOffset, seek, seekSize) != 0)
                             {
                                 size32_t sz = helper->unfilteredTransform(ret, r);
+                                callback.finishedRow();
                                 if (sz)
                                 {
                                     wasCompleteMatch = false;
-                                    callback.finishedRow();
                                     return ret.finalizeRowClear(sz);
                                 }
                                 else
@@ -1392,6 +1393,7 @@ class CIndexNormalizeSlaveActivity : public CIndexReadSlaveBase
     {
         RtlDynamicRowBuilder row(allocator);
         size32_t sz = helper->transform(row);
+        callback.finishedRow();
         if (sz==0)
             return NULL;
         if (getDataLinkCount() >= rowLimit)
@@ -1476,7 +1478,10 @@ public:
                 {
                     expanding = helper->next();
                     if (!expanding)
+                    {
+                        callback.finishedRow(); // next() could filter?
                         break;
+                    }
 
                     OwnedConstThorRow row = createNextRow();
                     if (row)
@@ -1486,7 +1491,6 @@ public:
 
             for (;;)
             {
-                callback.finishedRow();
                 const void *rec = nextKey();
                 if (rec)
                 {
@@ -1499,6 +1503,8 @@ public:
                             return row.getClear();
                         break;
                     }
+                    else
+                        callback.finishedRow(); // first() could filter?
                 }
                 else
                 {