|
@@ -3162,7 +3162,7 @@ class HashJoinSlaveActivity : public CSlaveActivity, public CThorDataLink, imple
|
|
|
bool leftdone;
|
|
|
mptag_t mptag;
|
|
|
mptag_t mptag2;
|
|
|
- Owned<IHashDistributor> distributor;
|
|
|
+ Owned<IHashDistributor> lhsDistributor, rhsDistributor;
|
|
|
|
|
|
public:
|
|
|
|
|
@@ -3207,25 +3207,27 @@ public:
|
|
|
IHash *ihashR = joinargs->queryHashRight();
|
|
|
ICompare *icompareL = joinargs->queryCompareLeft();
|
|
|
ICompare *icompareR = joinargs->queryCompareRight();
|
|
|
- if (!distributor)
|
|
|
- distributor.setown(createHashDistributor(this, container.queryJob().queryJobComm(), mptag, abortSoon,false, this));
|
|
|
- Owned<IRowStream> reader = distributor->connect(queryRowInterfaces(inL), inL, ihashL, icompareL);
|
|
|
+ if (!lhsDistributor)
|
|
|
+ lhsDistributor.setown(createHashDistributor(this, container.queryJob().queryJobComm(), mptag, abortSoon,false, this));
|
|
|
+ Owned<IRowStream> reader = lhsDistributor->connect(queryRowInterfaces(inL), inL, ihashL, icompareL);
|
|
|
Owned<IThorRowLoader> loaderL = createThorRowLoader(*this, ::queryRowInterfaces(inL), icompareL, true, rc_allDisk, SPILL_PRIORITY_HASHJOIN);
|
|
|
strmL.setown(loaderL->load(reader, abortSoon));
|
|
|
loaderL.clear();
|
|
|
reader.clear();
|
|
|
stopInputL();
|
|
|
- distributor->disconnect(false);
|
|
|
- distributor->join();
|
|
|
+ lhsDistributor->disconnect(false);
|
|
|
+ lhsDistributor->join();
|
|
|
leftdone = true;
|
|
|
- reader.setown(distributor->connect(queryRowInterfaces(inR), inR, ihashR, icompareR));
|
|
|
+ if (!rhsDistributor)
|
|
|
+ rhsDistributor.setown(createHashDistributor(this, container.queryJob().queryJobComm(), mptag2, abortSoon,false, this));
|
|
|
+ reader.setown(rhsDistributor->connect(queryRowInterfaces(inR), inR, ihashR, icompareR));
|
|
|
Owned<IThorRowLoader> loaderR = createThorRowLoader(*this, ::queryRowInterfaces(inR), icompareR, true, rc_mixed, SPILL_PRIORITY_HASHJOIN);;
|
|
|
strmR.setown(loaderR->load(reader, abortSoon));
|
|
|
loaderR.clear();
|
|
|
reader.clear();
|
|
|
stopInputR();
|
|
|
- distributor->disconnect(false);
|
|
|
- distributor->join();
|
|
|
+ rhsDistributor->disconnect(false);
|
|
|
+ rhsDistributor->join();
|
|
|
{ CriticalBlock b(joinHelperCrit);
|
|
|
switch(container.getKind())
|
|
|
{
|