Browse Source

HPCC-23211 Fix remote KJ's in CQ's that depend on parent context

Signed-off-by: Jake Smith <jake.smith@lexisnexisrisk.com>
Jake Smith 5 years ago
parent
commit
2839a58c2c

+ 3 - 0
testing/regress/ecl/key/keyed_join5.xml

@@ -55,3 +55,6 @@
 <Dataset name='Result 10'>
  <Row><key>0</key><f1>limit hit </f1><f2>          </f2></Row>
 </Dataset>
+<Dataset name='Result 11'>
+ <Row><Result_11>59</Result_11></Row>
+</Dataset>

+ 13 - 0
testing/regress/ecl/keyed_join5.ecl

@@ -89,6 +89,17 @@ j5 := TABLE(JOIN(GROUP(lhs, someid), i, LEFT.lhsKey=RIGHT.key, KEEP(2)), { lhsKe
 // test helper->getRowLimit, generated inside KJ by enclosing within LIMIT()
 j6 := LIMIT(JOIN(lhs, i, LEFT.lhsKey=RIGHT.key, doHKJoinTrans(LEFT, RIGHT), KEEP(3)), 2, onFail(TRANSFORM(rhsRec, SELF.f1 := 'limit hit'; SELF := [])));
 
+
+childFunc(unsigned v) := FUNCTION
+ j := JOIN(lhs, i, v>20 AND v<80 AND LEFT.someid=RIGHT.key);
+ RETURN IF(COUNT(j)>0, j[1].key, 0);
+END;
+
+parentDs := DATASET(100, TRANSFORM({unsigned4 id1; unsigned4 id2}, SELF.id1 := COUNTER; SELF.id2 := 0));
+j7 := PROJECT(parentDs, TRANSFORM(RECORDOF(parentDs), SELF.id2 := childFunc(LEFT.id1); SELF := LEFT));
+j7sumid2 := SUM(j7, id2);
+
+
 SEQUENTIAL(
  OUTPUT(rhs, , '~REGRESS::'+WORKUNIT+'::rhsDs', OVERWRITE);
  BUILD(i, OVERWRITE);
@@ -102,5 +113,7 @@ SEQUENTIAL(
 
   OUTPUT(j5);
   OUTPUT(j6);
+
+  OUTPUT(j7sumid2);
  );
 );

+ 10 - 1
thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp

@@ -158,7 +158,7 @@ static const unsigned defaultFetchLookupProcessBatchLimit = 10000;
 class CJoinGroup;
 
 
-enum AllocatorTypes { AT_Transform=1, AT_LookupWithJG, AT_LookupWithJGRef, AT_JoinFields, AT_FetchRequest, AT_FetchResponse, AT_JoinGroup, AT_JoinGroupRhsRows, AT_FetchDisk, AT_LookupResponse };
+enum AllocatorTypes { AT_Transform=1, AT_LookupWithJG, AT_JoinFields, AT_FetchRequest, AT_FetchResponse, AT_JoinGroup, AT_JoinGroupRhsRows, AT_FetchDisk, AT_LookupResponse };
 
 
 struct Row
@@ -1496,6 +1496,13 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
                 activity.queryHelper()->serializeCreateContext(msg);
                 sizeMark.write();
 
+                size32_t parentExtractSz;
+                const byte *parentExtract = activity.queryGraph().queryParentExtract(parentExtractSz);
+                msg.append(parentExtractSz);
+                msg.append(parentExtractSz, parentExtract);
+                msg.append(activity.startCtxMb.length());
+                msg.append(activity.startCtxMb.length(), activity.startCtxMb.toByteArray());
+
                 msg.append(activity.messageCompression);
                 // NB: potentially translation per part could be different if dealing with superkeys
                 IPropertyTree &props = part.queryOwner().queryProperties();
@@ -2145,6 +2152,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
     roxiemem::IRowManager *rowManager = nullptr;
     unsigned currentAdded = 0;
     unsigned currentJoinGroupSize = 0;
+    MemoryBuffer startCtxMb;
 
     Owned<IThorRowInterfaces> fetchInputMetaRowIf; // fetch request rows, header + fetch fields
     Owned<IThorRowInterfaces> fetchOutputMetaRowIf; // fetch request reply rows, header + [join fields as child row]
@@ -3028,6 +3036,7 @@ public:
         rowLimit = (rowcount_t)helper->getRowLimit();
         if (rowLimit < keepLimit)
             keepLimit = rowLimit+1; // if keepLimit is small, let it reach rowLimit+1, but any more is pointless and a waste of time/resources.
+        helper->serializeStartContext(startCtxMb.clear());
 
         inputHelper.set(input->queryFromActivity()->queryContainer().queryHelper());
         preserveOrder = 0 == (joinFlags & JFreorderable);

+ 2 - 0
thorlcr/activities/loop/thloopslave.cpp

@@ -517,6 +517,8 @@ public:
                 if (condLoopCounter)
                     boundGraph->prepareCounterResult(*this, results, condLoopCounter, 0);
                 sendLoopingCount(loopCounter, 0);
+                size32_t parentExtractSz;
+                const byte *parentExtract = queryGraph().queryParentExtract(parentExtractSz);
                 boundGraph->queryGraph()->executeChild(parentExtractSz, parentExtract, results, loopResults);
             }
             int iNumResults = loopResults->count();

+ 1 - 2
thorlcr/graph/thgraph.cpp

@@ -1381,6 +1381,7 @@ void CGraphBase::doExecute(size32_t parentExtractSz, const byte *parentExtract,
         throw MakeGraphException(this, 0, "subgraph aborted");
     }
     GraphPrintLog("Processing graph");
+    setParentCtx(parentExtractSz, parentExtract);
     Owned<IException> exception;
     try
     {
@@ -3142,8 +3143,6 @@ CActivityBase::CActivityBase(CGraphElementBase *_container) : container(*_contai
     mpTag = TAG_NULL;
     abortSoon = receiving = cancelledReceive = initialized = reInit = false;
     baseHelper.set(container.queryHelper());
-    parentExtractSz = 0;
-    parentExtract = NULL;
 
     defaultRoxieMemHeapFlags = (roxiemem::RoxieHeapFlags)container.getOptInt("heapflags", defaultHeapFlags);
     if (container.queryJob().queryUsePackedAllocators())

+ 5 - 3
thorlcr/graph/thgraph.hpp

@@ -658,6 +658,11 @@ public:
         parentExtractMb.swapWith(newParentExtract);
         return (const byte *)parentExtractMb.toByteArray();
     }
+    const byte *queryParentExtract(size32_t &sz) const
+    {
+        sz = parentExtractSz;
+        return (const byte *)parentExtractMb.toByteArray();
+    }
     virtual ICodeContext *queryCodeContext() { return &graphCodeContext; }
     void setLoopCounter(unsigned _counter) { counter = _counter; }
     unsigned queryLoopCounter() const { return counter; }
@@ -1071,8 +1076,6 @@ protected:
     mptag_t mpTag; // to be used by any direct inter master<->slave communication
     bool abortSoon;
     bool timeActivities; // purely for access efficiency
-    size32_t parentExtractSz;
-    const byte *parentExtract;
     bool receiving, cancelledReceive, initialized, reInit;
     Owned<IThorGraphResults> ownedResults; // NB: probably only to be used by loop results
 
@@ -1096,7 +1099,6 @@ public:
     inline bool queryTimeActivities() const { return timeActivities; }
     inline roxiemem::RoxieHeapFlags queryHeapFlags() const { return defaultRoxieMemHeapFlags; }
 
-    void onStart(size32_t _parentExtractSz, const byte *_parentExtract) { parentExtractSz = _parentExtractSz; parentExtract = _parentExtract; }
     bool receiveMsg(ICommunicator &comm, CMessageBuffer &mb, const rank_t rank, const mptag_t mpTag, rank_t *sender=NULL, unsigned timeout=MP_WAIT_FOREVER);
     bool receiveMsg(CMessageBuffer &mb, const rank_t rank, const mptag_t mpTag, rank_t *sender=NULL, unsigned timeout=MP_WAIT_FOREVER);
     void cancelReceiveMsg(ICommunicator &comm, const rank_t rank, const mptag_t mpTag);

+ 33 - 4
thorlcr/slave/slavmain.cpp

@@ -454,6 +454,7 @@ class CKJService : public CSimpleInterfaceOf<IKJService>, implements IThreaded,
         Linked<CKeyLookupContext> ctx;
         Owned<IKeyManager> keyManager;
         unsigned handle = 0;
+        Owned<IHThorKeyedJoinArg> helper;
     public:
         CKMContainer(CKJService &_service, CKeyLookupContext *_ctx)
             : service(_service), ctx(_ctx)
@@ -464,6 +465,7 @@ class CKJService : public CSimpleInterfaceOf<IKJService>, implements IThreaded,
             if (translator)
                 keyManager->setLayoutTranslator(translator);
             handle = service.getUniqId();
+            helper.set(ctx->queryHelper());
         }
         ~CKMContainer()
         {
@@ -472,6 +474,14 @@ class CKJService : public CSimpleInterfaceOf<IKJService>, implements IThreaded,
         CKeyLookupContext &queryCtx() const { return *ctx; }
         IKeyManager *queryKeyManager() const { return keyManager; }
         unsigned queryHandle() const { return handle; }
+        void setContexts(MemoryBuffer &parentCtxMb, MemoryBuffer &startCtxMb, MemoryBuffer &createCtxMb)
+        {
+            // Only create a new helper, if either parent or start are present, in which case onStart evaluation may vary.
+            if (parentCtxMb.length() || startCtxMb.length())
+                helper.setown(service.createHelper(*service.currentJob, ctx->queryKey().id, createCtxMb));
+            helper->onStart((const byte *)parentCtxMb.toByteArray(), startCtxMb.length() ? &startCtxMb : nullptr);
+        }
+        inline IHThorKeyedJoinArg *queryHelper() const { return helper; }
     };
     template<class KEY, class ITEM>
     class CKeyedCacheEntry : public CInterface
@@ -505,7 +515,6 @@ class CKJService : public CSimpleInterfaceOf<IKJService>, implements IThreaded,
     {
     protected:
         Linked<CActivityContext> activityCtx;
-        IHThorKeyedJoinArg *helper;
         std::vector<const void *> rows;
         rank_t sender;
         mptag_t replyTag;
@@ -516,7 +525,6 @@ class CKJService : public CSimpleInterfaceOf<IKJService>, implements IThreaded,
         CLookupRequest(CActivityContext *_activityCtx, rank_t _sender, mptag_t _replyTag)
             : activityCtx(_activityCtx), sender(_sender), replyTag(_replyTag)
         {
-            helper = activityCtx->queryHelper();
         }
         ~CLookupRequest()
         {
@@ -655,6 +663,7 @@ class CKJService : public CSimpleInterfaceOf<IKJService>, implements IThreaded,
     class CKeyLookupRequest : public CLookupRequest
     {
         CKJService &service;
+        IHThorKeyedJoinArg *helper = nullptr;
         Linked<CKMContainer> kmc;
 
         rowcount_t abortLimit = 0;
@@ -720,6 +729,7 @@ class CKJService : public CSimpleInterfaceOf<IKJService>, implements IThreaded,
         CKeyLookupRequest(CKJService &_service, CKeyLookupContext *_ctx, CKMContainer *_kmc, rank_t _sender, mptag_t _replyTag)
             : CLookupRequest(_ctx->queryActivityCtx(), _sender, _replyTag), kmc(_kmc), service(_service)
         {
+            helper = kmc->queryHelper();
             allocator = activityCtx->queryLookupInputAllocator();
             deserializer = activityCtx->queryLookupInputDeserializer();
             joinFieldsAllocator = activityCtx->queryJoinFieldsAllocator();
@@ -820,6 +830,7 @@ class CKJService : public CSimpleInterfaceOf<IKJService>, implements IThreaded,
     class CFetchLookupRequest : public CLookupRequest
     {
         CKJService &service;
+        IHThorKeyedJoinArg *helper = nullptr;
         Linked<CFetchContext> fetchContext;
         const unsigned defaultMaxFetchLookupReplySz = 0x100000;
         const IDynamicTransform *translator = nullptr;
@@ -880,6 +891,7 @@ class CKJService : public CSimpleInterfaceOf<IKJService>, implements IThreaded,
             StringBuffer tracing;
             translator = fetchContext->queryTranslator(fetchContext->queryKey().getTracing(tracing));
             prefetcher = fetchContext->queryPrefetcher();
+            helper = queryCtx().queryHelper();
         }
         virtual void process(bool &abortSoon) override
         {
@@ -981,7 +993,7 @@ class CKJService : public CSimpleInterfaceOf<IKJService>, implements IThreaded,
     CriticalSection kMCrit, lCCrit;
     Owned<IThreadPool> processorPool;
 
-    CActivityContext *createActivityContext(CJobBase &job, activity_id id, MemoryBuffer &createCtxMb)
+    IHThorKeyedJoinArg *createHelper(CJobBase &job, activity_id id, MemoryBuffer &createCtxMb)
     {
         VStringBuffer helperName("fAc%u", (unsigned)id);
         EclHelperFactory helperFactory = (EclHelperFactory) job.queryDllEntry().getEntry(helperName.str());
@@ -991,7 +1003,13 @@ class CKJService : public CSimpleInterfaceOf<IKJService>, implements IThreaded,
         ICodeContext &codeCtx = job.queryJobChannel(0).querySharedMemCodeContext();
         Owned<IHThorKeyedJoinArg> helper = static_cast<IHThorKeyedJoinArg *>(helperFactory());
         helper->onCreate(&codeCtx, nullptr, &createCtxMb); // JCS->GH - will I ever need colocalParent here?
-        return new CActivityContext(*this, id, helper.getClear(), &codeCtx);
+        return helper.getClear();
+    }
+    CActivityContext *createActivityContext(CJobBase &job, activity_id id, MemoryBuffer &createCtxMb)
+    {
+        IHThorKeyedJoinArg *helper = createHelper(job, id, createCtxMb);
+        ICodeContext &codeCtx = job.queryJobChannel(0).querySharedMemCodeContext();
+        return new CActivityContext(*this, id, helper, &codeCtx);
     }
     CActivityContext *ensureActivityContext(CJobBase &job, activity_id id, MemoryBuffer &createCtxMb)
     {
@@ -1294,6 +1312,16 @@ public:
                         MemoryBuffer createCtxMb;
                         createCtxMb.setBuffer(createCtxSz, (void *)msg.readDirect(createCtxSz)); // NB: read only
 
+                        size32_t parentCtxSz;
+                        msg.read(parentCtxSz);
+                        MemoryBuffer parentCtxMb;
+                        parentCtxMb.setBuffer(parentCtxSz, (void *)msg.readDirect(parentCtxSz)); // NB: read only
+
+                        size32_t startCtxSz;
+                        msg.read(startCtxSz);
+                        MemoryBuffer startCtxMb;
+                        startCtxMb.setBuffer(startCtxSz, (void *)msg.readDirect(startCtxSz)); // NB: read only
+
                         bool created;
                         Owned<CKeyLookupContext> keyLookupContext = ensureKeyLookupContext(*currentJob, key, createCtxMb, &created); // ensure entry in keyLookupContextsHT, will be removed by last CKMContainer
                         bool messageCompression;
@@ -1317,6 +1345,7 @@ public:
                                 keyLookupContext->setTranslation(translationMode, publishedFormat, publishedFormatCrc, projectedFormat);
                         }
                         Owned<CKMContainer> kmc = createActiveKeyManager(keyLookupContext); // owns keyLookupContext
+                        kmc->setContexts(parentCtxMb, startCtxMb, createCtxMb);
                         processKeyLookupRequest(msg, kmc, sender, replyTag);
                         break;
                     }