瀏覽代碼

HPCC-14854 Add Roxie implementation of parallel PROJECT and PARSE

Use a unique allocator per strand.

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 9 年之前
父節點
當前提交
8def4d730a
共有 4 個文件被更改,包括 19 次插入5 次删除
  1. 2 2
      roxie/ccd/ccdactivities.cpp
  2. 5 0
      roxie/ccd/ccdcontext.cpp
  3. 2 0
      roxie/ccd/ccdcontext.hpp
  4. 10 3
      roxie/ccd/ccdserver.cpp

+ 2 - 2
roxie/ccd/ccdactivities.cpp

@@ -585,9 +585,9 @@ public:
         return logctx;
     }
 
-    virtual IEngineRowAllocator * getRowAllocator(IOutputMetaData * meta, unsigned activityId) const 
+    virtual IEngineRowAllocator * getRowAllocator(IOutputMetaData * meta, unsigned activityId) const
     {
-        return queryContext->queryCodeContext()->getRowAllocator(meta, activityId); 
+        return queryContext->queryCodeContext()->getRowAllocator(meta, activityId);
     }
     virtual const char *cloneVString(const char *str) const
     {

+ 5 - 0
roxie/ccd/ccdcontext.cpp

@@ -1578,6 +1578,11 @@ public:
         return allocatorMetaCache->ensure(meta, activityId, roxiemem::RHFnone);
     }
 
+    virtual IEngineRowAllocator *getRowAllocatorEx(IOutputMetaData * meta, unsigned activityId, roxiemem::RoxieHeapFlags flags) const
+    {
+        return allocatorMetaCache->ensure(meta, activityId, flags);
+    }
+
     virtual const char *cloneVString(const char *str) const
     {
         return rowManager->cloneVString(str);

+ 2 - 0
roxie/ccd/ccdcontext.hpp

@@ -69,6 +69,8 @@ interface IRoxieSlaveContext : extends IRoxieContextLogger
     virtual IConstWorkUnit *queryWorkUnit() const = 0;
     virtual IRoxieServerContext *queryServerContext() = 0;
     virtual IWorkUnitRowReader *getWorkunitRowReader(const char *wuid, const char * name, unsigned sequence, IXmlToRowTransformer * xmlTransformer, IEngineRowAllocator *rowAllocator, bool isGrouped) = 0;
+    virtual IEngineRowAllocator * getRowAllocatorEx(IOutputMetaData * meta, unsigned activityId, roxiemem::RoxieHeapFlags flags) const = 0;
+
 };
 
 interface IRoxieServerContext : extends IInterface

+ 10 - 3
roxie/ccd/ccdserver.cpp

@@ -316,6 +316,12 @@ public:
     {
         return ctx->getWorkunitRowReader(wuid, name, sequence, xmlTransformer, rowAllocator, isGrouped);
     }
+    virtual IEngineRowAllocator *getRowAllocatorEx(IOutputMetaData * meta, unsigned activityId, roxiemem::RoxieHeapFlags flags) const
+    {
+        return ctx->getRowAllocatorEx(meta, activityId, flags);
+    }
+
+
 protected:
     IRoxieSlaveContext * ctx;
 };
@@ -13240,6 +13246,7 @@ class CRoxieServerParallelProjectActivity : public CRoxieServerActivity
     class ProjectProcessor : public CInterfaceOf<IEngineRowStream>
     {
         CRoxieServerParallelProjectActivity &parent;
+        IEngineRowAllocator *rowAllocator;
 
         // All these probably should go in a common base class StrandProcessor. Might even want that class to replace the corresponding fields in CRoxieServerActivity
         IEngineRowStream *inputStream;
@@ -13255,6 +13262,7 @@ class CRoxieServerParallelProjectActivity : public CRoxieServerActivity
           : parent(_parent), inputStream(_inputStream), basehelper(parent.basehelper)
         {
             timeActivities = parent.timeActivities;
+            rowAllocator = parent.ctx->getRowAllocatorEx(parent.meta.queryOriginal(), parent.activityId, roxiemem::RHFunique);
         }
         virtual const void * nextRow()
         {
@@ -13275,7 +13283,7 @@ class CRoxieServerParallelProjectActivity : public CRoxieServerActivity
 
                 try
                 {
-                    RtlDynamicRowBuilder rowBuilder(parent.rowAllocator);
+                    RtlDynamicRowBuilder rowBuilder(rowAllocator);
                     size32_t outSize;
                     outSize = ((IHThorProjectArg &) basehelper).transform(rowBuilder, in);
                     if (outSize)
@@ -13312,6 +13320,7 @@ public:
     {
         if (!blockSize)
             blockSize = ctx->queryOptions().strandBlockSize;
+
     }
 
     virtual void start(unsigned parentExtractSize, const byte *parentExtract, bool paused)
@@ -13372,8 +13381,6 @@ public:
         return recombiner.getClear();
     }
 
-    virtual bool needsAllocator() const { return true; }
-
     virtual const void * nextRow()
     {
         throwUnexpected();