|
@@ -25,6 +25,7 @@
|
|
|
#include "thbufdef.hpp"
|
|
|
#include "jbuff.hpp"
|
|
|
#include "jset.hpp"
|
|
|
+#include "jisem.hpp"
|
|
|
|
|
|
#include "thorxmlwrite.hpp"
|
|
|
|
|
@@ -86,7 +87,7 @@ class CBroadcaster : public CSimpleInterface
|
|
|
mptag_t mpTag;
|
|
|
unsigned myNode, slaves;
|
|
|
IBCastReceive *recvInterface;
|
|
|
- Semaphore allDoneSem;
|
|
|
+ InterruptableSemaphore allDoneSem;
|
|
|
CriticalSection allDoneLock, bcastOtherCrit;
|
|
|
bool allDone, allDoneWaiting, allRequestStop, stopping, stopRecv;
|
|
|
Owned<IBitSet> slavesDone, slavesStopping;
|
|
@@ -95,51 +96,96 @@ class CBroadcaster : public CSimpleInterface
|
|
|
{
|
|
|
CBroadcaster &broadcaster;
|
|
|
CThreadedPersistent threaded;
|
|
|
+ bool aborted;
|
|
|
public:
|
|
|
CRecv(CBroadcaster &_broadcaster) : threaded("CBroadcaster::CRecv", this), broadcaster(_broadcaster)
|
|
|
{
|
|
|
+ aborted = false;
|
|
|
}
|
|
|
- void start() { threaded.start(); }
|
|
|
- void stop()
|
|
|
+ void start()
|
|
|
{
|
|
|
+ aborted = false;
|
|
|
+ threaded.start();
|
|
|
+ }
|
|
|
+ void abort(bool join)
|
|
|
+ {
|
|
|
+ if (aborted)
|
|
|
+ return;
|
|
|
+ aborted = true;
|
|
|
broadcaster.cancelReceive();
|
|
|
- threaded.join();
|
|
|
+ if (join)
|
|
|
+ threaded.join();
|
|
|
}
|
|
|
void wait()
|
|
|
{
|
|
|
threaded.join();
|
|
|
}
|
|
|
// IThreaded
|
|
|
- virtual void main() { broadcaster.recvLoop(); }
|
|
|
+ virtual void main()
|
|
|
+ {
|
|
|
+ try
|
|
|
+ {
|
|
|
+ broadcaster.recvLoop();
|
|
|
+ }
|
|
|
+ catch (IException *e)
|
|
|
+ {
|
|
|
+ EXCLOG(e, "CRecv");
|
|
|
+ abort(false);
|
|
|
+ broadcaster.cancel(e);
|
|
|
+ e->Release();
|
|
|
+ }
|
|
|
+ }
|
|
|
} receiver;
|
|
|
class CSend : implements IThreaded
|
|
|
{
|
|
|
CBroadcaster &broadcaster;
|
|
|
CThreadedPersistent threaded;
|
|
|
SimpleInterThreadQueueOf<CSendItem, true> broadcastQueue;
|
|
|
+ Owned<IException> exception;
|
|
|
+ bool aborted;
|
|
|
+ void clearQueue()
|
|
|
+ {
|
|
|
+ loop
|
|
|
+ {
|
|
|
+ Owned<CSendItem> sendItem = broadcastQueue.dequeueNow();
|
|
|
+ if (NULL == sendItem)
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
public:
|
|
|
CSend(CBroadcaster &_broadcaster) : threaded("CBroadcaster::CSend", this), broadcaster(_broadcaster)
|
|
|
{
|
|
|
+ aborted = false;
|
|
|
}
|
|
|
~CSend()
|
|
|
{
|
|
|
- stop();
|
|
|
+ clearQueue();
|
|
|
}
|
|
|
void addBlock(CSendItem *sendItem)
|
|
|
{
|
|
|
+ if (exception)
|
|
|
+ {
|
|
|
+ if (sendItem)
|
|
|
+ sendItem->Release();
|
|
|
+ throw exception.getClear();
|
|
|
+ }
|
|
|
broadcastQueue.enqueue(sendItem); // will block if queue full
|
|
|
}
|
|
|
- void start() { threaded.start(); }
|
|
|
- void stop()
|
|
|
+ void start()
|
|
|
{
|
|
|
+ aborted = false;
|
|
|
+ exception.clear();
|
|
|
+ threaded.start();
|
|
|
+ }
|
|
|
+ void abort(bool join)
|
|
|
+ {
|
|
|
+ if (aborted)
|
|
|
+ return;
|
|
|
+ aborted = true;
|
|
|
broadcastQueue.stop();
|
|
|
- loop
|
|
|
- {
|
|
|
- Owned<CSendItem> sendItem = broadcastQueue.dequeueNow();
|
|
|
- if (NULL == sendItem)
|
|
|
- break;
|
|
|
- }
|
|
|
- threaded.join();
|
|
|
+ clearQueue();
|
|
|
+ if (join)
|
|
|
+ threaded.join();
|
|
|
}
|
|
|
void wait()
|
|
|
{
|
|
@@ -150,12 +196,22 @@ class CBroadcaster : public CSimpleInterface
|
|
|
// IThreaded
|
|
|
virtual void main()
|
|
|
{
|
|
|
- while (!broadcaster.activity.queryAbortSoon())
|
|
|
+ try
|
|
|
{
|
|
|
- Owned<CSendItem> sendItem = broadcastQueue.dequeue();
|
|
|
- if (NULL == sendItem)
|
|
|
- break;
|
|
|
- broadcaster.broadcastToOthers(sendItem);
|
|
|
+ while (!broadcaster.activity.queryAbortSoon())
|
|
|
+ {
|
|
|
+ Owned<CSendItem> sendItem = broadcastQueue.dequeue();
|
|
|
+ if (NULL == sendItem)
|
|
|
+ break;
|
|
|
+ broadcaster.broadcastToOthers(sendItem);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ catch (IException *e)
|
|
|
+ {
|
|
|
+ EXCLOG(e, "CSend");
|
|
|
+ exception.setown(e);
|
|
|
+ abort(false);
|
|
|
+ broadcaster.cancel(e);
|
|
|
}
|
|
|
ActPrintLog(&broadcaster.activity, "Sender stopped");
|
|
|
}
|
|
@@ -345,13 +401,19 @@ public:
|
|
|
receiver.wait(); // terminates when received stop from all others
|
|
|
sender.wait(); // terminates when any remaining packets, including final stop packets have been re-broadcast
|
|
|
}
|
|
|
- void cancel()
|
|
|
+ void cancel(IException *e=NULL)
|
|
|
{
|
|
|
allDoneWaiting = false;
|
|
|
allDone = true;
|
|
|
- receiver.stop();
|
|
|
- sender.stop();
|
|
|
- allDoneSem.signal();
|
|
|
+ receiver.abort(true);
|
|
|
+ sender.abort(true);
|
|
|
+ if (e)
|
|
|
+ {
|
|
|
+ allDoneSem.interrupt(LINK(e));
|
|
|
+ activity.fireException(LINK(e));
|
|
|
+ }
|
|
|
+ else
|
|
|
+ allDoneSem.signal();
|
|
|
}
|
|
|
bool send(CSendItem *sendItem)
|
|
|
{
|
|
@@ -468,6 +530,7 @@ class CLookupJoinActivity : public CSlaveActivity, public CThorDataLink, impleme
|
|
|
CLookupJoinActivity &owner;
|
|
|
bool stopped;
|
|
|
SimpleInterThreadQueueOf<CSendItem, true> blockQueue;
|
|
|
+ Owned<IException> exception;
|
|
|
public:
|
|
|
CRowProcessor(CLookupJoinActivity &_owner) : threaded("CRowProcessor", this), owner(_owner)
|
|
|
{
|
|
@@ -485,23 +548,50 @@ class CLookupJoinActivity : public CSlaveActivity, public CThorDataLink, impleme
|
|
|
}
|
|
|
wait();
|
|
|
}
|
|
|
- void start() { threaded.start(); }
|
|
|
+ void start()
|
|
|
+ {
|
|
|
+ stopped = false;
|
|
|
+ exception.clear();
|
|
|
+ threaded.start();
|
|
|
+ }
|
|
|
+ void abort()
|
|
|
+ {
|
|
|
+ if (!stopped)
|
|
|
+ {
|
|
|
+ stopped = true;
|
|
|
+ blockQueue.enqueue(NULL);
|
|
|
+ }
|
|
|
+ }
|
|
|
void wait() { threaded.join(); }
|
|
|
void addBlock(CSendItem *sendItem)
|
|
|
{
|
|
|
+ if (exception)
|
|
|
+ {
|
|
|
+ if (sendItem)
|
|
|
+ sendItem->Release();
|
|
|
+ throw exception.getClear();
|
|
|
+ }
|
|
|
blockQueue.enqueue(sendItem); // will block if queue full
|
|
|
}
|
|
|
// IThreaded
|
|
|
virtual void main()
|
|
|
{
|
|
|
- while (!stopped)
|
|
|
+ try
|
|
|
{
|
|
|
- Owned<CSendItem> sendItem = blockQueue.dequeue();
|
|
|
- if (NULL == sendItem)
|
|
|
- break;
|
|
|
- MemoryBuffer expandedMb;
|
|
|
- ThorExpand(sendItem->queryMsg(), expandedMb);
|
|
|
- owner.processRHSRows(sendItem->queryOrigin(), expandedMb);
|
|
|
+ while (!stopped)
|
|
|
+ {
|
|
|
+ Owned<CSendItem> sendItem = blockQueue.dequeue();
|
|
|
+ if (stopped || (NULL == sendItem))
|
|
|
+ break;
|
|
|
+ MemoryBuffer expandedMb;
|
|
|
+ ThorExpand(sendItem->queryMsg(), expandedMb);
|
|
|
+ owner.processRHSRows(sendItem->queryOrigin(), expandedMb);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ catch (IException *e)
|
|
|
+ {
|
|
|
+ exception.setown(e);
|
|
|
+ EXCLOG(e, "CRowProcessor");
|
|
|
}
|
|
|
}
|
|
|
} rowProcessor;
|
|
@@ -743,6 +833,7 @@ public:
|
|
|
gotOtherROs.signal();
|
|
|
cancelReceiveMsg(RANK_ALL, mpTag);
|
|
|
broadcaster.cancel();
|
|
|
+ rowProcessor.abort();
|
|
|
}
|
|
|
virtual void stop()
|
|
|
{
|