|
@@ -39,7 +39,7 @@ enum join_t { JT_Undefined, JT_Inner, JT_LeftOuter, JT_RightOuter, JT_LeftOnly,
|
|
|
#define MAX_QUEUE_BLOCKS 5
|
|
|
|
|
|
enum broadcast_code { bcast_none, bcast_send, bcast_sendStopping, bcast_stop };
|
|
|
-enum broadcast_flags { bcastflag_null=0, bcastflag_spilt=0x100, bcastflag_standardjoin=0x200 };
|
|
|
+enum broadcast_flags { bcastflag_null=0, bcastflag_spilt=0x100 };
|
|
|
#define BROADCAST_CODE_MASK 0x00FF
|
|
|
#define BROADCAST_FLAG_MASK 0xFF00
|
|
|
class CSendItem : public CSimpleInterface
|
|
@@ -79,7 +79,7 @@ public:
|
|
|
|
|
|
interface IBCastReceive
|
|
|
{
|
|
|
- virtual void bCastReceive(CSendItem *sendItem) = 0;
|
|
|
+ virtual void bCastReceive(CSendItem *sendItem, bool stop) = 0;
|
|
|
};
|
|
|
|
|
|
/*
|
|
@@ -94,14 +94,15 @@ class CBroadcaster : public CSimpleInterface
|
|
|
ICommunicator &comm;
|
|
|
CActivityBase &activity;
|
|
|
mptag_t mpTag;
|
|
|
- unsigned myNode, nodes, mySlave, slaves;
|
|
|
+ unsigned myNode, nodes, mySlave, slaves, senders, mySender;
|
|
|
IBCastReceive *recvInterface;
|
|
|
InterruptableSemaphore allDoneSem;
|
|
|
- CriticalSection allDoneLock, bcastOtherCrit, stopCrit;
|
|
|
- bool allDone, allRequestStop, stopping, stopRecv;
|
|
|
+ CriticalSection allDoneLock, stopCrit;
|
|
|
+ CriticalSection *broadcastLock;
|
|
|
+ bool allRequestStop, stopping, stopRecv, receiving, nodeBroadcast;
|
|
|
unsigned waitingAtAllDoneCount;
|
|
|
broadcast_flags stopFlag;
|
|
|
- Owned<IBitSet> slavesDone, broadcastersStopping;
|
|
|
+ Owned<IBitSet> sendersDone, broadcastersStopping;
|
|
|
|
|
|
class CRecv : implements IThreaded
|
|
|
{
|
|
@@ -228,30 +229,27 @@ class CBroadcaster : public CSimpleInterface
|
|
|
}
|
|
|
} sender;
|
|
|
|
|
|
- // NB: returns true if all except me(myNode) are done
|
|
|
- bool slaveStop(unsigned slave)
|
|
|
+ // NB: returns true if all done. Sets allDoneExceptSelf if all except this sender are done
|
|
|
+ bool senderStop(unsigned sender)
|
|
|
{
|
|
|
- CriticalBlock b(allDoneLock);
|
|
|
- bool done = slavesDone->testSet(slave, true);
|
|
|
+ bool done = sendersDone->testSet(sender, true);
|
|
|
assertex(false == done);
|
|
|
- unsigned which = slavesDone->scan(0, false);
|
|
|
- if (which == slaves) // i.e. got all
|
|
|
- {
|
|
|
- allDone = true;
|
|
|
- if (waitingAtAllDoneCount)
|
|
|
- {
|
|
|
- allDoneSem.signal(waitingAtAllDoneCount);
|
|
|
- waitingAtAllDoneCount = 0;
|
|
|
- }
|
|
|
- receiver.abort(false);
|
|
|
- recvInterface->bCastReceive(NULL);
|
|
|
- }
|
|
|
- else if (which == mySlave)
|
|
|
+ unsigned which = sendersDone->scan(0, false);
|
|
|
+ if (which != senders)
|
|
|
+ return false;
|
|
|
+ // all have signalled stop
|
|
|
+ activity.ActPrintLog("CBroadcaster::senderStop() all done - waitingAtAllDoneCount=%u", waitingAtAllDoneCount);
|
|
|
+ if (waitingAtAllDoneCount)
|
|
|
{
|
|
|
- if (slavesDone->scan(which+1, false) == slaves)
|
|
|
- return true; // all done except me
|
|
|
+ allDoneSem.signal(waitingAtAllDoneCount);
|
|
|
+ waitingAtAllDoneCount = 0;
|
|
|
}
|
|
|
- return false;
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ bool senderStop(CSendItem &sendItem)
|
|
|
+ {
|
|
|
+ unsigned sender = nodeBroadcast?sendItem.queryNode():sendItem.querySlave();
|
|
|
+ return senderStop(sender);
|
|
|
}
|
|
|
unsigned target(unsigned i, unsigned node)
|
|
|
{
|
|
@@ -269,12 +267,13 @@ class CBroadcaster : public CSimpleInterface
|
|
|
}
|
|
|
void broadcastToOthers(CSendItem *sendItem)
|
|
|
{
|
|
|
+ dbgassertex(broadcastLock);
|
|
|
mptag_t rt = ::createReplyTag();
|
|
|
unsigned origin = sendItem->queryNode();
|
|
|
unsigned pseudoNode = (myNode<origin) ? nodes-origin+myNode : myNode-origin;
|
|
|
CMessageBuffer replyMsg;
|
|
|
// sends to all in 1st pass, then waits for ack from all
|
|
|
- CriticalBlock b(bcastOtherCrit);
|
|
|
+ CriticalBlock b(*broadcastLock); // prevent other channels overlapping, otherwise causes queue ordering issues with MP multi packet messages to same dst.
|
|
|
for (unsigned sendRecv=0; sendRecv<2 && !activity.queryAbortSoon(); sendRecv++)
|
|
|
{
|
|
|
unsigned i = 0;
|
|
@@ -291,7 +290,7 @@ class CBroadcaster : public CSimpleInterface
|
|
|
if (0 == sendRecv) // send
|
|
|
{
|
|
|
#ifdef _TRACEBROADCAST
|
|
|
- ActPrintLog(&activity, "Broadcast node %d Sending to node %d, origin %d, size %d, code=%d", myNode+1, t, origin, sendLen, (unsigned)sendItem->queryCode());
|
|
|
+ ActPrintLog(&activity, "Broadcast node %d Sending to node %d, origin node %d, origin slave %d, size %d, code=%d", myNode+1, t, origin+1, sendItem->querySlave()+1, sendLen, (unsigned)sendItem->queryCode());
|
|
|
#endif
|
|
|
CMessageBuffer &msg = sendItem->queryMsg();
|
|
|
msg.setReplyTag(rt); // simulate sendRecv
|
|
@@ -300,36 +299,52 @@ class CBroadcaster : public CSimpleInterface
|
|
|
else // recv reply
|
|
|
{
|
|
|
#ifdef _TRACEBROADCAST
|
|
|
- ActPrintLog(&activity, "Broadcast node %d Waiting for reply from node %d, origin %d, size %d, code=%d, replyTag=%d", myNode+1, t, origin, sendLen, (unsigned)sendItem->queryCode(), (unsigned)rt);
|
|
|
+ ActPrintLog(&activity, "Broadcast node %d Waiting for reply from node %d, origin node %d, origin slave %d, size %d, code=%d, replyTag=%d", myNode+1, t, origin+1, sendItem->querySlave()+1, sendLen, (unsigned)sendItem->queryCode(), (unsigned)rt);
|
|
|
#endif
|
|
|
if (!activity.receiveMsg(comm, replyMsg, t, rt))
|
|
|
break;
|
|
|
#ifdef _TRACEBROADCAST
|
|
|
- ActPrintLog(&activity, "Broadcast node %d Sent to node %d, origin %d, size %d, code=%d - received ack", myNode+1, t, origin, sendLen, (unsigned)sendItem->queryCode());
|
|
|
+ ActPrintLog(&activity, "Broadcast node %d Sent to node %d, origin node %d, origin slave %d, size %d, code=%d - received ack", myNode+1, t, origin+1, sendItem->querySlave()+1, sendLen, (unsigned)sendItem->queryCode());
|
|
|
#endif
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- // called by CRecv thread
|
|
|
void cancelReceive()
|
|
|
{
|
|
|
stopRecv = true;
|
|
|
- activity.cancelReceiveMsg(comm, RANK_ALL, mpTag);
|
|
|
+ if (receiving)
|
|
|
+ comm.cancel(RANK_ALL, mpTag);
|
|
|
+ }
|
|
|
+ bool receiveMsg(CMessageBuffer &mb, rank_t *sender)
|
|
|
+ {
|
|
|
+ BooleanOnOff onOff(receiving);
|
|
|
+ // check 'cancelledReceive' every 10 secs
|
|
|
+ while (!stopRecv)
|
|
|
+ {
|
|
|
+ if (comm.recv(mb, RANK_ALL, mpTag, sender, 10000))
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ return false;
|
|
|
}
|
|
|
void recvLoop()
|
|
|
{
|
|
|
+ ActPrintLog(&activity, "Start of recvLoop()");
|
|
|
+ senderStop(mySender); // my sender is implicitly stopped (never sends to self)
|
|
|
CMessageBuffer msg;
|
|
|
while (!stopRecv && !activity.queryAbortSoon())
|
|
|
{
|
|
|
rank_t sendRank;
|
|
|
- if (!activity.receiveMsg(comm, msg, RANK_ALL, mpTag, &sendRank))
|
|
|
+ if (!receiveMsg(msg, &sendRank))
|
|
|
+ {
|
|
|
+ ActPrintLog(&activity, "recvLoop() - receiveMsg cancelled");
|
|
|
break;
|
|
|
+ }
|
|
|
mptag_t replyTag = msg.getReplyTag();
|
|
|
CMessageBuffer ackMsg;
|
|
|
Owned<CSendItem> sendItem = new CSendItem(msg);
|
|
|
#ifdef _TRACEBROADCAST
|
|
|
- ActPrintLog(&activity, "Broadcast node %d received from node %d, origin slave %d, size %d, code=%d", myNode+1, (unsigned)sendRank, sendItem->querySlave()+1, sendItem->length(), (unsigned)sendItem->queryCode());
|
|
|
+ ActPrintLog(&activity, "Broadcast node %d received from node %d, origin node %d, origin slave %d, size %d, code=%d", myNode+1, (unsigned)sendRank, sendItem->queryNode()+1, sendItem->querySlave()+1, sendItem->length(), (unsigned)sendItem->queryCode());
|
|
|
#endif
|
|
|
comm.send(ackMsg, sendRank, replyTag); // send ack
|
|
|
#ifdef _TRACEBROADCAST
|
|
@@ -342,10 +357,12 @@ class CBroadcaster : public CSimpleInterface
|
|
|
case bcast_stop:
|
|
|
{
|
|
|
CriticalBlock b(allDoneLock);
|
|
|
- if (slaveStop(sendItem->querySlave()) || allDone)
|
|
|
+ ActPrintLog(&activity, "recvLoop - received bcast_stop, from : node=%u, slave=%u", sendItem->queryNode()+1, sendItem->querySlave()+1);
|
|
|
+ bool stop = senderStop(*sendItem);
|
|
|
+ recvInterface->bCastReceive(sendItem.getLink(), stop);
|
|
|
+ if (stop)
|
|
|
{
|
|
|
- recvInterface->bCastReceive(sendItem.getClear());
|
|
|
- ActPrintLog(&activity, "recvLoop, received last slaveStop");
|
|
|
+ ActPrintLog(&activity, "recvLoop, received last senderStop, node=%u, slave=%u", sendItem->queryNode()+1, sendItem->querySlave()+1);
|
|
|
// NB: this slave has nothing more to receive.
|
|
|
// However the sender will still be re-broadcasting some packets, including these stop packets
|
|
|
return;
|
|
@@ -360,13 +377,14 @@ class CBroadcaster : public CSimpleInterface
|
|
|
case bcast_send:
|
|
|
{
|
|
|
if (!allRequestStop) // don't care if all stopping
|
|
|
- recvInterface->bCastReceive(sendItem.getClear());
|
|
|
+ recvInterface->bCastReceive(sendItem.getClear(), false);
|
|
|
break;
|
|
|
}
|
|
|
default:
|
|
|
throwUnexpected();
|
|
|
}
|
|
|
}
|
|
|
+ ActPrintLog(&activity, "End of recvLoop()");
|
|
|
}
|
|
|
inline void _setStopping(unsigned node)
|
|
|
{
|
|
@@ -377,20 +395,36 @@ class CBroadcaster : public CSimpleInterface
|
|
|
public:
|
|
|
CBroadcaster(CActivityBase &_activity) : activity(_activity), receiver(*this), sender(*this), comm(_activity.queryJob().queryNodeComm())
|
|
|
{
|
|
|
- allDone = allRequestStop = stopping = stopRecv = false;
|
|
|
+ allRequestStop = stopping = stopRecv = false;
|
|
|
waitingAtAllDoneCount = 0;
|
|
|
myNode = activity.queryJob().queryMyNodeRank()-1; // 0 based
|
|
|
mySlave = activity.queryJobChannel().queryMyRank()-1; // 0 based
|
|
|
nodes = activity.queryJob().queryNodes();
|
|
|
slaves = activity.queryJob().querySlaves();
|
|
|
- slavesDone.setown(createThreadSafeBitSet());
|
|
|
+ mySender = mySlave;
|
|
|
+ senders = slaves;
|
|
|
+ sendersDone.setown(createThreadSafeBitSet());
|
|
|
broadcastersStopping.setown(createThreadSafeBitSet());
|
|
|
mpTag = TAG_NULL;
|
|
|
recvInterface = NULL;
|
|
|
stopFlag = bcastflag_null;
|
|
|
+ broadcastLock = NULL;
|
|
|
+ receiving = false;
|
|
|
+ nodeBroadcast = false;
|
|
|
}
|
|
|
- void start(IBCastReceive *_recvInterface, mptag_t _mpTag, bool _stopping)
|
|
|
+ void start(IBCastReceive *_recvInterface, mptag_t _mpTag, bool _stopping, bool _nodeBroadcast)
|
|
|
{
|
|
|
+ nodeBroadcast = _nodeBroadcast;
|
|
|
+ if (nodeBroadcast)
|
|
|
+ {
|
|
|
+ mySender = myNode;
|
|
|
+ senders = nodes;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ mySender = mySlave;
|
|
|
+ senders = slaves;
|
|
|
+ }
|
|
|
stopping = _stopping;
|
|
|
recvInterface = _recvInterface;
|
|
|
stopRecv = false;
|
|
@@ -403,10 +437,10 @@ public:
|
|
|
}
|
|
|
void reset()
|
|
|
{
|
|
|
- allDone = allRequestStop = stopping = false;
|
|
|
+ allRequestStop = stopping = false;
|
|
|
waitingAtAllDoneCount = 0;
|
|
|
stopFlag = bcastflag_null;
|
|
|
- slavesDone->reset();
|
|
|
+ sendersDone->reset();
|
|
|
broadcastersStopping->reset();
|
|
|
}
|
|
|
CSendItem *newSendItem(broadcast_code code)
|
|
@@ -415,25 +449,28 @@ public:
|
|
|
code = bcast_sendStopping;
|
|
|
return new CSendItem(code, myNode, mySlave);
|
|
|
}
|
|
|
+ void setBroadcastLock(CriticalSection *_broadcastLock)
|
|
|
+ {
|
|
|
+ broadcastLock = _broadcastLock;
|
|
|
+ }
|
|
|
void resetSendItem(CSendItem *sendItem)
|
|
|
{
|
|
|
sendItem->reset();
|
|
|
}
|
|
|
- void waitReceiverDone(unsigned slave)
|
|
|
+ void waitReceiverDone(unsigned sender)
|
|
|
{
|
|
|
{
|
|
|
CriticalBlock b(allDoneLock);
|
|
|
- slaveStop(slave);
|
|
|
- if (allDone)
|
|
|
+ if (senderStop(sender))
|
|
|
+ {
|
|
|
+ receiver.abort(false);
|
|
|
+ recvInterface->bCastReceive(NULL, true);
|
|
|
return;
|
|
|
+ }
|
|
|
waitingAtAllDoneCount++;
|
|
|
}
|
|
|
allDoneSem.wait();
|
|
|
}
|
|
|
- void waitReceiverDone()
|
|
|
- {
|
|
|
- waitReceiverDone(mySlave);
|
|
|
- }
|
|
|
void end()
|
|
|
{
|
|
|
receiver.wait(); // terminates when received stop from all others
|
|
@@ -441,7 +478,6 @@ public:
|
|
|
}
|
|
|
void cancel(IException *e=NULL)
|
|
|
{
|
|
|
- allDone = true;
|
|
|
receiver.abort(true);
|
|
|
sender.abort(true);
|
|
|
if (e)
|
|
@@ -597,7 +633,7 @@ public:
|
|
|
if (0 == threadCount)
|
|
|
threadCount = getAffinityCpus();
|
|
|
}
|
|
|
- bool init(rowidx_t rowCount)
|
|
|
+ bool init(rowidx_t rowCount, roxiemem::IRowManager *rowManager)
|
|
|
{
|
|
|
bool threadSafeBitSet = activity.getOptBool("threadSafeBitSet", false); // for testing only
|
|
|
if (threadSafeBitSet)
|
|
@@ -608,7 +644,7 @@ public:
|
|
|
else
|
|
|
{
|
|
|
size32_t bitSetMemSz = getBitSetMemoryRequirement(rowCount);
|
|
|
- void *pBitSetMem = activity.queryRowManager().allocate(bitSetMemSz, activity.queryContainer().queryId(), SPILL_PRIORITY_LOW);
|
|
|
+ void *pBitSetMem = rowManager->allocate(bitSetMemSz, activity.queryContainer().queryId(), SPILL_PRIORITY_LOW);
|
|
|
if (!pBitSetMem)
|
|
|
return false;
|
|
|
|
|
@@ -729,7 +765,11 @@ public:
|
|
|
class CThorRowArrayWithFlushMarker : public CThorSpillableRowArray
|
|
|
{
|
|
|
public:
|
|
|
- CThorRowArrayWithFlushMarker(CActivityBase &activity, IRowInterfaces *rowIf, bool allowNulls=false, StableSortFlag stableSort=stableSort_none, rowidx_t initialSize=InitialSortElements, size32_t commitDelta=CommitStep)
|
|
|
+ CThorRowArrayWithFlushMarker(CActivityBase &activity) : CThorSpillableRowArray(activity)
|
|
|
+ {
|
|
|
+ flushMarker = 0;
|
|
|
+ }
|
|
|
+ CThorRowArrayWithFlushMarker(CActivityBase &activity, IThorRowInterfaces *rowIf, bool allowNulls=false, StableSortFlag stableSort=stableSort_none, rowidx_t initialSize=InitialSortElements, size32_t commitDelta=CommitStep)
|
|
|
: CThorSpillableRowArray(activity, rowIf, allowNulls, stableSort, initialSize, commitDelta)
|
|
|
{
|
|
|
flushMarker = 0;
|
|
@@ -865,6 +905,7 @@ protected:
|
|
|
|
|
|
Owned<CBroadcaster> broadcaster;
|
|
|
CBroadcaster *channel0Broadcaster;
|
|
|
+ CriticalSection *broadcastLock;
|
|
|
rowidx_t rhsTableLen;
|
|
|
Owned<HTHELPER> table; // NB: only channel 0 uses table, unless failing over to local lookup join
|
|
|
Linked<HTHELPER> tableProxy; // Channels >1 will reference channel 0 table unless failed over
|
|
@@ -873,9 +914,12 @@ protected:
|
|
|
|
|
|
IThorDataLink *leftITDL, *rightITDL;
|
|
|
Owned<IRowStream> left, right;
|
|
|
+ IThorAllocator *rightThorAllocator;
|
|
|
+ roxiemem::IRowManager *rightRowManager;
|
|
|
+ Owned<IThorRowInterfaces> sharedRightRowInterfaces;
|
|
|
Owned<IEngineRowAllocator> rightAllocator;
|
|
|
Owned<IEngineRowAllocator> leftAllocator;
|
|
|
- Owned<IEngineRowAllocator> allocator;
|
|
|
+ Owned<IEngineRowAllocator> allocator; // allocator for output transform
|
|
|
Owned<IOutputRowSerializer> rightSerializer;
|
|
|
Owned<IOutputRowDeserializer> rightDeserializer;
|
|
|
bool gotRHS;
|
|
@@ -887,6 +931,7 @@ protected:
|
|
|
const void *rhsNext;
|
|
|
CThorExpandingRowArray rhs;
|
|
|
Owned<IOutputMetaData> outputMeta;
|
|
|
+ IOutputMetaData *rightOutputMeta;
|
|
|
PointerArrayOf<CThorRowArrayWithFlushMarker> rhsSlaveRows;
|
|
|
IArrayOf<IRowStream> gatheredRHSNodeStreams;
|
|
|
|
|
@@ -1006,7 +1051,7 @@ protected:
|
|
|
* if it never spills, but will make flushing non-locals simpler if spilling occurs.
|
|
|
*/
|
|
|
CThorSpillableRowArray &rows = *rhsSlaveRows.item(slave);
|
|
|
- RtlDynamicRowBuilder rowBuilder(rightAllocator);
|
|
|
+ RtlDynamicRowBuilder rowBuilder(rightAllocator); // NB: rightAllocator is the shared allocator
|
|
|
CThorStreamDeserializerSource memDeserializer(mb.length(), mb.toByteArray());
|
|
|
while (!memDeserializer.eos())
|
|
|
{
|
|
@@ -1033,7 +1078,9 @@ protected:
|
|
|
if (!row)
|
|
|
break;
|
|
|
|
|
|
- // Add all locally read right rows to channel0 directly
|
|
|
+ /* Add all locally read right rows to channel0 directly
|
|
|
+ * NB: these rows remain on their channel allocator.
|
|
|
+ */
|
|
|
if (0 == queryJobChannelNumber())
|
|
|
{
|
|
|
if (!addRHSRow(localRhsRows, row)) // may cause broadcaster to be told to stop (for isStopping() to become true)
|
|
@@ -1237,7 +1284,7 @@ public:
|
|
|
IMPLEMENT_IINTERFACE_USING(CSlaveActivity);
|
|
|
|
|
|
CInMemJoinBase(CGraphElementBase *_container) : CSlaveActivity(_container), CThorDataLink(this),
|
|
|
- HELPERBASE((HELPER *)queryHelper()), rhs(*this, NULL)
|
|
|
+ HELPERBASE((HELPER *)queryHelper()), rhs(*this)
|
|
|
{
|
|
|
gotRHS = false;
|
|
|
nextRhsRow = 0;
|
|
@@ -1273,6 +1320,16 @@ public:
|
|
|
channel0Broadcaster = NULL;
|
|
|
channelActivitiesAssigned = false;
|
|
|
table.setown(new HTHELPER);
|
|
|
+ rightOutputMeta = NULL;
|
|
|
+ if (getOptBool("lkjoinUseSharedAllocator", true))
|
|
|
+ {
|
|
|
+ ActPrintLog("Using shared row manager for RHS");
|
|
|
+ rightThorAllocator = queryJob().querySharedAllocator();
|
|
|
+ }
|
|
|
+ else
|
|
|
+ rightThorAllocator = queryJobChannel().queryThorAllocator();
|
|
|
+ rightRowManager = rightThorAllocator->queryRowManager();
|
|
|
+ broadcastLock = NULL;
|
|
|
}
|
|
|
~CInMemJoinBase()
|
|
|
{
|
|
@@ -1285,6 +1342,8 @@ public:
|
|
|
if (rows)
|
|
|
delete rows;
|
|
|
}
|
|
|
+ if (broadcastLock)
|
|
|
+ delete broadcastLock;
|
|
|
}
|
|
|
}
|
|
|
HTHELPER *queryTable() { return table; }
|
|
@@ -1304,18 +1363,37 @@ public:
|
|
|
unsigned slaves = container.queryJob().querySlaves();
|
|
|
rhsSlaveRows.ensure(slaves);
|
|
|
for (unsigned s=0; s<container.queryJob().querySlaves(); s++)
|
|
|
- rhsSlaveRows.append(new CThorRowArrayWithFlushMarker(*this, NULL));
|
|
|
+ rhsSlaveRows.append(new CThorRowArrayWithFlushMarker(*this));
|
|
|
channels.allocateN(queryJob().queryJobChannels());
|
|
|
broadcaster.setown(new CBroadcaster(*this));
|
|
|
if (0 == queryJobChannelNumber())
|
|
|
+ {
|
|
|
rowProcessor = new CRowProcessor(*this);
|
|
|
+ broadcastLock = new CriticalSection;
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
virtual void start()
|
|
|
{
|
|
|
assertex(inputs.ordinality() == 2);
|
|
|
+
|
|
|
+ gotRHS = false;
|
|
|
+ nextRhsRow = 0;
|
|
|
+ joined = 0;
|
|
|
+ joinCounter = 0;
|
|
|
+ leftMatch = false;
|
|
|
+ rhsNext = NULL;
|
|
|
+ rhsTableLen = 0;
|
|
|
+ leftITDL = inputs.item(0);
|
|
|
+ rightITDL = inputs.item(1);
|
|
|
+ rightOutputMeta = rightITDL->queryFromActivity()->queryContainer().queryHelper()->queryOutputMeta();
|
|
|
+ rightAllocator.setown(rightThorAllocator->getRowAllocator(rightOutputMeta, container.queryId()));
|
|
|
+
|
|
|
if (isGlobal())
|
|
|
{
|
|
|
+ sharedRightRowInterfaces.setown(createThorRowInterfaces(rightRowManager, rightOutputMeta, queryId(), &queryJobChannel().querySharedMemCodeContext()));
|
|
|
+ rhs.setup(sharedRightRowInterfaces);
|
|
|
+
|
|
|
// It is not until here, that it is guaranteed all channel slave activities have been initialized.
|
|
|
if (!channelActivitiesAssigned)
|
|
|
{
|
|
@@ -1327,18 +1405,18 @@ public:
|
|
|
}
|
|
|
}
|
|
|
channel0Broadcaster = channels[0]->broadcaster;
|
|
|
+ if (0 == queryJobChannelNumber())
|
|
|
+ {
|
|
|
+ for (unsigned c=0; c<queryJob().queryJobChannels(); c++)
|
|
|
+ {
|
|
|
+ CInMemJoinBase &channel = (CInMemJoinBase &)queryChannelActivity(c);
|
|
|
+ channel.broadcaster->setBroadcastLock(broadcastLock);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // NB: use sharedRightRowInterfaces, so that expanding ptr array is using shared allocator
|
|
|
for (unsigned s=0; s<container.queryJob().querySlaves(); s++)
|
|
|
- rhsSlaveRows.item(s)->setup(queryRowInterfaces(rightITDL), false, stableSort_none, true);
|
|
|
+ rhsSlaveRows.item(s)->setup(sharedRightRowInterfaces, false, stableSort_none, true);
|
|
|
}
|
|
|
- gotRHS = false;
|
|
|
- nextRhsRow = 0;
|
|
|
- joined = 0;
|
|
|
- joinCounter = 0;
|
|
|
- leftMatch = false;
|
|
|
- rhsNext = NULL;
|
|
|
- rhsTableLen = 0;
|
|
|
- leftITDL = inputs.item(0);
|
|
|
- rightITDL = inputs.item(1);
|
|
|
allocator.set(queryRowAllocator());
|
|
|
leftAllocator.set(::queryRowAllocator(leftITDL));
|
|
|
outputMeta.set(leftITDL->queryFromActivity()->queryContainer().queryHelper()->queryOutputMeta());
|
|
@@ -1349,7 +1427,6 @@ public:
|
|
|
currentHashEntry.count = 0;
|
|
|
|
|
|
right.set(rightITDL);
|
|
|
- rightAllocator.set(::queryRowAllocator(rightITDL));
|
|
|
rightSerializer.set(::queryRowSerializer(rightITDL));
|
|
|
rightDeserializer.set(::queryRowDeserializer(rightITDL));
|
|
|
|
|
@@ -1426,23 +1503,22 @@ public:
|
|
|
if (0 == queryJobChannelNumber())
|
|
|
{
|
|
|
rowProcessor->start();
|
|
|
- broadcaster->start(this, mpTag, stopping);
|
|
|
+ broadcaster->start(this, mpTag, stopping, false); // slaves broadcasting
|
|
|
broadcastRHS();
|
|
|
- broadcaster->waitReceiverDone();
|
|
|
broadcaster->end();
|
|
|
rowProcessor->wait();
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- broadcaster->start(NULL, mpTag, stopping); // pass NULL for IBCastReceive, since only channel 0 receives
|
|
|
+ broadcaster->start(NULL, mpTag, stopping, false); // pass NULL for IBCastReceive, since only channel 0 receives
|
|
|
broadcastRHS();
|
|
|
channel0Broadcaster->waitReceiverDone(mySlaveNum);
|
|
|
}
|
|
|
}
|
|
|
- void doBroadcastStop(mptag_t tag, broadcast_flags flag)
|
|
|
+ void doBroadcastStop(mptag_t tag, broadcast_flags flag) // only called on channel 0
|
|
|
{
|
|
|
broadcaster->reset();
|
|
|
- broadcaster->start(this, tag, false);
|
|
|
+ broadcaster->start(this, tag, false, true); // nodes broadcasting
|
|
|
Owned<CSendItem> sendItem = broadcaster->newSendItem(bcast_stop);
|
|
|
if (flag)
|
|
|
sendItem->setFlag(flag);
|
|
@@ -1480,13 +1556,9 @@ public:
|
|
|
}
|
|
|
|
|
|
// IBCastReceive (only used if global)
|
|
|
- virtual void bCastReceive(CSendItem *sendItem) // NB: only called on channel 0
|
|
|
+ virtual void bCastReceive(CSendItem *sendItem, bool stop)
|
|
|
{
|
|
|
- if (sendItem && (bcast_stop == sendItem->queryCode()))
|
|
|
- {
|
|
|
- sendItem->Release();
|
|
|
- sendItem = NULL; // NB: NULL indicates end
|
|
|
- }
|
|
|
+ dbgassertex((sendItem==NULL) == stop); // if sendItem==NULL stop must = true, if sendItem != NULL stop must = false;
|
|
|
rowProcessor->addBlock(sendItem);
|
|
|
}
|
|
|
// ISmartBufferNotify
|
|
@@ -1530,7 +1602,7 @@ public:
|
|
|
interface IChannelDistributor
|
|
|
{
|
|
|
virtual void putRow(const void *row) = 0;
|
|
|
- virtual bool spill() = 0;
|
|
|
+ virtual bool spill(bool critical) = 0;
|
|
|
virtual roxiemem::IBufferedRowCallback *queryCallback() = 0;
|
|
|
};
|
|
|
|
|
@@ -1565,8 +1637,10 @@ protected:
|
|
|
using PARENT::defaultRight;
|
|
|
using PARENT::grouped;
|
|
|
using PARENT::abortSoon;
|
|
|
- using PARENT::leftAllocator;
|
|
|
+ using PARENT::rightRowManager;
|
|
|
using PARENT::rightAllocator;
|
|
|
+ using PARENT::sharedRightRowInterfaces;
|
|
|
+ using PARENT::leftAllocator;
|
|
|
using PARENT::returnMany;
|
|
|
using PARENT::fuzzyMatch;
|
|
|
using PARENT::keepLimit;
|
|
@@ -1756,7 +1830,7 @@ protected:
|
|
|
}
|
|
|
try
|
|
|
{
|
|
|
- table->setup(this, size, leftHash, rightHash, compareLeftRight);
|
|
|
+ table->setup(this, rightRowManager, size, leftHash, rightHash, compareLeftRight);
|
|
|
}
|
|
|
catch (IException *e)
|
|
|
{
|
|
@@ -1840,7 +1914,7 @@ protected:
|
|
|
{
|
|
|
try
|
|
|
{
|
|
|
- if (!marker.init(rhs.ordinality()))
|
|
|
+ if (!marker.init(rhs.ordinality(), queryRowManager()))
|
|
|
return false;
|
|
|
}
|
|
|
catch (IException *e)
|
|
@@ -1874,7 +1948,7 @@ protected:
|
|
|
overflowWriteCount = 0;
|
|
|
overflowWriteFile.clear();
|
|
|
overflowWriteStream.clear();
|
|
|
- queryRowManager().addRowBuffer(this);
|
|
|
+ rightRowManager->addRowBuffer(this);
|
|
|
}
|
|
|
doBroadcastRHS(stopping);
|
|
|
|
|
@@ -1892,7 +1966,7 @@ protected:
|
|
|
bool success=false;
|
|
|
try
|
|
|
{
|
|
|
- if (marker.init(rhsRows)) // May fail if insufficient memory available
|
|
|
+ if (marker.init(rhsRows, rightRowManager)) // May fail if insufficient memory available
|
|
|
{
|
|
|
// NB: If marker.init() returned false, it will have called the MM callbacks and have setup hasFailedOverToLocal() already
|
|
|
success = rhs.resize(rhsRows, SPILL_PRIORITY_LOW); // NB: Could OOM, handled by exception handler
|
|
@@ -1962,14 +2036,16 @@ protected:
|
|
|
* Need to remove spill callback and broadcast one last message to know.
|
|
|
*/
|
|
|
|
|
|
- queryRowManager().removeRowBuffer(this);
|
|
|
+ rightRowManager->removeRowBuffer(this);
|
|
|
|
|
|
- ActPrintLog("Broadcasting final split status");
|
|
|
- broadcaster->reset();
|
|
|
+ ActPrintLog("Broadcasting final spilt status: %s", hasFailedOverToLocal() ? "spilt" : "did not spill");
|
|
|
// NB: Will cause other slaves to flush non-local if any have and failedOverToLocal will be set on all
|
|
|
doBroadcastStop(broadcast2MpTag, hasFailedOverToLocal() ? bcastflag_spilt : bcastflag_null);
|
|
|
}
|
|
|
InterChannelBarrier();
|
|
|
+ ActPrintLog("Shared memory manager memory report");
|
|
|
+ rightRowManager->reportMemoryUsage(false);
|
|
|
+ ActPrintLog("End of shared manager memory report");
|
|
|
}
|
|
|
else
|
|
|
{
|
|
@@ -1977,19 +2053,21 @@ protected:
|
|
|
if (isSmart())
|
|
|
{
|
|
|
/* Add IBufferedRowCallback to all channels, because memory pressure can come on any IRowManager
|
|
|
- * However, all invoked callbacks are handled by ch0
|
|
|
+ * However, all invoked callbacks need to be handled by ch0
|
|
|
*/
|
|
|
- queryRowManager().addRowBuffer(lkJoinCh0);
|
|
|
+ rightRowManager->addRowBuffer(lkJoinCh0);
|
|
|
}
|
|
|
doBroadcastRHS(stopping);
|
|
|
InterChannelBarrier(); // wait for channel 0, which will have marked rhsCollated and broadcast spilt status to all others
|
|
|
if (isSmart())
|
|
|
- queryRowManager().removeRowBuffer(lkJoinCh0);
|
|
|
+ rightRowManager->removeRowBuffer(lkJoinCh0);
|
|
|
if (lkJoinCh0->hasFailedOverToLocal())
|
|
|
setFailoverToLocal(true);
|
|
|
rhsCollated = lkJoinCh0->isRhsCollated();
|
|
|
-
|
|
|
}
|
|
|
+ ActPrintLog("Channel memory manager report");
|
|
|
+ queryRowManager()->reportMemoryUsage(false);
|
|
|
+ ActPrintLog("End of channel memory manager report");
|
|
|
return !hasFailedOverToLocal();
|
|
|
}
|
|
|
/*
|
|
@@ -2027,6 +2105,7 @@ protected:
|
|
|
IChannelDistributor **channelDistributors;
|
|
|
unsigned nextSpillChannel;
|
|
|
CriticalSection crit;
|
|
|
+ atomic_t spilt;
|
|
|
public:
|
|
|
CChannelDistributor(CLookupJoinActivityBase &_owner, ICompare *cmp) : owner(_owner)
|
|
|
{
|
|
@@ -2035,6 +2114,7 @@ protected:
|
|
|
channelDistributors = ((CLookupJoinActivityBase *)owner.channels[0])->channelDistributors;
|
|
|
channelDistributors[owner.queryJobChannelNumber()] = this;
|
|
|
nextSpillChannel = 0;
|
|
|
+ atomic_set(&spilt, 0);
|
|
|
//NB: all channels will have done this, before rows are added
|
|
|
}
|
|
|
void process(IRowStream *right)
|
|
@@ -2059,7 +2139,7 @@ protected:
|
|
|
OwnedConstThorRow row = right->nextRow();
|
|
|
if (!row)
|
|
|
break;
|
|
|
- channelCollectorWriter->putRow(row.getClear());
|
|
|
+ putRow(row.getClear());
|
|
|
}
|
|
|
}
|
|
|
IRowStream *getStream(CThorExpandingRowArray *rhs=NULL)
|
|
@@ -2074,7 +2154,7 @@ protected:
|
|
|
unsigned startSpillChannel = nextSpillChannel;
|
|
|
loop
|
|
|
{
|
|
|
- bool res = channelDistributors[nextSpillChannel]->spill();
|
|
|
+ bool res = channelDistributors[nextSpillChannel]->spill(critical);
|
|
|
++nextSpillChannel;
|
|
|
if (nextSpillChannel == owner.queryJob().queryJobChannels())
|
|
|
nextSpillChannel = 0;
|
|
@@ -2096,11 +2176,18 @@ protected:
|
|
|
// IChannelDistributor impl.
|
|
|
virtual void putRow(const void *row)
|
|
|
{
|
|
|
+ if (atomic_cas(&spilt, 0, 1))
|
|
|
+ {
|
|
|
+ StringBuffer traceInfo;
|
|
|
+ if (channelCollector->shrink(&traceInfo)) // grab back some valuable table array space
|
|
|
+ owner.ActPrintLog("CChannelDistributor %s", traceInfo.str());
|
|
|
+ }
|
|
|
channelCollectorWriter->putRow(row);
|
|
|
}
|
|
|
- virtual bool spill()
|
|
|
+ virtual bool spill(bool critical) // called from OOM callback
|
|
|
{
|
|
|
- return channelCollector->spill();
|
|
|
+ atomic_set(&spilt, 1);
|
|
|
+ return channelCollector->spill(critical);
|
|
|
}
|
|
|
virtual roxiemem::IBufferedRowCallback *queryCallback() { return this; }
|
|
|
} channelDistributor(*this, cmp);
|
|
@@ -2114,7 +2201,7 @@ protected:
|
|
|
* However, all invoked callbacks are handled by ch0 and round-robin freeing channel collectors
|
|
|
*/
|
|
|
roxiemem::IBufferedRowCallback *callback = ((CLookupJoinActivityBase *)channels[0])->channelDistributors[0]->queryCallback();
|
|
|
- queryRowManager().addRowBuffer(callback);
|
|
|
+ queryRowManager()->addRowBuffer(callback);
|
|
|
Owned<IRowStream> stream;
|
|
|
Owned<IException> exception;
|
|
|
try
|
|
@@ -2129,7 +2216,7 @@ protected:
|
|
|
InterChannelBarrier(); // wait for channel[0] to process in mem rows 1st
|
|
|
|
|
|
if (getOptBool(THOROPT_LKJOIN_HASHJOINFAILOVER)) // for testing only (force to disk, as if spilt)
|
|
|
- channelDistributor.spill();
|
|
|
+ channelDistributor.spill(false);
|
|
|
|
|
|
Owned<IRowStream> distChannelStream;
|
|
|
if (!rhsCollated) // there may be some more undistributed rows
|
|
@@ -2144,7 +2231,7 @@ protected:
|
|
|
EXCLOG(e, "During channel distribution");
|
|
|
exception.setown(e);
|
|
|
}
|
|
|
- queryRowManager().removeRowBuffer(callback);
|
|
|
+ queryRowManager()->removeRowBuffer(callback);
|
|
|
InterChannelBarrier(); // need barrier point to ensure all have removed callback before channelDistributor is destroyed
|
|
|
if (exception)
|
|
|
throw exception.getClear();
|
|
@@ -2244,7 +2331,9 @@ protected:
|
|
|
ActPrintLog("Global getRHS stopped");
|
|
|
return;
|
|
|
}
|
|
|
- if (!ok)
|
|
|
+ if (ok)
|
|
|
+ ActPrintLog("RHS global rows fitted in memory in this channel, count: %" RIPF "d", rhs.ordinality());
|
|
|
+ else
|
|
|
{
|
|
|
ActPrintLog("Spilt whilst broadcasting, will attempt distributed local lookup join");
|
|
|
|
|
@@ -2287,31 +2376,36 @@ protected:
|
|
|
ActPrintLog("Local SMART JOIN spilt to disk. Failing over to regular local join");
|
|
|
setFailoverToStandard(true);
|
|
|
}
|
|
|
+ else
|
|
|
+ ActPrintLog("RHS local rows fitted in memory in this channel, count: %" RIPF "d", rhs.ordinality());
|
|
|
}
|
|
|
if (!rightStream)
|
|
|
{
|
|
|
// All RHS rows fitted in memory, rows were transferred out back into 'rhs' and sorted
|
|
|
|
|
|
- ActPrintLog("RHS local rows fitted in memory in this channel, count: %" RIPF "d", rhs.ordinality());
|
|
|
- if (hasFailedOverToLocal())
|
|
|
- marker.reset();
|
|
|
- if (!prepareLocalHT(marker))
|
|
|
+ if (isLocal() || hasFailedOverToLocal())
|
|
|
{
|
|
|
- ActPrintLog("Out of memory trying to prepare [LOCAL] hashtable for a SMART join (%" RIPF "d rows), will now failover to a std hash join", rhs.ordinality());
|
|
|
- Owned<IThorRowCollector> collector = createThorRowCollector(*this, queryRowInterfaces(rightITDL), NULL, stableSort_none, rc_mixed, SPILL_PRIORITY_LOOKUPJOIN);
|
|
|
- collector->transferRowsIn(rhs); // can spill after this
|
|
|
- rightStream.setown(collector->getStream());
|
|
|
+ if (hasFailedOverToLocal())
|
|
|
+ marker.reset();
|
|
|
+ if (!prepareLocalHT(marker))
|
|
|
+ {
|
|
|
+ ActPrintLog("Out of memory trying to prepare [LOCAL] hashtable for a SMART join (%" RIPF "d rows), will now failover to a std hash join", rhs.ordinality());
|
|
|
+ Owned<IThorRowCollector> collector = createThorRowCollector(*this, queryRowInterfaces(rightITDL), NULL, stableSort_none, rc_mixed, SPILL_PRIORITY_LOOKUPJOIN);
|
|
|
+ collector->transferRowsIn(rhs); // can spill after this
|
|
|
+ rightStream.setown(collector->getStream());
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
if (rightStream)
|
|
|
{
|
|
|
- ActPrintLog("Performing standard join");
|
|
|
+ ActPrintLog("Performing STANDARD JOIN");
|
|
|
setupStandardJoin(rightStream); // NB: rightStream is sorted
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
if (isLocal() || hasFailedOverToLocal())
|
|
|
{
|
|
|
+ ActPrintLog("Performing LOCAL LOOKUP JOIN: rhs size=%u, lookup table size = %" RIPF "u", rhs.ordinality(), rhsTableLen);
|
|
|
table->addRows(rhs, marker);
|
|
|
tableProxy.set(table);
|
|
|
}
|
|
@@ -2319,6 +2413,7 @@ protected:
|
|
|
{
|
|
|
if (0 == queryJobChannelNumber()) // only ch0 has table, ch>0 will share ch0's table.
|
|
|
{
|
|
|
+ ActPrintLog("Performing GLOBAL LOOKUP JOIN: rhs size=%u, lookup table size = %" RIPF "u", rhs.ordinality(), rhsTableLen);
|
|
|
table->addRows(rhs, marker);
|
|
|
tableProxy.set(table);
|
|
|
InterChannelBarrier();
|
|
@@ -2330,7 +2425,6 @@ protected:
|
|
|
rhsTableLen = tableProxy->queryTableSize();
|
|
|
}
|
|
|
}
|
|
|
- ActPrintLog("rhs table: %d elements", rhsTableLen);
|
|
|
}
|
|
|
}
|
|
|
catch (IException *e)
|
|
@@ -2470,7 +2564,7 @@ public:
|
|
|
if (isGlobal())
|
|
|
{
|
|
|
for (unsigned s=0; s<container.queryJob().querySlaves(); s++)
|
|
|
- rhsSlaveRows.item(s)->setup(queryRowInterfaces(rightITDL), false, stableSort_none, false);
|
|
|
+ rhsSlaveRows.item(s)->setup(sharedRightRowInterfaces, false, stableSort_none, false);
|
|
|
setFailoverToLocal(false);
|
|
|
rhsCollated = rhsCompacted = false;
|
|
|
}
|
|
@@ -2481,7 +2575,6 @@ public:
|
|
|
bool inputGrouped = leftITDL->isGrouped();
|
|
|
dbgassertex(inputGrouped == grouped); // std. lookup join expects these to match
|
|
|
}
|
|
|
-
|
|
|
}
|
|
|
CATCH_NEXTROW()
|
|
|
{
|
|
@@ -2555,7 +2648,7 @@ public:
|
|
|
{
|
|
|
return isSmart() ? false : inputs.item(0)->isGrouped();
|
|
|
}
|
|
|
- virtual void bCastReceive(CSendItem *sendItem) // NB: only called on channel 0
|
|
|
+ virtual void bCastReceive(CSendItem *sendItem, bool stop) // NB: only called on channel 0
|
|
|
{
|
|
|
if (sendItem)
|
|
|
{
|
|
@@ -2564,13 +2657,15 @@ public:
|
|
|
VStringBuffer msg("Notification that node %d spilt", sendItem->queryNode());
|
|
|
clearAllNonLocalRows(msg.str());
|
|
|
}
|
|
|
- else if (0 != (sendItem->queryFlags() & bcastflag_standardjoin))
|
|
|
+ if (bcast_stop == sendItem->queryCode())
|
|
|
{
|
|
|
- VStringBuffer msg("Notification that node %d required standard join", sendItem->queryNode());
|
|
|
- setFailoverToStandard(true);
|
|
|
+ sendItem->Release();
|
|
|
+ if (!stop)
|
|
|
+ return;
|
|
|
+ sendItem = NULL; // fall through, base signals stop to rowProcessor
|
|
|
}
|
|
|
}
|
|
|
- PARENT::bCastReceive(sendItem);
|
|
|
+ PARENT::bCastReceive(sendItem, stop);
|
|
|
}
|
|
|
// IBufferedRowCallback
|
|
|
virtual unsigned getSpillCost() const
|
|
@@ -2659,13 +2754,13 @@ public:
|
|
|
{
|
|
|
reset();
|
|
|
}
|
|
|
- void setup(CSlaveActivity *activity, rowidx_t size, IHash *_leftHash, IHash *_rightHash, ICompare *_compareLeftRight)
|
|
|
+ void setup(CSlaveActivity *activity, roxiemem::IRowManager *rowManager, rowidx_t size, IHash *_leftHash, IHash *_rightHash, ICompare *_compareLeftRight)
|
|
|
{
|
|
|
unsigned __int64 _sz = sizeof(const void *) * ((unsigned __int64)size);
|
|
|
memsize_t sz = (memsize_t)_sz;
|
|
|
if (sz != _sz) // treat as OOM exception for handling purposes.
|
|
|
throw MakeStringException(ROXIEMM_MEMORY_LIMIT_EXCEEDED, "Unsigned overflow, trying to allocate hash table of size: %" I64F "d ", _sz);
|
|
|
- void *ht = activity->queryRowManager().allocate(sz, activity->queryContainer().queryId(), SPILL_PRIORITY_LOW);
|
|
|
+ void *ht = rowManager->allocate(sz, activity->queryContainer().queryId(), SPILL_PRIORITY_LOW);
|
|
|
memset(ht, 0, sz);
|
|
|
htMemory.setown(ht);
|
|
|
tableSize = size;
|
|
@@ -2716,10 +2811,10 @@ public:
|
|
|
{
|
|
|
releaseHTRows();
|
|
|
}
|
|
|
- void setup(CLookupJoinActivityBase<CLookupHT> *_activity, rowidx_t size, IHash *leftHash, IHash *rightHash, ICompare *compareLeftRight)
|
|
|
+ void setup(CLookupJoinActivityBase<CLookupHT> *_activity, roxiemem::IRowManager *rowManager, rowidx_t size, IHash *leftHash, IHash *rightHash, ICompare *compareLeftRight)
|
|
|
{
|
|
|
activity = _activity;
|
|
|
- CHTBase::setup(activity, size, leftHash, rightHash, compareLeftRight);
|
|
|
+ CHTBase::setup(activity, rowManager, size, leftHash, rightHash, compareLeftRight);
|
|
|
ht = (const void **)htMemory.get();
|
|
|
}
|
|
|
void reset()
|
|
@@ -2811,10 +2906,10 @@ public:
|
|
|
{
|
|
|
reset();
|
|
|
}
|
|
|
- void setup(CLookupJoinActivityBase<CLookupManyHT> *_activity, rowidx_t size, IHash *leftHash, IHash *rightHash, ICompare *compareLeftRight)
|
|
|
+ void setup(CLookupJoinActivityBase<CLookupManyHT> *_activity, roxiemem::IRowManager *rowManager, rowidx_t size, IHash *leftHash, IHash *rightHash, ICompare *compareLeftRight)
|
|
|
{
|
|
|
activity = _activity;
|
|
|
- CHTBase::setup(activity, size, leftHash, rightHash, compareLeftRight);
|
|
|
+ CHTBase::setup(activity, rowManager, size, leftHash, rightHash, compareLeftRight);
|
|
|
ht = (HtEntry *)htMemory.get();
|
|
|
}
|
|
|
inline void addEntry(const void *row, unsigned hash, rowidx_t index, rowidx_t count)
|