ソースを参照

Merge pull request #10870 from jakesmith/hpcc-19087

HPCC-19087 Delay IKeyManager::setKey calls to ensure lazy open

Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 7 年 前
コミット
06fff20cbb
1 ファイル変更51 行追加60 行削除
  1. 51 60
      thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp

+ 51 - 60
thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp

@@ -1160,9 +1160,10 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
         IKeyIndex *currentTlk;
         CJoinGroup *currentJG;
         RtlDynamicRowBuilder indexReadFieldsRow;
-        IArrayOf<IKeyManager> partKeyManagers;
+        Owned<IKeyManager> partManager;
+        IArrayOf<IKeyIndex> partKeyIndexes;
 
-        IKeyManager *currentPartKeyManager = nullptr;
+        IKeyIndex *currentPart = nullptr;
         unsigned nextPart;
         unsigned candidateCount;
         __int64 lastSeeks, lastScans;
@@ -1190,7 +1191,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
             currentTlk = NULL;
             lastSeeks = lastScans = 0;
             nextPart = 0; // only used for superkeys of single part keys
-            currentPartKeyManager = nullptr;
+            currentPart = nullptr;
             candidateCount = 0;
         }
     public:
@@ -1199,10 +1200,18 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
         CKeyLocalLookup(CKeyedJoinSlave &_owner) : owner(_owner), indexReadFieldsRow(_owner.indexInputAllocator)
         {
             tlkManager.setown(owner.keyHasTlk ? createLocalKeyManager(nullptr, owner.fixedRecordSize, nullptr) : nullptr);
-
-            if (owner.getKeyManagers(partKeyManagers)) // true signifies that dealing with a local mergable set of index parts
-                currentPartKeyManager = &partKeyManagers.item(0);
             reset();
+            owner.getKeyIndexes(partKeyIndexes);
+            if (owner.localKey && (partKeyIndexes.ordinality() > 1))
+            {
+                Owned<IKeyIndexSet> partKeySet = createKeyIndexSet();
+                ForEachItemIn(i, partKeyIndexes)
+                    partKeySet->addIndex(&partKeyIndexes.item(i));
+                currentPart = &partKeyIndexes.item(0);
+                partManager.setown(createKeyMerger(partKeySet, owner.fixedRecordSize, 0, nullptr));
+            }
+            else
+                partManager.setown(createLocalKeyManager(nullptr, owner.fixedRecordSize, nullptr));
         }
         ~CKeyLocalLookup()
         {
@@ -1281,16 +1290,16 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
             {
                 for (;;)
                 {
-                    if (currentPartKeyManager)
+                    if (currentPart)
                     {
-                        while (currentPartKeyManager->lookup(true))
+                        while (partManager->lookup(true))
                         {
                             ++candidateCount;
                             if (candidateCount > owner.atMost)
                                 break;
-                            KLBlobProviderAdapter adapter(currentPartKeyManager);
+                            KLBlobProviderAdapter adapter(partManager);
                             offset_t fpos;
-                            byte const * keyRow = currentPartKeyManager->queryKeyBuffer(fpos);
+                            byte const * keyRow = partManager->queryKeyBuffer(fpos);
                             if (owner.helper->indexReadMatch(indexReadFieldsRow.getSelf(), keyRow, fpos, &adapter))
                             {
                                 if (currentJG->rowsSeen() >= owner.keepLimit)
@@ -1316,7 +1325,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
 #ifdef TRACE_JOINGROUPS
                                 ::ActPrintLog(&owner, "CJoinGroup [result] %x from %d", currentJG, __LINE__);
 #endif
-                                noteStats(currentPartKeyManager->querySeeks(), currentPartKeyManager->queryScans());
+                                noteStats(partManager->querySeeks(), partManager->queryScans());
                                 size32_t lorsz = owner.keyLookupAllocator->queryOutputMeta()->getRecordSize(lookupRow.getSelf());
                                 // must be easier way
                                 return lookupRow.finalizeRowClear(lorsz);
@@ -1327,20 +1336,21 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
                                 owner.statsArr[AS_PostFiltered]++;
                             }
                         }
-                        currentPartKeyManager->releaseSegmentMonitors();
-                        noteStats(currentPartKeyManager->querySeeks(), currentPartKeyManager->queryScans());
-                        currentPartKeyManager = nullptr;
+                        partManager->releaseSegmentMonitors();
+                        noteStats(partManager->querySeeks(), partManager->queryScans());
+                        currentPart = nullptr;
                         if (owner.localKey)
                         { // merger done
                         }
                         else if (!owner.keyHasTlk)
                         {
-                            if (nextPart < partKeyManagers.ordinality())
+                            if (nextPart < partKeyIndexes.ordinality())
                             {
-                                currentPartKeyManager = &partKeyManagers.item(nextPart++);
-                                owner.helper->createSegmentMonitors(currentPartKeyManager, indexReadFieldsRow.getSelf());
-                                currentPartKeyManager->finishSegmentMonitors();
-                                currentPartKeyManager->reset();
+                                currentPart = &partKeyIndexes.item(nextPart++);
+                                partManager->setKey(currentPart);
+                                owner.helper->createSegmentMonitors(partManager, indexReadFieldsRow.getSelf());
+                                partManager->finishSegmentMonitors();
+                                partManager->reset();
                             }
                         }
                     }
@@ -1354,14 +1364,15 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
                                 unsigned partNo = (unsigned)tlkManager->queryFpos();
                                 partNo = owner.superWidth ? owner.superWidth*nextTlk+(partNo-1) : partNo-1;
 
-                                currentPartKeyManager = &partKeyManagers.item(partNo);
-                                owner.helper->createSegmentMonitors(currentPartKeyManager, indexReadFieldsRow.getSelf());
-                                currentPartKeyManager->finishSegmentMonitors();
-                                currentPartKeyManager->reset();
+                                currentPart = &partKeyIndexes.item(partNo);
+                                partManager->setKey(currentPart);
+                                owner.helper->createSegmentMonitors(partManager, indexReadFieldsRow.getSelf());
+                                partManager->finishSegmentMonitors();
+                                partManager->reset();
                                 break;
                             }
                         }
-                        if (!currentPartKeyManager)
+                        if (!currentPart)
                         {
                             if (++nextTlk < owner.tlkKeySet->numParts())
                             {
@@ -1402,8 +1413,8 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
 #ifdef TRACE_JOINGROUPS
                             ::ActPrintLog(&owner, "CJoinGroup [end marker returned] %x from %d", currentJG, __LINE__);
 #endif
-                            if (currentPartKeyManager)
-                                noteStats(currentPartKeyManager->querySeeks(), currentPartKeyManager->queryScans());
+                            if (currentPart)
+                                noteStats(partManager->querySeeks(), partManager->queryScans());
                             currentJG = NULL;
                             size32_t lorsz = owner.keyLookupAllocator->queryOutputMeta()->getRecordSize(lookupRow.getSelf());
                             // must be easier way
@@ -1418,18 +1429,21 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
                         currentJG->notePendingEndCandidate();
 
                         candidateCount = 0;
-                        if (0 == partKeyManagers.ordinality()) // if empty key
+                        if (0 == partKeyIndexes.ordinality()) // if empty key
                         {
                             // will terminate row/group next cycle
                         }
                         else if (!owner.keyHasTlk)
                         {
-                            currentPartKeyManager = &partKeyManagers.item(0);
-                            if (!owner.localKey || 1 == partKeyManagers.ordinality())
+                            currentPart = &partKeyIndexes.item(0);
+                            if (!owner.localKey || 1 == partKeyIndexes.ordinality())
+                            {
                                 nextPart = 1;
-                            owner.helper->createSegmentMonitors(currentPartKeyManager, indexReadFieldsRow.getSelf());
-                            currentPartKeyManager->finishSegmentMonitors();
-                            currentPartKeyManager->reset();
+                                partManager->setKey(currentPart);
+                            }
+                            owner.helper->createSegmentMonitors(partManager, indexReadFieldsRow.getSelf());
+                            partManager->finishSegmentMonitors();
+                            partManager->reset();
                         }
                         else
                         {
@@ -1448,8 +1462,8 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
                 ::ActPrintLog(&owner, e);
                 throw;
             }
-            if (currentPartKeyManager)
-                noteStats(currentPartKeyManager->querySeeks(), currentPartKeyManager->queryScans());
+            if (currentPart)
+                noteStats(partManager->querySeeks(), partManager->queryScans());
             return NULL;
         }
 
@@ -1559,11 +1573,9 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
     friend class CKeyLookupPoolMember;
     };
 
-    bool getKeyManagers(IArrayOf<IKeyManager> &keyManagers)
+    void getKeyIndexes(IArrayOf<IKeyIndex> &keyIndexes)
     {
         unsigned numIndexParts = indexParts.ordinality();
-        bool localMergedKey = localKey && (numIndexParts > 1);
-        Owned<IKeyIndexSet> partKeySet;
         for (unsigned ip=0; ip<numIndexParts; ip++)
         {
             IPartDescriptor &filePart = indexParts.item(ip);
@@ -1576,30 +1588,9 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
 
             Owned<IDelayedFile> lfile = queryThor().queryFileCache().lookup(*this, indexName, filePart);
 
-            Owned<IKeyManager> klManager;
-            if (localMergedKey)
-            {
-                Owned<IKeyIndex> partIndex = createKeyIndex(filename.str(), crc, *lfile, false, false);
-                if (!partKeySet)
-                    partKeySet.setown(createKeyIndexSet());
-                partKeySet->addIndex(partIndex.getClear());
-            }
-            else
-            {
-                bool allowRemote = getOptBool("remoteKeyFilteringEnabled");
-                bool forceRemote = allowRemote ? getOptBool("forceDafilesrv") : false; // can only force remote, if forceDafilesrv and remoteKeyFilteringEnabled are enabled.
-                klManager.setown(createKeyManager(filename, fixedRecordSize, crc, lfile, allowRemote, forceRemote));
-                keyManagers.append(*klManager.getClear());
-            }
+            Owned<IKeyIndex> keyIndex = createKeyIndex(filename, crc, *lfile, false, false);
+            keyIndexes.append(*keyIndex.getClear());
         }
-        if (localMergedKey)
-        {
-            dbgassertex(0 == keyManagers.ordinality());
-            keyManagers.append(*createKeyMerger(partKeySet, fixedRecordSize, 0, nullptr));
-            return true;
-        }
-        else
-            return false;
     }
 public:
     IMPLEMENT_IINTERFACE_USING(CSlaveActivity);