Explorar o código

Merge pull request #9916 from jakesmith/hpcc-17464

HPCC-17464 A sink with an UPDATE, could cause balanced splitter stall

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday %!s(int64=8) %!d(string=hai) anos
pai
achega
fd6370d1b7

+ 6 - 0
testing/regress/ecl/key/splitter-partialstart.xml

@@ -0,0 +1,6 @@
+<Dataset name='Result 1'>
+</Dataset>
+<Dataset name='Result 2'>
+</Dataset>
+<Dataset name='Result 3'>
+</Dataset>

+ 53 - 0
testing/regress/ecl/splitter-partialstart.ecl

@@ -0,0 +1,53 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2017 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+//noRoxie
+//nolocal
+
+import Std;
+
+#onwarning(10125, ignore); // ignore UPDATE 'up to date' messages, so that output is consistent across engines
+
+rec := RECORD
+  unsigned4 id;
+  string100 str;
+END;
+
+unsigned num := 1000000;
+ds := DATASET(num, TRANSFORM(rec, SELF.id := COUNTER; SELF.str := (string)HASH(COUNTER);), DISTRIBUTED);
+
+p1 := PROJECT(ds, TRANSFORM(rec, SELF.id := LEFT.id*2; SELF := LEFT));
+p2 := PROJECT(ds, TRANSFORM(rec, SELF.id := LEFT.id*3; SELF := LEFT));
+
+gc1 := 0 : STORED('gc1');
+ifp2 := IF(gc1=1, p2);
+
+fname1 := '~regress::splitout1';
+fname2 := '~regress::splitout2';
+SEQUENTIAL(
+ PARALLEL(
+  OUTPUT(p1, , fname1, OVERWRITE, UPDATE);
+  OUTPUT(p2, , fname2, OVERWRITE, UPDATE);
+ );
+ Std.File.DeleteLogicalFile(fname1);
+ PARALLEL(
+  OUTPUT(p1, , fname1, OVERWRITE, UPDATE);
+  OUTPUT(p2, , fname2, OVERWRITE, UPDATE);
+ );
+ Std.File.DeleteLogicalFile(fname1);
+ Std.File.DeleteLogicalFile(fname2);
+);

+ 33 - 16
thorlcr/activities/nsplitter/thnsplitterslave.cpp

@@ -86,13 +86,15 @@ class NSplitterSlaveActivity : public CSlaveActivity, implements ISharedSmartBuf
     PointerArrayOf<Semaphore> stalledWriters;
     UnsignedArray stalledWriterIdxs;
     unsigned stoppedOutputs = 0;
-    unsigned activeOutputs = 0;
+    Owned<IBitSet> connectedOutputSet;
+    unsigned activeOutputCount = 0;
+    unsigned connectedOutputCount = 0;
     rowcount_t recsReady = 0;
     Owned<IException> writeAheadException;
     Owned<ISharedSmartBuffer> smartBuf;
     bool inputPrepared = false;
     bool inputConnected = false;
-    unsigned remainingOutputs = 0;
+    unsigned numOutputs = 0;
 
     // NB: CWriter only used by 'balanced' splitter, which blocks write when too far ahead
     class CWriter : public CSimpleInterface, IThreaded
@@ -142,8 +144,8 @@ class NSplitterSlaveActivity : public CSlaveActivity, implements ISharedSmartBuf
 public:
     NSplitterSlaveActivity(CGraphElementBase *_container) : CSlaveActivity(_container), writer(*this)
     {
-        activeOutputs = container.getOutputs();
-        ActPrintLog("Number of connected outputs: %u", activeOutputs);
+        numOutputs = container.getOutputs();
+        connectedOutputSet.setown(createBitSet());
         setRequireInitData(false);
         IHThorSplitArg *helper = (IHThorSplitArg *)queryHelper();
         int dV = getOptInt(THOROPT_SPLITTER_SPILL, -1);
@@ -154,6 +156,20 @@ public:
         ForEachItemIn(o, container.outputs)
             appendOutput(new CSplitterOutput(*this, o));
     }
+    virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
+    {
+        // NB: init() is called post connect, some inputs may not have connected streams, e.g. if UPDATE has suppressed them.
+        unsigned cur = 0;
+        while (true)
+        {
+             unsigned next = connectedOutputSet->scan(cur, false); // find next inactive
+             connectedOutputCount += next-cur;
+             if (next==container.getOutputs()) // can't exceed
+                 break;
+             cur = next+1;
+        }
+        ActPrintLog("Number of connected outputs: %u", connectedOutputCount);
+    }
     virtual void reset() override
     {
         PARENT::reset();
@@ -167,8 +183,7 @@ public:
         ForEachItemIn(o, outputs)
         {
             CSplitterOutput *output = (CSplitterOutput *)outputs.item(o);
-            if (output)
-                output->reset();
+            output->reset();
         }
     }
     bool prepareInput()
@@ -178,16 +193,16 @@ public:
         if (!inputPrepared)
         {
             inputPrepared = true;
+            activeOutputCount = connectedOutputCount;
             PARENT::start();
-            remainingOutputs = activeOutputs;
             ForEachItemIn(o, outputs)
             {
                 CSplitterOutput *output = (CSplitterOutput *)outputs.item(o);
-                if (output && output->isStopped())
-                    --remainingOutputs;
+                if (output->isStopped())
+                    --activeOutputCount;
             }
-            assertex(remainingOutputs); // must be >=1, as this output (outIdx) has invoked prepareInput
-            if (1 == remainingOutputs)
+            assertex(activeOutputCount); // must be >=1, as this output (outIdx) has invoked prepareInput
+            if (1 == activeOutputCount)
                 return false;
             if (smartBuf)
                 smartBuf->reset();
@@ -197,19 +212,19 @@ public:
                 {
                     StringBuffer tempname;
                     GetTempName(tempname, "nsplit", true); // use alt temp dir
-                    smartBuf.setown(createSharedSmartDiskBuffer(this, tempname.str(), activeOutputs, queryRowInterfaces(input), &container.queryJob().queryIDiskUsage()));
+                    smartBuf.setown(createSharedSmartDiskBuffer(this, tempname.str(), numOutputs, queryRowInterfaces(input), &container.queryJob().queryIDiskUsage()));
                     ActPrintLog("Using temp spill file: %s", tempname.str());
                 }
                 else
                 {
                     ActPrintLog("Spill is 'balanced'");
-                    smartBuf.setown(createSharedSmartMemBuffer(this, activeOutputs, queryRowInterfaces(input), NSPLITTER_SPILL_BUFFER_SIZE));
+                    smartBuf.setown(createSharedSmartMemBuffer(this, numOutputs, queryRowInterfaces(input), NSPLITTER_SPILL_BUFFER_SIZE));
                 }
                 // mark any outputs already stopped
                 ForEachItemIn(o, outputs)
                 {
                     CSplitterOutput *output = (CSplitterOutput *)outputs.item(o);
-                    if (output && output->isStopped())
+                    if (output->isStopped() || !connectedOutputSet->test(o))
                         smartBuf->queryOutput(o)->stop();
                 }
             }
@@ -220,7 +235,7 @@ public:
     }
     inline const void *nextRow(unsigned outIdx)
     {
-        if (1 == remainingOutputs) // will be true, if only 1 input connect, or only 1 input was active (others stopped) when it started reading
+        if (1 == activeOutputCount) // will be true, if only 1 input connected, or only 1 input was active (others stopped) when it started reading
             return inputStream->nextRow();
         OwnedConstThorRow row = smartBuf->queryOutput(outIdx)->nextRow(); // will block until available
         if (writeAheadException)
@@ -308,7 +323,7 @@ public:
             smartBuf->queryOutput(outIdx)->stop();
         }
         ++stoppedOutputs;
-        if (stoppedOutputs == activeOutputs)
+        if (stoppedOutputs == connectedOutputCount)
         {
             writer.stop();
             PARENT::stop();
@@ -347,6 +362,7 @@ public:
     virtual IStrandJunction *getOutputStreams(CActivityBase &ctx, unsigned idx, PointerArrayOf<IEngineRowStream> &streams, const CThorStrandOptions * consumerOptions, bool consumerOrdered, IOrderedCallbackCollection * orderedCallbacks) override
     {
         connectInput(consumerOrdered);
+        connectedOutputSet->set(idx);
         streams.append(this);
         return nullptr;
     }
@@ -407,6 +423,7 @@ void CSplitterOutput::setOutputStream(unsigned index, IEngineRowStream *stream)
 IStrandJunction *CSplitterOutput::getOutputStreams(CActivityBase &ctx, unsigned idx, PointerArrayOf<IEngineRowStream> &streams, const CThorStrandOptions * consumerOptions, bool consumerOrdered, IOrderedCallbackCollection * orderedCallbacks)
 {
     activity.connectInput(consumerOrdered);
+    activity.connectedOutputSet->set(idx);
     streams.append(this);
     return nullptr;
 }