Просмотр исходного кода

HPCC-19941 isFastThrough must be checked after input started

Signed-off-by: Jake Smith <jake.smith@lexisnexisrisk.com>
Jake Smith 7 лет назад
Родитель
Сommit
8f48c32e71
62 измененных файлов с 453 добавлено и 348 удалено
  1. 6 0
      testing/regress/ecl/key/loopft.xml
  2. 28 0
      testing/regress/ecl/loopft.ecl
  3. 2 2
      thorlcr/activities/aggregate/thaggregateslave.cpp
  4. 1 1
      thorlcr/activities/aggregate/thgroupaggregateslave.cpp
  5. 2 2
      thorlcr/activities/catch/thcatchslave.cpp
  6. 10 12
      thorlcr/activities/choosesets/thchoosesetsslave.cpp
  7. 3 3
      thorlcr/activities/countproject/thcountprojectslave.cpp
  8. 2 2
      thorlcr/activities/csvread/thcsvrslave.cpp
  9. 1 1
      thorlcr/activities/degroup/thdegroupslave.cpp
  10. 10 10
      thorlcr/activities/diskread/thdiskreadslave.cpp
  11. 16 21
      thorlcr/activities/enth/thenthslave.cpp
  12. 3 3
      thorlcr/activities/external/thexternalslave.cpp
  13. 1 1
      thorlcr/activities/fetch/thfetchslave.cpp
  14. 1 1
      thorlcr/activities/filter/thfilterslave.cpp
  15. 19 15
      thorlcr/activities/firstn/thfirstnslave.cpp
  16. 6 6
      thorlcr/activities/funnel/thfunnelslave.cpp
  17. 3 1
      thorlcr/activities/group/thgroupslave.cpp
  18. 7 7
      thorlcr/activities/hashdistrib/thhashdistribslave.cpp
  19. 5 5
      thorlcr/activities/indexread/thindexreadslave.cpp
  20. 3 13
      thorlcr/activities/indexwrite/thindexwriteslave.cpp
  21. 2 2
      thorlcr/activities/iterate/thgroupiterateslave.cpp
  22. 11 14
      thorlcr/activities/iterate/thiterateslave.cpp
  23. 13 21
      thorlcr/activities/join/thjoinslave.cpp
  24. 1 1
      thorlcr/activities/keyedjoin/thkeyedjoinslave-legacy.cpp
  25. 1 1
      thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp
  26. 1 1
      thorlcr/activities/limit/thlimitslave.cpp
  27. 5 11
      thorlcr/activities/lookupjoin/thlookupjoinslave.cpp
  28. 13 10
      thorlcr/activities/loop/thloopslave.cpp
  29. 3 3
      thorlcr/activities/merge/thmergeslave.cpp
  30. 2 2
      thorlcr/activities/msort/thgroupsortslave.cpp
  31. 1 1
      thorlcr/activities/msort/thmsortslave.cpp
  32. 3 3
      thorlcr/activities/normalize/thnormalizeslave.cpp
  33. 4 5
      thorlcr/activities/nsplitter/thnsplitterslave.cpp
  34. 2 2
      thorlcr/activities/null/thnullslave.cpp
  35. 1 1
      thorlcr/activities/nullaction/thnullactionslave.cpp
  36. 1 1
      thorlcr/activities/parse/thparseslave.cpp
  37. 2 2
      thorlcr/activities/piperead/thprslave.cpp
  38. 2 2
      thorlcr/activities/project/thprojectslave.cpp
  39. 3 7
      thorlcr/activities/pull/thpullslave.cpp
  40. 10 13
      thorlcr/activities/rollup/throllupslave.cpp
  41. 1 1
      thorlcr/activities/sample/thsampleslave.cpp
  42. 3 16
      thorlcr/activities/selectnth/thselectnthslave.cpp
  43. 1 1
      thorlcr/activities/selfjoin/thselfjoinslave.cpp
  44. 2 2
      thorlcr/activities/soapcall/thsoapcallslave.cpp
  45. 1 1
      thorlcr/activities/spill/thspillslave.cpp
  46. 1 1
      thorlcr/activities/temptable/thtmptableslave.cpp
  47. 6 51
      thorlcr/activities/thactivityutil.cpp
  48. 2 6
      thorlcr/activities/thactivityutil.ipp
  49. 5 7
      thorlcr/activities/thdiskbaseslave.cpp
  50. 3 3
      thorlcr/activities/thdiskbaseslave.ipp
  51. 1 1
      thorlcr/activities/topn/thtopnslave.cpp
  52. 1 1
      thorlcr/activities/trace/thtraceslave.cpp
  53. 1 1
      thorlcr/activities/when/thwhenslave.cpp
  54. 1 1
      thorlcr/activities/wuidread/thwuidreadslave.cpp
  55. 4 7
      thorlcr/activities/wuidwrite/thwuidwriteslave.cpp
  56. 1 1
      thorlcr/activities/xmlparse/thxmlparseslave.cpp
  57. 2 2
      thorlcr/activities/xmlread/thxmlreadslave.cpp
  58. 4 4
      thorlcr/graph/thgraph.cpp
  59. 6 6
      thorlcr/graph/thgraph.hpp
  60. 121 20
      thorlcr/graph/thgraphslave.cpp
  61. 75 5
      thorlcr/graph/thgraphslave.hpp
  62. 1 1
      thorlcr/slave/slave.hpp

+ 6 - 0
testing/regress/ecl/key/loopft.xml

@@ -0,0 +1,6 @@
+<Dataset name='Result 1'>
+ <Row><id>18516</id></Row>
+ <Row><id>18632</id></Row>
+ <Row><id>18748</id></Row>
+ <Row><id>18864</id></Row>
+</Dataset>

+ 28 - 0
testing/regress/ecl/loopft.ecl

@@ -0,0 +1,28 @@
+rec := RECORD
+ unsigned4 id;
+END;
+
+numRecs := 5;
+ds1 := DATASET(numRecs, TRANSFORM(rec, SELF.id := 100+COUNTER));
+ds2 := DATASET(numRecs, TRANSFORM(rec, SELF.id := 200+COUNTER));
+ds3 := DATASET(numRecs, TRANSFORM(rec, SELF.id := 300+COUNTER));
+ds4 := DATASET(numRecs, TRANSFORM(rec, SELF.id := 400+COUNTER));
+
+rec loopFunc(DATASET(rec) loopin, unsigned c) := FUNCTION
+  cs := CASE(c,
+            1 => loopin,
+            2 => ds2,
+            3 => loopin,
+            4 => loopin,
+            5 => ds3,
+            6 => ds4,
+            7 => PROJECT(loopin, TRANSFORM(rec, SELF.id := LEFT.id + COUNTER)),
+            8 => loopin,
+            9 => loopin);
+  cn := CHOOSEN(NOFOLD(cs), IF(c=9, 4, 5));
+  sloopin := IF(c=9, CHOOSEN(loopin, 4), loopin);
+  RETURN COMBINE(sloopin, cn, TRANSFORM(rec, SELF.id := LEFT.id + RIGHT.id), LOCAL);
+END;
+l := LOOP(ds1, 9, loopFunc(ROWS(LEFT), COUNTER));
+
+OUTPUT(l);

+ 2 - 2
thorlcr/activities/aggregate/thaggregateslave.cpp

@@ -193,7 +193,7 @@ public:
         sz = helper->clearAggregate(resultcr);  
         return resultcr.finalizeRowClear(sz);
     }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info)
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         info.singleRowOutput = true;
@@ -275,7 +275,7 @@ public:
         dataLinkIncrement();
         return row.getClear();
     }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info)
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         queryInput(0)->getMetaInfo(info);
     }

+ 1 - 1
thorlcr/activities/aggregate/thgroupaggregateslave.cpp

@@ -73,7 +73,7 @@ public:
         dataLinkIncrement();
         return out.finalizeRowClear(sz);
     }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         info.canReduceNumRows = true;

+ 2 - 2
thorlcr/activities/catch/thcatchslave.cpp

@@ -128,7 +128,7 @@ public:
     { 
         inputStream->resetEOF();
     }
-    void getMetaInfo(ThorDataLinkMetaInfo &info)
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         info.fastThrough = true;
@@ -266,7 +266,7 @@ public:
             barrier.clear();
         }
     }
-    void getMetaInfo(ThorDataLinkMetaInfo &info)
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         info.fastThrough = false;

+ 10 - 12
thorlcr/activities/choosesets/thchoosesetsslave.cpp

@@ -94,7 +94,7 @@ public:
         }
         return NULL;        
     }
-    void getMetaInfo(ThorDataLinkMetaInfo &info)
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         info.fastThrough = true;
@@ -108,8 +108,8 @@ class ChooseSetsActivity : public BaseChooseSetsActivity
 {
     typedef BaseChooseSetsActivity PARENT;
 
-    bool first;
-    bool done;
+    bool first = false;
+    bool done = false;
 
     void getTallies() // NB: not called on first node.
     {
@@ -149,17 +149,15 @@ public:
         SocketEndpoint server;
         server.serialize(slaveData);
     }
-    virtual void setInputStream(unsigned index, CThorInput &_input, bool consumerOrdered) override
-    {
-        PARENT::setInputStream(index, _input, consumerOrdered);
-        if (!isFastThrough(input))
-            setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), CHOOSESETS_SMART_BUFFER_SIZE, isSmartBufferSpillNeeded(this), false, RCUNBOUND, NULL, &container.queryJob().queryIDiskUsage()));
-    }
     virtual void start() override
     {
         ActivityTimer s(totalCycles, timeActivities);
         ActPrintLog("CHOOSESETS: Is Global");
         PARENT::start();
+
+        if (ensureStartFTLookAhead(0))
+            setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), CHOOSESETS_SMART_BUFFER_SIZE, ::canStall(input), false, RCUNBOUND, NULL, &container.queryJob().queryIDiskUsage()), false);
+
         first = true;
         done = false;
     }
@@ -206,7 +204,7 @@ public:
             sendTallies();
         return NULL;
     }
-    void getMetaInfo(ThorDataLinkMetaInfo &info)
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         info.isSequential = true;
@@ -281,7 +279,7 @@ public:
     {
         PARENT::setInputStream(index, _input, consumerOrdered);
         inputCounter->setInputStream(inputStream);
-        setLookAhead(0, createRowStreamLookAhead(this, inputCounter.get(), queryRowInterfaces(input), CHOOSESETSPLUS_SMART_BUFFER_SIZE, true, false, RCUNBOUND, this, &container.queryJob().queryIDiskUsage())); // read all input
+        setLookAhead(0, createRowStreamLookAhead(this, inputCounter.get(), queryRowInterfaces(input), CHOOSESETSPLUS_SMART_BUFFER_SIZE, true, false, RCUNBOUND, this, &container.queryJob().queryIDiskUsage()), true); // read all input
     }
     virtual void start() override
     {
@@ -311,7 +309,7 @@ public:
             cancelReceiveMsg(RANK_ALL, mpTag);
     }
     virtual bool isGrouped() const override { return false; }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         info.buffersInput = true;

+ 3 - 3
thorlcr/activities/countproject/thcountprojectslave.cpp

@@ -87,7 +87,7 @@ public:
         return NULL;        
     }
     virtual bool isGrouped() const override { return queryInput(0)->isGrouped(); }
-    void getMetaInfo(ThorDataLinkMetaInfo &info)
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         info.fastThrough = true;
@@ -154,7 +154,7 @@ public:
     virtual void setInputStream(unsigned index, CThorInput &_input, bool consumerOrdered) override
     {
         PARENT::setInputStream(index, _input, consumerOrdered);
-        setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), COUNTPROJECT_SMART_BUFFER_SIZE, true, false, RCUNBOUND, this, &container.queryJob().queryIDiskUsage())); // could spot disk write output here?
+        setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), COUNTPROJECT_SMART_BUFFER_SIZE, true, false, RCUNBOUND, this, &container.queryJob().queryIDiskUsage()), true); // could spot disk write output here?
     }
     virtual void start()
     {
@@ -233,7 +233,7 @@ public:
             }
         }
     }
-    void getMetaInfo(ThorDataLinkMetaInfo &info)
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         info.buffersInput = true;

+ 2 - 2
thorlcr/activities/csvread/thcsvrslave.cpp

@@ -374,14 +374,14 @@ public:
     }
     
 // IThorDataLink
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info)
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         if (!gotMeta)
         {
             gotMeta = true;
             initMetaInfo(cachedMetaInfo);
             cachedMetaInfo.isSource = true;
-            getPartsMetaInfo(cachedMetaInfo, partDescs.ordinality(), partDescs.getArray(), partHandler);
+            getPartsMetaInfo(cachedMetaInfo, partDescs.ordinality(), ((IArrayOf<IPartDescriptor> &) partDescs).getArray(), partHandler);
             cachedMetaInfo.unknownRowsOutput = true; // at least I don't think we know
             cachedMetaInfo.fastThrough = (0 == headerLines);
         }

+ 1 - 1
thorlcr/activities/degroup/thdegroupslave.cpp

@@ -84,7 +84,7 @@ public:
     { 
         return false; 
     }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         info.fastThrough = true;

+ 10 - 10
thorlcr/activities/diskread/thdiskreadslave.cpp

@@ -154,7 +154,7 @@ protected:
 public:
     CDiskRecordPartHandler(CDiskReadSlaveActivityRecord &activity);
     ~CDiskRecordPartHandler();
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info, IPartDescriptor *partDesc);
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info, IPartDescriptor *partDesc) const override;
     virtual void open();
     virtual void close(CRC32 &fileCRC);
     offset_t getLocalOffset()
@@ -203,7 +203,7 @@ CDiskRecordPartHandler::~CDiskRecordPartHandler()
 
 
 
-void CDiskRecordPartHandler::getMetaInfo(ThorDataLinkMetaInfo &info, IPartDescriptor *partDesc)
+void CDiskRecordPartHandler::getMetaInfo(ThorDataLinkMetaInfo &info, IPartDescriptor *partDesc) const
 {
     if (!partDesc->queryProperties().hasProp("@size"))
     {
@@ -462,7 +462,7 @@ public:
             return NULL;
         }
 
-        virtual void getMetaInfo(ThorDataLinkMetaInfo &info, IPartDescriptor *partDesc)
+        virtual void getMetaInfo(ThorDataLinkMetaInfo &info, IPartDescriptor *partDesc) const override
         {
             CDiskRecordPartHandler::getMetaInfo(info, partDesc);
             if (activity.helper->transformMayFilter() || (TDRkeyed & activity.helper->getFlags()))
@@ -557,14 +557,14 @@ public:
     }
 
 // IThorDataLink
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info)
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         if (!gotMeta)
         {
             gotMeta = true;
             initMetaInfo(cachedMetaInfo);
             cachedMetaInfo.isSource = true;
-            getPartsMetaInfo(cachedMetaInfo, partDescs.ordinality(), partDescs.getArray(), partHandler);
+            getPartsMetaInfo(cachedMetaInfo, partDescs.ordinality(), ((IArrayOf<IPartDescriptor> &) partDescs).getArray(), partHandler);
         }
         info = cachedMetaInfo;
         if (info.totalRowsMin==info.totalRowsMax)
@@ -721,14 +721,14 @@ public:
     }
 
 // IThorDataLink
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info)
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         if (!gotMeta)
         {
             gotMeta = true;
             initMetaInfo(cachedMetaInfo);
             cachedMetaInfo.isSource = true;
-            getPartsMetaInfo(cachedMetaInfo, partDescs.ordinality(), partDescs.getArray(), partHandler);
+            getPartsMetaInfo(cachedMetaInfo, partDescs.ordinality(), ((IArrayOf<IPartDescriptor> &) partDescs).getArray(), partHandler);
             cachedMetaInfo.unknownRowsOutput = true; // JCSMORE
         }
         info = cachedMetaInfo;
@@ -858,7 +858,7 @@ public:
     }
 
 // IThorDataLink
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info)
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         info.isSource = true;
@@ -971,7 +971,7 @@ public:
     }
 
 // IThorDataLink
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info)
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         info.isSource = true;
@@ -1109,7 +1109,7 @@ public:
         localAggTable.setown(createRowAggregator(*this, *helper, *helper));
         localAggTable->init(queryRowAllocator());
     }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info)
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         info.isSource = true;

+ 16 - 21
thorlcr/activities/enth/thenthslave.cpp

@@ -28,7 +28,6 @@ protected:
     Semaphore finishedSem;
     rowcount_t counter = 0, localRecCount = 0;
     rowcount_t denominator = 0, numerator = 0;
-    Owned<IEngineRowStream> originalInputStream;
 
     bool haveLocalCount() { return RCUNBOUND != localRecCount; }
     inline bool wanted()
@@ -74,17 +73,23 @@ protected:
         // Need lookahead _unless_ row count pre-known.
         if (0 == numerator)
             localRecCount = 0;
-        else if (info.totalRowsMin == info.totalRowsMax)
-        {
-            localRecCount = (rowcount_t)info.totalRowsMax;
-            ActPrintLog("%s: row count pre-known to be %" RCPF "d", actStr.str(), localRecCount);
-        }
         else
         {
-            localRecCount = RCUNBOUND;
-            IStartableEngineRowStream *lookAhead = createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), ENTH_SMART_BUFFER_SIZE, true, false, RCUNBOUND, this, &container.queryJob().queryIDiskUsage());
-            originalInputStream.setown(replaceInputStream(0, lookAhead)); // NB: this is post base start()
-            lookAhead->start();
+            if (info.totalRowsMin == info.totalRowsMax)
+            {
+                localRecCount = (rowcount_t)info.totalRowsMax;
+                ActPrintLog("%s: row count pre-known to be %" RCPF "d", actStr.str(), localRecCount);
+            }
+            else
+            {
+                localRecCount = RCUNBOUND;
+
+                // NB: this is post base start()
+                if (!hasLookAhead(0))
+                    setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), ENTH_SMART_BUFFER_SIZE, true, false, RCUNBOUND, this, &container.queryJob().queryIDiskUsage()), false);
+                else
+                    startLookAhead(0);
+            }
         }
     }
 public:
@@ -103,18 +108,8 @@ public:
         denominator = validRC(helper->getProportionDenominator());
         numerator = validRC(helper->getProportionNumerator());
     }
-    virtual void stop() override
-    {
-        PARENT::stop();
-
-        // restore original inputStream if lookAhead was installed, to avoid base start spuriously starting previously installed lookahead
-        if (originalInputStream)
-        {
-            Owned<IEngineRowStream> lookAhead = replaceInputStream(0, originalInputStream.getClear());
-        }
-    }
     virtual bool isGrouped() const override { return false; }
-    void getMetaInfo(ThorDataLinkMetaInfo &info)
+    void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         info.buffersInput = true;

+ 3 - 3
thorlcr/activities/external/thexternalslave.cpp

@@ -75,7 +75,7 @@ public:
     virtual void setInputStream(unsigned index, CThorInput &_input, bool consumerOrdered) override
     {
         CSlaveActivity::setInputStream(index, _input, consumerOrdered);
-        helper->setInput(index, _input.stream);
+        helper->setInput(index, _input.queryStream());
     }
     virtual void start() override
     {
@@ -107,7 +107,7 @@ public:
     {
         return grouped;
     }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
     }
@@ -138,7 +138,7 @@ public:
     virtual void setInputStream(unsigned index, CThorInput &_input, bool consumerOrdered) override
     {
         CSlaveActivity::setInputStream(index, _input, consumerOrdered);
-        helper->setInput(index, _input.stream);
+        helper->setInput(index, _input.queryStream());
     }
     virtual void start() override
     {

+ 1 - 1
thorlcr/activities/fetch/thfetchslave.cpp

@@ -510,7 +510,7 @@ public:
         return NULL;
     }
     virtual bool isGrouped() const override { return false; }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         info.canStall = true;

+ 1 - 1
thorlcr/activities/filter/thfilterslave.cpp

@@ -39,7 +39,7 @@ public:
     virtual const void *nextRow() override =0;
 
 // IThorDataLink
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         info.canReduceNumRows = true;

+ 19 - 15
thorlcr/activities/firstn/thfirstnslave.cpp

@@ -61,7 +61,7 @@ public:
         }
         PARENT::stop();
     }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info)
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         info.canReduceNumRows = true;
@@ -215,7 +215,8 @@ class CFirstNSlaveGlobal : public CFirstNSlaveBase, implements ILookAheadStopNot
     rowcount_t maxres = RCUNBOUND, skipped = 0, totallimit = RCUNBOUND;
     bool firstget = true;
     ThorDataLinkMetaInfo inputMeta;
-    Owned<IEngineRowStream> originalInputStream;
+
+    rowcount_t lastTotalLimitState = 0, lastSkipCountState = RCMAX;
 
     void sendOnce(rowcount_t count)
     {
@@ -229,7 +230,7 @@ class CFirstNSlaveGlobal : public CFirstNSlaveBase, implements ILookAheadStopNot
     }
     void ensureSendCount()
     {
-        if (hasStarted() && isFastThrough(input)) // i.e. if fast through there is no readahead
+        if (hasStarted() && !isLookAheadActive(0))
             sendOnce(getDataLinkCount() + skipped);
     }
     void doStopInput()
@@ -250,30 +251,33 @@ public:
     {
         ActivityTimer s(totalCycles, timeActivities);
         PARENT::start(); // adds to totalTime (common to local and global firstn)
+
         limit = maxres = RCUNBOUND;
         skipCount = 0;
         skipped = 0;
         firstget = true;
         input->getMetaInfo(inputMeta);
         totallimit = (rowcount_t)helper->getLimit();
-        if (!isFastThrough(input))
+
+        rowcount_t _skipCount = validRC(helper->numToSkip()); // max
+        if (!isInputFastThrough(0))
         {
-            rowcount_t _skipCount = validRC(helper->numToSkip()); // max
-            rowcount_t maxRead = (totallimit>(RCUNBOUND-_skipCount))?RCUNBOUND:totallimit+_skipCount;
-            IStartableEngineRowStream *lookAhead = createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), FIRSTN_SMART_BUFFER_SIZE, isSmartBufferSpillNeeded(this), false,
-                                                                                  maxRead, this, &container.queryJob().queryIDiskUsage()); // if a very large limit don't bother truncating
-            originalInputStream.setown(replaceInputStream(0, lookAhead));
-            lookAhead->start();
+            if (hasLookAhead(0) && (lastTotalLimitState == totallimit) && (lastSkipCountState == _skipCount))
+                startLookAhead(0);
+            else
+            {
+                rowcount_t maxRead = (totallimit>(RCUNBOUND-_skipCount))?RCUNBOUND:totallimit+_skipCount;
+                setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), FIRSTN_SMART_BUFFER_SIZE, ::canStall(input), false,
+                                                                                  maxRead, this, &container.queryJob().queryIDiskUsage()), false); // if a very large limit don't bother truncating
+                lastTotalLimitState = totallimit;
+                lastSkipCountState = _skipCount;
+            }
         }
     }
     virtual void stop() override
     {
         ensureSendCount();
         PARENT::stop();
-        if (originalInputStream)
-        {
-            Owned<IEngineRowStream> lookAhead = replaceInputStream(0, originalInputStream.getClear());
-        }
     }
     virtual void abort()
     {
@@ -373,7 +377,7 @@ public:
     }
 
     virtual bool isGrouped() const override { return false; } // need to do different if is!
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         PARENT::getMetaInfo(info);
         info.canBufferInput = true;

+ 6 - 6
thorlcr/activities/funnel/thfunnelslave.cpp

@@ -441,7 +441,7 @@ public:
         }
         return NULL;
     }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info)
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         calcMetaInfoSize(info, inputs);
@@ -547,7 +547,7 @@ public:
     {
         return queryInput(0)->isGrouped();
     }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         // TBD I think this should say max out = lhs set.
@@ -617,7 +617,7 @@ public:
         return NULL;
     }
 
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         calcMetaInfoSize(info, inputs);
@@ -749,7 +749,7 @@ public:
         return NULL;
     }
     virtual bool isGrouped() const override { return container.queryGrouped(); }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info)
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         info.canReduceNumRows = true;
@@ -861,7 +861,7 @@ public:
             return NULL;
         return selectedStream->nextRowGE(seek, numFields, wasCompleteMatch, stepExtra);
     }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info)
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         if (selectedStream)
@@ -960,7 +960,7 @@ public:
     {
         throwUnexpected();
     }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         calcMetaInfoSize(info, queryInput(0));

+ 3 - 1
thorlcr/activities/group/thgroupslave.cpp

@@ -174,7 +174,7 @@ public:
             numGroupMax = numThisGroup;
         numGroups++;
     }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         if (rolloverEnabled)
@@ -182,6 +182,8 @@ public:
             info.isSequential = true;
             info.unknownRowsOutput = true; // don't know how many rolled over
         }
+        else
+            info.fastThrough = true;
         calcMetaInfoSize(info, queryInput(0));
     }
     virtual bool isGrouped() const override{ return true; }

+ 7 - 7
thorlcr/activities/hashdistrib/thhashdistribslave.cpp

@@ -2076,7 +2076,7 @@ public:
         return row.getClear();
     }
     virtual bool isGrouped() const override { return false; }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         info.canStall = true; // currently
@@ -3139,7 +3139,7 @@ public:
     }
 
     virtual bool isGrouped() const override { return grouped; }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) = 0;
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override = 0;
 friend class CBucketHandler;
 friend class CHashTableRowTable;
 friend class CBucket;
@@ -3621,7 +3621,7 @@ public:
         : HashDedupSlaveActivityBase(container, true)
     {
     }
-    void getMetaInfo(ThorDataLinkMetaInfo &info)
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         info.unknownRowsOutput = true;
@@ -3689,7 +3689,7 @@ public:
         if (distributor)
             distributor->abort();
     }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info)
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         info.canStall = true;
@@ -3877,7 +3877,7 @@ public:
         return NULL;
     }
     virtual bool isGrouped() const override { return false; }
-    void getMetaInfo(ThorDataLinkMetaInfo &info)
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         info.canStall = true;
@@ -4359,7 +4359,7 @@ public:
         return nullptr;
     }
     virtual bool isGrouped() const override { return false; }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         info.canStall = true;
@@ -4420,7 +4420,7 @@ public:
         return row.getClear();
     }
     virtual bool isGrouped() const override { return queryInput(0)->isGrouped(); }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         if (input)

+ 5 - 5
thorlcr/activities/indexread/thindexreadslave.cpp

@@ -725,7 +725,7 @@ public:
         if (eoi && RCMAX != keyedLimitCount && !keyedLimitSkips && (container.queryLocalOrGrouped() || firstNode()))
             eoi = false; // because a non skipping limit needs to be triggered by checkLimit in nextRow(), which will either fire an exception or generate a row
     }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         info.isSource = true;
@@ -862,7 +862,7 @@ public:
         localAggTable->addRow(next);
     }
 // IThorDataLink
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         info.isSource = true;
@@ -977,7 +977,7 @@ public:
     }
 
 // IThorDataLink
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         info.isSource = true;
@@ -1143,7 +1143,7 @@ public:
     }
 
 // IThorDataLink
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         info.isSource = true;
@@ -1286,7 +1286,7 @@ public:
     }
 
 // IThorDataLink
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         info.isSource = true;

+ 3 - 13
thorlcr/activities/indexwrite/thindexwriteslave.cpp

@@ -150,13 +150,6 @@ public:
             maxDiskRecordSize = diskSize->getFixedSize() - fileposSize;
         reportOverflow = false;
     }
-    virtual void setInputStream(unsigned index, CThorInput &_input, bool consumerOrdered) override
-    {
-        PARENT::setInputStream(index, _input, consumerOrdered);
-        // JCSMORE - not sure why you ever want a look ahead on a sink like this?
-        if (!isFastThrough(input))
-            setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), INDEXWRITE_SMART_BUFFER_SIZE, true, false, RCUNBOUND, this, &container.queryJob().queryIDiskUsage()));
-    }
     void open(IPartDescriptor &partDesc, bool isTopLevel, bool isVariable, bool isTlk)
     {
         StringBuffer partFname;
@@ -181,8 +174,6 @@ public:
         unsigned nodeSize = metadata ? metadata->getPropInt("_nodeSize", NODESIZE) : NODESIZE;
         builder.setown(createKeyBuilder(out, flags, maxDiskRecordSize, nodeSize, helper->getKeyedSize(), isTopLevel ? 0 : totalCount, helper, !isTlk, isTlk));
     }
-
-
     void buildUserMetadata(Owned<IPropertyTree> & metadata)
     {
         size32_t nameLen;
@@ -202,7 +193,6 @@ public:
             metadata->setProp(name.str(), value.str());
         }
     }
-
     void buildLayoutMetadata(Owned<IPropertyTree> & metadata)
     {
         if(!metadata) metadata.setown(createPTree("metadata"));
@@ -210,7 +200,6 @@ public:
 
         setRtlFormat(*metadata, helper->queryDiskRecordSize());
     }
-
     void close(IPartDescriptor &partDesc, unsigned &crc)
     {
         StringBuffer partFname;
@@ -247,7 +236,6 @@ public:
         if (e)
             throw LINK(e);
     }
-
     void removeFiles(IPartDescriptor &partDesc)
     {
         StringBuffer partFname;
@@ -257,7 +245,6 @@ public:
         catch (IException *e) { ActPrintLog(e, "Failed to remove file: %s", partFname.str()); e->Release(); }
         catch (CATCHALL) { ActPrintLog("Failed to remove: %s", partFname.str()); }
     }
-
     virtual unsigned __int64 createBlob(size32_t size, const void * ptr)
     {
         return builder->createBlob(size, (const char *) ptr);
@@ -271,6 +258,9 @@ public:
         outRowAllocator.setown(getRowAllocator(helper->queryDiskRecordSize()));
         start();
 
+        if (ensureStartFTLookAhead(0))
+            setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), INDEXWRITE_SMART_BUFFER_SIZE, true, false, RCUNBOUND, this, &container.queryJob().queryIDiskUsage()), false);
+
         if (refactor)
         {
             assertex(isLocal);

+ 2 - 2
thorlcr/activities/iterate/thgroupiterateslave.cpp

@@ -87,7 +87,7 @@ public:
         }
         return NULL;
     }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         if (helper->canFilter())
@@ -171,7 +171,7 @@ public:
         }
         return NULL;
     }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         if (helper->canFilter())

+ 11 - 14
thorlcr/activities/iterate/thiterateslave.cpp

@@ -32,6 +32,7 @@ protected:
     bool global;
     bool eof, nextPut;
     rowcount_t count;
+
 public:
     IterateSlaveActivityBase(CGraphElementBase *_container, bool _global) : CSlaveActivity(_container)
     {
@@ -45,15 +46,6 @@ public:
         if (global)
             mpTag = container.queryJobChannel().deserializeMPTag(data);
     }
-    virtual void setInputStream(unsigned index, CThorInput &_input, bool consumerOrdered) override
-    {
-        PARENT::setInputStream(index, _input, consumerOrdered);
-        if (global) // only want lookahead if global (hence serial)
-        {
-            if (!isFastThrough(input))
-                setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), ENTH_SMART_BUFFER_SIZE, true, false, RCUNBOUND, NULL, &container.queryJob().queryIDiskUsage()));
-        }
-    }
     const void *getFirst() // for global, not called on 1st slave
     {
         CMessageBuffer msg;
@@ -87,6 +79,11 @@ public:
     {
         ActivityTimer s(totalCycles, timeActivities);
         PARENT::start();
+        if (global) // only want lookahead if global (hence serial)
+        {
+            if (ensureStartFTLookAhead(0))
+                setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), ENTH_SMART_BUFFER_SIZE, true, false, RCUNBOUND, NULL, &container.queryJob().queryIDiskUsage()), false);
+        }
         count = 0;
         eof = nextPut = false;
         inrowif.set(::queryRowInterfaces(queryInput(0)));
@@ -160,7 +157,7 @@ public:
         return NULL;
     }
 
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info)
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         info.canBufferInput = true;
@@ -240,7 +237,7 @@ public:
         return NULL;
     }
 
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info)
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         info.canBufferInput = true;
@@ -308,7 +305,7 @@ public:
     }
 
     virtual bool isGrouped() const override { return false; }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info)
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
     }
@@ -353,7 +350,7 @@ public:
         return NULL;
     }
     virtual bool isGrouped() const override { return grouped; }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
     }
@@ -416,7 +413,7 @@ public:
         return next;
     }
     virtual bool isGrouped() const override { return false; }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         info.isSource = true;

+ 13 - 21
thorlcr/activities/join/thjoinslave.cpp

@@ -44,7 +44,6 @@ class JoinSlaveActivity : public CSlaveActivity, implements ILookAheadStopNotify
 
     unsigned secondaryInputIndex = 0;
     unsigned primaryInputIndex = 0;
-    IEngineRowStream *rightInputStream = nullptr;
     IEngineRowStream *primaryInputStream = nullptr;
     IEngineRowStream *secondaryInputStream = nullptr;
     Owned<IThorDataLink> leftInput, rightInput;
@@ -207,29 +206,12 @@ public:
         {
             secondaryInputIndex = index;
             secondaryInputStream = queryInputStream(secondaryInputIndex);
-            if (isFastThrough(_input.itdl))
-            {
-                if (queryInput(index)->isGrouped())
-                {
-                    secondaryInputStream = createUngroupStream(secondaryInputStream);
-                    Owned<IEngineRowStream> old = replaceInputStream(secondaryInputIndex, secondaryInputStream);
-                }
-            }
-            else
-            {
-                IStartableEngineRowStream *lookAhead = createRowStreamLookAhead(this, secondaryInputStream, queryRowInterfaces(_input.itdl), JOIN_SMART_BUFFER_SIZE, isSmartBufferSpillNeeded(_input.itdl->queryFromActivity()),
-                                                            false, RCUNBOUND, this, &container.queryJob().queryIDiskUsage());
-                setLookAhead(secondaryInputIndex, lookAhead),
-                secondaryInputStream = lookAhead;
-            }
         }
         else
         {
             primaryInputIndex = index;
             primaryInputStream = queryInputStream(primaryInputIndex);
         }
-        if (1 == index)
-            rightInputStream = queryInputStream(1);
     }
     virtual void onInputFinished(rowcount_t count) override
     {
@@ -260,12 +242,20 @@ public:
         try
         {
             startInput(secondaryInputIndex);
+
+            if (ensureStartFTLookAhead(secondaryInputIndex))
+            {
+                IThorDataLink *secondaryInput = queryInput(secondaryInputIndex);
+                // NB: lookahead told not to preserveGroups
+                setLookAhead(secondaryInputIndex, createRowStreamLookAhead(this, secondaryInputStream, queryRowInterfaces(secondaryInput), JOIN_SMART_BUFFER_SIZE, ::canStall(secondaryInput),
+                             false, RCUNBOUND, this, &container.queryJob().queryIDiskUsage()), false);
+            }
+            secondaryInputStream = queryInputStream(secondaryInputIndex); // either lookahead or underlying stream, depending on whether active or not
         }
         catch (IException *e)
         {
             secondaryStartException.setown(e);
         }
-
     }
     virtual void start() override
     {
@@ -398,6 +388,7 @@ public:
             dataLinkStop();
             leftInput.clear();
             rightInput.clear();
+            secondaryInputStream = queryInputStream(secondaryInputIndex);
         }
     }
     virtual void reset() override
@@ -429,7 +420,7 @@ public:
         return NULL;
     }
     virtual bool isGrouped() const override { return false; }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         info.unknownRowsOutput = true;
@@ -457,6 +448,7 @@ public:
             stopLeftInput();
             mergeStats(spillStats, iLoaderL);
         }
+        IEngineRowStream *rightInputStream = queryInputStream(1);
         if (isemptylhs&&((helper->getJoinFlags()&JFrightouter)==0))
         {
             ActPrintLog("ignoring RHS as LHS empty");
@@ -696,7 +688,7 @@ public:
         return next.getClear();
     }
     virtual bool isGrouped() const override { return false; }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         info.unknownRowsOutput = true;

+ 1 - 1
thorlcr/activities/keyedjoin/thkeyedjoinslave-legacy.cpp

@@ -2416,7 +2416,7 @@ public:
 
     virtual bool isGrouped() const override { return queryInput(0)->isGrouped(); }
 
-    void getMetaInfo(ThorDataLinkMetaInfo &info)
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         info.canStall = true;

+ 1 - 1
thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp

@@ -2601,7 +2601,7 @@ public:
         PARENT::stop();
     }
     virtual bool isGrouped() const override { return queryInput(0)->isGrouped(); }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         info.canStall = true;

+ 1 - 1
thorlcr/activities/limit/thlimitslave.cpp

@@ -71,7 +71,7 @@ public:
         PARENT::stop();
     }
     virtual bool isGrouped() const override { return queryInput(0)->isGrouped(); }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         info.canReduceNumRows = true;

+ 5 - 11
thorlcr/activities/lookupjoin/thlookupjoinslave.cpp

@@ -1381,6 +1381,9 @@ public:
         try
         {
             startInput(0);
+            if (ensureStartFTLookAhead(0))
+                setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), LOOKUPJOINL_SMART_BUFFER_SIZE, ::canStall(input), grouped, RCUNBOUND, this, &container.queryJob().queryIDiskUsage()), false);
+            left.set(inputStream); // can be replaced by loader stream
         }
         catch(IException *e)
         {
@@ -1442,16 +1445,8 @@ public:
     virtual void setInputStream(unsigned index, CThorInput &_input, bool consumerOrdered) override
     {
         PARENT::setInputStream(index, _input, consumerOrdered);
-        if (0 == index)
-        {
-            if (!isFastThrough(input))
-                setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), LOOKUPJOINL_SMART_BUFFER_SIZE, isSmartBufferSpillNeeded(input->queryFromActivity()), grouped, RCUNBOUND, this, &container.queryJob().queryIDiskUsage()));
-        }
-        else
-        {
-            dbgassertex(1 == index);
+        if (1 == index)
             right = queryInputStream(1);
-        }
     }
     virtual void reset() override
     {
@@ -1469,7 +1464,6 @@ public:
         leftMatch = false;
         rhsNext = NULL;
 
-        left.set(inputStream); // can be replaced by loader stream
         if (isGlobal())
         {
             // It is not until here, that it is guaranteed all channel slave activities have been initialized.
@@ -1545,7 +1539,7 @@ public:
         left.clear();
         dataLinkStop();
     }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info)
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         info.unknownRowsOutput = true;

+ 13 - 10
thorlcr/activities/loop/thloopslave.cpp

@@ -111,7 +111,7 @@ public:
     }
 // IThorDataLink
     virtual bool isGrouped() const override { return false; }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info)
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
     }
@@ -594,7 +594,7 @@ public:
         return NULL;
     }
     virtual bool isGrouped() const override { return false; }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info)
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
     }
@@ -680,7 +680,7 @@ public:
         PARENT::stop();
     }
     virtual bool isGrouped() const override { return queryInput(0)->isGrouped(); }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info)
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
     }
@@ -856,7 +856,7 @@ public:
             startInput(branch);
             CThorInput &selectedInput = inputs.item(branch);
             selectedItdl = selectedInput.itdl;
-            selectedInputStream = selectedInput.stream;
+            selectedInputStream = selectedInput.queryStream();
         }
         dataLinkStart();
     }
@@ -884,9 +884,12 @@ public:
         return ret.getClear();
     }
     virtual bool isGrouped() const override { return container.queryGrouped(); }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
+        IThorDataLink *branchInput = queryInput(branch);
+        if (branchInput)
+            branchInput->getMetaInfo(info);
     }
 };
 
@@ -1049,7 +1052,7 @@ public:
         return NULL;
     }
     virtual bool isGrouped() const override { return queryInput(0)->isGrouped(); }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info)
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
     }
@@ -1095,7 +1098,7 @@ public:
         return ret.finalizeRowClear(sz);
     }
     virtual bool isGrouped() const override { return queryInput(0)->isGrouped(); }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info)
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
     }
@@ -1155,7 +1158,7 @@ public:
         return NULL;
     }
     virtual bool isGrouped() const override { return queryInput(0)->isGrouped(); }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info)
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
     }
@@ -1243,7 +1246,7 @@ public:
         }
     }
     virtual bool isGrouped() const override { return queryInput(0)->isGrouped(); }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info)
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
     }
@@ -1317,7 +1320,7 @@ public:
         resultStream.clear();
         PARENT::stop();
     }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info)
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
     }

+ 3 - 3
thorlcr/activities/merge/thmergeslave.cpp

@@ -403,7 +403,7 @@ public:
     }
 
     virtual bool isGrouped() const override { return false; }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         calcMetaInfoSize(info, inputs);
@@ -484,7 +484,7 @@ public:
     }
 
     virtual bool isGrouped() const override { return false; }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info)
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         calcMetaInfoSize(info, inputs);
@@ -590,7 +590,7 @@ public:
         return NULL;
     }
     virtual bool isGrouped() const override { return false; }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         calcMetaInfoSize(info, inputs);

+ 2 - 2
thorlcr/activities/msort/thgroupsortslave.cpp

@@ -109,7 +109,7 @@ public:
         return row.getClear();
     }
     virtual bool isGrouped() const override { return container.queryGrouped(); }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info)
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         info.buffersInput = true;
@@ -185,7 +185,7 @@ public:
         inputStream->resetEOF();
     }
     virtual bool isGrouped() const override { return false; }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         calcMetaInfoSize(info, queryInput(0));

+ 1 - 1
thorlcr/activities/msort/thmsortslave.cpp

@@ -206,7 +206,7 @@ public:
     }
 
     virtual bool isGrouped() const override { return false; }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         info.buffersInput = true;

+ 3 - 3
thorlcr/activities/normalize/thnormalizeslave.cpp

@@ -86,7 +86,7 @@ public:
         }
     }
     virtual bool isGrouped() const override { return queryInput(0)->isGrouped(); }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info)
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         info.unknownRowsOutput = true;
@@ -159,7 +159,7 @@ public:
             }
         }
     }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         info.unknownRowsOutput = true;
@@ -237,7 +237,7 @@ public:
             }
         }
     }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         info.unknownRowsOutput = true;

+ 4 - 5
thorlcr/activities/nsplitter/thnsplitterslave.cpp

@@ -48,7 +48,7 @@ public:
 
 // IThorDataLink impl.
     virtual CSlaveActivity *queryFromActivity() override;
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override;
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override;
     virtual void dataLinkSerialize(MemoryBuffer &mb) const override { CEdgeProgress::dataLinkSerialize(mb); }
     virtual rowcount_t getProgressCount() const override { return CEdgeProgress::getCount(); }
     virtual bool isGrouped() const override;
@@ -204,8 +204,6 @@ public:
 
                 PARENT::start();
                 initMetaInfo(cachedMetaInfo);
-                //GH->JCS I think the following line is correct and may remove some downstream spilling streams
-                //cachedMetaInfo.canBufferInput = spill;
                 calcMetaInfoSize(cachedMetaInfo, queryInput(0));
 
                 ForEachItemIn(o, outputs)
@@ -232,6 +230,7 @@ public:
                     {
                         ActPrintLog("Spill is 'balanced'");
                         smartBuf.setown(createSharedSmartMemBuffer(this, numOutputs, queryRowInterfaces(input), NSPLITTER_SPILL_BUFFER_SIZE));
+                        cachedMetaInfo.canStall = true;
                     }
                     // mark any outputs already stopped
                     ForEachItemIn(o, outputs)
@@ -387,7 +386,7 @@ public:
         streams.append(this);
         return nullptr;
     }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         info = cachedMetaInfo;
     }
@@ -406,7 +405,7 @@ CSlaveActivity *CSplitterOutput::queryFromActivity()
     return &activity;
 }
 
-void CSplitterOutput::getMetaInfo(ThorDataLinkMetaInfo &info)
+void CSplitterOutput::getMetaInfo(ThorDataLinkMetaInfo &info) const
 {
     activity.getMetaInfo(info);
 }

+ 2 - 2
thorlcr/activities/null/thnullslave.cpp

@@ -66,7 +66,7 @@ public:
     {
         return queryHelper()->queryOutputMeta()->isGrouped();
     }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         info.canReduceNumRows = true; // to 0 in fact
@@ -100,7 +100,7 @@ public:
     {
         return queryInput(0)->isGrouped();
     }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         queryInput(0)->getMetaInfo(info);
     }

+ 1 - 1
thorlcr/activities/nullaction/thnullactionslave.cpp

@@ -48,7 +48,7 @@ public:
         return NULL;
     }
     virtual bool isGrouped() const override { return false; }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info)
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         info.totalRowsMin = info.totalRowsMax = 0;

+ 1 - 1
thorlcr/activities/parse/thparseslave.cpp

@@ -116,7 +116,7 @@ public:
     { 
         return queryInput(0)->isGrouped();
     }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         info.unknownRowsOutput = true;

+ 2 - 2
thorlcr/activities/piperead/thprslave.cpp

@@ -261,7 +261,7 @@ public:
         abortPipe();
     }
     virtual bool isGrouped() const override { return false; }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         info.isSource = true;
@@ -483,7 +483,7 @@ public:
         abortPipe();
     }
     virtual bool isGrouped() const override { return grouped; }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         info.isSource = false;

+ 2 - 2
thorlcr/activities/project/thprojectslave.cpp

@@ -95,7 +95,7 @@ public:
     virtual CThorStrandProcessor *createStrandSourceProcessor(bool inputOrdered) override { throwUnexpected(); }
 
 // IThorDataLink
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         info.fastThrough = true; // ish
@@ -311,7 +311,7 @@ public:
         CSlaveActivity::abort();
         prefetcher.abort();
     }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         if (helper->canFilter())

+ 3 - 7
thorlcr/activities/pull/thpullslave.cpp

@@ -33,16 +33,12 @@ public:
     }
 
 // IThorDataLink methods
-    virtual void setInputStream(unsigned index, CThorInput &_input, bool consumerOrdered) override
-    {
-        PARENT::setInputStream(index, _input, consumerOrdered);
-        if (!isFastThrough(input))
-            setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), PULL_SMART_BUFFER_SIZE, true, false, RCUNBOUND, NULL, &container.queryJob().queryIDiskUsage()));
-    }
     virtual void start() override
     {
         ActivityTimer s(totalCycles, timeActivities);
         PARENT::start();
+        if (ensureStartFTLookAhead(0))
+            setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), PULL_SMART_BUFFER_SIZE, true, false, RCUNBOUND, NULL, &container.queryJob().queryIDiskUsage()), false);
     }
     const void * nextRow() override
     {
@@ -55,7 +51,7 @@ public:
     }
 
     virtual bool isGrouped() const override { return false; } // or input->isGrouped?
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         info.buffersInput = true;

+ 10 - 13
thorlcr/activities/rollup/throllupslave.cpp

@@ -178,26 +178,23 @@ public:
         if (!global)
             setRequireInitData(false);
     }
-    virtual void setInputStream(unsigned index, CThorInput &_input, bool consumerOrdered) override
+    virtual void start()
     {
-        PARENT::setInputStream(index, _input, consumerOrdered);
+        PARENT::start();
         if (global)
         {
-            if (!isFastThrough(input))
-                setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), rollup?ROLLUP_SMART_BUFFER_SIZE:DEDUP_SMART_BUFFER_SIZE, isSmartBufferSpillNeeded(this), false, RCUNBOUND, NULL, &container.queryJob().queryIDiskUsage())); // only allow spill if input can stall
+            if (ensureStartFTLookAhead(0))
+                setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), rollup?ROLLUP_SMART_BUFFER_SIZE:DEDUP_SMART_BUFFER_SIZE, ::canStall(input), false, RCUNBOUND, NULL, &container.queryJob().queryIDiskUsage()), false);
         }
-    }
-    virtual void start()
-    {
-        PARENT::start();
         needFirstRow = true;
         rowif.set(queryRowInterfaces(input));
         eogNext = eos = false;
         numKept = 0;
     }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info)
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
-        if (global) {
+        if (global)
+        {
             info.canBufferInput = true;
             info.isSequential = true;
         }
@@ -311,7 +308,7 @@ public:
         assertex( (keepLeft || numToKeep == 1) && (!keepBest || numToKeep==1));
     }
     virtual bool isGrouped() const override { return groupOp; }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         CDedupRollupBaseActivity::getMetaInfo(info);
@@ -534,7 +531,7 @@ public:
         return NULL;
     }
     virtual bool isGrouped() const override { return groupOp; }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         CDedupRollupBaseActivity::getMetaInfo(info);
@@ -603,7 +600,7 @@ public:
         }
     }
     virtual bool isGrouped() const override { return false; }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info)
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         calcMetaInfoSize(info, queryInput(0));

+ 1 - 1
thorlcr/activities/sample/thsampleslave.cpp

@@ -70,7 +70,7 @@ public:
         }
         return NULL;
     }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         info.canReduceNumRows = true;

+ 3 - 16
thorlcr/activities/selectnth/thselectnthslave.cpp

@@ -28,7 +28,6 @@ class CSelectNthSlaveActivity : public CSlaveActivity, implements ILookAheadStop
     bool createDefaultIfFail;
     IHThorSelectNArg *helper;
     SpinLock spin; // MORE: Remove this and use an atomic variable for lookaheadN
-    Owned<IEngineRowStream> originalInputStream;
 
     void initN()
     {
@@ -102,12 +101,8 @@ public:
         IStartableEngineRowStream *lookAhead = nullptr;
         if (!isLocal && rowN)
         {
-            if (!isFastThrough(input))
-            {
-                lookAhead = createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), SELECTN_SMART_BUFFER_SIZE, isSmartBufferSpillNeeded(this), false, rowN, this, &container.queryJob().queryIDiskUsage());
-                originalInputStream.setown(replaceInputStream(0, lookAhead));
-                lookAhead->start();
-            }
+            if (ensureStartFTLookAhead(0))
+                setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), SELECTN_SMART_BUFFER_SIZE, ::canStall(input), false, rowN, this, &container.queryJob().queryIDiskUsage()), false);
         }
 
         seenNth = false;
@@ -127,14 +122,6 @@ public:
         }
         first = true;
     }
-    virtual void stop() override
-    {
-        PARENT::stop();
-        if (originalInputStream)
-        {
-            Owned<IEngineRowStream> lookAhead = replaceInputStream(0, originalInputStream.getClear());
-        }
-    }
     virtual void abort() override
     {
         CSlaveActivity::abort();
@@ -204,7 +191,7 @@ public:
         return ret.getClear();
     }
     virtual bool isGrouped() const override { return false; }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         info.isSequential = true; 

+ 1 - 1
thorlcr/activities/selfjoin/thselfjoinslave.cpp

@@ -222,7 +222,7 @@ public:
     }
 
     virtual bool isGrouped() const override { return false; }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         info.buffersInput = true; 

+ 2 - 2
thorlcr/activities/soapcall/thsoapcallslave.cpp

@@ -103,7 +103,7 @@ public:
         if (wscHelper)
             wscHelper->abort();
     }
-    void getMetaInfo(ThorDataLinkMetaInfo &info)
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         info.unknownRowsOutput = true;
@@ -187,7 +187,7 @@ public:
         CSlaveActivity::abort();
         wscHelper->abort();
     }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         info.unknownRowsOutput = true;

+ 1 - 1
thorlcr/activities/spill/thspillslave.cpp

@@ -193,7 +193,7 @@ public:
     }
 
     virtual bool isGrouped() const override { return grouped; }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info)
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         info.fastThrough = true; // ish

+ 1 - 1
thorlcr/activities/temptable/thtmptableslave.cpp

@@ -94,7 +94,7 @@ public:
         }
         return NULL;
     }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         info.isSource = true;

+ 6 - 51
thorlcr/activities/thactivityutil.cpp

@@ -312,7 +312,7 @@ void calcMetaInfoSize(ThorDataLinkMetaInfo &info, IThorDataLink *link)
 
 }
 
-void calcMetaInfoSize(ThorDataLinkMetaInfo &info, CThorInputArray &inputs)
+void calcMetaInfoSize(ThorDataLinkMetaInfo &info, const CThorInputArray &inputs)
 {
     //IThorDataLink **link,unsigned ninputs;
 
@@ -380,6 +380,11 @@ void calcMetaInfoSize(ThorDataLinkMetaInfo &info, const ThorDataLinkMetaInfo *in
     {
         if (1 == num)
             info = infos[0];
+        else
+        {
+            info.fastThrough = true;
+            info.totalRowsMin = info.totalRowsMax = 0;
+        }
         return;
     }
     if (!info.unknownRowsOutput)
@@ -430,56 +435,6 @@ void calcMetaInfoSize(ThorDataLinkMetaInfo &info, const ThorDataLinkMetaInfo *in
         info.totalRowsMin = 0; // a good bet
 }
 
-bool isFastThrough(IThorDataLink *input)
-{
-    CSlaveActivity *act = (CSlaveActivity *)input->queryFromActivity();
-    if (act)
-    {
-        ThorDataLinkMetaInfo info;
-        act->getMetaInfo(info);
-        if (!info.fastThrough)
-            return false;
-        unsigned i=0;
-        while (true)
-        {
-            input = act->queryInput(i++);
-            if (!input)
-                break;
-            if (!isFastThrough(input))
-                return false;
-        }
-    }
-    return true;
-}
-
-static bool canStall(CActivityBase *act)
-{
-    if (!act)
-        return false;
-    unsigned i=0;
-    IThorDataLink *inp;
-    while ((inp=((CSlaveActivity *)act)->queryInput(i++))!=NULL) {
-        ThorDataLinkMetaInfo info;
-        inp->getMetaInfo(info);
-        if (info.canStall)
-            return true;
-        if (!info.isSource&&!info.buffersInput&&!info.canBufferInput)
-            if (canStall((CSlaveActivity *)inp->queryFromActivity()))
-                return true;
-    }
-    return false;
-}
-
-bool isSmartBufferSpillNeeded(CActivityBase *act)
-{
-    // two part - forward and reverse checking
-    // first reverse looking for stalling activities
-    if (!canStall((CSlaveActivity *)act))
-        return false;
-    // now check
-    return true;
-}
-
 bool checkSavedFileCRC(IFile * ifile, bool & timesDiffer, unsigned & storedCrc)
 {
     StringBuffer s(ifile->queryFilename());

+ 2 - 6
thorlcr/activities/thactivityutil.ipp

@@ -45,7 +45,7 @@ public:
 
     virtual ~CPartHandler() { }
     virtual void setPart(IPartDescriptor *partDesc) = 0;
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info, IPartDescriptor *partDesc) { }
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info, IPartDescriptor *partDesc) const { }
     virtual void stop() = 0;
 };
 
@@ -70,7 +70,7 @@ IRowStream *createSequentialPartHandler(CPartHandler *partHandler, IArrayOf<IPar
 
 void initMetaInfo(ThorDataLinkMetaInfo &info);
 void calcMetaInfoSize(ThorDataLinkMetaInfo &info, IThorDataLink *link);
-void calcMetaInfoSize(ThorDataLinkMetaInfo &info, CThorInputArray &inputs);
+void calcMetaInfoSize(ThorDataLinkMetaInfo &info, const CThorInputArray &inputs);
 void calcMetaInfoSize(ThorDataLinkMetaInfo &info, const ThorDataLinkMetaInfo *infos, unsigned num);
 
 interface ILookAheadStopNotify
@@ -81,10 +81,6 @@ interface IDiskUsage;
 IStartableEngineRowStream *createRowStreamLookAhead(CSlaveActivity *activity, IEngineRowStream *inputStream, IThorRowInterfaces *rowIf, size32_t bufsize, bool spillenabled, bool preserveGrouping=true, rowcount_t maxcount=RCUNBOUND, ILookAheadStopNotify *notify=NULL, IDiskUsage *_diskUsage=NULL); //maxcount is maximum rows to read set to RCUNBOUND for all
 
 
-
-bool isSmartBufferSpillNeeded(CActivityBase *act);
-bool isFastThrough(IThorDataLink *input);
-
 StringBuffer &locateFilePartPath(CActivityBase *activity, const char *logicalFilename, IPartDescriptor &partDesc, StringBuffer &filePath);
 void doReplicate(CActivityBase *activity, IPartDescriptor &partDesc, ICopyFileProgress *iProgress=NULL);
 void cancelReplicates(CActivityBase *activity, IPartDescriptor &partDesc);

+ 5 - 7
thorlcr/activities/thdiskbaseslave.cpp

@@ -239,6 +239,7 @@ void CDiskReadSlaveActivityBase::init(MemoryBuffer &data, MemoryBuffer &slaveDat
         if ((helper->getFlags() & TDXtemporary) && (!container.queryJob().queryUseCheckpoints()))
             partDescs.item(0).queryOwner().setDefaultDir(queryTempDir());
     }
+    gotMeta = false; // if variable filename and inside loop, need to invalidate cached meta
 }
 
 const char *CDiskReadSlaveActivityBase::queryLogicalFilename(unsigned index)
@@ -303,18 +304,15 @@ void CDiskReadSlaveActivityBase::serializeStats(MemoryBuffer &mb)
 
 /////////////////
 
-void CDiskWriteSlaveActivityBase::setInputStream(unsigned index, CThorInput &_input, bool consumerOrdered)
-{
-    PARENT::setInputStream(index, _input, consumerOrdered);
-    if (dlfn.isExternal() && !firstNode())
-        setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), PROCESS_SMART_BUFFER_SIZE, isSmartBufferSpillNeeded(this), grouped, RCUNBOUND, NULL, &container.queryJob().queryIDiskUsage()));
-}
-
 void CDiskWriteSlaveActivityBase::open()
 {
     start();
     if (dlfn.isExternal() && !firstNode())
     {
+        if (hasLookAhead(0))
+            startLookAhead(0);
+        else
+            setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), PROCESS_SMART_BUFFER_SIZE, ::canStall(input), grouped, RCUNBOUND, NULL, &container.queryJob().queryIDiskUsage()), false);
         if (!rfsQueryParallel)
         {
             ActPrintLog("Blocked, waiting for previous part to complete write");

+ 3 - 3
thorlcr/activities/thdiskbaseslave.ipp

@@ -86,8 +86,9 @@ protected:
     StringArray subfileLogicalFilenames;
     IArrayOf<IPartDescriptor> partDescs;
     IHThorDiskReadBaseArg *helper;
-    bool checkFileCrc, gotMeta, crcCheckCompressed, markStart;
-    ThorDataLinkMetaInfo cachedMetaInfo;
+    bool checkFileCrc, crcCheckCompressed, markStart;
+    mutable bool gotMeta;
+    mutable ThorDataLinkMetaInfo cachedMetaInfo;
     Owned<CDiskPartHandlerBase> partHandler;
     Owned<IExpander> eexp;
     rowcount_t diskProgress = 0;
@@ -143,7 +144,6 @@ public:
     virtual CFPmode onProgress(unsigned __int64 sizeDone, unsigned __int64 totalSize);
 
 // IThorSlaveProcess overloaded methods
-    virtual void setInputStream(unsigned index, CThorInput &_input, bool consumerOrdered) override;
     virtual void kill();
     virtual void process();
     virtual void endProcess();

+ 1 - 1
thorlcr/activities/topn/thtopnslave.cpp

@@ -272,7 +272,7 @@ public:
         }
         return NULL;
     }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info)
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         info.canStall = true;

+ 1 - 1
thorlcr/activities/trace/thtraceslave.cpp

@@ -123,7 +123,7 @@ public:
         inputStream->resetEOF();
     }
     virtual bool isGrouped() const override { return input->isGrouped(); }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         calcMetaInfoSize(info, queryInput(0));

+ 1 - 1
thorlcr/activities/when/thwhenslave.cpp

@@ -102,7 +102,7 @@ public:
         dataLinkIncrement();
         return row.getClear();
     }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         info.fastThrough = false;

+ 1 - 1
thorlcr/activities/wuidread/thwuidreadslave.cpp

@@ -97,7 +97,7 @@ public:
         return rowBuilder.finalizeRowClear(sz);
     }
     virtual bool isGrouped() const override { return grouped; }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         info.isSource = true;

+ 4 - 7
thorlcr/activities/wuidwrite/thwuidwriteslave.cpp

@@ -86,18 +86,15 @@ public:
     CWorkUnitWriteGlobalSlaveBaseActivity(CGraphElementBase *container) : CWorkUnitWriteSlaveBase(container)
     {
     }
-    virtual void setInputStream(unsigned index, CThorInput &_input, bool consumerOrdered) override
-    {
-        PARENT::setInputStream(index, _input, consumerOrdered);
-        // JCSMORE - not sure why you ever want a look ahead on a sink like this?
-        if (!isFastThrough(input))
-            setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), WORKUNITWRITE_SMART_BUFFER_SIZE, isSmartBufferSpillNeeded(this), grouped, RCUNBOUND, NULL, &container.queryJob().queryIDiskUsage()));
-    }
     virtual void process() override
     {
         start();
+
         processed = THORDATALINK_STARTED;
 
+        if (ensureStartFTLookAhead(0))
+            setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), WORKUNITWRITE_SMART_BUFFER_SIZE, ::canStall(input), grouped, RCUNBOUND, NULL, &container.queryJob().queryIDiskUsage()), false);
+
         ActPrintLog("WORKUNITWRITE: processing first block");
 
         CMessageBuffer replyMb;

+ 1 - 1
thorlcr/activities/xmlparse/thxmlparseslave.cpp

@@ -154,7 +154,7 @@ public:
         return NULL;
     }
     virtual bool isGrouped() const override { return queryInput(0)->isGrouped(); }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         initMetaInfo(info);
         info.fastThrough = true; // ish

+ 2 - 2
thorlcr/activities/xmlread/thxmlreadslave.cpp

@@ -270,14 +270,14 @@ public:
     }
     
     virtual bool isGrouped() const override { return false; }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info)
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
     {
         if (!gotMeta)
         {
             gotMeta = true;
             initMetaInfo(cachedMetaInfo);
             cachedMetaInfo.isSource = true;
-            getPartsMetaInfo(cachedMetaInfo, partDescs.ordinality(), partDescs.getArray(), partHandler);
+            getPartsMetaInfo(cachedMetaInfo, partDescs.ordinality(), ((IArrayOf<IPartDescriptor> &) partDescs).getArray(), partHandler);
             cachedMetaInfo.unknownRowsOutput = true; // at least I don't think we know
             cachedMetaInfo.fastThrough = true;
         }

+ 4 - 4
thorlcr/graph/thgraph.cpp

@@ -3075,7 +3075,7 @@ bool CActivityBase::appendRowXml(StringBuffer & target, IOutputMetaData & meta,
     }
 }
 
-void CActivityBase::logRow(const char * prefix, IOutputMetaData & meta, const void * row)
+void CActivityBase::logRow(const char * prefix, IOutputMetaData & meta, const void * row) const
 {
     bool blindLogging = false; // MORE: should check a workunit/global option
     if (meta.hasXML() && !blindLogging)
@@ -3086,7 +3086,7 @@ void CActivityBase::logRow(const char * prefix, IOutputMetaData & meta, const vo
     }
 }
 
-void CActivityBase::ActPrintLog(const char *format, ...)
+void CActivityBase::ActPrintLog(const char *format, ...) const
 {
     va_list args;
     va_start(args, format);
@@ -3094,7 +3094,7 @@ void CActivityBase::ActPrintLog(const char *format, ...)
     va_end(args);
 }
 
-void CActivityBase::ActPrintLog(IException *e, const char *format, ...)
+void CActivityBase::ActPrintLog(IException *e, const char *format, ...) const
 {
     va_list args;
     va_start(args, format);
@@ -3102,7 +3102,7 @@ void CActivityBase::ActPrintLog(IException *e, const char *format, ...)
     va_end(args);
 }
 
-void CActivityBase::ActPrintLog(IException *e)
+void CActivityBase::ActPrintLog(IException *e) const
 {
     ActPrintLog(e, "%s", "");
 }

+ 6 - 6
thorlcr/graph/thgraph.hpp

@@ -1073,14 +1073,14 @@ public:
     bool receiveMsg(CMessageBuffer &mb, const rank_t rank, const mptag_t mpTag, rank_t *sender=NULL, unsigned timeout=MP_WAIT_FOREVER);
     void cancelReceiveMsg(ICommunicator &comm, const rank_t rank, const mptag_t mpTag);
     void cancelReceiveMsg(const rank_t rank, const mptag_t mpTag);
-    bool firstNode() { return 1 == container.queryJobChannel().queryMyRank(); }
-    bool lastNode() { return container.queryJob().querySlaves() == container.queryJobChannel().queryMyRank(); }
+    bool firstNode() const { return 1 == container.queryJobChannel().queryMyRank(); }
+    bool lastNode() const { return container.queryJob().querySlaves() == container.queryJobChannel().queryMyRank(); }
     unsigned queryMaxCores() const { return container.queryMaxCores(); }
     IThorRowInterfaces *getRowInterfaces();
     IEngineRowAllocator *getRowAllocator(IOutputMetaData * meta, roxiemem::RoxieHeapFlags flags=roxiemem::RHFnone, byte seq=0) const;
 
     bool appendRowXml(StringBuffer & target, IOutputMetaData & meta, const void * row) const;
-    void logRow(const char * prefix, IOutputMetaData & meta, const void * row);
+    void logRow(const char * prefix, IOutputMetaData & meta, const void * row) const;
 
     virtual void setInput(unsigned index, CActivityBase *inputActivity, unsigned inputOutIdx) { }
     virtual void clearConnections() { }
@@ -1096,9 +1096,9 @@ public:
     virtual MemoryBuffer &getInitializationData(unsigned slave, MemoryBuffer &mb) const = 0;
     virtual IThorGraphResults *queryResults() { return ownedResults; }
 
-    void ActPrintLog(const char *format, ...) __attribute__((format(printf, 2, 3)));
-    void ActPrintLog(IException *e, const char *format, ...) __attribute__((format(printf, 3, 4)));
-    void ActPrintLog(IException *e);
+    void ActPrintLog(const char *format, ...) const __attribute__((format(printf, 2, 3)));
+    void ActPrintLog(IException *e, const char *format, ...) const __attribute__((format(printf, 3, 4)));
+    void ActPrintLog(IException *e) const;
 
     IThorRowInterfaces * createRowInterfaces(IOutputMetaData * meta, byte seq=0);
     IThorRowInterfaces * createRowInterfaces(IOutputMetaData * meta, roxiemem::RoxieHeapFlags heapFlags, byte seq=0);

+ 121 - 20
thorlcr/graph/thgraphslave.cpp

@@ -111,6 +111,17 @@ public:
     virtual const mptag_t queryTag() const { return tag; }
 };
 
+
+bool canStall(IThorDataLink *input)
+{
+    return input->queryFromActivity()->canStall();
+}
+
+//
+bool CThorInput::isFastThrough() const
+{
+    return itdl->queryFromActivity()->isFastThrough();
+}
 // 
 
 CSlaveActivity::CSlaveActivity(CGraphElementBase *_container) : CActivityBase(_container), CEdgeProgress(this)
@@ -126,6 +137,11 @@ CSlaveActivity::~CSlaveActivity()
     ActPrintLog("DESTROYED");
 }
 
+bool CSlaveActivity::hasLookAhead(unsigned index) const
+{
+    return inputs.item(index).hasLookAhead();
+}
+
 void CSlaveActivity::setOutputStream(unsigned index, IEngineRowStream *stream)
 {
     while (outputStreams.ordinality()<=index)
@@ -182,7 +198,7 @@ void CSlaveActivity::setInputStream(unsigned index, CThorInput &_input, bool con
             _input.tracingStream.setown(tracingStream);
             _inputStream = tracingStream;
         }
-        _input.stream.set(_inputStream);
+        _input.setStream(LINK(_inputStream));
         _input.junction.setown(junction.getClear());
         if (0 == index)
             inputStream = _inputStream;
@@ -190,25 +206,111 @@ void CSlaveActivity::setInputStream(unsigned index, CThorInput &_input, bool con
     }
 }
 
-IEngineRowStream *CSlaveActivity::replaceInputStream(unsigned index, IEngineRowStream *_inputStream)
+void CSlaveActivity::setLookAhead(unsigned index, IStartableEngineRowStream *lookAhead, bool persistent)
 {
     CThorInput &_input = inputs.item(index);
-    IEngineRowStream *prevInputStream = _input.stream.getClear();
-    _input.stream.setown(_inputStream);
+    _input.setLookAhead(lookAhead, persistent);
     if (0 == index)
-        inputStream = _inputStream;
-    return prevInputStream;
+        inputStream = lookAhead;
+    if (!persistent)
+        _input.startLookAhead();
 }
 
-void CSlaveActivity::setLookAhead(unsigned index, IStartableEngineRowStream *lookAhead)
+void CSlaveActivity::startLookAhead(unsigned index)
 {
     CThorInput &_input = inputs.item(index);
-    _input.lookAhead.setown(lookAhead);
-    _input.stream.set(lookAhead);
+    _input.startLookAhead();
     if (0 == index)
-        inputStream = lookAhead;
+        inputStream = _input.queryStream();
+}
+
+bool CSlaveActivity::isLookAheadActive(unsigned index) const
+{
+    CThorInput &_input = inputs.item(index);
+    return _input.isLookAheadActive();
+}
+
+bool CSlaveActivity::isInputFastThrough(unsigned index) const
+{
+    CThorInput &input = inputs.item(index);
+    return input.isFastThrough();
+}
+
+/* If fastThrough, return false.
+ * If !fastThrough (indicating needs look ahead) and has existing lookahead, start it, return false.
+ * If !fastThrough (indicating needs look ahead) and no existing lookahead, return true, caller will install.
+ *
+ * NB: only return true if new lookahead needs installing.
+ */
+bool CSlaveActivity::ensureStartFTLookAhead(unsigned index)
+{
+    CThorInput &input = inputs.item(index);
+    if (input.isFastThrough())
+        return false; // no look ahead required
+    else
+    {
+        // look ahead required
+        if (input.hasLookAhead())
+        {
+            // no change, start existing look ahead
+            startLookAhead(index);
+            return false; // no [new] look ahead required
+        }
+        else
+            return true; // new look ahead required
+    }
+}
+
+// recurse through active inputs, if _all_ are fastThrough, return true
+bool CSlaveActivity::isFastThrough() const
+{
+    if (!hasStarted())
+        return true;
+    ThorDataLinkMetaInfo info;
+    getMetaInfo(info);
+    if (!info.fastThrough || info.canStall) // NB: JIC - but should never be marked fastThrough==true if canStall==true
+        return false;
+    for (unsigned i=0; i<queryNumInputs(); i++)
+    {
+        IThorDataLink *input = queryInput(i);
+        if (input && queryInputStarted(i))
+        {
+            CSlaveActivity *inputAct = input->queryFromActivity();
+            if (!inputAct->isFastThrough())
+                return false;
+        }
+    }
+    return true;
 }
 
+// NB: very similar to above, should possible be merged at some point
+// recurse through active inputs, if _any_ are canStall, return true
+bool CSlaveActivity::canStall() const
+{
+    if (!hasStarted())
+        return false;
+    ThorDataLinkMetaInfo info;
+    getMetaInfo(info);
+    if (info.canStall)
+        return true;
+    if (info.isSource || info.buffersInput || info.canBufferInput)
+        return false;
+
+    for (unsigned i=0; i<queryNumInputs(); i++)
+    {
+        IThorDataLink *input = queryInput(i);
+        if (input && queryInputStarted(i))
+        {
+            CSlaveActivity *inputAct = input->queryFromActivity();
+            if (inputAct->canStall())
+                return true;
+        }
+    }
+    return false;
+}
+
+
+
 IStrandJunction *CSlaveActivity::getOutputStreams(CActivityBase &ctx, unsigned idx, PointerArrayOf<IEngineRowStream> &streams, const CThorStrandOptions * consumerOptions, bool consumerOrdered, IOrderedCallbackCollection * orderedCallbacks)
 {
     // Default non-stranded implementation, expects activity to have 1 output.
@@ -248,7 +350,7 @@ IThorDataLink *CSlaveActivity::queryInput(unsigned index) const
 IEngineRowStream *CSlaveActivity::queryInputStream(unsigned index) const
 {
     if (index>=inputs.ordinality()) return nullptr;
-    return inputs.item(index).stream;
+    return inputs.item(index).queryStream();
 }
 
 IStrandJunction *CSlaveActivity::queryInputJunction(unsigned index) const
@@ -298,14 +400,12 @@ void CSlaveActivity::startInput(unsigned index, const char *extra)
     try
     {
 #endif
-        _input.itdl->start();
-        startJunction(_input.junction);
-        if (_input.lookAhead)
-            _input.lookAhead->start();
-        _input.stopped = false;
-        _input.started = true;
+        _input.start();
         if (0 == index)
+        {
             inputStopped = false;
+            inputStream = _input.queryStream();
+        }
 #ifdef TRACE_STARTSTOP_EXCEPTIONS
     }
     catch(IException *e)
@@ -337,11 +437,12 @@ void CSlaveActivity::stopInput(unsigned index, const char *extra)
     try
     {
 #endif
-        if (_input.stream)
-            _input.stream->stop();
-        _input.stopped = true;
+        _input.stop();
         if (0 == index)
+        {
             inputStopped = true;
+            inputStream = _input.queryStream();
+        }
 
 #ifdef TRACE_STARTSTOP_EXCEPTIONS
     }

+ 75 - 5
thorlcr/graph/thgraphslave.hpp

@@ -107,15 +107,25 @@ public:
 
 class CThorInput : public CSimpleInterfaceOf<IInterface>
 {
+    Linked<IEngineRowStream> stream;
+    Linked<IStartableEngineRowStream> lookAhead;
+
+
+    void _startLookAhead()
+    {
+        assertex(nullptr != lookAhead);
+        lookAhead->start();
+        lookAheadActive = true;
+    }
 public:
     unsigned sourceIdx = 0;
     Linked<IThorDataLink> itdl;
-    Linked<IStartableEngineRowStream> lookAhead;
     Linked<IThorDebug> tracingStream;
-    Linked<IEngineRowStream> stream;
     Linked<IStrandJunction> junction;
     bool stopped = false;
     bool started = false;
+    bool persistentLookAhead = false;
+    bool lookAheadActive = false;
 
     explicit CThorInput() { }
     void set(IThorDataLink *_itdl, unsigned idx) { itdl.set(_itdl); sourceIdx = idx; }
@@ -126,6 +136,56 @@ public:
     }
     bool isStopped() const { return stopped; }
     bool isStarted() const { return started; }
+    bool isLookAheadActive() const { return lookAheadActive; }
+    IEngineRowStream *queryStream() const
+    {
+        if (lookAhead && lookAheadActive)
+            return lookAhead;
+        else
+            return stream;
+    }
+    void setStream(IEngineRowStream *_stream) { stream.setown(_stream); }
+    bool hasLookAhead() const { return nullptr != lookAhead; }
+    void setLookAhead(IStartableEngineRowStream *_lookAhead, bool persistent)
+    {
+        dbgassertex(!persistentLookAhead); // If persistent, must only be called once
+
+        /* NB: if persistent, must be installed before starting input, e.g. during setInputStream wiring.
+         * if not persistent, must be installed after input started, e.g. in start() after startInput(x).
+         */
+        dbgassertex((persistent && !isStarted()) || (!persistent && isStarted()));
+
+        lookAhead.setown(_lookAhead); // if pre-existing lookAhead, this will replace.
+        persistentLookAhead = persistent;
+    }
+    void startLookAhead()
+    {
+        dbgassertex(!persistentLookAhead);
+        dbgassertex(isStarted());
+        _startLookAhead();
+    }
+    void start()
+    {
+        itdl->start();
+        startJunction(junction);
+        if (persistentLookAhead)
+            _startLookAhead();
+        stopped = false;
+        started = true;
+    }
+    void stop()
+    {
+        // NB: lookAhead can be installed but not used
+        if (lookAheadActive)
+        {
+            lookAhead->stop();
+            lookAheadActive = false;
+        }
+        else if (stream)
+            stream->stop();
+        stopped = true;
+    }
+    bool isFastThrough() const;
 };
 typedef IArrayOf<CThorInput> CThorInputArray;
 
@@ -150,6 +210,12 @@ protected:
 
 protected:
     unsigned __int64 queryLocalCycles() const;
+    bool ensureStartFTLookAhead(unsigned index);
+    bool isInputFastThrough(unsigned index) const;
+    bool hasLookAhead(unsigned index) const;
+    void setLookAhead(unsigned index, IStartableEngineRowStream *lookAhead, bool persistent);
+    void startLookAhead(unsigned index);
+    bool isLookAheadActive(unsigned index) const;
 
 public:
     IMPLEMENT_IINTERFACE_USING(CActivityBase)
@@ -168,8 +234,6 @@ public:
     virtual void setInput(unsigned index, CActivityBase *inputActivity, unsigned inputOutIdx) override;
     virtual void connectInputStreams(bool consumerOrdered);
 
-    void setLookAhead(unsigned index, IStartableEngineRowStream *lookAhead);
-    IEngineRowStream *replaceInputStream(unsigned index, IEngineRowStream *_inputStream);
     IThorDataLink *queryOutput(unsigned index) const;
     IThorDataLink *queryInput(unsigned index) const;
     IEngineRowStream *queryInputStream(unsigned index) const;
@@ -187,12 +251,15 @@ public:
     void stopAllInputs();
     virtual void serializeStats(MemoryBuffer &mb);
     void debugRequest(unsigned edgeIdx, MemoryBuffer &msg);
+    bool canStall() const;
+    bool isFastThrough() const;
+
 
 // IThorDataLink
     virtual CSlaveActivity *queryFromActivity() override { return this; }
     virtual IStrandJunction *getOutputStreams(CActivityBase &_ctx, unsigned idx, PointerArrayOf<IEngineRowStream> &streams, const CThorStrandOptions * consumerOptions, bool consumerOrdered, IOrderedCallbackCollection * orderedCallbacks) override;
     virtual void setOutputStream(unsigned index, IEngineRowStream *stream) override;
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override { }
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override { }
     virtual bool isGrouped() const override;
     virtual IOutputMetaData * queryOutputMeta() const;
     virtual void dataLinkSerialize(MemoryBuffer &mb) const override;
@@ -497,4 +564,7 @@ extern graphslave_decl bool ensurePrimary(CActivityBase *activity, IPartDescript
 extern graphslave_decl IActivityReplicatedFile *createEnsurePrimaryPartFile(const char *logicalFilename, IPartDescriptor *partDesc);
 extern graphslave_decl IThorFileCache *createFileCache(unsigned limit);
 
+extern graphslave_decl bool canStall(IThorDataLink *input);
+
+
 #endif

+ 1 - 1
thorlcr/slave/slave.hpp

@@ -91,7 +91,7 @@ interface IThorDataLink : extends IInterface
 {
     virtual void start() = 0; // prepares input
     virtual CSlaveActivity *queryFromActivity() = 0; // activity that has this as an output
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) = 0;
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const = 0;
     virtual bool isGrouped() const { return false; }
     virtual IOutputMetaData * queryOutputMeta() const = 0;
     virtual bool isInputOrdered(bool consumerOrdered) const = 0;