|
@@ -160,37 +160,43 @@ class CKeyedJoinMaster : public CMasterActivity
|
|
|
INode &groupNode = dfsGroup.queryNode(gn);
|
|
|
if ((partNode->equals(&groupNode)))
|
|
|
{
|
|
|
- /* NB: If there's >1 slave per node (e.g. slavesPerNode>1) then there are multiple matching node's in the dfsGroup
|
|
|
- * Which means a copy of a part may already be assigned to a cluster slave map. This check avoid handling it again if it has.
|
|
|
- */
|
|
|
- if (!partsOnSlaves->testSet(groupSize*p+gn))
|
|
|
+ RemoteFilename rfn;
|
|
|
+ part->getFilename(c, rfn);
|
|
|
+ Owned<IFile> file = createIFile(rfn);
|
|
|
+ if (file->exists()) // skip if copy doesn't exist
|
|
|
{
|
|
|
- std::vector<unsigned> &slaveParts = querySlaveParts(gn);
|
|
|
- if (NotFound == mappedPos)
|
|
|
+ /* NB: If there's >1 slave per node (e.g. slavesPerNode>1) then there are multiple matching node's in the dfsGroup
|
|
|
+ * Which means a copy of a part may already be assigned to a cluster slave map. This check avoid handling it again if it has.
|
|
|
+ */
|
|
|
+ if (!partsOnSlaves->testSet(groupSize*p+gn))
|
|
|
{
|
|
|
- /* NB: to avoid all parts being mapped to same remote slave process (significant if slavesPerNode>1)
|
|
|
- * or (conditionally) all accessible locals being added to all slaves (which may have detrimental effect on key node caching)
|
|
|
- * inc. group start pos for beginning of next search.
|
|
|
- */
|
|
|
- slaveParts.push_back(partCopy);
|
|
|
- if (activity.queryContainer().queryJob().queryChannelsPerSlave()>1)
|
|
|
- mappedPos = gn % queryNodeClusterWidth();
|
|
|
- else
|
|
|
- mappedPos = gn;
|
|
|
- nextGroupStartPos = gn+1;
|
|
|
- if (nextGroupStartPos == groupSize)
|
|
|
- nextGroupStartPos = 0;
|
|
|
-
|
|
|
- /* NB: normally if the part is within the cluster, the copy will be 0 (i.e. primary)
|
|
|
- * But it's possible that a non-primary copy from another logical cluster is local to
|
|
|
- * this cluster, in which case, must capture which copy it is here in the map, so the
|
|
|
- * slaves can send the requests to the correct slave and tell it to deal with the
|
|
|
- * correct copy.
|
|
|
- */
|
|
|
- mappedPos |= (c << slaveBits); // encode which copy into mappedPos
|
|
|
+ std::vector<unsigned> &slaveParts = querySlaveParts(gn);
|
|
|
+ if (NotFound == mappedPos)
|
|
|
+ {
|
|
|
+ /* NB: to avoid all parts being mapped to same remote slave process (significant if slavesPerNode>1)
|
|
|
+ * or (conditionally) all accessible locals being added to all slaves (which may have detrimental effect on key node caching)
|
|
|
+ * inc. group start pos for beginning of next search.
|
|
|
+ */
|
|
|
+ slaveParts.push_back(partCopy);
|
|
|
+ if (activity.queryContainer().queryJob().queryChannelsPerSlave()>1)
|
|
|
+ mappedPos = gn % queryNodeClusterWidth();
|
|
|
+ else
|
|
|
+ mappedPos = gn;
|
|
|
+ nextGroupStartPos = gn+1;
|
|
|
+ if (nextGroupStartPos == groupSize)
|
|
|
+ nextGroupStartPos = 0;
|
|
|
+
|
|
|
+ /* NB: normally if the part is within the cluster, the copy will be 0 (i.e. primary)
|
|
|
+ * But it's possible that a non-primary copy from another logical cluster is local to
|
|
|
+ * this cluster, in which case, must capture which copy it is here in the map, so the
|
|
|
+ * slaves can send the requests to the correct slave and tell it to deal with the
|
|
|
+ * correct copy.
|
|
|
+ */
|
|
|
+ mappedPos |= (c << slaveBits); // encode which copy into mappedPos
|
|
|
+ }
|
|
|
+ else if (allLocal) // all slaves get all locally accessible parts
|
|
|
+ slaveParts.push_back(partCopy);
|
|
|
}
|
|
|
- else if (allLocal) // all slaves get all locally accessible parts
|
|
|
- slaveParts.push_back(partCopy);
|
|
|
}
|
|
|
}
|
|
|
gn++;
|