Browse Source

HPCC-19074 Prevent key+transform filters causing early exit

The keyed filter handling held and returns a pending eog.
If the transform is also skipping, it could cause a double
null to be seen and early exit from the grouped disk read
handling.

Signed-off-by: Jake Smith <jake.smith@lexisnexisrisk.com>
Jake Smith 7 years ago
parent
commit
efd9c0fa75
2 changed files with 26 additions and 15 deletions
  1. 10 2
      roxie/ccd/ccdactivities.cpp
  2. 16 13
      roxie/ccd/ccdserver.cpp

+ 10 - 2
roxie/ccd/ccdactivities.cpp

@@ -1278,12 +1278,14 @@ public:
         unsigned totalSizeSent = 0;
         helper->setCallback(reader);
         unsigned lastGroupProcessed = processed;
+        bool someInGroup = false;
         while (!aborted)
         {
             // This loop is the inner loop for memory diskreads - so keep it efficient!
             const byte *nextRec = reader->nextRow();
             if (nextRec)
             {
+                someInGroup = true;
                 size32_t transformedSize = owner.doTransform(output, nextRec);
                 reader->finishedRow();
                 if (transformedSize)
@@ -1301,11 +1303,17 @@ public:
                         break;
                 }
             }
-            else if (isGrouped && processed>lastGroupProcessed)
-                break; // We return grouped data one whole group at a time
+            else if (isGrouped)
+            {
+                if (processed>lastGroupProcessed)
+                    break; // We return grouped data one whole group at a time
+                else if (!someInGroup) // eof
+                    return;
+            }
             else
                 return;
         }
+        // Send continuation message indicating end of group or too much data
         if (!aborted)
         {
             MemoryBuffer si;

+ 16 - 13
roxie/ccd/ccdserver.cpp

@@ -21731,9 +21731,9 @@ class CRoxieServerDiskReadActivity : public CRoxieServerDiskReadBaseActivity
     IHThorDiskReadArg * readHelper;
     ConstPointerArray readrows;
     bool readAheadDone;
-    bool eogPending;
     unsigned readIndex;
     unsigned lastGroupProcessed;
+    bool someInGroup = false;
 
 public:
     CRoxieServerDiskReadActivity(IRoxieSlaveContext *_ctx, const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, const RemoteActivityId &_remoteId, unsigned _numParts, bool _isLocal, bool _sorted, bool _maySkip, IInMemoryIndexManager *_manager, ITranslatorSet *_translators)
@@ -21744,7 +21744,6 @@ public:
         limitTransformExtra = readHelper;
         readAheadDone = false;
         readIndex = 0;
-        eogPending = false;
         lastGroupProcessed = processed;
     }
 
@@ -21755,8 +21754,8 @@ public:
         CRoxieServerDiskReadBaseActivity::start(parentExtractSize, parentExtract, paused);
         readAheadDone = false;
         readIndex = 0;
-        eogPending = false;
         lastGroupProcessed = processed;
+        someInGroup = false;
     }
 
     virtual void reset()
@@ -21765,8 +21764,8 @@ public:
         readrows.kill();
         readAheadDone = false;
         readIndex = 0;
-        eogPending = false;
         lastGroupProcessed = processed;
+        someInGroup = false;
         CRoxieServerDiskReadBaseActivity::reset();
     }
 
@@ -21853,12 +21852,6 @@ public:
 
     const void *_nextRow()
     {
-        if (eogPending && (lastGroupProcessed != processed))
-        {
-            eogPending = false;
-            lastGroupProcessed = processed;
-            return NULL;
-        }
         RtlDynamicRowBuilder rowBuilder(rowAllocator);
         unsigned transformedSize = 0;
         for (;;)
@@ -21866,15 +21859,25 @@ public:
             const byte *nextRec = reader->nextRow();
             if (nextRec)
             {
+                someInGroup = true;
                 transformedSize = readHelper->transform(rowBuilder, nextRec);
                 reader->finishedRow();
                 if (transformedSize)
                     break;
             }
-            else if (isGrouped && lastGroupProcessed != processed)
+            else if (isGrouped)
             {
-                lastGroupProcessed = processed;
-                return NULL;
+                if (lastGroupProcessed != processed)
+                {
+                    someInGroup = false;
+                    lastGroupProcessed = processed;
+                    return nullptr;
+                }
+                else if (!someInGroup)
+                {
+                    eof = true;
+                    return nullptr;
+                }
             }
             else
             {