Ver código fonte

Merge pull request #10229 from jakesmith/hpcc-17991

HPCC-17991 Avoid using a smart lookahead reader if input is fastThrough

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 8 anos atrás
pai
commit
5b35891c22

+ 2 - 1
thorlcr/activities/choosesets/thchoosesetsslave.cpp

@@ -152,7 +152,8 @@ public:
     virtual void setInputStream(unsigned index, CThorInput &_input, bool consumerOrdered) override
     {
         PARENT::setInputStream(index, _input, consumerOrdered);
-        setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), CHOOSESETS_SMART_BUFFER_SIZE, isSmartBufferSpillNeeded(this), false, RCUNBOUND, NULL, &container.queryJob().queryIDiskUsage()));
+        if (!isFastThrough(input))
+            setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), CHOOSESETS_SMART_BUFFER_SIZE, isSmartBufferSpillNeeded(this), false, RCUNBOUND, NULL, &container.queryJob().queryIDiskUsage()));
     }
     virtual void start() override
     {

+ 9 - 6
thorlcr/activities/firstn/thfirstnslave.cpp

@@ -231,12 +231,15 @@ public:
         firstget = true;
         input->getMetaInfo(inputMeta);
         totallimit = (rowcount_t)helper->getLimit();
-        rowcount_t _skipCount = validRC(helper->numToSkip()); // max
-        rowcount_t maxRead = (totallimit>(RCUNBOUND-_skipCount))?RCUNBOUND:totallimit+_skipCount;
-        IStartableEngineRowStream *lookAhead = createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), FIRSTN_SMART_BUFFER_SIZE, isSmartBufferSpillNeeded(this), false,
-                                                                              maxRead, this, &container.queryJob().queryIDiskUsage()); // if a very large limit don't bother truncating
-        originalInputStream.setown(replaceInputStream(0, lookAhead));
-        lookAhead->start();
+        if (!isFastThrough(input))
+        {
+            rowcount_t _skipCount = validRC(helper->numToSkip()); // max
+            rowcount_t maxRead = (totallimit>(RCUNBOUND-_skipCount))?RCUNBOUND:totallimit+_skipCount;
+            IStartableEngineRowStream *lookAhead = createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), FIRSTN_SMART_BUFFER_SIZE, isSmartBufferSpillNeeded(this), false,
+                                                                                  maxRead, this, &container.queryJob().queryIDiskUsage()); // if a very large limit don't bother truncating
+            originalInputStream.setown(replaceInputStream(0, lookAhead));
+            lookAhead->start();
+        }
     }
     virtual void stop() override
     {

+ 3 - 1
thorlcr/activities/indexwrite/thindexwriteslave.cpp

@@ -148,7 +148,9 @@ public:
     virtual void setInputStream(unsigned index, CThorInput &_input, bool consumerOrdered) override
     {
         PARENT::setInputStream(index, _input, consumerOrdered);
-        setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), INDEXWRITE_SMART_BUFFER_SIZE, true, false, RCUNBOUND, this, &container.queryJob().queryIDiskUsage()));
+        // JCSMORE - not sure why you ever want a look ahead on a sink like this?
+        if (!isFastThrough(input))
+            setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), INDEXWRITE_SMART_BUFFER_SIZE, true, false, RCUNBOUND, this, &container.queryJob().queryIDiskUsage()));
     }
     void open(IPartDescriptor &partDesc, bool isTopLevel, bool isVariable)
     {

+ 4 - 1
thorlcr/activities/iterate/thiterateslave.cpp

@@ -49,7 +49,10 @@ public:
     {
         PARENT::setInputStream(index, _input, consumerOrdered);
         if (global) // only want lookahead if global (hence serial)
-            setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), ENTH_SMART_BUFFER_SIZE, true, false, RCUNBOUND, NULL, &container.queryJob().queryIDiskUsage()));
+        {
+            if (!isFastThrough(input))
+                setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), ENTH_SMART_BUFFER_SIZE, true, false, RCUNBOUND, NULL, &container.queryJob().queryIDiskUsage()));
+        }
     }
     const void *getFirst() // for global, not called on 1st slave
     {

+ 8 - 5
thorlcr/activities/join/thjoinslave.cpp

@@ -204,11 +204,14 @@ public:
         if ((rightpartition && (0 == index)) || (!rightpartition && (1 == index)))
         {
             secondaryInputIndex = index;
-            IEngineRowStream *secondaryStream = queryInputStream(secondaryInputIndex);
-            IStartableEngineRowStream *lookAhead = createRowStreamLookAhead(this, secondaryStream, queryRowInterfaces(_input.itdl), JOIN_SMART_BUFFER_SIZE, isSmartBufferSpillNeeded(_input.itdl->queryFromActivity()),
-                                                        false, RCUNBOUND, this, &container.queryJob().queryIDiskUsage());
-            setLookAhead(secondaryInputIndex, lookAhead),
-            secondaryInputStream = lookAhead;
+            secondaryInputStream = queryInputStream(secondaryInputIndex);
+            if (!isFastThrough(_input.itdl))
+            {
+                IStartableEngineRowStream *lookAhead = createRowStreamLookAhead(this, secondaryInputStream, queryRowInterfaces(_input.itdl), JOIN_SMART_BUFFER_SIZE, isSmartBufferSpillNeeded(_input.itdl->queryFromActivity()),
+                                                            false, RCUNBOUND, this, &container.queryJob().queryIDiskUsage());
+                setLookAhead(secondaryInputIndex, lookAhead),
+                secondaryInputStream = lookAhead;
+            }
         }
         else
         {

+ 4 - 1
thorlcr/activities/lookupjoin/thlookupjoinslave.cpp

@@ -1441,7 +1441,10 @@ public:
     {
         PARENT::setInputStream(index, _input, consumerOrdered);
         if (0 == index)
-            setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), LOOKUPJOINL_SMART_BUFFER_SIZE, isSmartBufferSpillNeeded(input->queryFromActivity()), grouped, RCUNBOUND, this, &container.queryJob().queryIDiskUsage()));
+        {
+            if (!isFastThrough(input))
+                setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), LOOKUPJOINL_SMART_BUFFER_SIZE, isSmartBufferSpillNeeded(input->queryFromActivity()), grouped, RCUNBOUND, this, &container.queryJob().queryIDiskUsage()));
+        }
         else
         {
             dbgassertex(1 == index);

+ 2 - 1
thorlcr/activities/pull/thpullslave.cpp

@@ -36,7 +36,8 @@ public:
     virtual void setInputStream(unsigned index, CThorInput &_input, bool consumerOrdered) override
     {
         PARENT::setInputStream(index, _input, consumerOrdered);
-        setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), PULL_SMART_BUFFER_SIZE, true, false, RCUNBOUND, NULL, &container.queryJob().queryIDiskUsage()));
+        if (!isFastThrough(input))
+            setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), PULL_SMART_BUFFER_SIZE, true, false, RCUNBOUND, NULL, &container.queryJob().queryIDiskUsage()));
     }
     virtual void start() override
     {

+ 4 - 1
thorlcr/activities/rollup/throllupslave.cpp

@@ -182,7 +182,10 @@ public:
     {
         PARENT::setInputStream(index, _input, consumerOrdered);
         if (global)
-            setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), rollup?ROLLUP_SMART_BUFFER_SIZE:DEDUP_SMART_BUFFER_SIZE, isSmartBufferSpillNeeded(this), false, RCUNBOUND, NULL, &container.queryJob().queryIDiskUsage())); // only allow spill if input can stall
+        {
+            if (!isFastThrough(input))
+                setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), rollup?ROLLUP_SMART_BUFFER_SIZE:DEDUP_SMART_BUFFER_SIZE, isSmartBufferSpillNeeded(this), false, RCUNBOUND, NULL, &container.queryJob().queryIDiskUsage())); // only allow spill if input can stall
+        }
     }
     virtual void start()
     {

+ 6 - 3
thorlcr/activities/selectnth/thselectnthslave.cpp

@@ -102,9 +102,12 @@ public:
         IStartableEngineRowStream *lookAhead = nullptr;
         if (!isLocal && rowN)
         {
-            lookAhead = createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), SELECTN_SMART_BUFFER_SIZE, isSmartBufferSpillNeeded(this), false, rowN, this, &container.queryJob().queryIDiskUsage());
-            originalInputStream.setown(replaceInputStream(0, lookAhead));
-            lookAhead->start();
+            if (!isFastThrough(input))
+            {
+                lookAhead = createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), SELECTN_SMART_BUFFER_SIZE, isSmartBufferSpillNeeded(this), false, rowN, this, &container.queryJob().queryIDiskUsage());
+                originalInputStream.setown(replaceInputStream(0, lookAhead));
+                lookAhead->start();
+            }
         }
 
         seenNth = false;

+ 1 - 0
thorlcr/activities/temptable/thtmptableslave.cpp

@@ -100,6 +100,7 @@ public:
         info.isSource = true;
         info.unknownRowsOutput = false;
         info.totalRowsMin = info.totalRowsMax = maxRow - startRow;
+        info.fastThrough = true;
         if (helper->getFlags() & TTFfiltered)
             info.totalRowsMin = 0;
     }

+ 22 - 0
thorlcr/activities/thactivityutil.cpp

@@ -425,6 +425,28 @@ void calcMetaInfoSize(ThorDataLinkMetaInfo &info, ThorDataLinkMetaInfo *infos, u
         info.totalRowsMin = 0; // a good bet
 }
 
+bool isFastThrough(IThorDataLink *input)
+{
+    CSlaveActivity *act = (CSlaveActivity *)input->queryFromActivity();
+    if (act)
+    {
+        ThorDataLinkMetaInfo info;
+        act->getMetaInfo(info);
+        if (!info.fastThrough)
+            return false;
+        unsigned i=0;
+        while (true)
+        {
+            input = act->queryInput(i++);
+            if (!input)
+                break;
+            if (!isFastThrough(input))
+                return false;
+        }
+    }
+    return true;
+}
+
 static bool canStall(CActivityBase *act)
 {
     if (!act)

+ 1 - 0
thorlcr/activities/thactivityutil.ipp

@@ -83,6 +83,7 @@ IStartableEngineRowStream *createRowStreamLookAhead(CSlaveActivity *activity, IE
 
 
 bool isSmartBufferSpillNeeded(CActivityBase *act);
+bool isFastThrough(IThorDataLink *input);
 
 StringBuffer &locateFilePartPath(CActivityBase *activity, const char *logicalFilename, IPartDescriptor &partDesc, StringBuffer &filePath);
 void doReplicate(CActivityBase *activity, IPartDescriptor &partDesc, ICopyFileProgress *iProgress=NULL);

+ 3 - 1
thorlcr/activities/wuidwrite/thwuidwriteslave.cpp

@@ -89,7 +89,9 @@ public:
     virtual void setInputStream(unsigned index, CThorInput &_input, bool consumerOrdered) override
     {
         PARENT::setInputStream(index, _input, consumerOrdered);
-        setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), WORKUNITWRITE_SMART_BUFFER_SIZE, isSmartBufferSpillNeeded(this), grouped, RCUNBOUND, NULL, &container.queryJob().queryIDiskUsage()));
+        // JCSMORE - not sure why you ever want a look ahead on a sink like this?
+        if (!isFastThrough(input))
+            setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), WORKUNITWRITE_SMART_BUFFER_SIZE, isSmartBufferSpillNeeded(this), grouped, RCUNBOUND, NULL, &container.queryJob().queryIDiskUsage()));
     }
     virtual void process() override
     {