Bladeren bron

HPCC-27289 Remove legacy IDiskUsage associated code.

supplanted by jstats disk tracking.
Was only used by Thor and inefficiently looked up
existing workunit files (e.g. on a rerun) on an overwrite.

Signed-off-by: Jake Smith <jake.smith@lexisnexisrisk.com>
Jake Smith 3 jaren geleden
bovenliggende
commit
cea824fc46
36 gewijzigde bestanden met toevoegingen van 37 en 233 verwijderingen
  1. 0 39
      common/workunit/workunit.cpp
  2. 0 2
      common/workunit/workunit.hpp
  3. 0 2
      common/workunit/workunit.ipp
  4. 1 1
      plugins/cassandra/cassandrawu.cpp
  5. 2 2
      thorlcr/activities/choosesets/thchoosesetsslave.cpp
  6. 1 1
      thorlcr/activities/countproject/thcountprojectslave.cpp
  7. 1 1
      thorlcr/activities/enth/thenthslave.cpp
  8. 1 1
      thorlcr/activities/firstn/thfirstnslave.cpp
  9. 1 5
      thorlcr/activities/indexwrite/thindexwriteslave.cpp
  10. 1 1
      thorlcr/activities/iterate/thiterateslave.cpp
  11. 3 3
      thorlcr/activities/join/thjoinslave.cpp
  12. 0 4
      thorlcr/activities/keydiff/thkeydiffslave.cpp
  13. 0 4
      thorlcr/activities/keypatch/thkeypatchslave.cpp
  14. 1 1
      thorlcr/activities/lookupjoin/thlookupjoinslave.cpp
  15. 2 2
      thorlcr/activities/msort/thmsortslave.cpp
  16. 1 1
      thorlcr/activities/nsplitter/thnsplitterslave.cpp
  17. 1 1
      thorlcr/activities/pull/thpullslave.cpp
  18. 1 1
      thorlcr/activities/rollup/throllupslave.cpp
  19. 1 1
      thorlcr/activities/selectnth/thselectnthslave.cpp
  20. 2 2
      thorlcr/activities/selfjoin/thselfjoinslave.cpp
  21. 0 2
      thorlcr/activities/spill/thspillslave.cpp
  22. 3 3
      thorlcr/activities/thactivityutil.cpp
  23. 1 2
      thorlcr/activities/thactivityutil.ipp
  24. 1 3
      thorlcr/activities/thdiskbaseslave.cpp
  25. 1 1
      thorlcr/activities/wuidwrite/thwuidwriteslave.cpp
  26. 0 12
      thorlcr/graph/thgraph.cpp
  27. 1 18
      thorlcr/graph/thgraph.hpp
  28. 0 75
      thorlcr/graph/thgraphmaster.cpp
  29. 0 5
      thorlcr/graph/thgraphmaster.ipp
  30. 0 1
      thorlcr/graph/thgraphslave.cpp
  31. 0 2
      thorlcr/graph/thgraphslave.hpp
  32. 0 14
      thorlcr/mfilemanager/thmfilemanager.cpp
  33. 3 3
      thorlcr/msort/tsorts.cpp
  34. 1 2
      thorlcr/msort/tsorts.hpp
  35. 5 13
      thorlcr/thorutil/thbuf.cpp
  36. 1 2
      thorlcr/thorutil/thbuf.hpp

+ 0 - 39
common/workunit/workunit.cpp

@@ -4470,10 +4470,6 @@ public:
             { c->deleteTempFiles(graph, deleteOwned, deleteJobOwned); }
     virtual void deleteTemporaries()
             { c->deleteTemporaries(); }
-    virtual void addDiskUsageStats(__int64 avgNodeUsage, unsigned minNode, __int64 minNodeUsage, unsigned maxNode, __int64 maxNodeUsage, __int64 graphId)
-            { c->addDiskUsageStats(avgNodeUsage, minNode, minNodeUsage, maxNode, maxNodeUsage, graphId); }
-    virtual IPropertyTree * getDiskUsageStats()
-            { return c->getDiskUsageStats(); }
     virtual IPropertyTreeIterator & getFileIterator() const
             { return c->getFileIterator(); }
     virtual IPropertyTreeIterator & getFilesReadIterator() const
@@ -9510,41 +9506,6 @@ bool CLocalWorkUnit::getFieldUsageArray(StringArray & filenames, StringArray & c
     return true;
 }
 
-IPropertyTree *CLocalWorkUnit::getDiskUsageStats()
-{
-    return p->getPropTree("DiskUsageStats");
-}
-
-void CLocalWorkUnit::addDiskUsageStats(__int64 _avgNodeUsage, unsigned _minNode, __int64 _minNodeUsage, unsigned _maxNode, __int64 _maxNodeUsage, __int64 _graphId)
-{
-    IPropertyTree *stats = p->queryPropTree("DiskUsageStats");
-    offset_t maxNodeUsage;
-    if (stats)
-        maxNodeUsage = stats->getPropInt64("@maxNodeUsage");
-    else
-    {
-        stats = p->addPropTree("DiskUsageStats");
-        maxNodeUsage = 0;
-    }
-
-    if ((offset_t)_maxNodeUsage > maxNodeUsage)
-    {
-        // record all details at time of max node usage.
-        stats->setPropInt("@minNode", _minNode);
-        stats->setPropInt("@maxNode", _maxNode);
-        stats->setPropInt64("@minNodeUsage", _minNodeUsage);
-        stats->setPropInt64("@maxNodeUsage", _maxNodeUsage);
-        stats->setPropInt64("@graphId", _graphId);
-        if (_avgNodeUsage)
-        {
-            unsigned _skewHi = (unsigned)((100 * (_maxNodeUsage-_avgNodeUsage))/_avgNodeUsage);
-            unsigned _skewLo = (unsigned)((100 * (_avgNodeUsage-_minNodeUsage))/_avgNodeUsage);
-            stats->setPropInt("@skewHi", _skewHi);
-            stats->setPropInt("@skewLo", _skewLo);
-        }
-    }
-}
-
 IPropertyTreeIterator & CLocalWorkUnit::getFileIterator() const
 {
     CriticalBlock block(crit);

+ 0 - 2
common/workunit/workunit.hpp

@@ -1266,7 +1266,6 @@ interface IConstWorkUnit : extends IConstWorkUnitInfo
     virtual unsigned getCodeVersion() const = 0;
     virtual unsigned getWuidVersion() const  = 0;
     virtual void getBuildVersion(IStringVal & buildVersion, IStringVal & eclVersion) const = 0;
-    virtual IPropertyTree * getDiskUsageStats() = 0;
     virtual IPropertyTreeIterator & getFileIterator() const = 0;
     virtual bool getCloneable() const = 0;
     virtual IUserDescriptor * queryUserDescriptor() const = 0;
@@ -1364,7 +1363,6 @@ interface IWorkUnit : extends IConstWorkUnit
     virtual void setCodeVersion(unsigned version, const char * buildVersion, const char * eclVersion) = 0;
     virtual void deleteTempFiles(const char * graph, bool deleteOwned, bool deleteJobOwned) = 0;
     virtual void deleteTemporaries() = 0;
-    virtual void addDiskUsageStats(__int64 avgNodeUsage, unsigned minNode, __int64 minNodeUsage, unsigned maxNode, __int64 maxNodeUsage, __int64 graphId) = 0;
     virtual void setCloneable(bool value) = 0;
     virtual void setIsClone(bool value) = 0;
     virtual void setTimeScheduled(const IJlibDateTime & val) = 0;

+ 0 - 2
common/workunit/workunit.ipp

@@ -295,7 +295,6 @@ public:
     virtual unsigned getCodeVersion() const;
     virtual unsigned getWuidVersion() const;
     virtual void getBuildVersion(IStringVal & buildVersion, IStringVal & eclVersion) const;
-    virtual IPropertyTree * getDiskUsageStats();
     virtual IPropertyTreeIterator & getFileIterator() const;
     virtual bool archiveWorkUnit(const char *base,bool del,bool ignoredllerrors,bool deleteOwned,bool exportAssociatedFiles);
     virtual IJlibDateTime & getTimeScheduled(IJlibDateTime &val) const;
@@ -373,7 +372,6 @@ public:
     void resetBeforeGeneration();
     void deleteTempFiles(const char *graph, bool deleteOwned, bool deleteJobOwned);
     void deleteTemporaries();
-    void addDiskUsageStats(__int64 avgNodeUsage, unsigned minNode, __int64 minNodeUsage, unsigned maxNode, __int64 maxNodeUsage, __int64 graphId);
     void setTimeScheduled(const IJlibDateTime &val);
     virtual void subscribe(WUSubscribeOptions options) {};
 

+ 1 - 1
plugins/cassandra/cassandrawu.cpp

@@ -891,7 +891,7 @@ static const CassandraXmlMapping workunitsMappings [] =
     // These are catchalls for anything not processed above or in a child table
 
     {"elements", "map<text, text>", "@Action@Application@Debug@Exceptions@Files@FilesRead@Graphs@Results@Statistics@Plugins@Query@State@Variables@Temporaries@Workflow@", elementMapColumnMapper},  // name is the suppression list, note trailing @
-    {"subtrees", "map<text, text>", "@DiskUsageStats@Parameters@Process@Tracing@", subTreeMapColumnMapper},  // name is the INCLUSION list, note trailing @
+    {"subtrees", "map<text, text>", "@Parameters@Process@Tracing@", subTreeMapColumnMapper},  // name is the INCLUSION list, note trailing @
 
     { NULL, "workunits", "((partition), wuid)|CLUSTERING ORDER BY (wuid DESC)", stringColumnMapper}
 };

+ 2 - 2
thorlcr/activities/choosesets/thchoosesetsslave.cpp

@@ -156,7 +156,7 @@ public:
         PARENT::start();
 
         if (ensureStartFTLookAhead(0))
-            setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), CHOOSESETS_SMART_BUFFER_SIZE, ::canStall(input), false, RCUNBOUND, NULL, &container.queryJob().queryIDiskUsage()), false);
+            setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), CHOOSESETS_SMART_BUFFER_SIZE, ::canStall(input), false, RCUNBOUND, NULL), false);
 
         first = true;
         done = false;
@@ -276,7 +276,7 @@ public:
     {
         PARENT::setInputStream(index, _input, consumerOrdered);
         inputCounter->setInputStream(inputStream);
-        setLookAhead(0, createRowStreamLookAhead(this, inputCounter.get(), queryRowInterfaces(input), CHOOSESETSPLUS_SMART_BUFFER_SIZE, true, false, RCUNBOUND, this, &container.queryJob().queryIDiskUsage()), true); // read all input
+        setLookAhead(0, createRowStreamLookAhead(this, inputCounter.get(), queryRowInterfaces(input), CHOOSESETSPLUS_SMART_BUFFER_SIZE, true, false, RCUNBOUND, this), true); // read all input
     }
     virtual void start() override
     {

+ 1 - 1
thorlcr/activities/countproject/thcountprojectslave.cpp

@@ -153,7 +153,7 @@ public:
     virtual void setInputStream(unsigned index, CThorInput &_input, bool consumerOrdered) override
     {
         PARENT::setInputStream(index, _input, consumerOrdered);
-        setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), COUNTPROJECT_SMART_BUFFER_SIZE, true, false, RCUNBOUND, this, &container.queryJob().queryIDiskUsage()), true); // could spot disk write output here?
+        setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), COUNTPROJECT_SMART_BUFFER_SIZE, true, false, RCUNBOUND, this), true); // could spot disk write output here?
     }
     virtual void start()
     {

+ 1 - 1
thorlcr/activities/enth/thenthslave.cpp

@@ -84,7 +84,7 @@ protected:
 
                 // NB: this is post base start()
                 if (!hasLookAhead(0))
-                    setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), ENTH_SMART_BUFFER_SIZE, true, false, RCUNBOUND, this, &container.queryJob().queryIDiskUsage()), false);
+                    setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), ENTH_SMART_BUFFER_SIZE, true, false, RCUNBOUND, this), false);
                 else
                     startLookAhead(0);
             }

+ 1 - 1
thorlcr/activities/firstn/thfirstnslave.cpp

@@ -270,7 +270,7 @@ public:
             {
                 rowcount_t maxRead = (totallimit>(RCUNBOUND-_skipCount))?RCUNBOUND:totallimit+_skipCount;
                 setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), FIRSTN_SMART_BUFFER_SIZE, ::canStall(input), false,
-                                                                                  maxRead, this, &container.queryJob().queryIDiskUsage()), false); // if a very large limit don't bother truncating
+                                                                                  maxRead, this), false); // if a very large limit don't bother truncating
                 lastTotalLimitState = totallimit;
                 lastSkipCountState = _skipCount;
             }

+ 1 - 5
thorlcr/activities/indexwrite/thindexwriteslave.cpp

@@ -281,7 +281,7 @@ public:
         start();
 
         if (ensureStartFTLookAhead(0))
-            setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), INDEXWRITE_SMART_BUFFER_SIZE, true, false, RCUNBOUND, this, &container.queryJob().queryIDiskUsage()), false);
+            setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), INDEXWRITE_SMART_BUFFER_SIZE, true, false, RCUNBOUND, this), false);
 
         if (refactor)
         {
@@ -603,8 +603,6 @@ public:
             Owned<IFile> ifile = createIFile(partFname.str());
             offset_t sz = ifile->size();
             mb.append(sz);
-            if ((offset_t)-1 != sz)
-                container.queryJob().queryIDiskUsage().increase(sz);
             CDateTime createTime, modifiedTime, accessedTime;
             ifile->getTime(&createTime, &modifiedTime, &accessedTime);
             modifiedTime.serialize(mb);
@@ -617,8 +615,6 @@ public:
                 ifile.setown(createIFile(path.str()));
                 sz = ifile->size();
                 mb.append(sz);
-                if ((offset_t)-1 != sz)
-                    container.queryJob().queryIDiskUsage().increase(sz);
                 ifile->getTime(&createTime, &modifiedTime, &accessedTime);
                 modifiedTime.serialize(mb);
             }

+ 1 - 1
thorlcr/activities/iterate/thiterateslave.cpp

@@ -82,7 +82,7 @@ public:
         if (global) // only want lookahead if global (hence serial)
         {
             if (ensureStartFTLookAhead(0))
-                setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), ENTH_SMART_BUFFER_SIZE, true, false, RCUNBOUND, NULL, &container.queryJob().queryIDiskUsage()), false);
+                setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), ENTH_SMART_BUFFER_SIZE, true, false, RCUNBOUND, NULL), false);
         }
         count = 0;
         eof = nextPut = false;

+ 3 - 3
thorlcr/activities/join/thjoinslave.cpp

@@ -216,7 +216,7 @@ public:
             portbase = queryJobChannel().allocPort(NUMSLAVEPORTS);
             ActPrintLog("SortJoinSlaveActivity::init portbase = %d, mpTagRPC=%d",portbase,(int)mpTagRPC);
             server.setLocalHost(portbase); 
-            sorter.setown(CreateThorSorter(this, server,&container.queryJob().queryIDiskUsage(),&queryJobChannel().queryJobComm(),mpTagRPC));
+            sorter.setown(CreateThorSorter(this, server, &queryJobChannel().queryJobComm(), mpTagRPC));
             server.serialize(slaveData);
         }
 
@@ -272,7 +272,7 @@ public:
                 IThorDataLink *secondaryInput = queryInput(secondaryInputIndex);
                 // NB: lookahead told not to preserveGroups
                 setLookAhead(secondaryInputIndex, createRowStreamLookAhead(this, secondaryInputStream, queryRowInterfaces(secondaryInput), JOIN_SMART_BUFFER_SIZE, ::canStall(secondaryInput),
-                             false, RCUNBOUND, this, &container.queryJob().queryIDiskUsage()), false);
+                             false, RCUNBOUND, this), false);
             }
             secondaryInputStream = queryInputStream(secondaryInputIndex); // either lookahead or underlying stream, depending on whether active or not
         }
@@ -402,7 +402,7 @@ public:
         PARENT::reset();
         if (sorter) return; // JCSMORE loop - shouldn't have to recreate sorter between loop iterations
         if (!islocal && TAG_NULL != mpTagRPC)
-            sorter.setown(CreateThorSorter(this, server,&container.queryJob().queryIDiskUsage(),&queryJobChannel().queryJobComm(),mpTagRPC));
+            sorter.setown(CreateThorSorter(this, server, &queryJobChannel().queryJobComm(), mpTagRPC));
     }
     virtual void kill() override
     {

+ 0 - 4
thorlcr/activities/keydiff/thkeydiffslave.cpp

@@ -160,8 +160,6 @@ public:
         StringBuffer tmpStr;
         Owned<IFile> ifile = createIFile(getPartFilename(*patchPart, 0, tmpStr).str());
         offset_t sz = ifile->size();
-        if ((offset_t)-1 != sz)
-            container.queryJob().queryIDiskUsage().increase(sz);
         mb.append(sz);
 
         CDateTime createTime, modifiedTime, accessedTime;
@@ -177,8 +175,6 @@ public:
         {
             Owned<IFile> ifile = createIFile(getPartFilename(*patchTlkPart, 0, tmpStr.clear()).str());
             offset_t sz = ifile->size();
-            if ((offset_t)-1 != sz)
-                container.queryJob().queryIDiskUsage().increase(sz);
             mb.append(sz);
 
             CDateTime createTime, modifiedTime, accessedTime;

+ 0 - 4
thorlcr/activities/keypatch/thkeypatchslave.cpp

@@ -153,8 +153,6 @@ public:
         StringBuffer newIndexFilePath;
         Owned<IFile> ifile = createIFile(getPartFilename(*newIndexPart, 0, newIndexFilePath).str());
         offset_t sz = ifile->size();
-        if ((offset_t)-1 != sz)
-            container.queryJob().queryIDiskUsage().increase(sz);
         mb.append(sz);
 
         CDateTime createTime, modifiedTime, accessedTime;
@@ -170,8 +168,6 @@ public:
             StringBuffer filePath;
             Owned<IFile> ifile = createIFile(getPartFilename(*newIndexTlkPart, 0, filePath).str());
             offset_t sz = ifile->size();
-            if ((offset_t)-1 != sz)
-                container.queryJob().queryIDiskUsage().increase(sz);
             mb.append(sz);
 
             CDateTime createTime, modifiedTime, accessedTime;

+ 1 - 1
thorlcr/activities/lookupjoin/thlookupjoinslave.cpp

@@ -1390,7 +1390,7 @@ public:
         {
             startInput(0);
             if (ensureStartFTLookAhead(0))
-                setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), LOOKUPJOINL_SMART_BUFFER_SIZE, ::canStall(input), grouped, RCUNBOUND, this, &container.queryJob().queryIDiskUsage()), false);
+                setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), LOOKUPJOINL_SMART_BUFFER_SIZE, ::canStall(input), grouped, RCUNBOUND, this), false);
             left.set(inputStream); // can be replaced by loader stream
         }
         catch(IException *e)

+ 2 - 2
thorlcr/activities/msort/thmsortslave.cpp

@@ -76,7 +76,7 @@ public:
         ActPrintLog("MSortSlaveActivity::init portbase = %d, mpTagRPC = %d",portbase,(int)mpTagRPC);
         server.setLocalHost(portbase); 
         helper = (IHThorSortArg *)queryHelper();
-        sorter.setown(CreateThorSorter(this, server,&container.queryJob().queryIDiskUsage(),&queryJobChannel().queryJobComm(),mpTagRPC));
+        sorter.setown(CreateThorSorter(this, server, &queryJobChannel().queryJobComm(), mpTagRPC));
         server.serialize(slaveData);
     }
     virtual void start() override
@@ -169,7 +169,7 @@ public:
     {
         PARENT::reset();
         if (sorter) return; // JCSMORE loop - shouldn't have to recreate sorter between loop iterations
-        sorter.setown(CreateThorSorter(this, server,&container.queryJob().queryIDiskUsage(),&queryJobChannel().queryJobComm(),mpTagRPC));
+        sorter.setown(CreateThorSorter(this, server, &queryJobChannel().queryJobComm(), mpTagRPC));
     }
     virtual void kill() override
     {

+ 1 - 1
thorlcr/activities/nsplitter/thnsplitterslave.cpp

@@ -227,7 +227,7 @@ public:
                     {
                         StringBuffer tempname;
                         GetTempName(tempname, "nsplit", true); // use alt temp dir
-                        smartBuf.setown(createSharedSmartDiskBuffer(this, tempname.str(), numOutputs, queryRowInterfaces(input), &container.queryJob().queryIDiskUsage()));
+                        smartBuf.setown(createSharedSmartDiskBuffer(this, tempname.str(), numOutputs, queryRowInterfaces(input)));
                         ActPrintLog("Using temp spill file: %s", tempname.str());
                     }
                     else

+ 1 - 1
thorlcr/activities/pull/thpullslave.cpp

@@ -38,7 +38,7 @@ public:
         ActivityTimer s(slaveTimerStats, timeActivities);
         PARENT::start();
         if (ensureStartFTLookAhead(0))
-            setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), PULL_SMART_BUFFER_SIZE, true, false, RCUNBOUND, NULL, &container.queryJob().queryIDiskUsage()), false);
+            setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), PULL_SMART_BUFFER_SIZE, true, false, RCUNBOUND, NULL), false);
     }
     const void * nextRow() override
     {

+ 1 - 1
thorlcr/activities/rollup/throllupslave.cpp

@@ -182,7 +182,7 @@ public:
         if (global)
         {
             if (ensureStartFTLookAhead(0))
-                setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), rollup?ROLLUP_SMART_BUFFER_SIZE:DEDUP_SMART_BUFFER_SIZE, ::canStall(input), false, RCUNBOUND, NULL, &container.queryJob().queryIDiskUsage()), false);
+                setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), rollup?ROLLUP_SMART_BUFFER_SIZE:DEDUP_SMART_BUFFER_SIZE, ::canStall(input), false, RCUNBOUND, NULL), false);
         }
         needFirstRow = true;
         rowif.set(queryRowInterfaces(input));

+ 1 - 1
thorlcr/activities/selectnth/thselectnthslave.cpp

@@ -101,7 +101,7 @@ public:
         if (!isLocal && rowN)
         {
             if (ensureStartFTLookAhead(0))
-                setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), SELECTN_SMART_BUFFER_SIZE, ::canStall(input), false, rowN, this, &container.queryJob().queryIDiskUsage()), false);
+                setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), SELECTN_SMART_BUFFER_SIZE, ::canStall(input), false, rowN, this), false);
         }
 
         seenNth = false;

+ 2 - 2
thorlcr/activities/selfjoin/thselfjoinslave.cpp

@@ -117,7 +117,7 @@ public:
             barrier.setown(container.queryJobChannel().createBarrier(barrierTag));
             portbase = queryJobChannel().allocPort(NUMSLAVEPORTS);
             server.setLocalHost(portbase);
-            sorter.setown(CreateThorSorter(this, server,&container.queryJob().queryIDiskUsage(),&queryJobChannel().queryJobComm(),mpTagRPC));
+            sorter.setown(CreateThorSorter(this, server, &queryJobChannel().queryJobComm(), mpTagRPC));
             server.serialize(slaveData);
         }
         compare = helper->queryCompareLeft();                   // NB not CompareLeftRight
@@ -134,7 +134,7 @@ public:
         PARENT::reset();
         if (sorter) return; // JCSMORE loop - shouldn't have to recreate sorter between loop iterations
         if (!isLocal && TAG_NULL != mpTagRPC)
-            sorter.setown(CreateThorSorter(this, server,&container.queryJob().queryIDiskUsage(),&queryJobChannel().queryJobComm(),mpTagRPC));
+            sorter.setown(CreateThorSorter(this, server, &queryJobChannel().queryJobComm(), mpTagRPC));
     }
     virtual void kill() override
     {

+ 0 - 2
thorlcr/activities/spill/thspillslave.cpp

@@ -137,8 +137,6 @@ public:
     {
         Owned<IFile> ifile = createIFile(fileName.str());
         offset_t sz = ifile->size();
-        if ((offset_t)-1 != sz)
-            container.queryJob().queryIDiskUsage().increase(sz);        
         mb.append(getDataLinkCount()).append(compress?uncompressedBytesWritten:sz).append(sz);
         unsigned crc = compress?~0:fileCRC.get();
         mb.append(crc);

+ 3 - 3
thorlcr/activities/thactivityutil.cpp

@@ -193,7 +193,7 @@ public:
         return 0;
     }
 
-    CRowStreamLookAhead(CSlaveActivity &_activity, IEngineRowStream *_inputStream, IThorRowInterfaces *_rowIf, size32_t _bufsize, bool _allowspill, bool _preserveGrouping, rowcount_t _required, ILookAheadStopNotify *_notify, IDiskUsage *_iDiskUsage)
+    CRowStreamLookAhead(CSlaveActivity &_activity, IEngineRowStream *_inputStream, IThorRowInterfaces *_rowIf, size32_t _bufsize, bool _allowspill, bool _preserveGrouping, rowcount_t _required, ILookAheadStopNotify *_notify)
         : thread(*this), activity(_activity), inputStream(_inputStream), rowIf(_rowIf)
     {
 #ifdef _FULL_TRACE
@@ -267,9 +267,9 @@ public:
 };
 
 
-IStartableEngineRowStream *createRowStreamLookAhead(CSlaveActivity *activity, IEngineRowStream *inputStream, IThorRowInterfaces *rowIf, size32_t bufsize, bool allowspill, bool preserveGrouping, rowcount_t maxcount, ILookAheadStopNotify *notify, IDiskUsage *iDiskUsage)
+IStartableEngineRowStream *createRowStreamLookAhead(CSlaveActivity *activity, IEngineRowStream *inputStream, IThorRowInterfaces *rowIf, size32_t bufsize, bool allowspill, bool preserveGrouping, rowcount_t maxcount, ILookAheadStopNotify *notify)
 {
-    return new CRowStreamLookAhead(*activity, inputStream, rowIf, bufsize, allowspill, preserveGrouping, maxcount, notify, iDiskUsage);
+    return new CRowStreamLookAhead(*activity, inputStream, rowIf, bufsize, allowspill, preserveGrouping, maxcount, notify);
 }
 
 void initMetaInfo(ThorDataLinkMetaInfo &info)

+ 1 - 2
thorlcr/activities/thactivityutil.ipp

@@ -77,8 +77,7 @@ interface ILookAheadStopNotify
 {
     virtual void onInputFinished(rowcount_t count) = 0;
 };
-interface IDiskUsage;
-IStartableEngineRowStream *createRowStreamLookAhead(CSlaveActivity *activity, IEngineRowStream *inputStream, IThorRowInterfaces *rowIf, size32_t bufsize, bool spillenabled, bool preserveGrouping=true, rowcount_t maxcount=RCUNBOUND, ILookAheadStopNotify *notify=NULL, IDiskUsage *_diskUsage=NULL); //maxcount is maximum rows to read set to RCUNBOUND for all
+IStartableEngineRowStream *createRowStreamLookAhead(CSlaveActivity *activity, IEngineRowStream *inputStream, IThorRowInterfaces *rowIf, size32_t bufsize, bool spillenabled, bool preserveGrouping=true, rowcount_t maxcount=RCUNBOUND, ILookAheadStopNotify *notify=NULL); //maxcount is maximum rows to read set to RCUNBOUND for all
 
 
 StringBuffer &locateFilePartPath(CActivityBase *activity, const char *logicalFilename, IPartDescriptor &partDesc, StringBuffer &filePath);

+ 1 - 3
thorlcr/activities/thdiskbaseslave.cpp

@@ -335,7 +335,7 @@ void CDiskWriteSlaveActivityBase::open()
         if (hasLookAhead(0))
             startLookAhead(0);
         else
-            setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), PROCESS_SMART_BUFFER_SIZE, ::canStall(input), grouped, RCUNBOUND, NULL, &container.queryJob().queryIDiskUsage()), false);
+            setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), PROCESS_SMART_BUFFER_SIZE, ::canStall(input), grouped, RCUNBOUND, NULL), false);
         if (!rfsQueryParallel)
         {
             ActPrintLog("Blocked, waiting for previous part to complete write");
@@ -653,8 +653,6 @@ void CDiskWriteSlaveActivityBase::processDone(MemoryBuffer &mb)
     rowcount_t _processed = processed & THORDATALINK_COUNT_MASK;
     Owned<IFile> ifile = createIFile(fName);
     offset_t sz = ifile->size();
-    if ((offset_t)-1 != sz)
-        container.queryJob().queryIDiskUsage().increase(sz);
     mb.append(_processed).append(compress?uncompressedBytesWritten:sz).append(sz);
     // NB: block compressed output has implicit crc of 0.
     unsigned crc = compress?~0:fileCRC.get();

+ 1 - 1
thorlcr/activities/wuidwrite/thwuidwriteslave.cpp

@@ -93,7 +93,7 @@ public:
         processed = THORDATALINK_STARTED;
 
         if (ensureStartFTLookAhead(0))
-            setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), WORKUNITWRITE_SMART_BUFFER_SIZE, ::canStall(input), grouped, RCUNBOUND, NULL, &container.queryJob().queryIDiskUsage()), false);
+            setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), WORKUNITWRITE_SMART_BUFFER_SIZE, ::canStall(input), grouped, RCUNBOUND, NULL), false);
 
         ::ActPrintLog(this, thorDetailedLogLevel, "WORKUNITWRITE: processing first block");
 

+ 0 - 12
thorlcr/graph/thgraph.cpp

@@ -2712,7 +2712,6 @@ public:
 
 CJobBase::CJobBase(ILoadedDllEntry *_querySo, const char *_graphName) : querySo(_querySo), graphName(_graphName)
 {
-    maxDiskUsage = diskUsage = 0;
     dirty = true;
     aborted = false;
     queryMemoryMB = 0;
@@ -2982,17 +2981,6 @@ void CJobBase::abort(IException *e)
     }
 }
 
-void CJobBase::increase(offset_t usage, const char *key)
-{
-    diskUsage += usage;
-    if (diskUsage > maxDiskUsage) maxDiskUsage = diskUsage;
-}
-
-void CJobBase::decrease(offset_t usage, const char *key)
-{
-    diskUsage -= usage;
-}
-
 // these getX methods for property in workunit settings, then global setting, defaulting to provided 'dft' if not present
 StringBuffer &CJobBase::getOpt(const char *opt, StringBuffer &out)
 {

+ 1 - 18
thorlcr/graph/thgraph.hpp

@@ -117,12 +117,6 @@ interface ICodeContextExt : extends ICodeContext
     virtual IConstWUResult *getResultForGet(const char *name, unsigned sequence) = 0;
 };
 
-interface IDiskUsage : extends IInterface
-{
-    virtual void increase(offset_t usage, const char *key=NULL) = 0;
-    virtual void decrease(offset_t usage, const char *key=NULL) = 0;
-};
-
 interface IBackup;
 interface IFileInProgressHandler;
 interface IThorFileCache;
@@ -834,13 +828,12 @@ interface ILoadedDllEntry;
 interface IConstWorkUnit;
 class CThorCodeContextBase;
 
-class graph_decl CJobBase : public CInterface, implements IDiskUsage, implements IExceptionHandler
+class graph_decl CJobBase : public CInterfaceOf<IInterface>, implements IExceptionHandler
 {
 protected:
     CriticalSection crit;
     Linked<ILoadedDllEntry> querySo;
     IUserDescriptor *userDesc;
-    offset_t maxDiskUsage, diskUsage;
     StringAttr key, graphName;
     bool aborted, pausing, resumed;
     StringBuffer wuid, user, scope, token;
@@ -890,8 +883,6 @@ protected:
     SafePluginMap *pluginMap;
     virtual void endJob();
 public:
-    IMPLEMENT_IINTERFACE;
-
     CJobBase(ILoadedDllEntry *querySo, const char *graphName);
     virtual void beforeDispose() override;
 
@@ -940,9 +931,6 @@ public:
     const char *queryWuid() const { return wuid.str(); }
     const char *queryUser() const { return user.str(); }
     const char *queryScope() const { return scope.str(); }
-    IDiskUsage &queryIDiskUsage() const { return *(IDiskUsage *)this; }
-    void setDiskUsage(offset_t _diskUsage) { diskUsage = _diskUsage; }
-    offset_t queryMaxDiskUsage() const { return maxDiskUsage; }
     mptag_t querySlaveMpTag() const { return slavemptag; }
     unsigned querySlaves() const { return slaveGroup->ordinality(); }
     unsigned queryNodes() const { return nodeGroup->ordinality()-1; }
@@ -968,11 +956,6 @@ public:
 
 //
     virtual void addCreatedFile(const char *file) { assertex(false); }
-    virtual __int64 addNodeDiskUsage(unsigned node, __int64 sz) { assertex(false); return 0; }
-
-// IDiskUsage
-    virtual void increase(offset_t usage, const char *key=NULL);
-    virtual void decrease(offset_t usage, const char *key=NULL);
 
 // IExceptionHandler
     virtual bool fireException(IException *e) = 0;

+ 0 - 75
thorlcr/graph/thgraphmaster.cpp

@@ -1394,7 +1394,6 @@ CJobMaster::CJobMaster(IConstWorkUnit &_workunit, const char *graphName, ILoaded
     resumed = WUActionResume == workunit->getAction();
     fatalHandler.setown(new CFatalHandler(globals->getPropInt("@fatal_timeout", FATAL_TIMEOUT)));
     querySent = spillsSaved = false;
-    nodeDiskUsageCached = false;
 
     StringBuffer pluginsDir;
     globals->getProp("@pluginsPath", pluginsDir);
@@ -1566,35 +1565,6 @@ void CJobMaster::broadcast(ICommunicator &comm, CMessageBuffer &msg, mptag_t mpt
     }
 }
 
-void CJobMaster::initNodeDUCache()
-{
-    if (!nodeDiskUsageCached)
-    {
-        nodeDiskUsageCached = true;
-        Owned<IPropertyTreeIterator> fileIter = &workunit->getFileIterator();
-        ForEach (*fileIter)
-        {
-            Owned<IDistributedFile> f = queryDistributedFileDirectory().lookup(fileIter->query().queryProp("@name"), userDesc, false, false, false, nullptr, defaultPrivilegedUser);
-            if (f)
-            {
-                unsigned n = f->numParts();
-                for (unsigned i=0;i<n;i++)
-                {
-                    Owned<IDistributedFilePart> part = f->getPart(i);
-                    offset_t sz = part->getFileSize(false, false);
-                    if (i>=nodeDiskUsage.ordinality())
-                        nodeDiskUsage.append(sz);
-                    else
-                    {
-                        sz += nodeDiskUsage.item(i);
-                        nodeDiskUsage.add(sz, i);
-                    }
-                }
-            }
-        }
-    }
-}
-
 IPropertyTree *CJobMaster::prepareWorkUnitInfo()
 {
     Owned<IPropertyTree> workUnitInfo = createPTree("workUnitInfo");
@@ -1938,28 +1908,6 @@ void CJobMaster::pause(bool doAbort)
     }
 }
 
-__int64 CJobMaster::queryNodeDiskUsage(unsigned node)
-{
-    initNodeDUCache();
-    if (!nodeDiskUsage.isItem(node)) return 0;
-    return nodeDiskUsage.item(node);
-}
-
-void CJobMaster::setNodeDiskUsage(unsigned node, __int64 sz)
-{
-    initNodeDUCache();
-    while (nodeDiskUsage.ordinality() <= node)
-        nodeDiskUsage.append(0);
-    nodeDiskUsage.replace(sz, node);
-}
-
-__int64 CJobMaster::addNodeDiskUsage(unsigned node, __int64 sz)
-{
-    sz += queryNodeDiskUsage(node);
-    setNodeDiskUsage(node, sz);
-    return sz;
-}
-
 bool CJobMaster::queryCreatedFile(const char *file)
 {
     StringBuffer scopedName;
@@ -2650,10 +2598,6 @@ void CMasterGraph::handleSlaveDone(unsigned node, MemoryBuffer &mb)
 
 void CMasterGraph::getFinalProgress()
 {
-    offset_t totalDiskUsage = 0;
-    offset_t minNodeDiskUsage = 0, maxNodeDiskUsage = 0;
-    unsigned maxNode = (unsigned)-1, minNode = (unsigned)-1;
-
     CMessageBuffer msg;
     mptag_t replyTag = queryJobChannel().queryMPServer().createReplyTag();
     msg.setReplyTag(replyTag);
@@ -2727,27 +2671,8 @@ void CMasterGraph::getFinalProgress()
                     }
                 }
             }
-            offset_t nodeDiskUsage;
-            msg.read(nodeDiskUsage);
-            jobM->setNodeDiskUsage(n, nodeDiskUsage);
-            if (nodeDiskUsage > maxNodeDiskUsage)
-            {
-                maxNodeDiskUsage = nodeDiskUsage;
-                maxNode = n;
-            }
-            if ((unsigned)-1 == minNode || nodeDiskUsage < minNodeDiskUsage)
-            {
-                minNodeDiskUsage = nodeDiskUsage;
-                minNode = n;
-            }
-            totalDiskUsage += nodeDiskUsage;
         }
     }
-    if (totalDiskUsage)
-    {
-        Owned<IWorkUnit> wu = &job.queryWorkUnit().lock();
-        wu->addDiskUsageStats(totalDiskUsage/queryJob().querySlaves(), minNode, minNodeDiskUsage, maxNode, maxNodeDiskUsage, queryGraphId());
-    }
 }
 
 void CMasterGraph::done()

+ 0 - 5
thorlcr/graph/thgraphmaster.ipp

@@ -156,8 +156,6 @@ class graphmaster_decl CJobMaster : public CJobBase
     Linked<IConstWorkUnit> workunit;
     Owned<IFatalHandler> fatalHandler;
     bool querySent, sendSo, spillsSaved;
-    Int64Array nodeDiskUsage;
-    bool nodeDiskUsageCached;
     StringArray createdFiles;
     Owned<CSlaveMessageHandler> slaveMsgHandler;
     SocketEndpoint agentEp;
@@ -214,10 +212,7 @@ public:
     virtual bool fireException(IException *e);
 
     virtual void addCreatedFile(const char *file);
-    virtual __int64 addNodeDiskUsage(unsigned node, __int64 sz);
 
-    __int64 queryNodeDiskUsage(unsigned node);
-    void setNodeDiskUsage(unsigned node, __int64 sz);
     bool queryCreatedFile(const char *file);
 
     virtual IFatalHandler *clearFatalHandler();

+ 0 - 1
thorlcr/graph/thgraphslave.cpp

@@ -1320,7 +1320,6 @@ void CSlaveGraph::getDone(MemoryBuffer &doneInfoMb)
                 if (globals->getPropBool("@watchdogProgressEnabled"))
                     jobS->queryProgressHandler()->stopGraph(*this, &doneInfoMb);
             }
-            doneInfoMb.append(job.queryMaxDiskUsage());
         }
         catch (IException *)
         {

+ 0 - 2
thorlcr/graph/thgraphslave.hpp

@@ -480,8 +480,6 @@ class graphslave_decl CJobSlave : public CJobBase
     unsigned actInitWaitTimeMins = DEFAULT_MAX_ACTINITWAITTIME_MINS;
 
 public:
-    IMPLEMENT_IINTERFACE;
-
     CJobSlave(ISlaveWatchdog *_watchdog, IPropertyTree *workUnitInfo, const char *graphName, ILoadedDllEntry *querySo, mptag_t _slavemptag);
 
     virtual CJobChannel *addChannel(IMPServer *mpServer) override;

+ 0 - 14
thorlcr/mfilemanager/thmfilemanager.cpp

@@ -433,21 +433,7 @@ public:
                     }
                 }
                 if (found)
-                {
                     workunit->releaseFile(logicalName);
-                    Owned<IDistributedFile> f = timedLookup(job, dlfn, false, true, job.queryMaxLfnBlockTimeMins() * 60000);
-                    if (f)
-                    {
-                        unsigned p, parts = f->numParts();
-                        for (p=0; p<parts; p++)
-                        {
-                            Owned<IDistributedFilePart> part = f->getPart(p);
-                            offset_t sz = part->getFileSize(false, false);
-                            if ((offset_t)-1 != sz)
-                                job.addNodeDiskUsage(p, -(__int64)sz);
-                        }
-                    }
-                }
             }
             if (efile.get())
             {

+ 3 - 3
thorlcr/msort/tsorts.cpp

@@ -775,7 +775,7 @@ class CThorSorter : public CSimpleInterface, implements IThorSorter, implements
 public:
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
 
-    CThorSorter(CActivityBase *_activity, SocketEndpoint &ep, IDiskUsage *_iDiskUsage, ICommunicator *_clusterComm, mptag_t _mpTagRPC)
+    CThorSorter(CActivityBase *_activity, SocketEndpoint &ep, ICommunicator *_clusterComm, mptag_t _mpTagRPC)
         : activity(_activity), myendpoint(ep), clusterComm(_clusterComm), mpTagRPC(_mpTagRPC),
           rowArray(*_activity, _activity), threaded("CThorSorter", this), spillStats(spillStatistics)
     {
@@ -1363,8 +1363,8 @@ public:
 
 //==============================================================================
 
-THORSORT_API IThorSorter *CreateThorSorter(CActivityBase *activity, SocketEndpoint &ep,IDiskUsage *iDiskUsage,ICommunicator *clusterComm, mptag_t _mpTagRPC)
+THORSORT_API IThorSorter *CreateThorSorter(CActivityBase *activity, SocketEndpoint &ep, ICommunicator *clusterComm, mptag_t _mpTagRPC)
 {
-    return new CThorSorter(activity, ep, iDiskUsage, clusterComm, _mpTagRPC);
+    return new CThorSorter(activity, ep, clusterComm, _mpTagRPC);
 }
 

+ 1 - 2
thorlcr/msort/tsorts.hpp

@@ -65,7 +65,6 @@ public:
     virtual rowcount_t getGlobalCount() const = 0;
 };
 
-interface IDiskUsage;
 interface ICommunicator;
 interface IOutputRowSerializer;
 
@@ -76,7 +75,7 @@ interface ISocketRowWriter: extends IRowWriter
 };
 
 class CActivityBase;
-THORSORT_API IThorSorter *CreateThorSorter(CActivityBase *activity, SocketEndpoint &ep,IDiskUsage *iDiskUsage,ICommunicator *clusterComm, mptag_t _mpTagRPC);
+THORSORT_API IThorSorter *CreateThorSorter(CActivityBase *activity, SocketEndpoint &ep, ICommunicator *clusterComm, mptag_t _mpTagRPC);
 IRowStream *ConnectMergeRead(unsigned id,IThorRowInterfaces *rowif,SocketEndpoint &nodeaddr,rowcount_t startrec,rowcount_t numrecs, ISocket *socket);
 ISocketRowWriter *ConnectMergeWrite(IThorRowInterfaces *rowif,ISocket *socket,size32_t bufsize,rowcount_t &startrec,rowcount_t &numrecs);
 #define SOCKETSERVERINC                    1

+ 5 - 13
thorlcr/thorutil/thbuf.cpp

@@ -1197,7 +1197,6 @@ bool CRowSet::Release() const
 
 class CSharedWriteAheadDisk : public CSharedWriteAheadBase
 {
-    IDiskUsage *iDiskUsage;
     Owned<IFile> spillFile;
     Owned<IFileIO> spillFileIO;
     CIArrayOf<Chunk> freeChunks;
@@ -1316,14 +1315,7 @@ class CSharedWriteAheadDisk : public CSharedWriteAheadBase
         ActPrintLog(activity, "getOutOffset: got new upper offset # = %" I64F "d", highOffset);
 #endif
         Chunk *chunk = new Chunk(highOffset, required);
-        if (iDiskUsage)
-        {
-            iDiskUsage->decrease(highOffset);
-            highOffset += required; // NB next
-            iDiskUsage->increase(highOffset);
-        }
-        else
-            highOffset += required; // NB next
+        highOffset += required; // NB next
         return chunk;
     }
     inline void mergeFreeChunk(Chunk *dst, const Chunk *src)
@@ -1511,8 +1503,8 @@ class CSharedWriteAheadDisk : public CSharedWriteAheadBase
         return ssz.size()+1; // space on disk, +1 = eog marker
     }
 public:
-    CSharedWriteAheadDisk(CActivityBase *activity, const char *spillName, unsigned outputCount, IThorRowInterfaces *rowIf, IDiskUsage *_iDiskUsage) : CSharedWriteAheadBase(activity, outputCount, rowIf),
-        allocator(rowIf->queryRowAllocator()), deserializer(rowIf->queryRowDeserializer()), serializeMeta(meta->querySerializedDiskMeta()), iDiskUsage(_iDiskUsage)
+    CSharedWriteAheadDisk(CActivityBase *activity, const char *spillName, unsigned outputCount, IThorRowInterfaces *rowIf) : CSharedWriteAheadBase(activity, outputCount, rowIf),
+        allocator(rowIf->queryRowAllocator()), deserializer(rowIf->queryRowDeserializer()), serializeMeta(meta->querySerializedDiskMeta())
     {
         assertex(spillName);
         spillFile.setown(createIFile(spillName));
@@ -1548,9 +1540,9 @@ public:
     }
 };
 
-ISharedSmartBuffer *createSharedSmartDiskBuffer(CActivityBase *activity, const char *spillname, unsigned outputs, IThorRowInterfaces *rowIf, IDiskUsage *iDiskUsage)
+ISharedSmartBuffer *createSharedSmartDiskBuffer(CActivityBase *activity, const char *spillname, unsigned outputs, IThorRowInterfaces *rowIf)
 {
-    return new CSharedWriteAheadDisk(activity, spillname, outputs, rowIf, iDiskUsage);
+    return new CSharedWriteAheadDisk(activity, spillname, outputs, rowIf);
 }
 
 class CSharedWriteAheadMem : public CSharedWriteAheadBase

+ 1 - 2
thorlcr/thorutil/thbuf.hpp

@@ -72,8 +72,7 @@ interface ISharedSmartBuffer : extends IRowWriter
 };
 
 extern graph_decl ISharedSmartBuffer *createSharedSmartMemBuffer(CActivityBase *activity, unsigned outputs, IThorRowInterfaces *rowif, unsigned buffSize=((unsigned)-1));
-interface IDiskUsage;
-extern graph_decl ISharedSmartBuffer *createSharedSmartDiskBuffer(CActivityBase *activity, const char *tempname, unsigned outputs, IThorRowInterfaces *rowif, IDiskUsage *iDiskUsage=NULL);
+extern graph_decl ISharedSmartBuffer *createSharedSmartDiskBuffer(CActivityBase *activity, const char *tempname, unsigned outputs, IThorRowInterfaces *rowif);
 
 
 interface IRowWriterMultiReader : extends IRowWriter