Browse Source

Merge branch 'candidate-7.8.x' into candidate-7.10.x

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 4 years ago
parent
commit
8d78421884

+ 1 - 0
roxie/ccd/ccdmain.cpp

@@ -1019,6 +1019,7 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml)
         roxiemem::setTotalMemoryLimit(allowHugePages, allowTransparentHugePages, retainMemory, totalMemoryLimit, 0, NULL, NULL);
         roxiemem::setTotalMemoryLimit(allowHugePages, allowTransparentHugePages, retainMemory, totalMemoryLimit, 0, NULL, NULL);
 
 
         traceStartStop = topology->getPropBool("@traceStartStop", false);
         traceStartStop = topology->getPropBool("@traceStartStop", false);
+        watchActivityId = topology->getPropInt("@watchActivityId", 0);
         traceServerSideCache = topology->getPropBool("@traceServerSideCache", false);
         traceServerSideCache = topology->getPropBool("@traceServerSideCache", false);
         traceTranslations = topology->getPropBool("@traceTranslations", true);
         traceTranslations = topology->getPropBool("@traceTranslations", true);
         defaultTimeActivities = topology->getPropBool("@timeActivities", true);
         defaultTimeActivities = topology->getPropBool("@timeActivities", true);

+ 60 - 9
roxie/ccd/ccdserver.cpp

@@ -817,8 +817,10 @@ public:
 
 
     virtual bool isSink() const
     virtual bool isSink() const
     {
     {
+        //If an internal result has as many dependencies (within the graph) as uses (which includes from outside the graph) then don't execute it unconditionally.
+        bool internalSpillAllUsesWithinGraph = (isInternal && dependentCount && dependentCount==usageCount);
         //only a sink if a root activity
         //only a sink if a root activity
-        return isRoot && !(isInternal && dependentCount && dependentCount==usageCount); // MORE - it's possible for this to get the answer wrong still, since usageCount does not include references from main procedure. Gavin?
+        return isRoot && !internalSpillAllUsesWithinGraph;
     }
     }
 
 
     virtual void getEdgeProgressInfo(unsigned idx, IPropertyTree &edge) const
     virtual void getEdgeProgressInfo(unsigned idx, IPropertyTree &edge) const
@@ -1715,7 +1717,17 @@ public:
         IFinalRoxieInput *saveInput = input;
         IFinalRoxieInput *saveInput = input;
         Owned<IStrandJunction> saveJunction = junction.getClear();
         Owned<IStrandJunction> saveJunction = junction.getClear();
         input = NULL;   // Make sure parent does not start the chain yet
         input = NULL;   // Make sure parent does not start the chain yet
-        CRoxieServerActivity::start(parentExtractSize, parentExtract, paused);
+        try
+        {
+            CRoxieServerActivity::start(parentExtractSize, parentExtract, paused);
+        }
+        catch (...)
+        {
+            // Make sure we restore these even if there is an exception thrown during start
+            input = saveInput;
+            junction.setown(saveJunction.getClear());
+            throw;
+        }
         input = saveInput;
         input = saveInput;
         junction.setown(saveJunction.getClear());
         junction.setown(saveJunction.getClear());
     }
     }
@@ -2088,7 +2100,16 @@ public:
     {
     {
         IFinalRoxieInput *save = input;
         IFinalRoxieInput *save = input;
         input = NULL;   // Make sure parent does not start the chain yet - but we do want to do the dependencies (because the decision about whether to start may depend on them)
         input = NULL;   // Make sure parent does not start the chain yet - but we do want to do the dependencies (because the decision about whether to start may depend on them)
-        CRoxieServerActivity::start(parentExtractSize, parentExtract, paused);
+        try
+        {
+            CRoxieServerActivity::start(parentExtractSize, parentExtract, paused);
+        }
+        catch (...)
+        {
+            // Make sure we restore these even if there is an exception thrown during start
+            input = save;
+            throw;
+        }
         input = save;
         input = save;
     }
     }
 
 
@@ -2163,7 +2184,8 @@ protected:
     IEngineRowStream *inputStream;
     IEngineRowStream *inputStream;
     IRecordPullerCallback *helper;
     IRecordPullerCallback *helper;
     Semaphore started;                      // MORE: GH->RKC I'm pretty sure this can be deleted, since handled by RestartableThread
     Semaphore started;                      // MORE: GH->RKC I'm pretty sure this can be deleted, since handled by RestartableThread
-    bool groupAtOnce, eof, eog;
+    bool groupAtOnce, eog;
+    std::atomic<bool> eof;
     CriticalSection crit;
     CriticalSection crit;
 
 
 public:
 public:
@@ -2256,6 +2278,7 @@ public:
             CriticalBlock c(crit); // stop is called on our consumer's thread. We need to take care calling stop for our input to make sure it is not in mid-nextRow etc etc.
             CriticalBlock c(crit); // stop is called on our consumer's thread. We need to take care calling stop for our input to make sure it is not in mid-nextRow etc etc.
             if (inputStream)
             if (inputStream)
                 inputStream->stop();
                 inputStream->stop();
+            eof = true;
         }
         }
         RestartableThread::join();
         RestartableThread::join();
     }
     }
@@ -2297,12 +2320,13 @@ public:
     {
     {
         if (eof)
         if (eof)
             return false;
             return false;
-        while (preload)
+        while (preload && !eof)
         {
         {
-            const void * row;
+            const void * row = nullptr;
             {
             {
                 CriticalBlock c(crit); // See comments in stop for why this is needed
                 CriticalBlock c(crit); // See comments in stop for why this is needed
-                row = inputStream->nextRow();
+                if (!eof)
+                    row = inputStream->nextRow();
             }
             }
             if (row)
             if (row)
             {
             {
@@ -2331,10 +2355,11 @@ public:
         unsigned rowsDone = 0;
         unsigned rowsDone = 0;
         while (preload && !eof)
         while (preload && !eof)
         {
         {
-            const void *row;
+            const void *row = nullptr;
             {
             {
                 CriticalBlock c(crit);
                 CriticalBlock c(crit);
-                row = inputStream->nextRow();
+                if (!eof)
+                    row = inputStream->nextRow();
             }
             }
             if (row)
             if (row)
             {
             {
@@ -5296,6 +5321,17 @@ public:
     {
     {
     }
     }
 
 
+    //override execute() to ensure that start is nevr called on any input activities.
+    virtual void execute(unsigned parentExtractSize, const byte * parentExtract) override
+    {
+        CriticalBlock b(ecrit);
+        if (!executed)
+        {
+            executed = true;
+            stop();
+        }
+    }
+
     virtual void onExecute() override
     virtual void onExecute() override
     {
     {
     }
     }
@@ -21630,6 +21666,7 @@ public:
 class CRoxieServerWorkUnitWriteActivityFactory : public CRoxieServerInternalSinkFactory
 class CRoxieServerWorkUnitWriteActivityFactory : public CRoxieServerInternalSinkFactory
 {
 {
     bool isReread;
     bool isReread;
+    bool isUnused = false;
 
 
 public:
 public:
     CRoxieServerWorkUnitWriteActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode, unsigned _usageCount, bool _isRoot)
     CRoxieServerWorkUnitWriteActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode, unsigned _usageCount, bool _isRoot)
@@ -21638,13 +21675,27 @@ public:
         isReread = usageCount > 0;
         isReread = usageCount > 0;
         Owned<IHThorWorkUnitWriteArg> helper = (IHThorWorkUnitWriteArg *) helperFactory();
         Owned<IHThorWorkUnitWriteArg> helper = (IHThorWorkUnitWriteArg *) helperFactory();
         isInternal = (helper->getSequence()==ResultSequenceInternal);
         isInternal = (helper->getSequence()==ResultSequenceInternal);
+        isUnused = isInternal && (usageCount == 0);
     }
     }
 
 
     virtual IRoxieServerActivity *createActivity(IRoxieAgentContext *_ctx, IProbeManager *_probeManager) const
     virtual IRoxieServerActivity *createActivity(IRoxieAgentContext *_ctx, IProbeManager *_probeManager) const
     {
     {
+        if (isUnused && !CRoxieServerInternalSinkFactory::isSink())
+        {
+            if (_ctx->queryTraceLevel() > 2)
+                DBGLOG("Workunit write %u is unused - create null activity", id);
+            //Create a null sink activity that is always executed to ensure that stop() is called on the input.
+            return createRoxieServerNullSinkActivity(_ctx, this, _probeManager);
+        }
+
         return new CRoxieServerWorkUnitWriteActivity(_ctx, this, _probeManager, isReread, usageCount);
         return new CRoxieServerWorkUnitWriteActivity(_ctx, this, _probeManager, isReread, usageCount);
     }
     }
 
 
+    virtual bool isSink() const
+    {
+        return isUnused || CRoxieServerInternalSinkFactory::isSink();
+    }
+
 };
 };
 
 
 IRoxieServerActivityFactory *createRoxieServerWorkUnitWriteActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode, unsigned _usageCount, bool _isRoot)
 IRoxieServerActivityFactory *createRoxieServerWorkUnitWriteActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode, unsigned _usageCount, bool _isRoot)

+ 49 - 0
testing/regress/ecl/globalresult.ecl

@@ -0,0 +1,49 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2020 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+
+//Ensure that references to temporaries outside of the graph are handled correctly
+ds := DATASET(10, transform({unsigned id}, SELF.id := COUNTER)) : GLOBAL(FEW);
+o1 := output(NOTHOR(count(ds)));
+
+
+idRec := {unsigned id};
+ds2base := DATASET(10, transform(idRec, SELF.id := COUNTER));
+
+//ds2 creates a global workunit write, which is then no longer needed once the child query is optimized.
+ds2 := ds2base : GLOBAL(FEW);
+
+
+ds3 := DATASET(20, transform({unsigned x, unsigned y}, SELF.x := COUNTER, SELF.y := 20));
+
+
+dsx(unsigned x) := DATASET(30, transform({DATASET(idRec) f1, unsigned f2}, SELF.f1 := ds2, SELF.f2 := x + COUNTER));
+
+ds3 t(ds3 l) := TRANSFORM
+    SELF.x := l.x;
+    SELF.y := COUNT(dsx(l.y));
+END;
+
+ds4 := project(nocombine(ds3), t(LEFT));
+
+sequential(
+    o1,
+    parallel(
+        output(ds4);
+        output(ds2base);
+    )
+);

+ 37 - 0
testing/regress/ecl/key/globalresult.xml

@@ -0,0 +1,37 @@
+<Dataset name='Result 1'>
+ <Row><Result_1>10</Result_1></Row>
+</Dataset>
+<Dataset name='Result 2'>
+ <Row><x>1</x><y>30</y></Row>
+ <Row><x>2</x><y>30</y></Row>
+ <Row><x>3</x><y>30</y></Row>
+ <Row><x>4</x><y>30</y></Row>
+ <Row><x>5</x><y>30</y></Row>
+ <Row><x>6</x><y>30</y></Row>
+ <Row><x>7</x><y>30</y></Row>
+ <Row><x>8</x><y>30</y></Row>
+ <Row><x>9</x><y>30</y></Row>
+ <Row><x>10</x><y>30</y></Row>
+ <Row><x>11</x><y>30</y></Row>
+ <Row><x>12</x><y>30</y></Row>
+ <Row><x>13</x><y>30</y></Row>
+ <Row><x>14</x><y>30</y></Row>
+ <Row><x>15</x><y>30</y></Row>
+ <Row><x>16</x><y>30</y></Row>
+ <Row><x>17</x><y>30</y></Row>
+ <Row><x>18</x><y>30</y></Row>
+ <Row><x>19</x><y>30</y></Row>
+ <Row><x>20</x><y>30</y></Row>
+</Dataset>
+<Dataset name='Result 3'>
+ <Row><id>1</id></Row>
+ <Row><id>2</id></Row>
+ <Row><id>3</id></Row>
+ <Row><id>4</id></Row>
+ <Row><id>5</id></Row>
+ <Row><id>6</id></Row>
+ <Row><id>7</id></Row>
+ <Row><id>8</id></Row>
+ <Row><id>9</id></Row>
+ <Row><id>10</id></Row>
+</Dataset>