Sfoglia il codice sorgente

Merge pull request #1876 from jakesmith/lookupjoin-stop

Lookup join stop race deadlock

Reviewed-By: Gavin Halliday <gavin.halliday@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 13 anni fa
parent
commit
511a7144a9

+ 113 - 60
thorlcr/activities/lookupjoin/thlookupjoinslave.cpp

@@ -462,6 +462,8 @@ public:
     }
     virtual void stop()
     {
+        if (!gotRHS)
+            getRHS(true);
         rhs.reset(false);
         stopRightInput();
         stopInput(left);
@@ -594,10 +596,7 @@ public:
     {
         ActivityTimer t(totalCycles, timeActivities, NULL);
         if (!gotRHS)
-        {
-            getRHS();
-            gotRHS = true;
-        }
+            getRHS(false);
         if (!abortSoon && !eos)
         {
             if (doRightOuter)
@@ -838,13 +837,15 @@ public:
         info.unknownRowsOutput = true;
         info.canStall = true;
     }
-    void sendToBroadcastSlave()
+    bool sendToBroadcastSlave(bool stopping)
     {
 #ifdef _TRACEBROADCAST
         ActPrintLog("%s: sendToBroadcastSlave sending to slave %d", joinStr.get(), broadcastSlave);
 #endif
+        bool allRequestStop = false;
         try
         {
+            bool sentStop = false;
             MemoryBuffer mb;
             CMemoryRowSerializer mbs(mb);
 
@@ -860,19 +861,30 @@ public:
                 }
                 CMessageBuffer msg;
                 if (!receiveMsg(msg, broadcastSlave, mpTag))
-                    return;
-                if (0 != mb.length())
+                    return false;
+                msg.read(allRequestStop);
+                msg.clear();
+                if (!allRequestStop && 0 != mb.length())
                 {
-                    ThorCompress(mb.toByteArray(), mb.length(), msg.clear());
+                    msg.append(stopping);
+                    ThorCompress(mb.toByteArray(), mb.length(), msg);
 #ifdef _TRACEBROADCAST
                     ActPrintLog("sendToBroadcastSlave Compressing buf from %d to %d",mb.length(),msg.length());
 #endif
 #ifdef _TRACEBROADCAST
                     ActPrintLog("sendToBroadcastSlave sending reply to %d on tag %d",(int)broadcastSlave,(int)mpTag);
+                    if (stopping)
+                        sentStop = true;
 #endif
                 }
+                else
+                {
+                    // prevent stop at this point unless have already sent stop,
+                    // to prevent allRequestStop at broadcaster, before this slave knows about it.
+                    msg.append(sentStop && stopping);
+                }
                 if (!container.queryJob().queryJobComm().reply(msg))
-                    return;
+                    return false;
                 if (0 == mb.length())
                     break;
                 mb.clear();
@@ -886,6 +898,7 @@ public:
             ActPrintLog(e, "CLookupJoinActivity::sendToBroadcastSlave: exception");
             throw;
         }
+        return !allRequestStop;
     }
     void processRows(MemoryBuffer &mb)
     {
@@ -913,20 +926,29 @@ public:
         stopRightInput();
 #endif
     }
-    void getRHS()
+    void getRHS(bool stopping)
     {
+        if (gotRHS)
+            return;
+        gotRHS = true;
         Owned<IException> exception;
         try
         {
-            gatherLocal();
             if (!container.queryLocal() && container.queryJob().querySlaves() > 1)
             {
+                bool allRequestStop = false;
+                gatherLocal();
                 broadcaster.init(container.queryJob().queryMyRank(), container.queryJob().querySlaves(), &container.queryJob().queryJobComm(), mpTag, broadcastSlave);
                 if (container.queryJob().queryMyRank()==broadcastSlave)
                 {
                     unsigned fromNode = 1;
                     Owned<IBitSet> slavesDone = createBitSet();
-
+                    Owned<IBitSet> slavesStopping = createBitSet();
+                    slavesDone->testSet(broadcastSlave-1, true);
+                    slavesStopping->testSet(broadcastSlave-1, true);
+                    // loop, requesting data from all other slaves (in chunks)
+                    // track slaves which have finished sending in slavesDone (signalled via 0 len. packet)
+                    // NB: collates from slaves serially, probably should be in parallel
                     MemoryBuffer tmp;
                     CMessageBuffer msg;
                     bool allDone = false;
@@ -936,19 +958,28 @@ public:
                         ActPrintLog("getRHS Receiving");
 #endif
                         if (fromNode == broadcastSlave)
-                        {
-                            slavesDone->testSet(broadcastSlave-1, true);
                             ++fromNode;
-                        }
-                        {
+
+                        { // request more
                             BooleanOnOff onOff(receiving);
+                            msg.append(allRequestStop);
                             container.queryJob().queryJobComm().sendRecv(msg, fromNode, mpTag);
                         }
 #ifdef _TRACEBROADCAST
                         ActPrintLog("getRHS got %d from %d",msg.length(),(int)fromNode);
 #endif
-                        if (0 == msg.length())
+                        bool slaveStopRequest; // only true if slave stopping without needing RHS
+                        msg.read(slaveStopRequest);
+                        slavesStopping->testSet(fromNode-1, slaveStopRequest);
+                        if (stopping)
+                        {
+                            // can only stop if I'm stopping and all other slaves are
+                            allRequestStop = slavesStopping->scan(0, false) == container.queryJob().querySlaves();
+                            // if true, next request will tell slaves to stop sending
+                        }
+                        if (0 == msg.remaining()) // slave signalled no more data
                         {
+                            msg.clear();
                             bool done = slavesDone->testSet(fromNode-1, true);
                             assertex(false == done);
                             if (slavesDone->scan(0, false) == container.queryJob().querySlaves()) // i.e. got all
@@ -957,73 +988,95 @@ public:
                         }
                         else
                         {
-                            ThorExpand(msg, tmp);
+                            if (allRequestStop)
+                            {
+                                // only here, if all stopping, 1st packet from slave and signalled stopping
+                                msg.clear(); // no longer wanted
+                            }
+                            else
+                            {
+                                ThorExpand(msg, tmp);
 #ifdef _TRACEBROADCAST
-                            ActPrintLog("getRHS expanding.1 %d to %d",msg.length(), tmp.length());
+                                ActPrintLog("getRHS expanding.1 %d to %d",msg.length(), tmp.length());
 #endif
-                            msg.clear();
-                            processRows(tmp);
-                            tmp.clear();
+                                msg.clear();
+                                processRows(tmp);
+                                tmp.clear();
+                            }
                         }
                         if (allDone)
                             break;
                     }
-
-                    // now all (global) rows in this (broadcast node) rhs HT.
-                    CMemoryRowSerializer mbs(tmp.clear());
-                    allDone = false;
-                    unsigned r=0;
-                    while (!abortSoon)
+                    if (!allRequestStop)
                     {
-                        loop
+                        // now all (global) RHS rows on this (broadcast) node
+                        CMemoryRowSerializer mbs(tmp.clear());
+                        allDone = false;
+                        unsigned r=0;
+                        while (!abortSoon)
                         {
-                            if (r == rhs.ordinality())
+                            loop
                             {
-                                allDone = true;
-                                break;
+                                if (r == rhs.ordinality())
+                                {
+                                    allDone = true;
+                                    break;
+                                }
+                                const byte *row = (const byte *)rhs.item(r++);
+                                rightSerializer->serialize(mbs, row);
+                                if (tmp.length() > 0x80000)
+                                    break;
                             }
-                            const byte *row = (const byte *)rhs.item(r++);
-                            rightSerializer->serialize(mbs, row);
-                            if (tmp.length() > 0x80000)
-                                break;
-                        }
-                        if (0 != tmp.length())
-                        {
-                            ThorCompress(tmp, msg);
+                            if (0 != tmp.length())
+                            {
+                                ThorCompress(tmp, msg);
 #ifdef _TRACEBROADCAST
-                            ActPrintLog("getRHS compress.1 %d to %d",tmp.length(), msg.length());
+                                ActPrintLog("getRHS compress.1 %d to %d",tmp.length(), msg.length());
 #endif
-                            tmp.clear();
+                                tmp.clear();
 
-                            broadcaster.broadcast(msg);
-                            msg.clear();
+                                broadcaster.broadcast(msg);
+                                msg.clear();
+                            }
+                            if (allDone)
+                                break;
                         }
-                        if (allDone)
-                            break;
                     }
                 }
                 else
                 {
-                    sendToBroadcastSlave();
-                    rhs.clear();
-                    MemoryBuffer buf;
-                    MemoryBuffer expBuf;
-                    while (broadcaster.receive(buf))
+                    if (!sendToBroadcastSlave(stopping))
+                        allRequestStop = true;
+                    else
                     {
-                        ThorExpand(buf, expBuf);
+                        rhs.clear();
+                        MemoryBuffer buf;
+                        MemoryBuffer expBuf;
+                        while (broadcaster.receive(buf))
+                        {
+                            ThorExpand(buf, expBuf);
 #ifdef _TRACEBROADCAST
-                        ActPrintLog("Expanding received buf from %d to %d",buf.length(),expBuf.length());
+                            ActPrintLog("Expanding received buf from %d to %d",buf.length(),expBuf.length());
 #endif
-                        processRows(expBuf);
-                        expBuf.clear();
-                        broadcaster.broadcast(buf); // will swap buf
-                        buf.clear();
+                            processRows(expBuf);
+                            expBuf.clear();
+                            broadcaster.broadcast(buf); // will swap buf
+                            buf.clear();
+                        }
                     }
                 }
-                broadcaster.endBroadcast();     // send final
-                broadcaster.clear();
+                if (!allRequestStop)
+                {
+                    broadcaster.endBroadcast();     // send final
+                    broadcaster.clear();
+                    prepareRHS();
+                }
+            }
+            else if (!stopping)
+            {   // single node or local
+                gatherLocal();
+                prepareRHS();
             }
-            prepareRHS();
         }
         catch (IOutOfMemException *e) { exception.setown(e); }
         catch (IThorRowArrayException *e) { exception.setown(e); }

+ 3 - 1
thorlcr/thorutil/thcompressutil.cpp

@@ -87,7 +87,9 @@ size32_t ThorExpand(const void * src, size32_t srcSz, MemoryBuffer & dest)
 
 size32_t ThorExpand(MemoryBuffer & src, MemoryBuffer & dest)
 {
-    return ThorExpand((const void *)src.toByteArray(), src.length(), dest);
+    size32_t len = src.remaining();
+    const void *pSrc = src.readDirect(len);
+    return ThorExpand(pSrc, len, dest);
 }