Pārlūkot izejas kodu

HPCC-27386 Clearup Thor temp file usage

Signed-off-by: Jake Smith <jake.smith@lexisnexisrisk.com>
Jake Smith 3 gadi atpakaļ
vecāks
revīzija
940089266e

+ 4 - 4
thorlcr/activities/hashdistrib/thhashdistribslave.cpp

@@ -1150,7 +1150,7 @@ public:
         if (allowSpill)
         {
             StringBuffer temp;
-            GetTempName(temp,"hddrecvbuff", true);
+            GetTempFilePath(temp,"hddrecvbuff");
             piperd.setown(createSmartBuffer(activity, temp.str(), pullBufferSize, rowIf));
         }
         else
@@ -1814,7 +1814,7 @@ public:
                 if (!cachefileio.get())
                 {
                     StringBuffer tempname;
-                    GetTempName(tempname,"hashdistspill",true);
+                    GetTempFilePath(tempname,"hashdistspill");
                     cachefile.setown(createIFile(tempname.str()));
                     cachefileio.setown(cachefile->open(IFOcreaterw));
                     if (!cachefileio)
@@ -2257,7 +2257,7 @@ public:
             unsigned rwFlags = DEFAULT_RWFLAGS;
             sz = 0;
             StringBuffer tempname;
-            GetTempName(tempname,"hdprop",true); // use alt temp dir
+            GetTempFilePath(tempname,"hdprop");
             tempfile.setown(createIFile(tempname.str()));
             {
                 ActPrintLogEx(&activity->queryContainer(), thorlog_null, MCwarning, "REDISTRIBUTE size unknown, spilling to disk");
@@ -2665,7 +2665,7 @@ public:
         count = 0;
         StringBuffer tempname, prefix("hashdedup_bucket");
         prefix.append(bucketN).append('_').append(desc);
-        GetTempName(tempname, prefix.str(), true);
+        GetTempFilePath(tempname, prefix.str());
         OwnedIFile iFile = createIFile(tempname.str());
         spillFile.setown(new CFileOwner(iFile.getLink()));
         if (owner.getOptBool(THOROPT_COMPRESS_SPILLS, true))

+ 1 - 1
thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp

@@ -3634,7 +3634,7 @@ public:
         };
 
         StringBuffer tmpFileName;
-        GetTempName(tmpFileName, "kjgroup");
+        GetTempFilePath(tmpFileName, "kjgroup");
         Owned<IFile> iFile = createIFile(tmpFileName);
         Owned<IRowStreamWithFpos> rowStream = new CRowStreamWithFpos(iFile, keyLookupReplyOutputMetaRowIf, preserveOrder ? totalIndexParts : 0);
 

+ 2 - 2
thorlcr/activities/lookupjoin/thlookupjoinslave.cpp

@@ -1877,7 +1877,7 @@ protected:
 
                         VStringBuffer tempPrefix("spill_%d", container.queryId());
                         StringBuffer tempName;
-                        GetTempName(tempName, tempPrefix.str(), true);
+                        GetTempFilePath(tempName, tempPrefix.str());
                         file.setown(new CFileOwner(createIFile(tempName.str())));
                         VStringBuffer spillPrefixStr("clearAllNonLocalRows(%d)", SPILL_PRIORITY_SPILLABLE_STREAM);
                         // 3rd param. is skipNulls = true, the row arrays may have had the non-local rows delete already.
@@ -2941,7 +2941,7 @@ public:
             rwFlags |= spillCompInfo;
         }
         StringBuffer tempFilename;
-        GetTempName(tempFilename, "lookup_local", true);
+        GetTempFilePath(tempFilename, "lookup_local");
         ActPrintLog("Overflowing RHS broadcast rows to spill file: %s", tempFilename.str());
         OwnedIFile iFile = createIFile(tempFilename.str());
         overflowWriteFile.setown(new CFileOwner(iFile.getLink()));

+ 1 - 1
thorlcr/activities/merge/thmergeslave.cpp

@@ -320,7 +320,7 @@ public:
         chunkmaxsize = MERGE_TRANSFER_BUFFER_SIZE;
         Owned<IRowStream> merged = createRowStreamMerger(streams.ordinality(), streams.getArray(), helper->queryCompare(),helper->dedup(), linkcounter);
         StringBuffer tmpname;
-        GetTempName(tmpname,"merge",true); // use alt temp dir
+        GetTempFilePath(tmpname,"merge");
         tmpfile.setown(createIFile(tmpname.str()));
         Owned<IRowWriter> writer =  createRowWriter(tmpfile, this);
         CThorKeyArray sample(*this, this, helper->querySerialize(), helper->queryCompare(), helper->queryCompareKey(), helper->queryCompareRowKey());

+ 1 - 1
thorlcr/activities/nsplitter/thnsplitterslave.cpp

@@ -226,7 +226,7 @@ public:
                     if (spill)
                     {
                         StringBuffer tempname;
-                        GetTempName(tempname, "nsplit", true); // use alt temp dir
+                        GetTempFilePath(tempname, "nsplit");
                         smartBuf.setown(createSharedSmartDiskBuffer(this, tempname.str(), numOutputs, queryRowInterfaces(input)));
                         ActPrintLog("Using temp spill file: %s", tempname.str());
                     }

+ 7 - 4
thorlcr/activities/thactivityutil.cpp

@@ -96,7 +96,7 @@ public:
         {
             StringBuffer temp;
             if (allowspill)
-                GetTempName(temp,"lookahd",true);
+                GetTempFilePath(temp,"lookahd");
             assertex(bufsize);
             if (allowspill)
                 smartbuf.setown(createSmartBuffer(&activity, temp.str(), bufsize, rowIf));
@@ -747,15 +747,18 @@ IFileIO *createMultipleWrite(CActivityBase *activity, IPartDescriptor &partDesc,
     else
     {
         // use temp name
-        GetTempName(outLocationName, "partial");
         if (rfn.isLocal() || (twFlags & TW_External))
-        { // ensure local tmp in same directory as target
+        {
+            // ensure local tmp in same directory (and plane) as target
             StringBuffer dir;
             splitDirTail(primaryName, dir);
             addPathSepChar(dir);
-            dir.append(pathTail(outLocationName));
+            GetTempFileName(dir, "partial");
             outLocationName.swapWith(dir);
         }
+        else
+            GetTempFilePath(outLocationName, "partial");
+
         assertex(outLocationName.length());
         ensureDirectoryForFile(outLocationName.str());
     }

+ 2 - 2
thorlcr/activities/thdiskbaseslave.cpp

@@ -236,7 +236,7 @@ void CDiskReadSlaveActivityBase::init(MemoryBuffer &data, MemoryBuffer &slaveDat
 
         if (helper->getFlags() & TDXtemporary)
         {
-            // put temp files in individual slave temp dirs (incl port)
+            // put temp files in temp dir
             if (!container.queryJob().queryUseCheckpoints())
                 partDescs.item(0).queryOwner().setDefaultDir(queryTempDir());
         }
@@ -520,7 +520,7 @@ void CDiskWriteSlaveActivityBase::init(MemoryBuffer &data, MemoryBuffer &slaveDa
     }
     partDesc.setown(deserializePartFileDescriptor(data));
 
-    // put temp files in individual slave temp dirs (incl port)
+    // put temp files in temp dir
     if ((diskHelperBase->getFlags() & TDXtemporary) && (!container.queryJob().queryUseCheckpoints()))
         partDesc->queryOwner().setDefaultDir(queryTempDir());
 

+ 1 - 1
thorlcr/graph/thgraph.cpp

@@ -2842,7 +2842,6 @@ CActivityBase &CJobBase::queryChannelActivity(unsigned c, graph_id gid, activity
 void CJobBase::startJob()
 {
     LOG(MCdebugProgress, thorJob, "New Graph started : %s", graphName.get());
-    ClearTempDirs();
     perfmonhook.setown(createThorMemStatsPerfMonHook(*this, getOptInt(THOROPT_MAX_KERNLOG, 3)));
     setPerformanceMonitorHook(perfmonhook);
     PrintMemoryStatusLog();
@@ -2922,6 +2921,7 @@ void CJobBase::endJob()
             exceptions.setown(makeMultiException());
         exceptions->append(*LINK(e));
     }
+    ClearTempDir();
     if (exceptions && exceptions->ordinality())
         throw exceptions.getClear();
 }

+ 6 - 0
thorlcr/graph/thgraphmaster.cpp

@@ -1420,6 +1420,12 @@ CJobMaster::CJobMaster(IConstWorkUnit &_workunit, const char *graphName, ILoaded
     slaveMsgHandler.setown(new CSlaveMessageHandler(*this, slavemptag));
     tmpHandler.setown(createTempHandler(true));
     xgmml.set(graphXGMML);
+
+    StringBuffer tempDir(globals->queryProp("@thorTempDirectory"));
+    // multiple thor jobs can be running on same node, sharing same local disk for temp storage.
+    // make unique by adding wuid+graphName+worker-num
+    VStringBuffer uniqueSubDir("%s_%s_0", workunit->queryWuid(), graphName); // 0 denotes master (workers = 1..N)
+    SetTempDir(tempDir, uniqueSubDir, "thtmp");
 }
 
 void CJobMaster::endJob()

+ 6 - 0
thorlcr/graph/thgraphslave.cpp

@@ -1709,6 +1709,12 @@ CJobSlave::CJobSlave(ISlaveWatchdog *_watchdog, IPropertyTree *_workUnitInfo, co
      */
     if (queryMaxLfnBlockTimeMins() >= actInitWaitTimeMins)
         actInitWaitTimeMins = queryMaxLfnBlockTimeMins()+1;
+
+    StringBuffer tempDir(globals->queryProp("@thorTempDirectory"));
+    // multiple thor jobs can be running on same node, sharing same local disk for temp storage.
+    // make unique by adding wuid+graphName+worker-num
+    VStringBuffer uniqueSubDir("%s_%s_%u", wuid.str(), graphName, globals->getPropInt("@slavenum"));
+    SetTempDir(tempDir, uniqueSubDir, "thtmp");
 }
 
 CJobChannel *CJobSlave::addChannel(IMPServer *mpServer)

+ 0 - 11
thorlcr/master/thmastermain.cpp

@@ -901,19 +901,8 @@ int main( int argc, const char *argv[]  )
             }
         }
 
-#ifdef _CONTAINERIZED
-        // multiple thor jobs can be running on same node, sharing same local disk for temp storage.
-        // make unique by adding wuid+graphName
-        addPathSepChar(tempDirStr).append(workunit);
-        addPathSepChar(tempDirStr).append(graphName);
-#endif
         // NB: set into globals, serialized and used by worker processes.
         globals->setProp("@thorTempDirectory", tempDirStr);
-        logDiskSpace(); // Log before temp space is cleared
-        StringBuffer tempPrefix("thtmp");
-        tempPrefix.append(getMasterPortBase()).append("_");
-        SetTempDir(0, tempDirStr.str(), tempPrefix.str(), true);
-        DBGLOG("Temp directory: %s", queryTempDir());
 
         startLogMsgParentReceiver();    
         connectLogMsgManagerToDali();

+ 1 - 1
thorlcr/mfilemanager/thmfilemanager.cpp

@@ -507,7 +507,7 @@ public:
             StringBuffer dir;
             bool dirPerPart = false;
             if (temporary && !job.queryUseCheckpoints()) 
-                dir.append(queryTempDir(false));
+                dir.append(queryTempDir());
             else
             {
                 StringBuffer planeDir;

+ 2 - 2
thorlcr/msort/tsorts.cpp

@@ -97,7 +97,7 @@ class CWriteIntercept : public CSimpleInterface
             {
                 // right create idx
                 StringBuffer tempname;
-                GetTempName(tempname.clear(),"srtidx",false);
+                GetTempFilePath(tempname.clear(),"srtidx");
                 idxFile.setown(createIFile(tempname.str()));
                 idxFileIO.setown(idxFile->open(IFOcreaterw));
                 if (!idxFileIO.get())
@@ -185,7 +185,7 @@ public:
     offset_t write(IRowStream *input)
     {
         StringBuffer tempname;
-        GetTempName(tempname,"srtmrg",false);
+        GetTempFilePath(tempname,"srtmrg");
         dataFile.setown(createIFile(tempname.str()));
         Owned<IExtRowWriter> output = createRowWriter(dataFile, rowIf);
 

+ 1 - 1
thorlcr/slave/slavmain.cpp

@@ -1866,7 +1866,7 @@ public:
                         if (!rfn.isLocal())
                         {
                             IWARNLOG("Cannot load shared object directly from remote path, creating temporary local copy: %s", soPath.str());
-                            GetTempName(tempSo,"so",true);
+                            GetTempFilePath(tempSo,"so");
                             copyFile(tempSo.str(), soPath.str());
                             soPath.clear().append(tempSo.str());
                         }

+ 5 - 13
thorlcr/slave/thslavemain.cpp

@@ -116,11 +116,13 @@ static bool RegisterSelf(SocketEndpoint &masterEp)
         mySlaveNum = (unsigned)processGroup->rank(queryMyNode());
         assertex(NotFound != mySlaveNum);
         mySlaveNum++; // 1 based;
-        unsigned configSlaveNum = globals->getPropInt("@slavenum", NotFound);
-        if (NotFound != configSlaveNum)
-            assertex(mySlaveNum == configSlaveNum);
 
+        unsigned configSlaveNum = globals->getPropInt("@slavenum", NotFound);
         globals.setown(createPTree(msg));
+        if (NotFound == configSlaveNum)
+            globals->setPropInt("@slavenum", mySlaveNum);
+        else
+            assertex(mySlaveNum == configSlaveNum);
 
         /* NB: preserve command line option overrides
          * Not sure if any cmdline options are actually needed by this stage..
@@ -487,18 +489,9 @@ int main( int argc, const char *argv[]  )
             }
 #endif
 
-            // NB: master has set, and serialized in globals
-            StringBuffer tempDirStr(globals->queryProp("@thorTempDirectory"));
-            addPathSepChar(tempDirStr).append(mySlaveNum);
-
-            logDiskSpace(); // Log before temp space is cleared
-            SetTempDir(mySlaveNum, tempDirStr.str(), "thtmp", true);
-
             useMemoryMappedRead(globals->getPropBool("@useMemoryMappedRead"));
 
             LOG(MCdebugProgress, thorJob, "ThorSlave Version LCR - %d.%d started",THOR_VERSION_MAJOR,THOR_VERSION_MINOR);
-            StringBuffer url;
-            LOG(MCdebugProgress, thorJob, "Slave %s - temporary dir set to : %s", slfEp.getUrlStr(url).str(), queryTempDir());
 #ifdef _WIN32
             ULARGE_INTEGER userfree;
             ULARGE_INTEGER total;
@@ -583,7 +576,6 @@ int main( int argc, const char *argv[]  )
 #ifndef _CONTAINERIZED
     stopPerformanceMonitor();
 #endif
-    ClearTempDirs();
 
     if (multiThorMemoryThreshold)
         setMultiThorMemoryNotify(0,NULL);

+ 2 - 2
thorlcr/thorutil/thmem.cpp

@@ -246,7 +246,7 @@ protected:
 
         StringBuffer tempName;
         VStringBuffer tempPrefix("streamspill_%d", activity.queryId());
-        GetTempName(tempName, tempPrefix.str(), true);
+        GetTempFilePath(tempName, tempPrefix.str());
         spillFile.setown(createIFile(tempName.str()));
 
         VStringBuffer spillPrefixStr("SpillableStream(%u)", spillPriority);
@@ -1653,7 +1653,7 @@ protected:
             tempPrefix.append("srt");
         }
         tempPrefix.appendf("spill_%d", activity.queryId());
-        GetTempName(tempName, tempPrefix.str(), true);
+        GetTempFilePath(tempName, tempPrefix.str());
         Owned<IFile> iFile = createIFile(tempName.str());
         VStringBuffer spillPrefixStr("%sRowCollector(%d)", tracingPrefix.str(), spillPriority);
         spillableRows.save(*iFile, spillCompInfo, false, spillPrefixStr.str()); // saves committed rows

+ 68 - 82
thorlcr/thorutil/thormisc.cpp

@@ -625,99 +625,77 @@ class CTempNameHandler
 {
 public:
     unsigned num;
-    StringAttr tempdir, tempPrefix;
-    StringAttr alttempdir; // only set if needed
+    StringBuffer rootDir, subDirName, prefix, subDirPath;
     CriticalSection crit;
-    bool altallowed;
-    bool cleardir;
-    unsigned slaveNum = 0;
 
     CTempNameHandler()
     {
         num = 0;
-        altallowed = false;
-        cleardir = false;
     }
-    ~CTempNameHandler()
-    {
-        if (cleardir) 
-            clearDirs(false);       // don't log as jlog may have closed
-    }
-    const char *queryTempDir(bool alt) 
+    const char *queryTempDir() 
     { 
-        if (alt&&altallowed) 
-            return alttempdir;
-        return tempdir; 
+        return subDirPath; 
     }
-    void setTempDir(unsigned _slaveNum, const char *name, const char *_tempPrefix, bool clear)
+    void setTempDir(const char *_rootDir, const char *_subDirName, const char *_prefix)
     {
-        assertex(name && *name);
+        assertex(!isEmptyString(_rootDir) && !isEmptyString(_prefix) && !isEmptyString(_subDirName));
         CriticalBlock block(crit);
-        slaveNum = _slaveNum;
-        assertex(tempdir.isEmpty()); // should only be called once
-        tempPrefix.set(_tempPrefix);
-        StringBuffer base(name);
-        addPathSepChar(base);
-        tempdir.set(base.str());
-        recursiveCreateDirectory(tempdir);
-#ifdef _WIN32
-        altallowed = false;
-#else
-        altallowed = globals->getPropBool("@thor_dual_drive",true);
-#endif
-        if (altallowed)
-        {
-            unsigned d = getPathDrive(tempdir);
-            if (d>1)
-                altallowed = false;
-            else
-            {
-                StringBuffer p(tempdir);
-                alttempdir.set(setPathDrive(p,d?0:1).str());
-                recursiveCreateDirectory(alttempdir);
-            }
-        }
-        cleardir = clear;
-        if (clear)
-            clearDirs(true);
+        assertex(subDirPath.isEmpty());
+        rootDir.set(_rootDir);
+        addPathSepChar(rootDir);
+        subDirName.set(_subDirName);
+        prefix.set(_prefix);
+        subDirPath.setf("%s%s", rootDir.str(), subDirName.str());
+        recursiveCreateDirectory(subDirPath);
     }
-    static void clearDir(const char *dir, bool log)
+    void clear(bool log)
     {
-        if (dir&&*dir)
+        assertex(subDirPath.length());
+        Owned<IDirectoryIterator> iter = createDirectoryIterator(subDirPath);
+        ForEach (*iter)
         {
-            Owned<IDirectoryIterator> iter = createDirectoryIterator(dir);
-            ForEach (*iter)
+            IFile &file = iter->query();
+            if (file.isFile()==fileBool::foundYes)
             {
-                IFile &file = iter->query();
-                if (file.isFile()==fileBool::foundYes)
+                if (log)
+                    LOG(MCdebugInfo, thorJob, "Deleting %s", file.queryFilename());
+                try { file.remove(); }
+                catch (IException *e)
                 {
                     if (log)
-                        LOG(MCdebugInfo, thorJob, "Deleting %s", file.queryFilename());
-                    try { file.remove(); }
-                    catch (IException *e)
-                    {
-                        if (log)
-                            FLLOG(MCwarning, thorJob, e);
-                        e->Release();
-                    }
+                        FLLOG(MCwarning, thorJob, e);
+                    e->Release();
                 }
             }
         }
+        try
+        {
+            Owned<IFile> dirIFile = createIFile(subDirPath);
+            bool success = dirIFile->remove();
+            if (log)
+                PROGLOG("%s to delete temp directory: %s", subDirPath.str(), success ? "succeeded" : "failed");
+        }
+        catch (IException *e)
+        {
+            if (log)
+                FLLOG(MCwarning, thorJob, e);
+            e->Release();
+        }
+        subDirPath.clear();
     }
-    void clearDirs(bool log)
-    {
-        clearDir(tempdir,log);
-        clearDir(alttempdir,log);
-    }
-    void getTempName(StringBuffer &name, const char *suffix,bool alt)
+    void getTempName(StringBuffer &name, const char *suffix, bool inTempDir)
     {
         CriticalBlock block(crit);
-        assertex(!tempdir.isEmpty()); // should only be called once
-        if (alt && altallowed)
-            name.append(alttempdir);
+        assertex(!subDirPath.isEmpty());
+        if (inTempDir)
+        {
+            name.append(rootDir);
+            name.append(subDirName);
+            addPathSepChar(name);
+        }
         else
-            name.append(tempdir);
-        name.append(tempPrefix).append((unsigned)GetCurrentProcessId()).append('_').append(slaveNum).append('_').append(++num);
+            name.append(subDirName).append('_');
+        name.append(prefix).append('_').append(++num);
         if (suffix)
             name.append("__").append(suffix);
         name.append(".tmp");
@@ -726,31 +704,39 @@ public:
 
 
 
-void GetTempName(StringBuffer &name, const char *prefix,bool altdisk)
+void GetTempFileName(StringBuffer &name, const char *suffix)
 {
-    TempNameHandler.getTempName(name, prefix, altdisk);
+    TempNameHandler.getTempName(name, suffix, false);
 }
 
-void SetTempDir(unsigned slaveNum, const char *name, const char *tempPrefix, bool clear)
+void GetTempFilePath(StringBuffer &name, const char *suffix)
 {
-    TempNameHandler.setTempDir(slaveNum, name, tempPrefix, clear);
+    TempNameHandler.getTempName(name, suffix, true);
 }
 
-void ClearDir(const char *dir)
+void SetTempDir(const char *rootTempDir, const char *uniqueSubDir, const char *tempPrefix)
 {
-    CTempNameHandler::clearDir(dir,true);
+    TempNameHandler.setTempDir(rootTempDir, uniqueSubDir, tempPrefix);
+    LOG(MCdebugProgress, thorJob, "temporary rootTempdir: %s, uniqueSubDir: %s, prefix: %s", rootTempDir, uniqueSubDir, tempPrefix);
 }
 
-void ClearTempDirs()
+void ClearTempDir()
 {
-    TempNameHandler.clearDirs(true);
-    LOG(MCthorDetailedDebugInfo, thorJob, "temp directory cleared");
+    try
+    {
+        TempNameHandler.clear(true);
+        LOG(MCthorDetailedDebugInfo, thorJob, "temp directory cleared");
+    }
+    catch (IException *e)
+    {
+        EXCLOG(e, "ClearTempDir");
+        e->Release();
+    }
 }
 
-
-const char *queryTempDir(bool altdisk)
+const char *queryTempDir()
 {
-    return TempNameHandler.queryTempDir(altdisk);
+    return TempNameHandler.queryTempDir();
 }
 
 class DECL_EXCEPTION CBarrierAbortException: public CSimpleInterface, public IBarrierException

+ 5 - 5
thorlcr/thorutil/thormisc.hpp

@@ -505,11 +505,11 @@ extern graph_decl IThorException *MakeThorFatal(IException *e, int code, const c
 extern graph_decl IThorException *ThorWrapException(IException *e, const char *msg, ...) __attribute__((format(printf, 2, 3)));
 extern graph_decl void setExceptionActivityInfo(CGraphElementBase &container, IThorException *e);
 
-extern graph_decl void GetTempName(StringBuffer &name, const char *prefix=NULL,bool altdisk=false);
-extern graph_decl void SetTempDir(unsigned slaveNum, const char *name, const char *tempPrefix, bool clear);
-extern graph_decl void ClearDir(const char *dir);
-extern graph_decl void ClearTempDirs();
-extern graph_decl const char *queryTempDir(bool altdisk=false);  
+extern graph_decl void GetTempFilePath(StringBuffer &name, const char *suffix);
+extern graph_decl void GetTempFileName(StringBuffer &name, const char *suffix);
+extern graph_decl void SetTempDir(const char *rootTempDir, const char *uniqueSubDir, const char *tempPrefix);
+extern graph_decl void ClearTempDir();
+extern graph_decl const char *queryTempDir();
 extern graph_decl void loadCmdProp(IPropertyTree *tree, const char *cmdProp);
 
 extern graph_decl void ensureDirectoryForFile(const char *fName);