Browse Source

Merge remote-tracking branch 'origin/candidate-3.10.x' into candidate-4.0.0

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 12 years ago
parent
commit
b837f3fe4f

+ 2 - 0
system/jlib/jthread.cpp

@@ -564,6 +564,8 @@ void CThreadedPersistent::main()
         }
         catch (IException *e)
         {
+            VStringBuffer errMsg("CThreadedPersistent (%s)", athread.getName());
+            EXCLOG(e, errMsg.str());
             exception.setown(e);
             joinSem.signal(); // leave in running state, signal to join to handle
             continue;

+ 11 - 9
thorlcr/activities/hashdistrib/thhashdistribslave.cpp

@@ -3152,7 +3152,7 @@ class HashJoinSlaveActivity : public CSlaveActivity, public CThorDataLink, imple
     bool leftdone;
     mptag_t mptag;
     mptag_t mptag2;
-    Owned<IHashDistributor> distributor;
+    Owned<IHashDistributor> lhsDistributor, rhsDistributor;
 
 public:
 
@@ -3197,25 +3197,27 @@ public:
         IHash *ihashR = joinargs->queryHashRight();
         ICompare *icompareL = joinargs->queryCompareLeft();
         ICompare *icompareR = joinargs->queryCompareRight();
-        if (!distributor)
-            distributor.setown(createHashDistributor(this, container.queryJob().queryJobComm(), mptag, abortSoon,false, this));
-        Owned<IRowStream> reader = distributor->connect(queryRowInterfaces(inL), inL, ihashL, icompareL);
+        if (!lhsDistributor)
+            lhsDistributor.setown(createHashDistributor(this, container.queryJob().queryJobComm(), mptag, abortSoon,false, this));
+        Owned<IRowStream> reader = lhsDistributor->connect(queryRowInterfaces(inL), inL, ihashL, icompareL);
         Owned<IThorRowLoader> loaderL = createThorRowLoader(*this, ::queryRowInterfaces(inL), icompareL, true, rc_allDisk, SPILL_PRIORITY_HASHJOIN);
         strmL.setown(loaderL->load(reader, abortSoon));
         loaderL.clear();
         reader.clear();
         stopInputL();
-        distributor->disconnect(false);
-        distributor->join();
+        lhsDistributor->disconnect(false);
+        lhsDistributor->join();
         leftdone = true;
-        reader.setown(distributor->connect(queryRowInterfaces(inR), inR, ihashR, icompareR));
+        if (!rhsDistributor)
+            rhsDistributor.setown(createHashDistributor(this, container.queryJob().queryJobComm(), mptag2, abortSoon,false, this));
+        reader.setown(rhsDistributor->connect(queryRowInterfaces(inR), inR, ihashR, icompareR));
         Owned<IThorRowLoader> loaderR = createThorRowLoader(*this, ::queryRowInterfaces(inR), icompareR, true, rc_mixed, SPILL_PRIORITY_HASHJOIN);;
         strmR.setown(loaderR->load(reader, abortSoon));
         loaderR.clear();
         reader.clear();
         stopInputR();
-        distributor->disconnect(false);
-        distributor->join();
+        rhsDistributor->disconnect(false);
+        rhsDistributor->join();
         { CriticalBlock b(joinHelperCrit);
             switch(container.getKind())
             {

+ 123 - 32
thorlcr/activities/lookupjoin/thlookupjoinslave.cpp

@@ -25,6 +25,7 @@
 #include "thbufdef.hpp"
 #include "jbuff.hpp"
 #include "jset.hpp"
+#include "jisem.hpp"
 
 #include "thorxmlwrite.hpp"
 
@@ -86,7 +87,7 @@ class CBroadcaster : public CSimpleInterface
     mptag_t mpTag;
     unsigned myNode, slaves;
     IBCastReceive *recvInterface;
-    Semaphore allDoneSem;
+    InterruptableSemaphore allDoneSem;
     CriticalSection allDoneLock, bcastOtherCrit;
     bool allDone, allDoneWaiting, allRequestStop, stopping, stopRecv;
     Owned<IBitSet> slavesDone, slavesStopping;
@@ -95,51 +96,96 @@ class CBroadcaster : public CSimpleInterface
     {
         CBroadcaster &broadcaster;
         CThreadedPersistent threaded;
+        bool aborted;
     public:
         CRecv(CBroadcaster &_broadcaster) : threaded("CBroadcaster::CRecv", this), broadcaster(_broadcaster)
         {
+            aborted = false;
         }
-        void start() { threaded.start(); }
-        void stop()
+        void start()
         {
+            aborted = false;
+            threaded.start();
+        }
+        void abort(bool join)
+        {
+            if (aborted)
+                return;
+            aborted = true;
             broadcaster.cancelReceive();
-            threaded.join();
+            if (join)
+                threaded.join();
         }
         void wait()
         {
             threaded.join();
         }
     // IThreaded
-        virtual void main() { broadcaster.recvLoop(); }
+        virtual void main()
+        {
+            try
+            {
+                broadcaster.recvLoop();
+            }
+            catch (IException *e)
+            {
+                EXCLOG(e, "CRecv");
+                abort(false);
+                broadcaster.cancel(e);
+                e->Release();
+            }
+        }
     } receiver;
     class CSend : implements IThreaded
     {
         CBroadcaster &broadcaster;
         CThreadedPersistent threaded;
         SimpleInterThreadQueueOf<CSendItem, true> broadcastQueue;
+        Owned<IException> exception;
+        bool aborted;
+        void clearQueue()
+        {
+            loop
+            {
+                Owned<CSendItem> sendItem = broadcastQueue.dequeueNow();
+                if (NULL == sendItem)
+                    break;
+            }
+        }
     public:
         CSend(CBroadcaster &_broadcaster) : threaded("CBroadcaster::CSend", this), broadcaster(_broadcaster)
         {
+            aborted = false;
         }
         ~CSend()
         {
-            stop();
+            clearQueue();
         }
         void addBlock(CSendItem *sendItem)
         {
+            if (exception)
+            {
+                if (sendItem)
+                    sendItem->Release();
+                throw exception.getClear();
+            }
             broadcastQueue.enqueue(sendItem); // will block if queue full
         }
-        void start() { threaded.start(); }
-        void stop()
+        void start()
         {
+            aborted = false;
+            exception.clear();
+            threaded.start();
+        }
+        void abort(bool join)
+        {
+            if (aborted)
+                return;
+            aborted = true;
             broadcastQueue.stop();
-            loop
-            {
-                Owned<CSendItem> sendItem = broadcastQueue.dequeueNow();
-                if (NULL == sendItem)
-                    break;
-            }
-            threaded.join();
+            clearQueue();
+            if (join)
+                threaded.join();
         }
         void wait()
         {
@@ -150,12 +196,22 @@ class CBroadcaster : public CSimpleInterface
     // IThreaded
         virtual void main()
         {
-            while (!broadcaster.activity.queryAbortSoon())
+            try
             {
-                Owned<CSendItem> sendItem = broadcastQueue.dequeue();
-                if (NULL == sendItem)
-                    break;
-                broadcaster.broadcastToOthers(sendItem);
+                while (!broadcaster.activity.queryAbortSoon())
+                {
+                    Owned<CSendItem> sendItem = broadcastQueue.dequeue();
+                    if (NULL == sendItem)
+                        break;
+                    broadcaster.broadcastToOthers(sendItem);
+                }
+            }
+            catch (IException *e)
+            {
+                EXCLOG(e, "CSend");
+                exception.setown(e);
+                abort(false);
+                broadcaster.cancel(e);
             }
             ActPrintLog(&broadcaster.activity, "Sender stopped");
         }
@@ -345,13 +401,19 @@ public:
         receiver.wait(); // terminates when received stop from all others
         sender.wait(); // terminates when any remaining packets, including final stop packets have been re-broadcast
     }
-    void cancel()
+    void cancel(IException *e=NULL)
     {
         allDoneWaiting = false;
         allDone = true;
-        receiver.stop();
-        sender.stop();
-        allDoneSem.signal();
+        receiver.abort(true);
+        sender.abort(true);
+        if (e)
+        {
+            allDoneSem.interrupt(LINK(e));
+            activity.fireException(LINK(e));
+        }
+        else
+            allDoneSem.signal();
     }
     bool send(CSendItem *sendItem)
     {
@@ -468,6 +530,7 @@ class CLookupJoinActivity : public CSlaveActivity, public CThorDataLink, impleme
         CLookupJoinActivity &owner;
         bool stopped;
         SimpleInterThreadQueueOf<CSendItem, true> blockQueue;
+        Owned<IException> exception;
     public:
         CRowProcessor(CLookupJoinActivity &_owner) : threaded("CRowProcessor", this), owner(_owner)
         {
@@ -485,23 +548,50 @@ class CLookupJoinActivity : public CSlaveActivity, public CThorDataLink, impleme
             }
             wait();
         }
-        void start() { threaded.start(); }
+        void start()
+        {
+            stopped = false;
+            exception.clear();
+            threaded.start();
+        }
+        void abort()
+        {
+            if (!stopped)
+            {
+                stopped = true;
+                blockQueue.enqueue(NULL);
+            }
+        }
         void wait() { threaded.join(); }
         void addBlock(CSendItem *sendItem)
         {
+            if (exception)
+            {
+                if (sendItem)
+                    sendItem->Release();
+                throw exception.getClear();
+            }
             blockQueue.enqueue(sendItem); // will block if queue full
         }
     // IThreaded
         virtual void main()
         {
-            while (!stopped)
+            try
             {
-                Owned<CSendItem> sendItem = blockQueue.dequeue();
-                if (NULL == sendItem)
-                    break;
-                MemoryBuffer expandedMb;
-                ThorExpand(sendItem->queryMsg(), expandedMb);
-                owner.processRHSRows(sendItem->queryOrigin(), expandedMb);
+                while (!stopped)
+                {
+                    Owned<CSendItem> sendItem = blockQueue.dequeue();
+                    if (stopped || (NULL == sendItem))
+                        break;
+                    MemoryBuffer expandedMb;
+                    ThorExpand(sendItem->queryMsg(), expandedMb);
+                    owner.processRHSRows(sendItem->queryOrigin(), expandedMb);
+                }
+            }
+            catch (IException *e)
+            {
+                exception.setown(e);
+                EXCLOG(e, "CRowProcessor");
             }
         }
     } rowProcessor;
@@ -743,6 +833,7 @@ public:
         gotOtherROs.signal();
         cancelReceiveMsg(RANK_ALL, mpTag);
         broadcaster.cancel();
+        rowProcessor.abort();
     }
     virtual void stop()
     {

+ 9 - 3
thorlcr/graph/thgraph.cpp

@@ -2812,9 +2812,15 @@ IRowInterfaces *CActivityBase::getRowInterfaces()
 bool CActivityBase::receiveMsg(CMessageBuffer &mb, const rank_t rank, const mptag_t mpTag, rank_t *sender, unsigned timeout)
 {
     BooleanOnOff onOff(receiving);
-    if (cancelledReceive)
-        return false;
-    return container.queryJob().queryJobComm().recv(mb, rank, mpTag, sender, timeout);
+    CTimeMon t(timeout);
+    unsigned remaining = timeout;
+    // check 'cancelledReceive' every 10 secs
+    while (!cancelledReceive && ((MP_WAIT_FOREVER==timeout) || !t.timedout(&remaining)))
+    {
+        if (container.queryJob().queryJobComm().recv(mb, rank, mpTag, sender, remaining>10000?10000:remaining))
+            return true;
+    }
+    return false;
 }
 
 void CActivityBase::cancelReceiveMsg(const rank_t rank, const mptag_t mpTag)