Kaynağa Gözat

Merge pull request #13182 from jakesmith/hpcc-23163-kj-failedtoopen

HPCC-23163 KJ fix for remote handling of parts from other clusters

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 5 yıl önce
ebeveyn
işleme
c07376c95c

+ 10 - 2
thorlcr/activities/keyedjoin/thkeyedjoin.cpp

@@ -24,6 +24,7 @@
 
 #include "../fetch/thfetchcommon.hpp"
 #include "../hashdistrib/thhashdistrib.ipp"
+#include "thkeyedjoincommon.hpp"
 #include "thkeyedjoin.ipp"
 #include "jhtree.hpp"
 
@@ -46,7 +47,6 @@ class CKeyedJoinMaster : public CMasterActivity
     // CMap contains mappings and lists of parts for each slave
     class CMap
     {
-        static const unsigned partMask = 0x00ffffff;
     public:
         std::vector<unsigned> allParts;
         std::vector<std::vector<unsigned>> slavePartMap; // vector of slave parts (IPartDescriptor's slavePartMap[<slave>] serialized to each slave)
@@ -152,7 +152,7 @@ class CKeyedJoinMaster : public CMasterActivity
                         for (unsigned c=0; c<part->numCopies(); c++)
                         {
                             INode *partNode = part->queryNode(c);
-                            unsigned partCopy = p | (c << 24);
+                            unsigned partCopy = p | (c << partBits);
                             unsigned start=nextGroupStartPos;
                             unsigned gn=start;
                             do
@@ -180,6 +180,14 @@ class CKeyedJoinMaster : public CMasterActivity
                                             nextGroupStartPos = gn+1;
                                             if (nextGroupStartPos == groupSize)
                                                 nextGroupStartPos = 0;
+
+                                            /* NB: normally if the part is within the cluster, the copy will be 0 (i.e. primary)
+                                             * But it's possible that a non-primary copy from another logical cluster is local to
+                                             * this cluster, in which case, must capture which copy it is here in the map, so the
+                                             * slaves can send the requests to the correct slave and tell it to deal with the
+                                             * correct copy.
+                                             */
+                                            mappedPos |= (c << slaveBits); // encode which copy into mappedPos
                                         }
                                         else if (allLocal) // all slaves get all locally accessible parts
                                             slaveParts.push_back(partCopy);

+ 7 - 0
thorlcr/activities/keyedjoin/thkeyedjoincommon.hpp

@@ -50,4 +50,11 @@ enum KJServiceCmds:byte { kjs_nop, kjs_keyopen, kjs_keyread, kjs_keyclose, kjs_f
 enum KJFetchFlags:byte { kjf_nop=0x0, kjf_compressed=0x1, kjf_encrypted=0x2 };
 enum KJServiceErrorCode:byte { kjse_nop, kjse_exception, kjse_unknownhandle };
 
+constexpr unsigned partBits = 24;
+constexpr unsigned partMask = 0x00ffffff;
+// the same as part, but for clarify has own symbols
+constexpr unsigned slaveBits = 24;
+constexpr unsigned slaveMask = 0x00ffffff;
+
+
 #endif

+ 27 - 13
thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp

@@ -561,7 +561,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor
             for (auto &partCopy : parts)
             {
                 unsigned partNo = partCopy & partMask;
-                unsigned copy = partCopy >> 24;
+                unsigned copy = partCopy >> partBits;
                 IPartDescriptor &pd = allParts->item(partNo);
                 RemoteFilename rfn;
                 pd.getFilename(copy, rfn);
@@ -761,7 +761,6 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor
                 limiter->dec(); // unblocks any requests to start lookup threads
         }
     };
-    static const unsigned partMask = 0x00ffffff;
 
     class CKeyLookupLocalBase : public CLookupHandler
     {
@@ -882,7 +881,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor
         {
             unsigned partCopy = parts[selected];
             unsigned partNo = partCopy & partMask;
-            unsigned copy = partCopy >> 24;
+            unsigned copy = partCopy >> partBits;
             IKeyManager *&keyManager = keyManagers[selected];
             if (!keyManager) // delayed until actually needed
             {
@@ -912,7 +911,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor
                 for (auto &partCopy: parts)
                 {
                     unsigned partNo = partCopy & partMask;
-                    unsigned copy = partCopy >> 24;
+                    unsigned copy = partCopy >> partBits;
                     Owned<IKeyIndex> keyIndex = activity.createPartKeyIndex(partNo, copy, false);
                     partKeySet->addIndex(keyIndex.getClear());
                 }
@@ -1094,7 +1093,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor
         {
             unsigned partCopy = parts[selected];
             unsigned partNo = partCopy & partMask;
-            unsigned copy = partCopy >> 24;
+            unsigned copy = partCopy >> partBits;
 
             unsigned numRows = processing.ordinality();
 
@@ -1248,7 +1247,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor
         {
             unsigned partCopy = parts[selected];
             unsigned partNo = partCopy & partMask;
-            unsigned copy = partCopy >> 24;
+            unsigned copy = partCopy >> partBits;
 
             ScopedAtomic<unsigned __int64> diskRejected(activity.statsArr[AS_DiskRejected]);
             ScopedAtomic<unsigned __int64> diskSeeks(activity.statsArr[AS_DiskSeeks]);
@@ -1396,7 +1395,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor
         {
             unsigned partCopy = parts[selected];
             unsigned partNo = partCopy & partMask;
-            unsigned copy = partCopy >> 24;
+            unsigned copy = partCopy >> partBits;
 
             CMessageBuffer msg;
             prepAndSend(msg, processing, selected, partNo, copy);
@@ -1663,6 +1662,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor
 
     CriticalSection fetchFileCrit;
     std::vector<PartIO> openFetchParts;
+    const unsigned unknownCopyNum = 0xff; // in a partCopy denotes that a copy is unknown.
 
     PartIO getFetchPartIO(unsigned partNo, unsigned copy, bool compressed, bool encrypted)
     {
@@ -2126,26 +2126,37 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor
         }
         return nullptr;
     }
-
     bool transferToDoneList(CJoinGroup *joinGroup)
     {
         doneJoinGroupList.addToTail(joinGroup);
         pendingKeyLookupLimiter.dec();
         return doneListLimiter.preIncNonBlocking();
     }
-
     void addPartToHandler(CHandlerContainer &handlerContainer, const std::vector<unsigned> &partToSlaveMap, unsigned partCopy, HandlerType hType, std::vector<unsigned> &handlerCounts, std::vector<std::vector<CLookupHandler *>> &slaveHandlers, std::vector<unsigned> &slaveHandlersRR)
     {
         // NB: This is called in partNo ascending order
 
         unsigned partNo = partCopy & partMask;
-        unsigned copy = partCopy >> 24;
+        unsigned copy = partCopy >> partBits;
         unsigned slave = 0;
         if (partToSlaveMap.size())
         {
             slave = partToSlaveMap[partNo];
             if (NotFound == slave) // part not local to cluster, part is handled locally/directly.
                 slave = handlerCounts.size()-1; // last one reserved for out of cluster part handling.
+            else
+            {
+                if (unknownCopyNum == copy)// this means that this is a part for a remote slave, and copy is unknown
+                {
+                    /* The partToSlaveMap encodes the target slave and the copy.
+                     * Extract the copy, so that the correct copy is used when request
+                     * arrives at the remote slave.
+                     */
+                    copy = slave >> slaveBits;
+                    partCopy = partNo | (copy << partBits);
+                }
+                slave = slave & slaveMask;
+            }
         }
         unsigned max = queryMaxHandlers(hType);
         unsigned &handlerCount = handlerCounts[slave];
@@ -2237,11 +2248,11 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor
                     partNo = partCopy & partMask;
                     if (superFdesc)
                     {
-                        unsigned copy = partCopy >> 24;
+                        unsigned copy = partCopy >> partBits;
                         unsigned subfile, subpartnum;
                         superFdesc->mapSubPart(partNo, subfile, subpartnum);
                         partNo = superWidth*subfile+subpartnum;
-                        partCopy = partNo | (copy << 24);
+                        partCopy = partNo | (copy << partBits);
                     }
                 }
                 else
@@ -2254,7 +2265,10 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor
                 while (p<partNo)
                 {
                     if (remoteLookup) // NB: only relevant if ,LOCAL and only some parts avail. otherwise if !remoteLookup all parts will have been sent
-                        addPartToHandler(handlerContainer, partToSlaveMap, p, missingHandlerType, handlerCounts, slaveHandlers, slaveHandlersRR);
+                    {
+                        unsigned remotePartCopy = p | (unknownCopyNum << partBits); // copy will be looked up via map in addPartToHandler
+                        addPartToHandler(handlerContainer, partToSlaveMap, remotePartCopy, missingHandlerType, handlerCounts, slaveHandlers, slaveHandlersRR);
+                    }
                     else // no handler if local KJ and part not local
                         handlerContainer.partIdxToHandler.push_back(nullptr);
                     ++p;