فهرست منبع

Merge branch 'candidate-7.6.x' into candidate-7.8.x

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 5 سال پیش
والد
کامیت
c6686b0a48
1فایلهای تغییر یافته به همراه146 افزوده شده و 0 حذف شده
  1. 146 0
      dali/daliadmin/daliadmin.cpp

+ 146 - 0
dali/daliadmin/daliadmin.cpp

@@ -15,6 +15,9 @@
     limitations under the License.
 ############################################################################## */
 
+#include <unordered_map>
+#include <string>
+
 #include "platform.h"
 #include "portlist.h"
 #include "jlib.hpp"
@@ -28,6 +31,7 @@
 #include "jexcept.hpp"
 #include "jset.hpp"
 #include "jprop.hpp"
+#include "jregexp.hpp"
 
 #include "mpbase.hpp"
 #include "mpcomm.hpp"
@@ -131,6 +135,7 @@ void usage(const char *exe)
   printf("  xmlsize <filename> [<percentage>] --  analyse size usage in xml file, display individual items above 'percentage' \n");
   printf("  migratefiles <src-group> <target-group> [<filemask>] [dryrun] [createmaps] [listonly] [verbose]\n");
   printf("  translatetoxpath logicalfile [File|SuperFile|Scope]\n");
+  printf("  cleanglobalwuid [dryrun] [noreconstruct]\n");
   printf("\n");
   printf("Common options\n");
   printf("  server=<dali-server-ip>         -- server ip\n");
@@ -3219,7 +3224,133 @@ void testThorRunningWUs()
 
 
 
+void removeOrphanedGlobalVariables(bool dryrun, bool reconstruct)
+{
+    unsigned reportInterval = 100;
+    VStringBuffer wuRoot("/WorkUnits/global");
+    CCycleTimer timer;
+    Owned<IRemoteConnection> wuConn = querySDS().connect(wuRoot, myProcessSession(), dryrun ? 0 : RTM_LOCK_WRITE, SDS_LOCK_TIMEOUT);
+    if (!wuConn)
+    {
+        PROGLOG("Failed to connect to: %s", wuRoot.str());
+        return;
+    }
+    PROGLOG("Time to get connect to global workunit: %u ms", timer.elapsedMs());
+    
+    timer.reset();
+    IPropertyTree *variables = wuConn->queryRoot()->queryBranch("Variables");
+    if (!variables)
+    {
+        PROGLOG("Global workunit has no Variables: %s", wuRoot.str());
+        return;
+    }
+    PROGLOG("Time to get Variables branch: %u ms", timer.elapsedMs());
+    
+    Owned<IPropertyTree> newVariables = createPTree(); // only used if reconstruct=true
 
+    RegExpr RE("^{.*}{\\$[a-zA-Z]*}$");
+    std::unordered_map<std::string, std::vector<IPropertyTree *>> varMap;
+    Owned<IPropertyTreeIterator> varIter = variables->getElements("*");
+    StringBuffer nameBase;
+    ForEach(*varIter)
+    {
+        IPropertyTree &variable = varIter->query();
+        if (streq("Variable", variable.queryName()))
+        {
+            const char *name = variable.queryProp("@name");
+            const char *lfn = name;
+            if (RE.find(name))
+            {
+                RE.substitute(nameBase.clear(), "#1");
+                lfn = nameBase.str();
+            }
+            auto it = varMap.find(lfn);
+            if (it == varMap.end())
+                it = varMap.insert({ lfn, {} }).first;
+            auto &e = it->second;
+            e.push_back(&variable);
+        }
+        else if (reconstruct)
+            newVariables->addPropTree(variable.queryName(), LINK(&variable));
+    }
+    unsigned total = (unsigned)varMap.size();
+    PROGLOG("Found %u global workunit entries. Time taken: %u ms", total, timer.elapsedMs());
+
+    timer.reset();
+    unsigned checked = 0, deletes = 0, auxDeletes = 0, existingPersists = 0;
+    CCycleTimer per100Timer;
+
+    auto varMapIt = varMap.begin();
+    StringBuffer lfnXpath;
+    while (varMapIt != varMap.end())
+    {
+        const char *name = varMapIt->first.c_str();
+
+        try
+        {
+            CDfsLogicalFileName lfn;
+            lfn.set(name);
+            lfn.makeFullnameQuery(lfnXpath.clear(), DXB_File);
+
+            Owned<IRemoteConnection> fConn = querySDS().connect(lfnXpath, myProcessSession(), RTM_LOCK_READ, daliConnectTimeoutMs);
+            if (!fConn) // doesn't exist, clear up Variable and associates
+            {
+                ++deletes;
+
+                if (reconstruct)
+                    varMapIt = varMap.erase(varMapIt);
+                else
+                {
+                    for (auto &t: varMapIt->second)
+                    {
+                        if (!dryrun)
+                            verifyex(variables->removeTree(t));
+                        ++auxDeletes;
+                    }
+                    varMapIt++;
+                }
+            }
+            else
+            {
+                ++existingPersists;
+                varMapIt++;
+            }
+        }
+        catch (IException *e)
+        {
+            VStringBuffer errMsg("Skipping: %s", name);
+            EXCLOG(e, errMsg.str());
+            e->Release();
+        }
+        ++checked;
+
+        if (0 == (checked % reportInterval))
+        {
+            cycle_t perVarCycles = per100Timer.elapsedCycles() / reportInterval;
+
+            per100Timer.reset();
+            PROGLOG("%sChecked: [%u / %u] - deletes: %u, auxDeletes: %u, persists: %u. [Avg ms: %2.2f]", dryrun?"DRYRUN:":"", checked, total, deletes, auxDeletes, existingPersists, static_cast<unsigned>(cycle_to_microsec(perVarCycles))/1000.0);
+        }
+    }
+
+    if (reconstruct)
+    {
+        for (auto &e: varMap)
+        {
+            auto &list = e.second;
+            for (auto &t: list)
+                newVariables->addPropTree("Variable", LINK(t));
+        }
+        if (!dryrun)
+            wuConn->queryRoot()->setPropTree("Variables", newVariables.getClear());
+    }
+
+    if (0 != (checked % reportInterval))
+    {
+        cycle_t avgCycles = timer.elapsedCycles() / total;
+        PROGLOG("%sChecked: [%u / %u] - deletes: %u, auxDeletes: %u, persists: %u. [Avg ms: %2.2f]", dryrun?"DRYRUN:":"", checked, total, deletes, auxDeletes, existingPersists, static_cast<unsigned>(cycle_to_microsec(avgCycles))/1000.0);
+    }
+}
 
 
 
@@ -3636,6 +3767,21 @@ int main(int argc, char* argv[])
                         else
                             dumpWorkunitAttr(params.item(1), nullptr);
                     }
+                    else if (strieq(cmd, "cleanglobalwuid"))
+                    {
+                        CHECKPARAMS(0, 2);
+                        bool dryrun = false;
+                        bool reconstruct = true;
+                        for (unsigned i=1; i<params.ordinality(); i++)
+                        {
+                            const char *param = params.item(i);
+                            if (strieq("dryrun", param))
+                                dryrun = true;
+                            else if (strieq("noreconstruct", param))
+                                reconstruct = false;
+                        }
+                        removeOrphanedGlobalVariables(dryrun, reconstruct);
+                    }
                     else
                         UERRLOG("Unknown command %s",cmd);
                 }