|
@@ -1008,7 +1008,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
|
|
|
for (auto &partCopy : parts)
|
|
|
{
|
|
|
unsigned partNo = partCopy & partMask;
|
|
|
- unsigned copy = partCopy >> 24;
|
|
|
+ unsigned copy = partCopy >> partBits;
|
|
|
IPartDescriptor &pd = allParts->item(partNo);
|
|
|
RemoteFilename rfn;
|
|
|
pd.getFilename(copy, rfn);
|
|
@@ -1215,7 +1215,6 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
|
|
|
limiter->dec(); // unblocks any requests to start lookup threads
|
|
|
}
|
|
|
};
|
|
|
- static const unsigned partMask = 0x00ffffff;
|
|
|
|
|
|
class CKeyLookupLocalBase : public CLookupHandler
|
|
|
{
|
|
@@ -1336,7 +1335,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
|
|
|
{
|
|
|
unsigned partCopy = parts[selected];
|
|
|
unsigned partNo = partCopy & partMask;
|
|
|
- unsigned copy = partCopy >> 24;
|
|
|
+ unsigned copy = partCopy >> partBits;
|
|
|
IKeyManager *&keyManager = keyManagers[selected];
|
|
|
if (!keyManager) // delayed until actually needed
|
|
|
{
|
|
@@ -1366,7 +1365,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
|
|
|
for (auto &partCopy: parts)
|
|
|
{
|
|
|
unsigned partNo = partCopy & partMask;
|
|
|
- unsigned copy = partCopy >> 24;
|
|
|
+ unsigned copy = partCopy >> partBits;
|
|
|
Owned<IKeyIndex> keyIndex = activity.createPartKeyIndex(partNo, copy, false);
|
|
|
partKeySet->addIndex(keyIndex.getClear());
|
|
|
}
|
|
@@ -1546,7 +1545,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
|
|
|
{
|
|
|
unsigned partCopy = parts[selected];
|
|
|
unsigned partNo = partCopy & partMask;
|
|
|
- unsigned copy = partCopy >> 24;
|
|
|
+ unsigned copy = partCopy >> partBits;
|
|
|
|
|
|
unsigned numRows = processing.ordinality();
|
|
|
|
|
@@ -1701,7 +1700,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
|
|
|
{
|
|
|
unsigned partCopy = parts[selected];
|
|
|
unsigned partNo = partCopy & partMask;
|
|
|
- unsigned copy = partCopy >> 24;
|
|
|
+ unsigned copy = partCopy >> partBits;
|
|
|
|
|
|
ScopedAtomic<unsigned __int64> diskRejected(activity.statsArr[AS_DiskRejected]);
|
|
|
ScopedAtomic<unsigned __int64> diskSeeks(activity.statsArr[AS_DiskSeeks]);
|
|
@@ -1852,7 +1851,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
|
|
|
{
|
|
|
unsigned partCopy = parts[selected];
|
|
|
unsigned partNo = partCopy & partMask;
|
|
|
- unsigned copy = partCopy >> 24;
|
|
|
+ unsigned copy = partCopy >> partBits;
|
|
|
|
|
|
CMessageBuffer msg;
|
|
|
prepAndSend(msg, processing, selected, partNo, copy);
|
|
@@ -2140,6 +2139,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
|
|
|
|
|
|
CriticalSection fetchFileCrit;
|
|
|
std::vector<PartIO> openFetchParts;
|
|
|
+ const unsigned unknownCopyNum = 0xff; // in a partCopy denotes that a copy is unknown.
|
|
|
|
|
|
PartIO getFetchPartIO(unsigned partNo, unsigned copy, bool compressed, bool encrypted)
|
|
|
{
|
|
@@ -2612,26 +2612,37 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
|
|
|
}
|
|
|
return nullptr;
|
|
|
}
|
|
|
-
|
|
|
bool transferToDoneList(CJoinGroup *joinGroup, bool markBlocked) // NB: always coming from pendingJoinGroupList
|
|
|
{
|
|
|
doneJoinGroupList.addToTail(joinGroup);
|
|
|
pendingKeyLookupLimiter.dec();
|
|
|
return doneListLimiter.preIncNonBlocking(markBlocked);
|
|
|
}
|
|
|
-
|
|
|
void addPartToHandler(CHandlerContainer &handlerContainer, const std::vector<unsigned> &partToSlaveMap, unsigned partCopy, HandlerType hType, std::vector<unsigned> &handlerCounts, std::vector<std::vector<CLookupHandler *>> &slaveHandlers, std::vector<unsigned> &slaveHandlersRR)
|
|
|
{
|
|
|
// NB: This is called in partNo ascending order
|
|
|
|
|
|
unsigned partNo = partCopy & partMask;
|
|
|
- unsigned copy = partCopy >> 24;
|
|
|
+ unsigned copy = partCopy >> partBits;
|
|
|
unsigned slave = 0;
|
|
|
if (partToSlaveMap.size())
|
|
|
{
|
|
|
slave = partToSlaveMap[partNo];
|
|
|
if (NotFound == slave) // part not local to cluster, part is handled locally/directly.
|
|
|
slave = handlerCounts.size()-1; // last one reserved for out of cluster part handling.
|
|
|
+ else
|
|
|
+ {
|
|
|
+ if (unknownCopyNum == copy)// this means that this is a part for a remote slave, and copy is unknown
|
|
|
+ {
|
|
|
+ /* The partToSlaveMap encodes the target slave and the copy.
|
|
|
+ * Extract the copy, so that the correct copy is used when request
|
|
|
+ * arrives at the remote slave.
|
|
|
+ */
|
|
|
+ copy = slave >> slaveBits;
|
|
|
+ partCopy = partNo | (copy << partBits);
|
|
|
+ }
|
|
|
+ slave = slave & slaveMask;
|
|
|
+ }
|
|
|
}
|
|
|
unsigned max = queryMaxHandlers(hType);
|
|
|
unsigned &handlerCount = handlerCounts[slave];
|
|
@@ -2723,11 +2734,11 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
|
|
|
partNo = partCopy & partMask;
|
|
|
if (superFdesc)
|
|
|
{
|
|
|
- unsigned copy = partCopy >> 24;
|
|
|
+ unsigned copy = partCopy >> partBits;
|
|
|
unsigned subfile, subpartnum;
|
|
|
superFdesc->mapSubPart(partNo, subfile, subpartnum);
|
|
|
partNo = superWidth*subfile+subpartnum;
|
|
|
- partCopy = partNo | (copy << 24);
|
|
|
+ partCopy = partNo | (copy << partBits);
|
|
|
}
|
|
|
}
|
|
|
else
|
|
@@ -2740,7 +2751,10 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
|
|
|
while (p<partNo)
|
|
|
{
|
|
|
if (remoteLookup) // NB: only relevant if ,LOCAL and only some parts avail. otherwise if !remoteLookup all parts will have been sent
|
|
|
- addPartToHandler(handlerContainer, partToSlaveMap, p, missingHandlerType, handlerCounts, slaveHandlers, slaveHandlersRR);
|
|
|
+ {
|
|
|
+ unsigned remotePartCopy = p | (unknownCopyNum << partBits); // copy will be looked up via map in addPartToHandler
|
|
|
+ addPartToHandler(handlerContainer, partToSlaveMap, remotePartCopy, missingHandlerType, handlerCounts, slaveHandlers, slaveHandlersRR);
|
|
|
+ }
|
|
|
else // no handler if local KJ and part not local
|
|
|
handlerContainer.partIdxToHandler.push_back(nullptr);
|
|
|
++p;
|