Browse Source

Merge pull request #8526 from jakesmith/hpcc-15401

HPCC-15401 Enth deadlock due to uninitialized members + lookahead

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 9 years ago
parent
commit
79bf860a67
2 changed files with 27 additions and 23 deletions
  1. 15 11
      thorlcr/activities/enth/thenthslave.cpp
  2. 12 12
      thorlcr/activities/firstn/thfirstnslave.cpp

+ 15 - 11
thorlcr/activities/enth/thenthslave.cpp

@@ -23,12 +23,12 @@ class BaseEnthActivity : public CSlaveActivity, implements ILookAheadStopNotify
 {
     typedef CSlaveActivity PARENT;
 
-    ThorDataLinkMetaInfo intoMetaInfo;
 protected:
     StringBuffer actStr;
     Semaphore finishedSem;
-    rowcount_t counter, localRecCount;
-    rowcount_t denominator, numerator;
+    rowcount_t counter = 0, localRecCount = 0;
+    rowcount_t denominator = 0, numerator = 0;
+    IEngineRowStream *originalInputStream = nullptr;
 
     bool haveLocalCount() { return RCUNBOUND != localRecCount; }
     inline bool wanted()
@@ -69,16 +69,23 @@ protected:
     }
     void setLocalCountReq()
     {
+        ThorDataLinkMetaInfo info;
+        input->getMetaInfo(info);
         // Need lookahead _unless_ row count pre-known.
         if (0 == numerator)
             localRecCount = 0;
-        else if (intoMetaInfo.totalRowsMin == intoMetaInfo.totalRowsMax)
+        else if (info.totalRowsMin == info.totalRowsMax)
         {
-            localRecCount = (rowcount_t)intoMetaInfo.totalRowsMax;
+            localRecCount = (rowcount_t)info.totalRowsMax;
             ActPrintLog("%s: row count pre-known to be %" RCPF "d", actStr.str(), localRecCount);
         }
         else
+        {
             localRecCount = RCUNBOUND;
+            IStartableEngineRowStream *lookAhead = createRowStreamLookAhead(this, originalInputStream, queryRowInterfaces(input), ENTH_SMART_BUFFER_SIZE, true, false, RCUNBOUND, this, &container.queryJob().queryIDiskUsage());
+            setLookAhead(0, lookAhead); // NB: this is post base start()
+            lookAhead->start();
+        }
     }
 public:
     IMPLEMENT_IINTERFACE_USING(CSlaveActivity);
@@ -86,19 +93,16 @@ public:
     BaseEnthActivity(CGraphElementBase *_container) : CSlaveActivity(_container)
     {
     }
-    virtual void init(MemoryBuffer & data, MemoryBuffer &slaveData)
+    virtual void init(MemoryBuffer & data, MemoryBuffer &slaveData) override
     {
         appendOutputLinked(this);
     }
     virtual void setInputStream(unsigned index, CThorInput &_input, bool consumerOrdered) override
     {
         PARENT::setInputStream(index, _input, consumerOrdered);
-        input->getMetaInfo(intoMetaInfo);
-        // Need lookahead _unless_ row count pre-known.
-        if (numerator && (intoMetaInfo.totalRowsMin != intoMetaInfo.totalRowsMax))
-            setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), ENTH_SMART_BUFFER_SIZE, true, false, RCUNBOUND, this, &container.queryJob().queryIDiskUsage()));
+        originalInputStream = inputStream;
     }
-    virtual void start()
+    virtual void start() override
     {
         ActivityTimer s(totalCycles, timeActivities);
         PARENT::start();

+ 12 - 12
thorlcr/activities/firstn/thfirstnslave.cpp

@@ -216,7 +216,7 @@ class CFirstNSlaveGlobal : public CFirstNSlaveBase, implements ILookAheadStopNot
     rowcount_t maxres, skipped, totallimit;
     bool firstget;
     ThorDataLinkMetaInfo inputMeta;
-    Owned<IStartableEngineRowStream> firstNLookAhead;
+    IEngineRowStream *originalInputStream = nullptr;
 
 protected:
     virtual void doStop()
@@ -235,6 +235,11 @@ public:
         PARENT::init(data, slaveData);
         mpTag = container.queryJobChannel().deserializeMPTag(data);
     }
+    virtual void setInputStream(unsigned index, CThorInput &_input, bool consumerOrdered) override
+    {
+        PARENT::setInputStream(index, _input, consumerOrdered);
+        originalInputStream = inputStream;
+    }
     virtual void start() override
     {
         ActivityTimer s(totalCycles, timeActivities);
@@ -247,15 +252,10 @@ public:
         totallimit = (rowcount_t)helper->getLimit();
         rowcount_t _skipCount = validRC(helper->numToSkip()); // max
         rowcount_t maxRead = (totallimit>(RCUNBOUND-_skipCount))?RCUNBOUND:totallimit+_skipCount;
-        firstNLookAhead.setown(createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), FIRSTN_SMART_BUFFER_SIZE, isSmartBufferSpillNeeded(this), false,
-                                          maxRead, this, &container.queryJob().queryIDiskUsage())); // if a very large limit don't bother truncating
-        firstNLookAhead->start();
-    }
-    virtual void stop() override
-    {
-        if (firstNLookAhead)
-            firstNLookAhead->stop(); // will call input stop()
-        dataLinkStop();
+        IStartableEngineRowStream *lookAhead = createRowStreamLookAhead(this, originalInputStream, queryRowInterfaces(input), FIRSTN_SMART_BUFFER_SIZE, isSmartBufferSpillNeeded(this), false,
+                                                                              maxRead, this, &container.queryJob().queryIDiskUsage()); // if a very large limit don't bother truncating
+        setLookAhead(0, lookAhead);
+        lookAhead->start();
     }
     virtual void abort()
     {
@@ -337,7 +337,7 @@ public:
                     return NULL;
                 while (skipped<skipCount)
                 {
-                    OwnedConstThorRow row = firstNLookAhead->ungroupedNextRow();
+                    OwnedConstThorRow row = inputStream->ungroupedNextRow();
                     if (!row)
                     {
                         stop();
@@ -348,7 +348,7 @@ public:
             }
             if (getDataLinkCount() < limit)
             {
-                OwnedConstThorRow row = firstNLookAhead->ungroupedNextRow();
+                OwnedConstThorRow row = inputStream->ungroupedNextRow();
                 if (row)
                 {
                     dataLinkIncrement();