Browse Source

Merge branch 'hpcc-9694' into candidate-4.0.2

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 12 years ago
parent
commit
f57c4f6f77
2 changed files with 23 additions and 20 deletions
  1. 1 2
      thorlcr/activities/loop/thloop.cpp
  2. 22 18
      thorlcr/activities/loop/thloopslave.cpp

+ 1 - 2
thorlcr/activities/loop/thloop.cpp

@@ -61,7 +61,6 @@ protected:
             if (0 == slaveEmptyIterations) // either 1st or has been reset, i.e. non-empty
                 allEmptyIterations = false;
         }
-        assertex(loopEnds==0 || loopEnds==nodes); // Not sure possible in global graph, for some to finish and not others 
         bool final = loopEnds == nodes; // final
         msg.clear();
         if (allEmptyIterations)
@@ -69,7 +68,7 @@ protected:
         else
             emptyIterations = 0;
         bool ok = emptyIterations <= maxEmptyLoopIterations;
-        msg.append(ok);
+        msg.append(ok && !final); // This is to tell slave whether it should continue or not
         n = nodes;
         while (n--) // a barrier really
             container.queryJob().queryJobComm().send(msg, n+1, mpTag, LONGTIMEOUT);

+ 22 - 18
thorlcr/activities/loop/thloopslave.cpp

@@ -309,14 +309,7 @@ public:
                     {
                         ret.setown(curInput->nextRow()); // more cope with groups somehow....
                         if (!ret)
-                        {
-                            if (finishedLooping)
-                            {
-                                eof = true;
-                                return NULL;
-                            }
                             break;
-                        }
                     }
 
                     if (finishedLooping || 
@@ -335,15 +328,14 @@ public:
                 {
                 case TAKloopdataset:
                     assertex(flags & IHThorLoopArg::LFnewloopagain);
+                    // NB: finishedLooping set at end of loop, based on loopAgain result
                     break;
                 case TAKlooprow:
                     if (0 == loopPendingCount)
-                    {
-                        sendEndLooping();
-                        finishedLooping = true;
-                        eof = true;
-                        return NULL;
-                    }
+                        finishedLooping = true; // This slave has finished
+                    break;
+                case TAKloopcount:
+                    // NB: finishedLooping set at end of loop, so that last getNextRow() iteration spits out final rows
                     break;
                 }
 
@@ -364,8 +356,22 @@ public:
                     }
                 }
 
-                if (!sendLoopingCount(loopCounter, emptyIterations)) // only if global
+                if (global)
+                {
+                    // 0 signals this slave has finished, but don't stop until all have
+                    if (!sendLoopingCount(finishedLooping ? 0 : loopCounter, finishedLooping ? 0 : emptyIterations))
+                    {
+                        sentEndLooping = true; // prevent sendEndLooping() sending end again
+                        eof = true;
+                        return NULL;
+                    }
+                }
+                else if (finishedLooping)
+                {
+                    eof = true;
                     return NULL;
+                }
+
                 loopPending->flush();
 
                 IThorBoundLoopGraph *boundGraph = queryContainer().queryLoopGraph();
@@ -401,15 +407,13 @@ public:
                     assertex(row);
                     //Result is a row which contains a single boolean field.
                     if (!((const bool *)row.get())[0])
-                        finishedLooping = true;
+                        finishedLooping = true; // NB: will finish when loopPending has been consumed
                 }
                 loopPending.setown(createOverflowableBuffer(*this, this, false, true));
                 loopPendingCount = 0;
                 ++loopCounter;
                 if ((container.getKind() == TAKloopcount) && (loopCounter > maxIterations))
-                    finishedLooping = true;
-                if (finishedLooping)
-                    sendEndLooping();
+                    finishedLooping = true; // NB: will finish when loopPending has been consumed
             }
         }
         return NULL;