浏览代码

HPCC-14442 Initial work on global/slave memory managers

Signed-off-by: Gavin Halliday <gavin.halliday@lexisnexis.com>
Gavin Halliday 9 年之前
父节点
当前提交
cd11b85e23

+ 4 - 0
common/thorhelper/roxiehelper.cpp

@@ -1307,6 +1307,10 @@ public:
             return 20;
         return 10;
     }
+    virtual unsigned getActivityId() const
+    {
+        return activityId;
+    }
     virtual bool freeBufferedRows(bool critical)
     {
         roxiemem::RoxieOutputRowArrayLock block(rowsToSort);

+ 1 - 1
common/thorhelper/roxierow.cpp

@@ -177,7 +177,7 @@ public:
     {
         return meta.queryOriginal();
     }
-    virtual unsigned queryActivityId()
+    virtual unsigned queryActivityId() const
     {
         return activityId;
     }

+ 1 - 1
common/thorhelper/thorcommon.cpp

@@ -1105,7 +1105,7 @@ IRowInterfaces *createRowInterfaces(IOutputMetaData *meta, unsigned actid, ICode
         {
             return meta;
         }
-        unsigned queryActivityId()
+        unsigned queryActivityId() const
         {
             return actid;
         }

+ 1 - 1
common/thorhelper/thorcommon.hpp

@@ -71,7 +71,7 @@ interface IRowInterfaces: extends IInterface
     virtual IOutputRowSerializer * queryRowSerializer()=0; 
     virtual IOutputRowDeserializer * queryRowDeserializer()=0; 
     virtual IOutputMetaData *queryRowMetaData()=0;
-    virtual unsigned queryActivityId()=0;
+    virtual unsigned queryActivityId() const=0;
     virtual ICodeContext *queryCodeContext()=0;
 };
 

+ 1 - 1
ecl/hqlcpp/hqlcppds.cpp

@@ -2921,7 +2921,7 @@ public:
     virtual void * finalizeRow(size32_t newSize, void * row, size32_t oldSize) { throwUnexpected(); }
 
     virtual IOutputMetaData * queryOutputMeta() { return NULL; }
-    virtual unsigned queryActivityId() { return 0; }
+    virtual unsigned queryActivityId() const { return 0; }
     virtual StringBuffer &getId(StringBuffer & out) { return out; }
     virtual IOutputRowSerializer *createDiskSerializer(ICodeContext *ctx = NULL) { throwUnexpected(); }
     virtual IOutputRowDeserializer *createDiskDeserializer(ICodeContext *ctx) { throwUnexpected(); }

+ 6 - 0
ecl/hthor/hthor.cpp

@@ -3894,6 +3894,12 @@ unsigned CHThorGroupSortActivity::getSpillCost() const
     return 10;
 }
 
+
+unsigned CHThorGroupSortActivity::getActivityId() const
+{
+    return activityId;
+}
+
 bool CHThorGroupSortActivity::freeBufferedRows(bool critical)
 {
     roxiemem::RoxieOutputRowArrayLock block(sorter->getRowArray());

+ 1 - 0
ecl/hthor/hthor.ipp

@@ -1067,6 +1067,7 @@ public:
 
     //interface roxiemem::IBufferedRowCallback
     virtual unsigned getSpillCost() const;
+    virtual unsigned getActivityId() const;
     virtual bool freeBufferedRows(bool critical);
 
 private:

文件差异内容过多而无法显示
+ 610 - 198
roxie/roxiemem/roxiemem.cpp


+ 3 - 1
roxie/roxiemem/roxiemem.hpp

@@ -93,6 +93,7 @@ const static unsigned SpillAllCost = (unsigned)-1;
 interface IBufferedRowCallback
 {
     virtual unsigned getSpillCost() const = 0; // lower values get freed up first.
+    virtual unsigned getActivityId() const = 0;
     virtual bool freeBufferedRows(bool critical) = 0; // return true if and only if managed to free something.
 };
 
@@ -460,7 +461,6 @@ interface IRowManager : extends IInterface
     virtual void *finalizeRow(void *final, memsize_t originalSize, memsize_t finalSize, unsigned activityId) = 0;
     virtual unsigned allocated() = 0;
     virtual unsigned numPagesAfterCleanup(bool forceFreeAll) = 0; // calls releaseEmptyPages() then returns
-    virtual bool releaseEmptyPages(bool forceFreeAll) = 0; // ensures any empty pages are freed back to the heap
     virtual unsigned getMemoryUsage() = 0;
     virtual bool attachDataBuff(DataBuffer *dataBuff) = 0 ;
     virtual void noteDataBuffReleased(DataBuffer *dataBuff) = 0 ;
@@ -490,6 +490,7 @@ interface IRowManager : extends IInterface
     virtual void setMinimizeFootprint(bool value, bool critical) = 0;
     //If set, and changes to the callback list always triggers the callbacks to be called.
     virtual void setReleaseWhenModifyCallback(bool value, bool critical) = 0;
+    virtual IRowManager * querySlaveRowManager(unsigned slave) = 0;  // 0..numSlaves-1
 };
 
 extern roxiemem_decl void setDataAlignmentSize(unsigned size);
@@ -512,6 +513,7 @@ interface IActivityMemoryUsageMap : public IInterface
 };
 
 extern roxiemem_decl IRowManager *createRowManager(memsize_t memLimit, ITimeLimiter *tl, const IContextLogger &logctx, const IRowAllocatorCache *allocatorCache, bool ignoreLeaks = false, bool outputOOMReports = false);
+extern roxiemem_decl IRowManager *createGlobalRowManager(memsize_t memLimit, memsize_t globalLimit, unsigned numSlaves, ITimeLimiter *tl, const IContextLogger &logctx, const IRowAllocatorCache *allocatorCache, bool ignoreLeaks, bool outputOOMReports);
 
 // Fixed size aggregated link-counted zero-overhead data Buffer manager
 

+ 1 - 1
rtl/include/eclhelper.hpp

@@ -254,7 +254,7 @@ interface IEngineRowAllocator : extends IInterface
     virtual void * finalizeRow(size32_t newSize, void * row, size32_t oldSize) = 0;
 
     virtual IOutputMetaData * queryOutputMeta() = 0;
-    virtual unsigned queryActivityId() = 0;
+    virtual unsigned queryActivityId() const = 0;
     virtual StringBuffer &getId(StringBuffer &) = 0;
     virtual IOutputRowSerializer *createDiskSerializer(ICodeContext *ctx = NULL) = 0;
     virtual IOutputRowDeserializer *createDiskDeserializer(ICodeContext *ctx) = 0;

+ 1 - 0
thorlcr/activities/filter/thfilterslave.cpp

@@ -249,6 +249,7 @@ public:
     CFilterGroupSlaveActivity(CGraphElementBase *container) : CFilterSlaveActivityBase(container), CThorSteppable(this)
     {
         groupLoader.setown(createThorRowLoader(*this, NULL, stableSort_none, rc_allMem));
+        helper = NULL;
     }
     void init(MemoryBuffer &data, MemoryBuffer &slaveData)
     {

+ 1 - 1
thorlcr/activities/group/thgroupslave.cpp

@@ -88,7 +88,7 @@ public:
 
         if (rolloverEnabled && !firstNode())  // 1st node can have nothing to send
         {
-            Owned<IThorRowCollector> collector = createThorRowCollector(*this, NULL, stableSort_none, rc_mixed, SPILL_PRIORITY_SPILLABLE_STREAM);
+            Owned<IThorRowCollector> collector = createThorRowCollector(*this, this, NULL, stableSort_none, rc_mixed, SPILL_PRIORITY_SPILLABLE_STREAM);
             Owned<IRowWriter> writer = collector->getWriter();
             if (next)
             {

+ 12 - 0
thorlcr/activities/hashdistrib/thhashdistribslave.cpp

@@ -2678,6 +2678,10 @@ class CBucketHandler : public CSimpleInterface, implements IInterface, implement
         {
             return SPILL_PRIORITY_HASHDEDUP_BUCKET_POSTSPILL;
         }
+        virtual unsigned getActivityId() const
+        {
+            return owner.getActivityId();
+        }
         virtual bool freeBufferedRows(bool critical)
         {
             if (NotFound == owner.nextSpilledBucketFlush)
@@ -2755,6 +2759,7 @@ public:
     {
         return SPILL_PRIORITY_HASHDEDUP;
     }
+    virtual unsigned getActivityId() const;
     virtual bool freeBufferedRows(bool critical)
     {
         return spillBucket(critical);
@@ -3259,6 +3264,8 @@ CBucketHandler::CBucketHandler(HashDedupSlaveActivityBase &_owner, IRowInterface
     nextToSpill = NotFound;
     peakKeyCount = RCIDXMAX;
     nextSpilledBucketFlush = NotFound;
+    numBuckets = 0;
+    buckets = NULL;
 }
 
 CBucketHandler::~CBucketHandler()
@@ -3356,6 +3363,11 @@ unsigned CBucketHandler::getBucketEstimate(rowcount_t totalRows) const
     return retBuckets;
 }
 
+unsigned CBucketHandler::getActivityId() const
+{
+    return owner.queryActivityId();
+}
+
 void CBucketHandler::init(unsigned _numBuckets, IRowStream *keyStream)
 {
     numBuckets = _numBuckets;

+ 4 - 0
thorlcr/activities/lookupjoin/thlookupjoinslave.cpp

@@ -2152,6 +2152,10 @@ public:
     {
         return SPILL_PRIORITY_LOOKUPJOIN;
     }
+    virtual unsigned getActivityId() const
+    {
+        return this->queryActivityId();
+    }
     virtual bool freeBufferedRows(bool critical)
     {
         // NB: only installed if lookup join and global

+ 7 - 0
thorlcr/activities/rollup/throllupslave.cpp

@@ -87,6 +87,10 @@ public:
         in = NULL;
         helper = NULL;
         abort = NULL;
+        dedupIdx = dedupCount = 0;
+        dedupArray = NULL;
+        iStopInput = NULL;
+        keepLeft = true;
         rowLoader.setown(createThorRowLoader(*activity, NULL, stableSort_none, rc_allMem));
     }
 
@@ -568,6 +572,9 @@ public:
 
     CRollupGroupSlaveActivity(CGraphElementBase *_container) : CSlaveActivity(_container), CThorDataLink(this)
     {
+        eoi = false;
+        input = NULL;
+        helper = NULL;
     }
     void init(MemoryBuffer &data, MemoryBuffer &slaveData)
     {

+ 1 - 1
thorlcr/graph/thgraph.hpp

@@ -1048,7 +1048,7 @@ public:
     virtual IOutputRowSerializer * queryRowSerializer(); 
     virtual IOutputRowDeserializer * queryRowDeserializer(); 
     virtual IOutputMetaData *queryRowMetaData() { return baseHelper->queryOutputMeta(); }
-    virtual unsigned queryActivityId() { return (unsigned)queryId(); }
+    virtual unsigned queryActivityId() const { return (unsigned)queryId(); }
     virtual ICodeContext *queryCodeContext() { return container.queryCodeContext(); }
 
     StringBuffer &getOpt(const char *prop, StringBuffer &out) const { return container.getOpt(prop, out); }

+ 11 - 8
thorlcr/thorutil/thmem.cpp

@@ -205,13 +205,14 @@ protected:
 public:
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
 
-    CSpillableStreamBase(CActivityBase &_activity, CThorSpillableRowArray &inRows, IRowInterfaces *_rowIf, bool _preserveNulls, unsigned _spillPriorirty)
-        : activity(_activity), rowIf(_rowIf), rows(_activity, _rowIf, _preserveNulls), preserveNulls(_preserveNulls), spillPriority(_spillPriorirty)
+    CSpillableStreamBase(CActivityBase &_activity, CThorSpillableRowArray &inRows, IRowInterfaces *_rowIf, bool _preserveNulls, unsigned _spillPriority)
+        : activity(_activity), rowIf(_rowIf), rows(_activity, _rowIf, _preserveNulls), preserveNulls(_preserveNulls), spillPriority(_spillPriority)
     {
         assertex(inRows.isFlushed());
         rows.swap(inRows);
         useCompression = false;
         mmRegistered = false;
+        ownsRows = false;
     }
     ~CSpillableStreamBase()
     {
@@ -225,6 +226,10 @@ public:
     {
         return spillPriority;
     }
+    virtual unsigned getActivityId() const
+    {
+        return activity.queryActivityId();
+    }
     virtual bool freeBufferedRows(bool critical)
     {
         if (spillFile) // i.e. if spilt already. NB: this is thread-safe, as 'spillFile' only set by spillRows() call below and can't be called on multiple threads concurrently.
@@ -1755,6 +1760,10 @@ public:
     {
         return spillPriority;
     }
+    virtual unsigned getActivityId() const
+    {
+        return activity.queryActivityId();
+    }
     virtual bool freeBufferedRows(bool critical)
     {
         if (!spillingEnabled())
@@ -1910,12 +1919,6 @@ IThorRowCollector *createThorRowCollector(CActivityBase &activity, IRowInterface
     return collector.getClear();
 }
 
-IThorRowCollector *createThorRowCollector(CActivityBase &activity, ICompare *iCompare, StableSortFlag stableSort, RowCollectorSpillFlags diskMemMix, unsigned spillPriority, bool preserveGrouping)
-{
-    return createThorRowCollector(activity, &activity, iCompare, stableSort, diskMemMix, spillPriority, preserveGrouping);
-}
-
-
 void setThorInABox(unsigned num)
 {
 }

+ 0 - 1
thorlcr/thorutil/thmem.hpp

@@ -536,7 +536,6 @@ interface IThorRowCollector : extends IThorRowCollectorCommon
 extern graph_decl IThorRowLoader *createThorRowLoader(CActivityBase &activity, IRowInterfaces *rowIf, ICompare *iCompare=NULL, StableSortFlag stableSort=stableSort_none, RowCollectorSpillFlags diskMemMix=rc_mixed, unsigned spillPriority=SPILL_PRIORITY_DEFAULT);
 extern graph_decl IThorRowLoader *createThorRowLoader(CActivityBase &activity, ICompare *iCompare=NULL, StableSortFlag stableSort=stableSort_none, RowCollectorSpillFlags diskMemMix=rc_mixed, unsigned spillPriority=SPILL_PRIORITY_DEFAULT);
 extern graph_decl IThorRowCollector *createThorRowCollector(CActivityBase &activity, IRowInterfaces *rowIf, ICompare *iCompare=NULL, StableSortFlag stableSort=stableSort_none, RowCollectorSpillFlags diskMemMix=rc_mixed, unsigned spillPriority=SPILL_PRIORITY_DEFAULT, bool preserveGrouping=false);
-extern graph_decl IThorRowCollector *createThorRowCollector(CActivityBase &activity, ICompare *iCompare=NULL, StableSortFlag stableSort=stableSort_none, RowCollectorSpillFlags diskMemMix=rc_mixed, unsigned spillPriority=SPILL_PRIORITY_DEFAULT, bool preserveGrouping=false);