소스 검색

Merge pull request #13379 from wangkx/h23307

HPCC-23307 Add Archive WUs to ESP services

Reviewed-By: Anthony Fishbeck <anthony.fishbeck@lexisnexis.com>
Reviewed-By: Jake Smith <jake.smith@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 5 년 전
부모
커밋
31f4bca39a

+ 1 - 0
esp/scm/ws_fs.ecm

@@ -22,6 +22,7 @@ ESPenum DFUWUActions : string
     Unprotect("Unprotect"),
     Restore("Restore"),
     SetToFailed("SetToFailed"),
+    Archive("Archive"),
 };
 
 ESPStruct [nil_remove] DFUWorkunit

+ 1 - 0
esp/scm/ws_workunits_struct.ecm

@@ -34,6 +34,7 @@ ESPenum ECLWUActions : string
     Restore("Restore"),
     Resume("Resume"),
     SetToFailed("SetToFailed"),
+    Archive("Archive"),
 };
 
 ESPenum EclDefinitionActions : string

+ 2 - 1
esp/services/ws_dfu/CMakeLists.txt

@@ -67,7 +67,8 @@ include_directories (
          ./../../../fs/dafsstream
          ./../../../common/roxiecommlib
          ./../../clients 
-         ./../../../dali/base 
+         ./../../../dali/base
+         ./../../../dali/sasha
          ./../ws_workunits 
          ./../../bindings 
          ./../../smc/SMCLib 

+ 1 - 0
esp/services/ws_esdlconfig/CMakeLists.txt

@@ -35,6 +35,7 @@ set (    SRCS
 
 include_directories (
          ${HPCC_SOURCE_DIR}/dali/base
+         ${HPCC_SOURCE_DIR}/dali/sasha
          ${HPCC_SOURCE_DIR}/common/environment
          ${HPCC_SOURCE_DIR}/common/dllserver
          ${HPCC_SOURCE_DIR}/system/include

+ 34 - 35
esp/services/ws_fs/ws_fsService.cpp

@@ -1560,8 +1560,8 @@ bool markWUFailed(IDFUWorkUnitFactory *f, const char *wuid)
     return false;
 }
 
-static unsigned NumOfDFUWUActionNames = 5;
-static const char *DFUWUActionNames[] = { "Delete", "Protect" , "Unprotect" , "Restore" , "SetToFailed" };
+static unsigned NumOfDFUWUActionNames = 6;
+static const char *DFUWUActionNames[] = { "Delete", "Protect" , "Unprotect" , "Restore" , "SetToFailed", "Archive" };
 
 bool CFileSprayEx::onDFUWorkunitsAction(IEspContext &context, IEspDFUWorkunitsActionRequest &req, IEspDFUWorkunitsActionResponse &resp)
 {
@@ -1572,35 +1572,48 @@ bool CFileSprayEx::onDFUWorkunitsAction(IEspContext &context, IEspDFUWorkunitsAc
         CDFUWUActions action = req.getType();
         if (action == DFUWUActions_Undefined)
             throw MakeStringException(ECLWATCH_INVALID_INPUT, "Action not defined.");
+        const char* actionStr = (action < NumOfDFUWUActionNames) ? DFUWUActionNames[action] : "Unknown";
 
         StringArray& wuids = req.getWuids();
         if (!wuids.ordinality())
             throw MakeStringException(ECLWATCH_INVALID_INPUT, "Workunit not defined.");
 
-        Owned<INode> node;
-        Owned<ISashaCommand> cmd;
-        StringBuffer sashaAddress;
-        if (action == CDFUWUActions_Restore)
+        if ((action == CDFUWUActions_Restore) || (action == CDFUWUActions_Archive))
         {
-            IArrayOf<IConstTpSashaServer> sashaservers;
-            CTpWrapper dummy;
-            dummy.getTpSashaServers(sashaservers);
-            ForEachItemIn(i, sashaservers)
+            StringBuffer msg;
+            ForEachItemIn(i, wuids)
             {
-                IConstTpSashaServer& sashaserver = sashaservers.item(i);
-                IArrayOf<IConstTpMachine> &sashaservermachine = sashaserver.getTpMachines();
-                sashaAddress.append(sashaservermachine.item(0).getNetaddress());
+                StringBuffer wuidStr(wuids.item(i));
+                const char* wuid = wuidStr.trim().str();
+                if (isEmptyString(wuid))
+                    msg.appendf("Empty Workunit ID at %u. ", i);
             }
-            if (sashaAddress.length() < 1)
+            if (!msg.isEmpty())
+                throw makeStringException(ECLWATCH_INVALID_INPUT, msg);
+
+            Owned<ISashaCommand> cmd = archiveOrRestoreWorkunits(wuids, nullptr, action == CDFUWUActions_Archive, true);
+            IArrayOf<IEspDFUActionResult> results;
+            ForEachItemIn(x, wuids)
             {
-                throw MakeStringException(ECLWATCH_ARCHIVE_SERVER_NOT_FOUND,"Archive server not found.");
-            }
+                Owned<IEspDFUActionResult> res = createDFUActionResult();
+                res->setID(wuids.item(x));
+                res->setAction(actionStr);
 
-            SocketEndpoint ep(sashaAddress.str(), DEFAULT_SASHA_PORT);
-            node.setown(createINode(ep));
-            cmd.setown(createSashaCommand());
-            cmd->setAction(SCA_RESTORE);
-            cmd->setDFU(true);
+                StringBuffer reply;
+                if (action == CDFUWUActions_Restore)
+                    reply.set("Restore ID: ");
+                else
+                    reply.set("Archive ID: ");
+
+                if (cmd->getId(x, reply))
+                    res->setResult(reply.str());
+                else
+                    res->setResult("Failed to get Archive/restore ID.");
+
+                results.append(*res.getLink());
+            }
+            resp.setDFUActionResults(results);
+            return true;
         }
 
         IArrayOf<IEspDFUActionResult> results;
@@ -1608,7 +1621,6 @@ bool CFileSprayEx::onDFUWorkunitsAction(IEspContext &context, IEspDFUWorkunitsAc
         for(unsigned i = 0; i < wuids.ordinality(); ++i)
         {
             const char* wuid = wuids.item(i);
-            const char* actionStr = (action < NumOfDFUWUActionNames) ? DFUWUActionNames[action] : "Unknown";
 
             Owned<IEspDFUActionResult> res = createDFUActionResult("", "");
             res->setID(wuid);
@@ -1626,19 +1638,6 @@ bool CFileSprayEx::onDFUWorkunitsAction(IEspContext &context, IEspDFUWorkunitsAc
                         throw MakeStringException(ECLWATCH_CANNOT_DELETE_WORKUNIT, "Failed in deleting workunit.");
                     res->setResult("Success");
                     break;
-                case CDFUWUActions_Restore:
-                    cmd->addId(wuid);
-                    if (!cmd->send(node,1*60*1000))
-                        throw MakeStringException(ECLWATCH_CANNOT_CONNECT_ARCHIVE_SERVER,
-                            "Sasha (%s) took too long to respond from: Restore workunit %s.",
-                            sashaAddress.str(), wuid);
-                    {
-                        StringBuffer reply("Restore ID: ");
-                        if (!cmd->getId(0, reply))
-                            throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT, "Failed to get ID.");
-                        res->setResult(reply.str());
-                    }
-                    break;
                 case CDFUWUActions_Protect:
                 case CDFUWUActions_Unprotect:
                 case CDFUWUActions_SetToFailed:

+ 2 - 1
esp/services/ws_machine/CMakeLists.txt

@@ -50,7 +50,8 @@ include_directories (
          ./../../../common/roxiecommlib
          ./../../../common/workunit
          ./../../clients 
-         ./../../../dali/base 
+         ./../../../dali/base
+         ./../../../dali/sasha
          ./../../bindings 
          ./../../bindings/SOAP/xpp 
          ./../../../esp/platform 

+ 2 - 1
esp/services/ws_smc/CMakeLists.txt

@@ -50,7 +50,8 @@ include_directories (
          ./../../../common/remote 
          ./../../clients 
          ./../../../tools/swapnode 
-         ./../../../dali/base 
+         ./../../../dali/base
+         ./../../../dali/sasha
          ./../ws_workunits 
          ./../../bindings 
          ./../../smc/SMCLib 

+ 1 - 0
esp/services/ws_sql/CMakeLists.txt

@@ -82,6 +82,7 @@ if(WSSQL_SERVICE)
         ${HPCC_SOURCE_DIR}/system/mp
         ${HPCC_SOURCE_DIR}/dali/dfu
         ${HPCC_SOURCE_DIR}/dali/base/
+        ${HPCC_SOURCE_DIR}/dali/sasha
         ${HPCC_SOURCE_DIR}/common/workunit
         ${HPCC_SOURCE_DIR}/common/remote
         ${HPCC_SOURCE_DIR}/common/environment

+ 2 - 1
esp/services/ws_topology/CMakeLists.txt

@@ -49,7 +49,8 @@ include_directories (
          ./../../../common/remote 
          ./../../clients 
          ./../../../tools/swapnode 
-         ./../../../dali/base 
+         ./../../../dali/base
+         ./../../../dali/sasha
          ./../ws_workunits 
          ./../../bindings 
          ./../../smc/SMCLib 

+ 9 - 0
esp/services/ws_workunits/ws_workunitsHelpers.cpp

@@ -88,6 +88,15 @@ SecAccessFlags getWsWorkunitAccess(IEspContext& ctx, IConstWorkUnit& cw)
     return accessFlag;
 }
 
+bool validateWsWorkunitAccess(IEspContext& ctx, const char* wuid, SecAccessFlags minAccess)
+{
+    Owned<IWorkUnitFactory> wf = getWorkUnitFactory(ctx.querySecManager(), ctx.queryUser());
+    Owned<IConstWorkUnit> cw = wf->openWorkUnit(wuid);
+    if (!cw)
+        throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT, "Failed to open workunit %s when validating workunit access", wuid);
+    return ctx.validateFeatureAccess(getWuAccessType(*cw, ctx.queryUserId()), minAccess, false);
+}
+
 void ensureWsWorkunitAccessByOwnerId(IEspContext& ctx, const char* owner, SecAccessFlags minAccess)
 {
     if (!ctx.validateFeatureAccess(getWuAccessType(owner, ctx.queryUserId()), minAccess, false))

+ 1 - 0
esp/services/ws_workunits/ws_workunitsHelpers.hpp

@@ -75,6 +75,7 @@ void ensureWsWorkunitAccess(IEspContext& cxt, IConstWorkUnit& cw, SecAccessFlags
 void ensureWsWorkunitAccess(IEspContext& context, const char* wuid, SecAccessFlags minAccess);
 void ensureWsWorkunitAccessByOwnerId(IEspContext& context, const char* owner, SecAccessFlags minAccess);
 void ensureWsCreateWorkunitAccess(IEspContext& cxt);
+bool validateWsWorkunitAccess(IEspContext& context, const char* wuid, SecAccessFlags minAccess);
 
 const char *getGraphNum(const char *s,unsigned &num);
 

+ 43 - 48
esp/services/ws_workunits/ws_workunitsService.cpp

@@ -48,6 +48,7 @@
 #include "fvresultset.ipp"
 #include "ws_wudetails.hpp"
 #include "wuerror.hpp"
+#include "TpWrapper.hpp"
 
 #include "rtlformat.hpp"
 
@@ -60,7 +61,7 @@
 
 #define ESP_WORKUNIT_DIR "workunits/"
 
-#define SDS_LOCK_TIMEOUT (5*60*1000) // 5 mins
+#define WU_SDS_LOCK_TIMEOUT (5*60*1000) // 5 mins
 const unsigned CHECK_QUERY_STATUS_THREAD_POOL_SIZE = 25;
 const unsigned MAX_ZAP_BUFFER_SIZE = 10000000; //10M
 
@@ -84,9 +85,9 @@ public:
 };
 
 //The ECLWUActionNames[] has to match with the ESPenum ECLWUActions in the ecm file.
-static unsigned NumOfECLWUActionNames = 11;
+static unsigned NumOfECLWUActionNames = 12;
 static const char *ECLWUActionNames[] = { "Abort", "Delete", "Deschedule", "Reschedule", "Pause",
-    "PauseNow", "Protect", "Unprotect", "Restore", "Resume", "SetToFailed", NULL };
+    "PauseNow", "Protect", "Unprotect", "Restore", "Resume", "SetToFailed", "Archive", nullptr };
 
 class CECLWUActionsEx : public SoapEnumParamNew<CECLWUActions>
 {
@@ -112,7 +113,38 @@ bool doAction(IEspContext& context, StringArray& wuids, CECLWUActions action, IP
     if (!wuids.length())
         return true;
 
-    Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
+    if ((action == CECLWUActions_Restore) || (action == CECLWUActions_Archive))
+    {
+        StringBuffer msg;
+        ForEachItemIn(i, wuids)
+        {
+            StringBuffer wuidStr(wuids.item(i));
+            const char* wuid = wuidStr.trim().str();
+            if (isEmpty(wuid))
+            {
+                msg.appendf("Empty Workunit ID at %u. ", i);
+                continue;
+            }
+            if ((action == CECLWUActions_Archive) && !validateWsWorkunitAccess(context, wuid, SecAccess_Full))
+                msg.appendf("Access denied for Workunit %s. ", wuid);
+        }
+        if (!msg.isEmpty())
+            throw makeStringException(ECLWATCH_INVALID_INPUT, msg);
+
+        Owned<ISashaCommand> cmd = archiveOrRestoreWorkunits(wuids, params, action == CECLWUActions_Archive, false);
+        ForEachItemIn(idx, wuids)
+        {
+            StringBuffer reply;
+            cmd->getId(idx, reply);
+
+            const char* wuid = wuids.item(idx);
+            if ((action == CECLWUActions_Restore) && !validateWsWorkunitAccess(context, wuid, SecAccess_Full))
+                reply.appendf("Access denied for Workunit %s. ", wuid);
+
+            AuditSystemAccess(context.queryUserId(), true, "%s", reply.str());
+        }
+        return true;
+    }
 
     bool bAllSuccess = true;
     const char* strAction = (action < NumOfECLWUActionNames) ? ECLWUActionNames[action] : "Unknown Action";
@@ -132,50 +164,13 @@ bool doAction(IEspContext& context, StringArray& wuids, CECLWUActions action, IP
                 throw MakeStringException(ECLWATCH_INVALID_INPUT, "Invalid Workunit ID: %s", wuid);
 
             PROGLOG("%s %s", strAction, wuid);
-            if ((action == CECLWUActions_Restore) || (action == CECLWUActions_EventDeschedule))
+            if (action == CECLWUActions_EventDeschedule)
             {
-                switch(action)
-                {
-                case CECLWUActions_Restore:
-                {
-                    SocketEndpoint ep;
-                    if (params->hasProp("sashaServerIP"))
-                        ep.set(params->queryProp("sashaServerIP"), params->getPropInt("sashaServerPort"));
-                    else
-                        getSashaNode(ep);
-
-                    Owned<ISashaCommand> cmd = createSashaCommand();
-                    cmd->setAction(SCA_RESTORE);
-                    cmd->addId(wuid);
-
-                    Owned<INode> node = createINode(ep);
-                    if (!node)
-                        throw MakeStringException(ECLWATCH_INODE_NOT_FOUND,"INode not found.");
-
-                    StringBuffer s;
-                    if (!cmd->send(node, 1*60*1000))
-                        throw MakeStringException(ECLWATCH_CANNOT_CONNECT_ARCHIVE_SERVER,
-                            "Sasha (%s) took too long to respond from: Restore workunit %s.",
-                            ep.getUrlStr(s).str(), wuid);
-
-                    if (cmd->numIds()==0)
-                        throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT,"Could not Archive/restore %s",wuid);
-
-                    StringBuffer reply;
-                    cmd->getId(0,reply);
-
-                    AuditSystemAccess(context.queryUserId(), true, "Updated %s", wuid);
+                if (!context.validateFeatureAccess(OWN_WU_ACCESS, SecAccess_Full, false)
+                    || !context.validateFeatureAccess(OTHERS_WU_ACCESS, SecAccess_Full, false))
                     ensureWsWorkunitAccess(context, wuid, SecAccess_Full);
-                    break;
-                }
-                case CECLWUActions_EventDeschedule:
-                    if (!context.validateFeatureAccess(OWN_WU_ACCESS, SecAccess_Full, false)
-                        || !context.validateFeatureAccess(OTHERS_WU_ACCESS, SecAccess_Full, false))
-                        ensureWsWorkunitAccess(context, wuid, SecAccess_Full);
-                    descheduleWorkunit(wuid);
-                    AuditSystemAccess(context.queryUserId(), true, "Updated %s", wuid);
-                    break;
-                }
+                descheduleWorkunit(wuid);
+                AuditSystemAccess(context.queryUserId(), true, "Updated %s", wuid);
             }
             else
             {
@@ -317,7 +312,7 @@ bool doUnProtectWorkunits(IEspContext& context, StringArray& wuids, IArrayOf<ICo
 
 static void checkUpdateQuerysetLibraries()
 {
-    Owned<IRemoteConnection> globalLock = querySDS().connect("/QuerySets/", myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE_QUERY, SDS_LOCK_TIMEOUT);
+    Owned<IRemoteConnection> globalLock = querySDS().connect("/QuerySets/", myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE_QUERY, WU_SDS_LOCK_TIMEOUT);
     if (!globalLock)
         return;
 
@@ -706,7 +701,7 @@ bool CWsWorkunitsEx::onWUAction(IEspContext &context, IEspWUActionRequest &req,
 
         Owned<IProperties> params = createProperties(true);
         params->setProp("BlockTillFinishTimer", req.getBlockTillFinishTimer());
-        if ((action == CECLWUActions_Restore) && !sashaServerIp.isEmpty())
+        if (((action == CECLWUActions_Restore) || (action == CECLWUActions_Archive)) && !sashaServerIp.isEmpty())
         {
             params->setProp("sashaServerIP", sashaServerIp.get());
             params->setProp("sashaServerPort", sashaServerPort);

+ 2 - 1
esp/smc/SMCLib/CMakeLists.txt

@@ -49,7 +49,8 @@ include_directories (
          ./../../../common/workunit 
          ./../../../common/remote 
          ./../../clients 
-         ./../../../dali/base 
+         ./../../../dali/base
+         ./../../../dali/sasha
          ./../../clients/LoggingClient 
          ./../../bindings 
          ./../../bindings/SOAP/xpp 

+ 42 - 0
esp/smc/SMCLib/TpWrapper.cpp

@@ -24,6 +24,8 @@
 #include <stdio.h>
 #include "workunit.hpp"
 #include "exception_util.hpp"
+#include "portlist.h"
+
 
 const char* MSG_FAILED_GET_ENVIRONMENT_INFO = "Failed to get environment information.";
 
@@ -1968,3 +1970,43 @@ void CTpWrapper::getAttPath(const char* Path,StringBuffer& returnStr)
     JBASE64_Decode(Path, returnStr);
 }
 
+extern TPWRAPPER_API ISashaCommand* archiveOrRestoreWorkunits(StringArray& wuids, IProperties* params, bool archive, bool dfu)
+{
+
+    StringBuffer sashaAddress;
+    unsigned port = DEFAULT_SASHA_PORT;
+    if (params && params->hasProp("sashaServerIP"))
+    {
+        sashaAddress.set(params->queryProp("sashaServerIP"));
+        port = params->getPropInt("sashaServerPort", DEFAULT_SASHA_PORT);
+    }
+    else
+    {
+        IArrayOf<IConstTpSashaServer> sashaservers;
+        CTpWrapper dummy;
+        dummy.getTpSashaServers(sashaservers);
+        if (sashaservers.ordinality() == 0)
+            throw makeStringException(ECLWATCH_ARCHIVE_SERVER_NOT_FOUND, "Sasha server not found");
+
+        IArrayOf<IConstTpMachine>& sashaservermachine = sashaservers.item(0).getTpMachines();
+        sashaAddress.set(sashaservermachine.item(0).getNetaddress());
+        if (sashaAddress.isEmpty())
+            throw makeStringException(ECLWATCH_ARCHIVE_SERVER_NOT_FOUND, "Sasha address not found");
+    }
+
+    SocketEndpoint ep(sashaAddress.str(), port);
+    Owned<INode> node = createINode(ep);
+    Owned<ISashaCommand> cmd = createSashaCommand();
+    cmd->setAction(archive ? SCA_ARCHIVE : SCA_RESTORE);
+    if (dfu)
+        cmd->setDFU(true);
+
+    ForEachItemIn(i, wuids)
+        cmd->addId(wuids.item(i));
+
+    if (!cmd->send(node, 1*60*1000))
+        throw MakeStringException(ECLWATCH_CANNOT_CONNECT_ARCHIVE_SERVER,
+            "Sasha (%s) took too long to respond for Archive/restore workunit.",
+            sashaAddress.str());
+    return cmd.getClear();
+}

+ 3 - 0
esp/smc/SMCLib/TpWrapper.hpp

@@ -41,6 +41,7 @@
 #include "ws_topology.hpp"
 #include <string>
 #include <set>
+#include "sacmd.hpp"
 
 
 using std::set;
@@ -209,5 +210,7 @@ private:
     Owned<IRemoteConnection> conn;
 };
 
+extern TPWRAPPER_API ISashaCommand* archiveOrRestoreWorkunits(StringArray& wuids, IProperties* params, bool archive, bool dfu);
+
 #endif //_ESPWIZ_TpWrapper_HPP__