Explorar el Código

HPCC-18950 Roxie aggregate performance-suite issues

Move allocation of rowbuilder objects onto the Roxie heap
(which is really more appropriate anyway) to avoid heap
fragmentation issues using glibc malloc.

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman hace 7 años
padre
commit
0493950613

+ 13 - 5
common/thorhelper/thorcommon.cpp

@@ -36,8 +36,13 @@
 #endif
 
 #include "thorstep.hpp"
+#include "roxiemem.hpp"
 
 #define ROWAGG_PERROWOVERHEAD (sizeof(AggregateRowBuilder))
+
+void AggregateRowBuilder::Link() const { LinkRoxieRow(this); }
+bool AggregateRowBuilder::Release() const { ReleaseRoxieRow(this); return false; }  // MORE - return value is iffy
+
 RowAggregator::RowAggregator(IHThorHashAggregateExtra &_extra, IHThorRowAggregator & _helper) : helper(_helper)
 {
     comparer = _extra.queryCompareRowElement();
@@ -54,9 +59,12 @@ RowAggregator::~RowAggregator()
     reset();
 }
 
-void RowAggregator::start(IEngineRowAllocator *_rowAllocator)
+static CClassMeta<AggregateRowBuilder> AggregateRowBuilderMeta;
+
+void RowAggregator::start(IEngineRowAllocator *_rowAllocator, ICodeContext *ctx, unsigned activityId)
 {
     rowAllocator.set(_rowAllocator);
+    rowBuilderAllocator.setown(ctx->getRowAllocatorEx(&AggregateRowBuilderMeta, activityId, roxiemem::RHFunique|roxiemem::RHFnofragment|roxiemem::RHFdelayrelease));
 }
 
 void RowAggregator::reset()
@@ -65,9 +73,9 @@ void RowAggregator::reset()
     {
         AggregateRowBuilder *n = nextResult();
         if (n)
-            n->Release();
+            ReleaseRoxieRow(n);
     }
-    SuperHashTable::_releaseAll();
+    _releaseAll();
     eof = false;
     cursor = NULL;
     rowAllocator.clear();
@@ -89,7 +97,7 @@ AggregateRowBuilder &RowAggregator::addRow(const void * row)
     }
     else
     {
-        Owned<AggregateRowBuilder> rowBuilder = new AggregateRowBuilder(rowAllocator, hash);
+        Owned<AggregateRowBuilder> rowBuilder = new (rowBuilderAllocator->createRow()) AggregateRowBuilder(rowAllocator, hash);
         helper.clearAggregate(*rowBuilder);
         size32_t sz = helper.processFirst(*rowBuilder, row);
         rowBuilder->setSize(sz);
@@ -115,7 +123,7 @@ void RowAggregator::mergeElement(const void * otherElement)
     }
     else
     {
-        Owned<AggregateRowBuilder> rowBuilder = new AggregateRowBuilder(rowAllocator, hash);
+        Owned<AggregateRowBuilder> rowBuilder = new (rowBuilderAllocator->createRow()) AggregateRowBuilder(rowAllocator, hash);
         rowBuilder->setSize(cloneRow(*rowBuilder, otherElement, rowAllocator->queryOutputMeta()));
         addNew(rowBuilder.getClear(), hash);
     }

+ 28 - 2
common/thorhelper/thorcommon.ipp

@@ -18,6 +18,7 @@
 #ifndef THORCOMMON_IPP
 #define THORCOMMON_IPP
 
+#include <type_traits>
 #include "eclrtl.hpp"
 #include "thorcommon.hpp"
 #include "jsort.hpp"
@@ -119,6 +120,27 @@ private:
     unsigned metaFlags;
 };
 
+template<class T> class CClassMeta : implements CInterfaceOf<IOutputMetaData>
+{
+public:
+    virtual size32_t getRecordSize(const void *rec)         { return sizeof(T); }
+    virtual size32_t getMinRecordSize() const               { return sizeof(T); }
+    virtual size32_t getFixedSize() const                   { return sizeof(T); }
+    virtual void toXML(const byte * self, IXmlWriter & out) { }
+    virtual unsigned getVersion() const                     { return OUTPUTMETADATA_VERSION; }
+    virtual unsigned getMetaFlags()                         { return std::is_pod<T>() ? 0 : MDFneeddestruct; }
+    virtual void destruct(byte * self)                      { reinterpret_cast<T *>(self)->~T(); }
+    virtual IOutputRowSerializer * createDiskSerializer(ICodeContext * ctx, unsigned activityId) { return NULL; }
+    virtual IOutputRowDeserializer * createDiskDeserializer(ICodeContext * ctx, unsigned activityId) { return NULL; }
+    virtual ISourceRowPrefetcher * createDiskPrefetcher() { return NULL; }
+    virtual IOutputMetaData * querySerializedDiskMeta() { return this; }
+    virtual IOutputRowSerializer * createInternalSerializer(ICodeContext * ctx, unsigned activityId) { return NULL; }
+    virtual IOutputRowDeserializer * createInternalDeserializer(ICodeContext * ctx, unsigned activityId) { return NULL; }
+    virtual void walkIndirectMembers(const byte * self, IIndirectMemberVisitor & visitor) {}
+    virtual IOutputMetaData * queryChildMeta(unsigned i) { return NULL; }
+    virtual const RtlRecord &queryRecordAccessor(bool expand) const { throwUnexpected(); } // could provide a static implementation if needed
+};
+
 //------------------------------------------------------------------------------------------------
 
 
@@ -154,9 +176,12 @@ protected:
 
 //------------------------------------------------------------------------------------------------
 
-class AggregateRowBuilder : public RtlDynamicRowBuilder, public CInterface
+class THORHELPER_API AggregateRowBuilder : public RtlDynamicRowBuilder
 {
 public:
+    void Link() const;
+    bool Release() const;
+
     AggregateRowBuilder(IEngineRowAllocator *_rowAllocator, unsigned _elementHash)
         : RtlDynamicRowBuilder(_rowAllocator, true), elementHash(_elementHash)
     {
@@ -194,7 +219,7 @@ public:
     IMPLEMENT_IINTERFACE
 
     void reset();
-    void start(IEngineRowAllocator *rowAllocator);
+    void start(IEngineRowAllocator *rowAllocator, ICodeContext *ctx, unsigned activityId);
     AggregateRowBuilder &addRow(const void * row);
     void mergeElement(const void * otherElement);
     AggregateRowBuilder *nextResult();
@@ -227,6 +252,7 @@ private:
     const void * cursor;
     bool eof;
     Owned<IEngineRowAllocator> rowAllocator;
+    Owned<IEngineRowAllocator> rowBuilderAllocator;
     memsize_t totalSize, overhead;
 };
 

+ 3 - 3
ecl/hthor/hthor.cpp

@@ -3349,7 +3349,7 @@ const void * CHThorHashAggregateActivity::nextRow()
     if (!gathered)
     {
         bool eog = true;
-        aggregated.start(rowAllocator);
+        aggregated.start(rowAllocator, agent.queryCodeContext(), activityId);
         for (;;)
         {
             OwnedConstRoxieRow next(input->nextRow());
@@ -7884,7 +7884,7 @@ void CHThorChildGroupAggregateActivity::ready()
     CHThorSimpleActivityBase::ready();
     eof = false;
     gathered = false;
-    aggregated.start(rowAllocator);
+    aggregated.start(rowAllocator, agent.queryCodeContext(), activityId);
 }
 
 void CHThorChildGroupAggregateActivity::stop()
@@ -8937,7 +8937,7 @@ void CHThorDiskGroupAggregateActivity::gatherInfo(IFileDescriptor * fd)
 {
     PARENT::gatherInfo(fd);
     assertex(!grouped);
-    aggregated.start(rowAllocator);
+    aggregated.start(rowAllocator, agent.queryCodeContext(), activityId);
 }
 
 void CHThorDiskGroupAggregateActivity::processRow(const void * next)

+ 1 - 1
ecl/hthor/hthorkey.cpp

@@ -1510,7 +1510,7 @@ void CHThorIndexGroupAggregateActivity::ready()
     eof = false;
     gathered = false;
     aggregated.reset();
-    aggregated.start(rowAllocator);
+    aggregated.start(rowAllocator, agent.queryCodeContext(), activityId);
 }
 
 void CHThorIndexGroupAggregateActivity::processRow(const void * next)

+ 3 - 3
roxie/ccd/ccdactivities.cpp

@@ -2161,7 +2161,7 @@ public:
           results(*helper, *helper)
     {
         onCreate();
-        results.start(rowAllocator);
+        results.start(rowAllocator, queryContext->queryCodeContext(), basefactory->queryId());
     }
 
     virtual bool needsRowAllocator()
@@ -2203,7 +2203,7 @@ public:
         resultAggregator(*helper, *helper)
     {
         onCreate();
-        resultAggregator.start(rowAllocator);
+        resultAggregator.start(rowAllocator, queryContext->queryCodeContext(), basefactory->queryId());
         if (meta.needsSerializeDisk())
         {
             // MORE - avoiding serializing to dummy would be more efficient...
@@ -3400,7 +3400,7 @@ public:
           results(*aggregateHelper, *aggregateHelper), kind(_kind)
     {
         onCreate();
-        results.start(rowAllocator);
+        results.start(rowAllocator, queryContext->queryCodeContext(), basefactory->queryId());
         assertex(!resent);
         groupSegCount = 0;
     }

+ 1 - 0
roxie/ccd/ccdmain.cpp

@@ -548,6 +548,7 @@ int STARTQUERY_API start_query(int argc, const char *argv[])
             topology->setProp("@traceLevel", globals->queryProp("--traceLevel"));
             topology->setPropInt("@allFilesDynamic", globals->getPropInt("--allFilesDynamic", 1));
             topology->setProp("@memTraceLevel", globals->queryProp("--memTraceLevel"));
+            topology->setPropInt64("@totalMemoryLimit", globals->getPropInt("--totalMemoryLimitMb", 0) * (memsize_t) 0x100000);
             topology->setProp("@disableLocalOptimizations", globals->queryProp("--disableLocalOptimizations"));
         }
         if (topology->hasProp("PreferredCluster"))

+ 5 - 5
roxie/ccd/ccdserver.cpp

@@ -5518,7 +5518,7 @@ public:
     virtual void start(unsigned parentExtractSize, const byte *parentExtract, bool paused)
     {
         CRoxieServerChildBaseActivity::start(parentExtractSize, parentExtract, paused);
-        aggregated.start(rowAllocator);
+        aggregated.start(rowAllocator, ctx->queryCodeContext(), activityId);
     }
 
     virtual void reset()
@@ -11182,7 +11182,7 @@ public:
 
         if (!gathered)
         {
-            aggregated.start(rowAllocator);
+            aggregated.start(rowAllocator, ctx->queryCodeContext(), activityId);
             bool eog = true;
             for (;;)
             {
@@ -22400,7 +22400,7 @@ public:
     {
         gathered= false;
         CRoxieServerDiskAggregateBaseActivity::start(parentExtractSize, parentExtract, paused);
-        resultAggregator.start(rowAllocator);
+        resultAggregator.start(rowAllocator, ctx->queryCodeContext(), activityId);
     }
 
     virtual void reset()
@@ -24174,7 +24174,7 @@ public:
         groupSegCount = 0;
         if (!paused)
             processAllKeys();
-        resultAggregator.start(rowAllocator);
+        resultAggregator.start(rowAllocator, ctx->queryCodeContext(), activityId);
     }
 
     virtual bool needsAllocator() const { return true; }
@@ -24203,7 +24203,7 @@ public:
     virtual bool processSingleKey(IKeyIndex *key, const IDynamicTransform * trans) override
     {
         Owned<CRowArrayMessageResult> result = new CRowArrayMessageResult(ctx->queryRowManager(), meta.isVariableSize());
-        singleAggregator.start(rowAllocator);
+        singleAggregator.start(rowAllocator, ctx->queryCodeContext(), activityId);
         ThorActivityKind kind = factory->getKind();
         while (tlk->lookup(true))
         {

+ 1 - 1
thorlcr/activities/loop/thloopslave.cpp

@@ -1125,7 +1125,7 @@ public:
         gathered = eos = false;
         aggregated.clear();
         aggregated.setown(new RowAggregator(*helper, *helper));
-        aggregated->start(queryRowAllocator());
+        aggregated->start(queryRowAllocator(), queryCodeContext(), queryId());
     }
     CATCH_NEXTROW()
     {