Browse Source

Merge pull request #6562 from jakesmith/hpcc-12441

HPCC-12441 Unordered join deadlock if stopped early

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 10 years ago
parent
commit
62c467bf64

+ 3 - 1
thorlcr/activities/hashdistrib/thhashdistribslave.cpp

@@ -3399,7 +3399,7 @@ public:
                     throwUnexpected();
             }
         }
-        joinhelper->init(strmL, strmR, ::queryRowAllocator(inL), ::queryRowAllocator(inR), ::queryRowMetaData(inL), &abortSoon);
+        joinhelper->init(strmL, strmR, ::queryRowAllocator(inL), ::queryRowAllocator(inR), ::queryRowMetaData(inL));
         dataLinkStart();
     }
     void stopInput()
@@ -3452,6 +3452,8 @@ public:
             lhsDistributor->abort();
         if (rhsDistributor)
             rhsDistributor->abort();
+        if (joinhelper)
+            joinhelper->stop();
     }
     CATCH_NEXTROW()
     {

+ 7 - 1
thorlcr/activities/join/thjoinslave.cpp

@@ -306,7 +306,7 @@ public:
         }
         if (!leftStream.get()||!rightStream.get())
             throw MakeActivityException(this, TE_FailedToStartJoinStreams, "Failed to start join streams");
-        joinhelper->init(leftStream, rightStream, ::queryRowAllocator(inputs.item(0)),::queryRowAllocator(inputs.item(1)),::queryRowMetaData(inputs.item(0)), &abortSoon);
+        joinhelper->init(leftStream, rightStream, ::queryRowAllocator(inputs.item(0)),::queryRowAllocator(inputs.item(1)),::queryRowMetaData(inputs.item(0)));
     }
     void stopLeftInput()
     {
@@ -336,6 +336,12 @@ public:
         else
             stopRightInput();
     }
+    void abort()
+    {
+        CSlaveActivity::abort();
+        if (joinhelper)
+            joinhelper->stop();
+    }
     void stop() 
     {
         stopLeftInput();

+ 3 - 1
thorlcr/activities/lookupjoin/thlookupjoinslave.cpp

@@ -1712,7 +1712,7 @@ protected:
                     default:
                         throwUnexpected();
                 }
-                joinHelper->init(left, rightStream, leftAllocator, rightAllocator, ::queryRowMetaData(leftITDL), &abortSoon);
+                joinHelper->init(left, rightStream, leftAllocator, rightAllocator, ::queryRowMetaData(leftITDL));
                 return;
             }
             else
@@ -1879,6 +1879,8 @@ public:
             rhsDistributor->abort();
         if (lhsDistributor)
             lhsDistributor->abort();
+        if (joinHelper)
+            joinHelper->stop();
     }
     virtual void stop()
     {

+ 29 - 25
thorlcr/activities/msort/thsortu.cpp

@@ -319,7 +319,7 @@ class CJoinHelper : public CSimpleInterface, implements IJoinHelper
     OwnedConstThorRow defaultRight;
     Linked<IRowStream> strmL;
     Linked<IRowStream> strmR;
-    bool *abort;
+    bool abort;
     bool nextleftgot;
     bool nextrightgot;
     unsigned atmost;
@@ -390,7 +390,6 @@ public:
             IEngineRowAllocator *_allocatorL,
             IEngineRowAllocator *_allocatorR,
             IOutputMetaData * _outputmeta,
-            bool *_abort,
             IMulticoreIntercept *_mcoreintercept)
     {
         //DebugBreak();
@@ -462,7 +461,7 @@ public:
             size32_t sz = helper->createDefaultRight(r);
             defaultRight.setown(r.finalizeRowClear(sz));
         }
-        abort = _abort;
+        abort = false;
         atmost = helper->getJoinLimit();
         if (atmost)
             assertex(!rightouter);
@@ -803,7 +802,7 @@ public:
     retry:
             ret.clear();
             do {
-                if (*abort) 
+                if (abort)
                     return NULL;
                 switch (state) {
                 case JSonfail:
@@ -965,7 +964,7 @@ public:
         CATCH_MEMORY_EXCEPTIONS
         return ret.getClear();;
     }
-    virtual void stop() { }
+    virtual void stop() { abort = true; }
     virtual rowcount_t getLhsProgress() const { return lhsProgressCount; }
     virtual rowcount_t getRhsProgress() const { return rhsProgressCount; }
 };
@@ -993,7 +992,7 @@ class SelfJoinHelper: public CSimpleInterface, implements IJoinHelper
     OwnedConstThorRow defaultLeft;
     OwnedConstThorRow defaultRight;
     Owned<IRowStream> strm;
-    bool *abort;
+    bool abort;
     unsigned atmost;
     rowcount_t progressCount;
     unsigned joinCounter;
@@ -1025,7 +1024,6 @@ public:
             IEngineRowAllocator *_allocatorL,
             IEngineRowAllocator *,
             IOutputMetaData * _outputmeta,
-            bool *_abort,
             IMulticoreIntercept *_mcoreintercept)
     {
         //DebugBreak();
@@ -1070,7 +1068,7 @@ public:
             size32_t sz = helper->createDefaultRight(r);
             defaultRight.setown(r.finalizeRowClear(sz));
         }
-        abort = _abort;
+        abort = false;
         atmost = helper->getJoinLimit();
         if (atmost)
             assertex(!rightouter);
@@ -1113,7 +1111,7 @@ public:
 retry:
             ret.clear();
             do {
-                if (*abort) 
+                if (abort)
                     return NULL;
                 switch (state) {
                 case JSonfail:
@@ -1304,7 +1302,7 @@ retry:
         CATCH_MEMORY_EXCEPTIONS
         return ret.getClear();
     }
-    virtual void stop() { }
+    virtual void stop() { abort = true; }
     virtual rowcount_t getLhsProgress() const { return progressCount; }
     virtual rowcount_t getRhsProgress() const { return progressCount; }
 };
@@ -1594,11 +1592,10 @@ public:
             IEngineRowAllocator *allocatorL,
             IEngineRowAllocator *allocatorR,
             IOutputMetaData * outputmetaL,   // for XML output 
-            bool *_abort,
             IMulticoreIntercept *_mcoreintercept
         )
     {
-        if (!jhelper->init(strmL,strmR,allocatorL,allocatorR,outputmetaL,_abort,this))
+        if (!jhelper->init(strmL,strmR,allocatorL,allocatorR,outputmetaL,this))
             return false;
         if (rightouter) {
             RtlDynamicRowBuilder r(allocatorL);
@@ -1626,6 +1623,10 @@ public:
             return jhelper->getRhsProgress();
         return 0;
     }
+    virtual void stop()
+    {
+        jhelper->stop();
+    }
 };
 
 
@@ -1754,11 +1755,10 @@ public:
             IEngineRowAllocator *allocatorL,
             IEngineRowAllocator *allocatorR,
             IOutputMetaData * outputmetaL,   // for XML output 
-            bool *_abort,
             IMulticoreIntercept *_mcoreintercept
         )
     {
-        if (!CMultiCoreJoinHelperBase::init(strmL,strmR,allocatorL,allocatorR,outputmetaL,_abort,this))
+        if (!CMultiCoreJoinHelperBase::init(strmL,strmR,allocatorL,allocatorR,outputmetaL,this))
             return false;
         stopped = false;
         for (unsigned i=0;i<numworkers;i++)
@@ -1802,6 +1802,7 @@ public:
         if (stopped)
             return;
         stopped = true;
+        CMultiCoreJoinHelperBase::stop();
         for (unsigned i=0;i<numworkers;i++)
             workers[i]->rowStream->stop();
     }
@@ -1930,20 +1931,12 @@ public:
 
     ~CMultiCoreUnorderedJoinHelper()
     {
-        if (!reader.join(1000*60))
-            ERRLOG("~CMulticoreUnorderedJoinHelper reader join timed out");
-        for (unsigned i=0;i<numworkers;i++) {
-            if (!workers[i]->join(1000*60))
-                ERRLOG("~CMulticoreUnorderedJoinHelper worker[%d] join timed out",i);
-        }
-        while (workqueue.ordinality())
-            delete workqueue.dequeue();
+        stop();
         for (unsigned i=0;i<numworkers;i++) 
             delete workers[i];
         delete [] workers;
         ::Release(jhelper);
     }
-
     void stopWorkers()
     {
         for (unsigned i=0;i<numworkers;i++)
@@ -1958,11 +1951,10 @@ public:
             IEngineRowAllocator *allocatorL,
             IEngineRowAllocator *allocatorR,
             IOutputMetaData * outputmetaL,   // for XML output 
-            bool *_abort,
             IMulticoreIntercept *_mcoreintercept
         )
     {
-        if (!CMultiCoreJoinHelperBase::init(strmL,strmR,allocatorL,allocatorR,outputmetaL,_abort,this))
+        if (!CMultiCoreJoinHelperBase::init(strmL,strmR,allocatorL,allocatorR,outputmetaL,this))
             return false;
         workqueue.setLimit(numworkers+1);
         rowWriter.setown(multiWriter->getWriter());
@@ -1989,6 +1981,18 @@ public:
     }
     virtual void stop()
     {
+        CMultiCoreJoinHelperBase::stop();
+        workqueue.stop();
+        multiWriter->abort();
+        if (!reader.join(1000*60))
+            ERRLOG("~CMulticoreUnorderedJoinHelper reader join timed out");
+        for (unsigned i=0;i<numworkers;i++)
+        {
+            if (!workers[i]->join(1000*60))
+                ERRLOG("~CMulticoreUnorderedJoinHelper worker[%d] join timed out",i);
+        }
+        while (workqueue.ordinality())
+            delete workqueue.dequeueNow();
     }
 
 // IMulticoreIntercept impl.

+ 0 - 1
thorlcr/activities/msort/thsortu.hpp

@@ -53,7 +53,6 @@ interface IJoinHelper: public IRowStream
             IEngineRowAllocator *allocatorL,
             IEngineRowAllocator *allocatorR,
             IOutputMetaData * outputmetaL,   // for XML output 
-            bool *_abort,
             IMulticoreIntercept *mcoreintercept=NULL
         )=0;
 

+ 7 - 1
thorlcr/activities/selfjoin/thselfjoinslave.cpp

@@ -180,9 +180,15 @@ public:
         strm.setown(isLightweight? doLightweightSelfJoin() : (isLocal ? doLocalSelfJoin() : doGlobalSelfJoin()));
         assertex(strm);
 
-        joinhelper->init(strm, NULL, ::queryRowAllocator(inputs.item(0)), ::queryRowAllocator(inputs.item(0)), ::queryRowMetaData(inputs.item(0)), &abortSoon);
+        joinhelper->init(strm, NULL, ::queryRowAllocator(inputs.item(0)), ::queryRowAllocator(inputs.item(0)), ::queryRowMetaData(inputs.item(0)));
     }
 
+    virtual void abort()
+    {
+        CSlaveActivity::abort();
+        if (joinhelper)
+            joinhelper->stop();
+    }
     virtual void stop()
     {
         if (input)