|
@@ -329,8 +329,13 @@ class CBroadcaster : public CSimpleInterface
|
|
|
}
|
|
|
void recvLoop()
|
|
|
{
|
|
|
+ // my sender is implicitly stopped (never sends to self)
|
|
|
+ if (senderStop(mySender))
|
|
|
+ {
|
|
|
+ recvInterface->bCastReceive(NULL, true);
|
|
|
+ return;
|
|
|
+ }
|
|
|
ActPrintLog(&activity, "Start of recvLoop()");
|
|
|
- senderStop(mySender); // my sender is implicitly stopped (never sends to self)
|
|
|
CMessageBuffer msg;
|
|
|
while (!stopRecv && !activity.queryAbortSoon())
|
|
|
{
|
|
@@ -1345,6 +1350,10 @@ public:
|
|
|
delete broadcastLock;
|
|
|
}
|
|
|
}
|
|
|
+ CriticalSection *queryBroadcastLock()
|
|
|
+ {
|
|
|
+ return broadcastLock;
|
|
|
+ }
|
|
|
HTHELPER *queryTable() { return table; }
|
|
|
void startLeftInput()
|
|
|
{
|
|
@@ -1417,16 +1426,9 @@ public:
|
|
|
CInMemJoinBase &channel = (CInMemJoinBase &)queryChannelActivity(c);
|
|
|
channels[c] = &channel;
|
|
|
}
|
|
|
+ broadcaster->setBroadcastLock(channels[0]->queryBroadcastLock());
|
|
|
}
|
|
|
channel0Broadcaster = channels[0]->broadcaster;
|
|
|
- if (0 == queryJobChannelNumber())
|
|
|
- {
|
|
|
- for (unsigned c=0; c<queryJob().queryJobChannels(); c++)
|
|
|
- {
|
|
|
- CInMemJoinBase &channel = (CInMemJoinBase &)queryChannelActivity(c);
|
|
|
- channel.broadcaster->setBroadcastLock(broadcastLock);
|
|
|
- }
|
|
|
- }
|
|
|
// NB: use sharedRightRowInterfaces, so that expanding ptr array is using shared allocator
|
|
|
for (unsigned s=0; s<container.queryJob().querySlaves(); s++)
|
|
|
rhsSlaveRows.item(s)->setup(sharedRightRowInterfaces, false, stableSort_none, true);
|