Browse Source

Merge branch 'hpcc-9618' into candidate-4.0.2

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 12 years ago
parent
commit
661741bb4b

+ 11 - 1
thorlcr/activities/fetch/thfetchslave.cpp

@@ -174,12 +174,17 @@ public:
     {
         fposHash = new CFPosHandler(*iFetchHandler, offsetCount, offsetTable);
         keyIn.set(_keyIn);
-        distributor = createHashDistributor(&owner, owner.queryContainer().queryJob().queryJobComm(), tag, abortSoon, false, this);
+        distributor = createHashDistributor(&owner, owner.queryContainer().queryJob().queryJobComm(), tag, false, this);
         keyOutStream.setown(distributor->connect(keyRowIf, keyIn, fposHash, NULL));
     }
     virtual IRowStream *queryOutput() { return this; }
     virtual IFileIO *queryPartIO(unsigned part) { assertex(part<files); return fPosMultiPartTable[part].file->queryFileIO(); }
     virtual StringBuffer &getPartName(unsigned part, StringBuffer &out) { return getPartFilename(parts.item(part), fPosMultiPartTable[part].location, out, true); }
+    virtual void abort()
+    {
+        if (distributor)
+            distributor->abort();
+    }
 
     // IStopInput
     virtual void stopInput()
@@ -459,6 +464,11 @@ public:
         fetchStreamOut->stop();
         dataLinkStop();
     }
+    virtual void abort()
+    {
+        if (fetchStream)
+            fetchStream->abort();
+    }
     CATCH_NEXTROW()
     {
         ActivityTimer t(totalCycles, timeActivities, NULL);

+ 1 - 0
thorlcr/activities/fetch/thfetchslave.ipp

@@ -36,6 +36,7 @@ interface IFetchStream : extends IInterface
     virtual IRowStream *queryOutput() = 0;
     virtual IFileIO *queryPartIO(unsigned part) = 0;
     virtual StringBuffer &getPartName(unsigned part, StringBuffer &out) = 0;
+    virtual void abort() = 0;
 };
 
 IFetchStream *createFetchStream(CSlaveActivity &owner, IRowInterfaces *keyRowIf, IRowInterfaces *fetchRowIf, bool &abortSoon, CPartDescriptorArray &parts, unsigned offsetCount, size32_t offsetMapSz, const void *offsetMap, IFetchHandler *iFetchHandler, mptag_t tag, IExpander *eexp=NULL);

+ 121 - 40
thorlcr/activities/hashdistrib/thhashdistribslave.cpp

@@ -82,7 +82,6 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
     IEngineRowAllocator *allocator;
     IOutputRowSerializer *serializer;
     IOutputMetaData *meta;
-    const bool &abort;
     IHash *ihash;
     Owned<IRowStream> input;
 
@@ -204,18 +203,21 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
             CDistributorBase &distributor;
             Owned<CSendBucket> _sendBucket;
             unsigned nextPending;
+            bool aborted;
 
         public:
             IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
 
             CWriteHandler(CSender &_owner) : owner(_owner), distributor(_owner.owner)
             {
+                aborted = false;
             }
             void init(void *startInfo)
             {
                 nextPending = getRandom()%distributor.numnodes;
                 _sendBucket.setown((CSendBucket *)startInfo);
                 owner.setActiveWriter(_sendBucket->queryDestination(), this);
+                aborted = false;
             }
             void main()
             {
@@ -224,7 +226,7 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
                 size32_t writerTotalSz = 0;
                 size32_t sendSz = 0;
                 MemoryBuffer mb;
-                loop
+                while (!aborted)
                 {
                     writerTotalSz += sendBucket->querySize();
                     owner.dedup(sendBucket); // conditional
@@ -250,7 +252,7 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
                                 continue; // NB: it will flow into else "remote" arm
                             }
                         }
-                        loop
+                        while (!aborted)
                         {
                             // JCSMORE check if worth compressing
                             CMessageBuffer msg;
@@ -279,6 +281,10 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
             }
             bool canReuse() { return true; }
             bool stop() { return true; }
+            void abort()
+            {
+                aborted = true;
+            }
         } **activeWriters;
 
         CDistributorBase &owner;
@@ -588,10 +594,13 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
             rowcount_t totalSent = 0;
             try
             {
-                do
+                while (!aborted && numFinished < owner.numnodes)
                 {
                     while (queryTotalSz() >= owner.inputBufferSize)
                     {
+                        if (aborted)
+                            break;
+
                         HDSendPrintLog("process exceeded inputBufferSize");
                         bool doSelf;
                         unsigned which = getSendCandidate(doSelf);
@@ -623,6 +632,8 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
 
                                 if (senderFullSem.wait(10000))
                                     break;
+                                if (aborted)
+                                    break;
                             }
                         }
                     }
@@ -651,7 +662,6 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
                         }
                     }
                 }
-                while (numFinished < owner.numnodes);
             }
             catch (IException *e)
             {
@@ -684,6 +694,23 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
 
             ActPrintLog(owner.activity, "HDIST: Send loop %s %"RCPF"d rows sent", exception.get()?"aborted":"finished", totalSent);
         }
+        void abort()
+        {
+            if (aborted)
+                return;
+            aborted = true;
+            senderFullSem.signal();
+            if (initialized)
+            {
+                CriticalBlock b(activeWritersLock);
+                for (unsigned w=0; w<owner.numnodes; w++)
+                {
+                    CWriteHandler *writer = activeWriters[w];
+                    if (writer)
+                        writer->abort();
+                }
+            }
+        }
     // IThreadFactory impl.
         virtual IPooledThread *createNew()
         {
@@ -695,9 +722,9 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
             ActPrintLog(owner.activity, e, "HDIST: CSender");
             if (!aborted)
             {
+                abort();
                 exception.set(e);
-                aborted = true;
-                senderFullSem.signal(); // send regardless, because senderFull could be about to be set.
+                senderFullSem.signal();
             }
             return owner.fireException(e);
         }
@@ -769,15 +796,15 @@ protected:
     unsigned self;
     unsigned numnodes;
     CriticalSection putsect;
-    bool pull;
+    bool pull, aborted;
     CSender sender;
 public:
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
 
-    CDistributorBase(CActivityBase *_activity, const bool &_abort,bool _doDedup, IStopInput *_istop)
-        : activity(_activity), abort(_abort), recvthread(this), sendthread(this), sender(*this)
+    CDistributorBase(CActivityBase *_activity, bool _doDedup, IStopInput *_istop)
+        : activity(_activity), recvthread(this), sendthread(this), sender(*this)
     {
-        connected = false;
+        aborted = connected = false;
         doDedup = _doDedup;
         self = activity->queryJob().queryMyRank() - 1;
         numnodes = activity->queryJob().querySlaves();
@@ -857,6 +884,7 @@ public:
         pipewr.set(piperd->queryWriter());
         connected = true;
         selfstopped = false;
+        aborted = false;
 
         sendException.clear();
         recvException.clear();
@@ -897,7 +925,14 @@ public:
         ihash = NULL;
         iCompare = NULL;
     }
-
+    virtual void abort()
+    {
+        if (!aborted)
+        {
+            aborted = true;
+            sender.abort();
+        }
+    }
     virtual void recvloop()
     {
         CCycleTimer timer;
@@ -910,7 +945,7 @@ public:
             CThorStreamDeserializerSource rowSource;
             rowSource.setStream(stream);
             unsigned left=numnodes-1;
-            while (left)
+            while (left && !aborted)
             {
 #ifdef _FULL_TRACE
                 ActPrintLog("HDIST: Receiving block");
@@ -934,7 +969,7 @@ public:
                     }
                     {
                         CriticalBlock block(putsect);
-                        while (!rowSource.eos())
+                        while (!rowSource.eos() && !aborted)
                         {
                             timer.reset();
                             RtlDynamicRowBuilder rowBuilder(allocator);
@@ -1048,11 +1083,23 @@ public:
     void setRecvExc(IException *e)
     {
         ActPrintLog(activity, e, "HDIST: recvloop");
+        abort();
         if (recvException.get())
             e->Release();
         else
             recvException.setown(e);
     }
+    bool sendRecv(ICommunicator &comm, CMessageBuffer &mb, rank_t r, mptag_t tag)
+    {
+        loop
+        {
+            if (aborted)
+                return false;
+            if (comm.sendRecv(mb, r, tag, MEDIUMTIMEOUT))
+                return true;
+            // try again
+        }
+    }
     virtual unsigned recvBlock(CMessageBuffer &mb,unsigned i=(unsigned)-1) = 0;
     virtual void stopRecv() = 0;
     virtual bool sendBlock(unsigned i,CMessageBuffer &mb) = 0;
@@ -1086,8 +1133,8 @@ public:
 
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
 
-    CRowDistributor(CActivityBase *activity, ICommunicator &_comm, mptag_t _tag, const bool &abort, bool doDedup, IStopInput *istop)
-        : CDistributorBase(activity, abort, doDedup, istop), comm(_comm), tag(_tag)
+    CRowDistributor(CActivityBase *activity, ICommunicator &_comm, mptag_t _tag, bool doDedup, IStopInput *istop)
+        : CDistributorBase(activity, doDedup, istop), comm(_comm), tag(_tag)
     {
         stopping = false;
     }
@@ -1164,7 +1211,8 @@ Restart:
             ActPrintLog(activity, "HDIST MP sending RTS to %d",i+1);
 #endif
 
-            comm.sendRecv(rts, i+1, tag);
+            if (!sendRecv(comm, rts, i+1, tag))
+                return false;
             rts.read(flag);
 #ifdef _FULL_TRACE
             ActPrintLog(activity, "HDIST MP got CTS from %d, %d",i+1,(int)flag);
@@ -1181,7 +1229,8 @@ Restart:
         }
         // this branch not yet used
         assertex(false);
-        comm.sendRecv(msg, i+1, tag);
+        if (!sendRecv(comm, msg, i+1, tag))
+            return false;
         msg.read(flag);             // whether stopped
         return flag!=0;
     }
@@ -1198,6 +1247,12 @@ Restart:
     {
         stopping = false;
     }
+    virtual void abort()
+    {
+        CDistributorBase::abort();
+        stopRecv();
+        comm.cancel(RANK_ALL, tag);
+    }
 };
 
 class CRowPullDistributor: public CDistributorBase
@@ -1338,8 +1393,8 @@ class CRowPullDistributor: public CDistributorBase
         selfdone.reinit();
     }
 public:
-    CRowPullDistributor(CActivityBase *activity, ICommunicator &_comm, mptag_t _tag, const bool &abort, bool doDedup, IStopInput *istop)
-        : CDistributorBase(activity, abort, doDedup, istop), comm(_comm), tag(_tag)
+    CRowPullDistributor(CActivityBase *activity, ICommunicator &_comm, mptag_t _tag, bool doDedup, IStopInput *istop)
+        : CDistributorBase(activity, doDedup, istop), comm(_comm), tag(_tag)
     {
         pull = true;
         tag = _tag;
@@ -1385,7 +1440,7 @@ public:
         {
             Owned<cSortedDistributeMerger> merger = new cSortedDistributeMerger(*this, numnodes, queryCompare(), queryAllocator(), deserializer);
             ActPrintLog(activity, "Read loop start");
-            loop
+            while (!aborted)
             {
                 const void *row = merger->merged().nextRow();
                 if (!row)
@@ -1465,11 +1520,13 @@ public:
         {
             msg.clear();
             selfready.wait();
+            if (aborted)
+                return (unsigned)-1;
         }
         else
         {
             msg.clear().append((byte)1); // rts
-            if (!comm.sendRecv(msg, i+1, tag))
+            if (!sendRecv(comm, msg, i+1, tag))
             {
                 return i;
             }
@@ -1614,6 +1671,11 @@ public:
         stopping = true;
         selfready.signal();
     }
+    virtual void abort()
+    {
+        CDistributorBase::abort();
+        comm.cancel(RANK_ALL, tag);
+    }
 };
 
 //==================================================================================================
@@ -1621,14 +1683,14 @@ public:
 //==================================================================================================
 
 
-IHashDistributor *createHashDistributor(CActivityBase *activity, ICommunicator &comm, mptag_t tag, const bool &abort,bool doDedup,IStopInput *istop)
+IHashDistributor *createHashDistributor(CActivityBase *activity, ICommunicator &comm, mptag_t tag, bool doDedup, IStopInput *istop)
 {
-    return new CRowDistributor(activity, comm, tag, abort, doDedup, istop);
+    return new CRowDistributor(activity, comm, tag, doDedup, istop);
 }
 
-IHashDistributor *createPullHashDistributor(CActivityBase *activity, ICommunicator &comm, mptag_t tag, const bool &abort,bool doDedup,IStopInput *istop)
+IHashDistributor *createPullHashDistributor(CActivityBase *activity, ICommunicator &comm, mptag_t tag, bool doDedup, IStopInput *istop)
 {
-    return new CRowPullDistributor(activity, comm, tag,  abort, doDedup, istop);
+    return new CRowPullDistributor(activity, comm, tag, doDedup, istop);
 }
 
 
@@ -1687,9 +1749,9 @@ public:
         ActPrintLog("HASHDISTRIB: %sinit tag %d",mergecmp?"merge, ":"",(int)mptag);
 
         if (mergecmp)
-            distributor = createPullHashDistributor(this, container.queryJob().queryJobComm(), mptag, abortSoon, false, this);
+            distributor = createPullHashDistributor(this, container.queryJob().queryJobComm(), mptag, false, this);
         else
-            distributor = createHashDistributor(this, container.queryJob().queryJobComm(), mptag, abortSoon, false, this);
+            distributor = createHashDistributor(this, container.queryJob().queryJobComm(), mptag, false, this);
         inputstopped = true;
     }
     void stopInput()
@@ -1751,7 +1813,12 @@ public:
         ActPrintLog("HASHDISTRIB: kill");
         CSlaveActivity::kill();
     }
-
+    void abort()
+    {
+        CSlaveActivity::abort();
+        if (distributor)
+            distributor->abort();
+    }
     CATCH_NEXTROW()
     {
         ActivityTimer t(totalCycles, timeActivities, NULL); // careful not to call again in derivatives
@@ -3150,7 +3217,6 @@ public:
         distributor = NULL;
         mptag = TAG_NULL;
     }
-
     ~GlobalHashDedupSlaveActivity()
     {
         instrm.clear();
@@ -3161,20 +3227,17 @@ public:
             distributor->Release();
         }
     }
-
     void stopInput()
     {
         CriticalBlock block(stopsect);  // can be called async by distribute
         HashDedupSlaveActivityBase::stopInput();
     }
-
     void init(MemoryBuffer &data, MemoryBuffer &slaveData)
     {
         HashDedupSlaveActivityBase::init(data, slaveData);
         mptag = container.queryJob().deserializeMPTag(data);
-        distributor = createHashDistributor(this, container.queryJob().queryJobComm(), mptag, abortSoon,true, this);
+        distributor = createHashDistributor(this, container.queryJob().queryJobComm(), mptag, true, this);
     }
-
     void start()
     {
         HashDedupSlaveActivityBase::start();
@@ -3183,7 +3246,6 @@ public:
         instrm.setown(distributor->connect(myRowIf, input, iHash, iCompare));
         input = instrm.get();
     }
-
     void stop()
     {
         ActPrintLog("%s: stopping", actTxt);
@@ -3197,7 +3259,12 @@ public:
         stopInput();
         dataLinkStop();
     }
-
+    void abort()
+    {
+        HashDedupSlaveActivityBase::abort();
+        if (distributor)
+            distributor->abort();
+    }
     void getMetaInfo(ThorDataLinkMetaInfo &info)
     {
         initMetaInfo(info);
@@ -3274,7 +3341,7 @@ public:
         ICompare *icompareL = joinargs->queryCompareLeft();
         ICompare *icompareR = joinargs->queryCompareRight();
         if (!lhsDistributor)
-            lhsDistributor.setown(createHashDistributor(this, container.queryJob().queryJobComm(), mptag, abortSoon,false, this));
+            lhsDistributor.setown(createHashDistributor(this, container.queryJob().queryJobComm(), mptag, false, this));
         Owned<IRowStream> reader = lhsDistributor->connect(queryRowInterfaces(inL), inL, ihashL, icompareL);
         Owned<IThorRowLoader> loaderL = createThorRowLoader(*this, ::queryRowInterfaces(inL), icompareL, true, rc_allDisk, SPILL_PRIORITY_HASHJOIN);
         strmL.setown(loaderL->load(reader, abortSoon));
@@ -3285,7 +3352,7 @@ public:
         lhsDistributor->join();
         leftdone = true;
         if (!rhsDistributor)
-            rhsDistributor.setown(createHashDistributor(this, container.queryJob().queryJobComm(), mptag2, abortSoon,false, this));
+            rhsDistributor.setown(createHashDistributor(this, container.queryJob().queryJobComm(), mptag2, false, this));
         reader.setown(rhsDistributor->connect(queryRowInterfaces(inR), inR, ihashR, icompareR));
         Owned<IThorRowLoader> loaderR = createThorRowLoader(*this, ::queryRowInterfaces(inR), icompareR, true, rc_mixed, SPILL_PRIORITY_HASHJOIN);;
         strmR.setown(loaderR->load(reader, abortSoon));
@@ -3358,6 +3425,14 @@ public:
         ActPrintLog("HASHJOIN: kill");
         CSlaveActivity::kill();
     }
+    void abort()
+    {
+        CSlaveActivity::abort();
+        if (lhsDistributor)
+            lhsDistributor->abort();
+        if (rhsDistributor)
+            rhsDistributor->abort();
+    }
     CATCH_NEXTROW()
     {
         ActivityTimer t(totalCycles, timeActivities, NULL);
@@ -3458,7 +3533,7 @@ CThorRowAggregator *mergeLocalAggs(Owned<IHashDistributor> &distributor, CActivi
             }
         } nodeCompare(helperExtra.queryHashElement());
         if (!distributor)
-            distributor.setown(createPullHashDistributor(&activity, activity.queryContainer().queryJob().queryJobComm(), mptag, activity.queryAbortSoon(), false, NULL));
+            distributor.setown(createPullHashDistributor(&activity, activity.queryContainer().queryJob().queryJobComm(), mptag, false, NULL));
         strm.setown(distributor->connect(nodeRowMetaRowIf, localAggregatedStream, &nodeCompare, &nodeCompare));
         loop
         {
@@ -3492,7 +3567,7 @@ CThorRowAggregator *mergeLocalAggs(Owned<IHashDistributor> &distributor, CActivi
         };
         Owned<IRowStream> localAggregatedStream = new CRowAggregatedStream(localAggTable);
         if (!distributor)
-            distributor.setown(createHashDistributor(&activity, activity.queryContainer().queryJob().queryJobComm(), mptag, activity.queryAbortSoon(), false, NULL));
+            distributor.setown(createHashDistributor(&activity, activity.queryContainer().queryJob().queryJobComm(), mptag, false, NULL));
         Owned<IRowInterfaces> rowIf = activity.getRowInterfaces(); // create new rowIF / avoid using activities IRowInterface, otherwise suffer from circular link
         strm.setown(distributor->connect(rowIf, localAggregatedStream, helperExtra.queryHashElement(), NULL));
         loop
@@ -3592,6 +3667,12 @@ public:
         stopInput(input);
         dataLinkStop();
     }
+    void abort()
+    {
+        CSlaveActivity::abort();
+        if (distributor)
+            distributor->abort();
+    }
     CATCH_NEXTROW()
     {
         ActivityTimer t(totalCycles, timeActivities, NULL);

+ 2 - 6
thorlcr/activities/hashdistrib/thhashdistribslave.ipp

@@ -23,17 +23,14 @@
 #include "slave.ipp"
 #include "thactivityutil.ipp"
 
-interface IRowStreamWithMetaData: extends IRowStream
-{   // currently fixed size data only
-    virtual bool nextRow(const void *&row,void *meta)=0;
-};
 
-interface IHashDistributor: extends IInterface
+interface IHashDistributor : extends IInterface
 {
     virtual IRowStream *connect(IRowInterfaces *rowIf, IRowStream *in, IHash *ihash, ICompare *icompare)=0;
     virtual void disconnect(bool stop)=0;
     virtual void join()=0;
     virtual void setBufferSizes(unsigned sendBufferSize, unsigned outputBufferSize, unsigned pullBufferSize) = 0;
+    virtual void abort()=0;
 };
 
 interface IStopInput;
@@ -41,7 +38,6 @@ IHashDistributor *createHashDistributor(
     CActivityBase *activity,
     ICommunicator &comm, 
     mptag_t tag, 
-    const bool &abort,
     bool dedup,
     IStopInput *istop);
 

+ 2 - 1
thorlcr/graph/thgraph.cpp

@@ -1276,7 +1276,8 @@ void CGraphBase::doExecute(size32_t parentExtractSz, const byte *parentExtract,
         start();
         if (!wait(aborted?MEDIUMTIMEOUT:INFINITE)) // can't wait indefinitely, query may have aborted and stall, but prudent to wait a short time for underlying graphs to unwind.
             GraphPrintLogEx(this, thorlog_null, MCuserWarning, "Graph wait cancelled, aborted=%s", aborted?"true":"false");
-        graphDone = true;
+        else
+            graphDone = true;
     }
     catch (IException *e)
     {

+ 1 - 1
thorlcr/graph/thgraphslave.cpp

@@ -632,7 +632,7 @@ void CSlaveGraph::done()
         progressActive = false;
         progressToCollect = true; // NB: ensure collected after end of graph
     }
-    if (!aborted && (!queryOwner() || isGlobal()))
+    if (!aborted && graphDone && (!queryOwner() || isGlobal()))
         getDoneSem.wait(); // must wait on master
     if (!queryOwner())
     {