Procházet zdrojové kódy

HPCC-15401 Enth deadlock due to uninitialized members + lookahead

Signed-off-by: Jake Smith <jake.smith@lexisnexis.com>
Jake Smith před 9 roky
rodič
revize
adc0048ad8

+ 13 - 15
thorlcr/activities/enth/thenthslave.cpp

@@ -23,12 +23,11 @@ class BaseEnthActivity : public CSlaveActivity, implements ILookAheadStopNotify
 {
     typedef CSlaveActivity PARENT;
 
-    ThorDataLinkMetaInfo intoMetaInfo;
 protected:
     StringBuffer actStr;
     Semaphore finishedSem;
-    rowcount_t counter, localRecCount;
-    rowcount_t denominator, numerator;
+    rowcount_t counter = 0, localRecCount = 0;
+    rowcount_t denominator = 0, numerator = 0;
 
     bool haveLocalCount() { return RCUNBOUND != localRecCount; }
     inline bool wanted()
@@ -69,16 +68,23 @@ protected:
     }
     void setLocalCountReq()
     {
+        ThorDataLinkMetaInfo info;
+        input->getMetaInfo(info);
         // Need lookahead _unless_ row count pre-known.
         if (0 == numerator)
             localRecCount = 0;
-        else if (intoMetaInfo.totalRowsMin == intoMetaInfo.totalRowsMax)
+        else if (info.totalRowsMin == info.totalRowsMax)
         {
-            localRecCount = (rowcount_t)intoMetaInfo.totalRowsMax;
+            localRecCount = (rowcount_t)info.totalRowsMax;
             ActPrintLog("%s: row count pre-known to be %" RCPF "d", actStr.str(), localRecCount);
         }
         else
+        {
             localRecCount = RCUNBOUND;
+            IStartableEngineRowStream *lookAhead = createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), ENTH_SMART_BUFFER_SIZE, true, false, RCUNBOUND, this, &container.queryJob().queryIDiskUsage());
+            setLookAhead(0, lookAhead); // NB: this is post base start()
+            lookAhead->start();
+        }
     }
 public:
     IMPLEMENT_IINTERFACE_USING(CSlaveActivity);
@@ -86,19 +92,11 @@ public:
     BaseEnthActivity(CGraphElementBase *_container) : CSlaveActivity(_container)
     {
     }
-    virtual void init(MemoryBuffer & data, MemoryBuffer &slaveData)
+    virtual void init(MemoryBuffer & data, MemoryBuffer &slaveData) override
     {
         appendOutputLinked(this);
     }
-    virtual void setInputStream(unsigned index, CThorInput &_input, bool consumerOrdered) override
-    {
-        PARENT::setInputStream(index, _input, consumerOrdered);
-        input->getMetaInfo(intoMetaInfo);
-        // Need lookahead _unless_ row count pre-known.
-        if (numerator && (intoMetaInfo.totalRowsMin != intoMetaInfo.totalRowsMax))
-            setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), ENTH_SMART_BUFFER_SIZE, true, false, RCUNBOUND, this, &container.queryJob().queryIDiskUsage()));
-    }
-    virtual void start()
+    virtual void start() override
     {
         ActivityTimer s(totalCycles, timeActivities);
         PARENT::start();

+ 6 - 12
thorlcr/activities/firstn/thfirstnslave.cpp

@@ -216,7 +216,6 @@ class CFirstNSlaveGlobal : public CFirstNSlaveBase, implements ILookAheadStopNot
     rowcount_t maxres, skipped, totallimit;
     bool firstget;
     ThorDataLinkMetaInfo inputMeta;
-    Owned<IStartableEngineRowStream> firstNLookAhead;
 
 protected:
     virtual void doStop()
@@ -247,15 +246,10 @@ public:
         totallimit = (rowcount_t)helper->getLimit();
         rowcount_t _skipCount = validRC(helper->numToSkip()); // max
         rowcount_t maxRead = (totallimit>(RCUNBOUND-_skipCount))?RCUNBOUND:totallimit+_skipCount;
-        firstNLookAhead.setown(createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), FIRSTN_SMART_BUFFER_SIZE, isSmartBufferSpillNeeded(this), false,
-                                          maxRead, this, &container.queryJob().queryIDiskUsage())); // if a very large limit don't bother truncating
-        firstNLookAhead->start();
-    }
-    virtual void stop() override
-    {
-        if (firstNLookAhead)
-            firstNLookAhead->stop(); // will call input stop()
-        dataLinkStop();
+        IStartableEngineRowStream *lookAhead = createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), FIRSTN_SMART_BUFFER_SIZE, isSmartBufferSpillNeeded(this), false,
+                                                                              maxRead, this, &container.queryJob().queryIDiskUsage()); // if a very large limit don't bother truncating
+        setLookAhead(0, lookAhead);
+        lookAhead->start();
     }
     virtual void abort()
     {
@@ -337,7 +331,7 @@ public:
                     return NULL;
                 while (skipped<skipCount)
                 {
-                    OwnedConstThorRow row = firstNLookAhead->ungroupedNextRow();
+                    OwnedConstThorRow row = inputStream->ungroupedNextRow();
                     if (!row)
                     {
                         stop();
@@ -348,7 +342,7 @@ public:
             }
             if (getDataLinkCount() < limit)
             {
-                OwnedConstThorRow row = firstNLookAhead->ungroupedNextRow();
+                OwnedConstThorRow row = inputStream->ungroupedNextRow();
                 if (row)
                 {
                     dataLinkIncrement();