Browse Source

HPCC-23307 Add Archive WUs to ESP services

Add Archive WUs to ws_workunits and ws_fs. Consolidate workunit
archive/restore code into one call: archiveOrRestoreWorkunits()
in TpWrapper.cpp which is shared by ws_workunits and ws_fs. The
existing code sends a command to sasha server for each workunit.
The new code sends only one command to sasha server for multiple
workunits. Change TpWrapper related CMakeLists files for the new
code. In the ws_workunits code, rename the SDS_LOCK_TIMEOUT to
WU_SDS_LOCK_TIMEOUT because a different SDS_LOCK_TIMEOUT exists
in TpWrapper.cpp.

Signed-off-by: wangkx <kevin.wang@lexisnexis.com>
wangkx 5 years ago
parent
commit
8a3e73967f

+ 1 - 0
esp/scm/ws_fs.ecm

@@ -22,6 +22,7 @@ ESPenum DFUWUActions : string
     Unprotect("Unprotect"),
     Restore("Restore"),
     SetToFailed("SetToFailed"),
+    Archive("Archive"),
 };
 
 ESPStruct [nil_remove] DFUWorkunit

+ 1 - 0
esp/scm/ws_workunits_struct.ecm

@@ -34,6 +34,7 @@ ESPenum ECLWUActions : string
     Restore("Restore"),
     Resume("Resume"),
     SetToFailed("SetToFailed"),
+    Archive("Archive"),
 };
 
 ESPenum EclDefinitionActions : string

+ 2 - 1
esp/services/ws_dfu/CMakeLists.txt

@@ -67,7 +67,8 @@ include_directories (
          ./../../../fs/dafsstream
          ./../../../common/roxiecommlib
          ./../../clients 
-         ./../../../dali/base 
+         ./../../../dali/base
+         ./../../../dali/sasha
          ./../ws_workunits 
          ./../../bindings 
          ./../../smc/SMCLib 

+ 1 - 0
esp/services/ws_esdlconfig/CMakeLists.txt

@@ -35,6 +35,7 @@ set (    SRCS
 
 include_directories (
          ${HPCC_SOURCE_DIR}/dali/base
+         ${HPCC_SOURCE_DIR}/dali/sasha
          ${HPCC_SOURCE_DIR}/common/environment
          ${HPCC_SOURCE_DIR}/common/dllserver
          ${HPCC_SOURCE_DIR}/system/include

+ 34 - 35
esp/services/ws_fs/ws_fsService.cpp

@@ -1560,8 +1560,8 @@ bool markWUFailed(IDFUWorkUnitFactory *f, const char *wuid)
     return false;
 }
 
-static unsigned NumOfDFUWUActionNames = 5;
-static const char *DFUWUActionNames[] = { "Delete", "Protect" , "Unprotect" , "Restore" , "SetToFailed" };
+static unsigned NumOfDFUWUActionNames = 6;
+static const char *DFUWUActionNames[] = { "Delete", "Protect" , "Unprotect" , "Restore" , "SetToFailed", "Archive" };
 
 bool CFileSprayEx::onDFUWorkunitsAction(IEspContext &context, IEspDFUWorkunitsActionRequest &req, IEspDFUWorkunitsActionResponse &resp)
 {
@@ -1572,35 +1572,48 @@ bool CFileSprayEx::onDFUWorkunitsAction(IEspContext &context, IEspDFUWorkunitsAc
         CDFUWUActions action = req.getType();
         if (action == DFUWUActions_Undefined)
             throw MakeStringException(ECLWATCH_INVALID_INPUT, "Action not defined.");
+        const char* actionStr = (action < NumOfDFUWUActionNames) ? DFUWUActionNames[action] : "Unknown";
 
         StringArray& wuids = req.getWuids();
         if (!wuids.ordinality())
             throw MakeStringException(ECLWATCH_INVALID_INPUT, "Workunit not defined.");
 
-        Owned<INode> node;
-        Owned<ISashaCommand> cmd;
-        StringBuffer sashaAddress;
-        if (action == CDFUWUActions_Restore)
+        if ((action == CDFUWUActions_Restore) || (action == CDFUWUActions_Archive))
         {
-            IArrayOf<IConstTpSashaServer> sashaservers;
-            CTpWrapper dummy;
-            dummy.getTpSashaServers(sashaservers);
-            ForEachItemIn(i, sashaservers)
+            StringBuffer msg;
+            ForEachItemIn(i, wuids)
             {
-                IConstTpSashaServer& sashaserver = sashaservers.item(i);
-                IArrayOf<IConstTpMachine> &sashaservermachine = sashaserver.getTpMachines();
-                sashaAddress.append(sashaservermachine.item(0).getNetaddress());
+                StringBuffer wuidStr(wuids.item(i));
+                const char* wuid = wuidStr.trim().str();
+                if (isEmptyString(wuid))
+                    msg.appendf("Empty Workunit ID at %u. ", i);
             }
-            if (sashaAddress.length() < 1)
+            if (!msg.isEmpty())
+                throw makeStringException(ECLWATCH_INVALID_INPUT, msg);
+
+            Owned<ISashaCommand> cmd = archiveOrRestoreWorkunits(wuids, nullptr, action == CDFUWUActions_Archive, true);
+            IArrayOf<IEspDFUActionResult> results;
+            ForEachItemIn(x, wuids)
             {
-                throw MakeStringException(ECLWATCH_ARCHIVE_SERVER_NOT_FOUND,"Archive server not found.");
-            }
+                Owned<IEspDFUActionResult> res = createDFUActionResult();
+                res->setID(wuids.item(x));
+                res->setAction(actionStr);
 
-            SocketEndpoint ep(sashaAddress.str(), DEFAULT_SASHA_PORT);
-            node.setown(createINode(ep));
-            cmd.setown(createSashaCommand());
-            cmd->setAction(SCA_RESTORE);
-            cmd->setDFU(true);
+                StringBuffer reply;
+                if (action == CDFUWUActions_Restore)
+                    reply.set("Restore ID: ");
+                else
+                    reply.set("Archive ID: ");
+
+                if (cmd->getId(x, reply))
+                    res->setResult(reply.str());
+                else
+                    res->setResult("Failed to get Archive/restore ID.");
+
+                results.append(*res.getLink());
+            }
+            resp.setDFUActionResults(results);
+            return true;
         }
 
         IArrayOf<IEspDFUActionResult> results;
@@ -1608,7 +1621,6 @@ bool CFileSprayEx::onDFUWorkunitsAction(IEspContext &context, IEspDFUWorkunitsAc
         for(unsigned i = 0; i < wuids.ordinality(); ++i)
         {
             const char* wuid = wuids.item(i);
-            const char* actionStr = (action < NumOfDFUWUActionNames) ? DFUWUActionNames[action] : "Unknown";
 
             Owned<IEspDFUActionResult> res = createDFUActionResult("", "");
             res->setID(wuid);
@@ -1626,19 +1638,6 @@ bool CFileSprayEx::onDFUWorkunitsAction(IEspContext &context, IEspDFUWorkunitsAc
                         throw MakeStringException(ECLWATCH_CANNOT_DELETE_WORKUNIT, "Failed in deleting workunit.");
                     res->setResult("Success");
                     break;
-                case CDFUWUActions_Restore:
-                    cmd->addId(wuid);
-                    if (!cmd->send(node,1*60*1000))
-                        throw MakeStringException(ECLWATCH_CANNOT_CONNECT_ARCHIVE_SERVER,
-                            "Sasha (%s) took too long to respond from: Restore workunit %s.",
-                            sashaAddress.str(), wuid);
-                    {
-                        StringBuffer reply("Restore ID: ");
-                        if (!cmd->getId(0, reply))
-                            throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT, "Failed to get ID.");
-                        res->setResult(reply.str());
-                    }
-                    break;
                 case CDFUWUActions_Protect:
                 case CDFUWUActions_Unprotect:
                 case CDFUWUActions_SetToFailed:

+ 2 - 1
esp/services/ws_machine/CMakeLists.txt

@@ -50,7 +50,8 @@ include_directories (
          ./../../../common/roxiecommlib
          ./../../../common/workunit
          ./../../clients 
-         ./../../../dali/base 
+         ./../../../dali/base
+         ./../../../dali/sasha
          ./../../bindings 
          ./../../bindings/SOAP/xpp 
          ./../../../esp/platform 

+ 2 - 1
esp/services/ws_smc/CMakeLists.txt

@@ -50,7 +50,8 @@ include_directories (
          ./../../../common/remote 
          ./../../clients 
          ./../../../tools/swapnode 
-         ./../../../dali/base 
+         ./../../../dali/base
+         ./../../../dali/sasha
          ./../ws_workunits 
          ./../../bindings 
          ./../../smc/SMCLib 

+ 1 - 0
esp/services/ws_sql/CMakeLists.txt

@@ -82,6 +82,7 @@ if(WSSQL_SERVICE)
         ${HPCC_SOURCE_DIR}/system/mp
         ${HPCC_SOURCE_DIR}/dali/dfu
         ${HPCC_SOURCE_DIR}/dali/base/
+        ${HPCC_SOURCE_DIR}/dali/sasha
         ${HPCC_SOURCE_DIR}/common/workunit
         ${HPCC_SOURCE_DIR}/common/remote
         ${HPCC_SOURCE_DIR}/common/environment

+ 2 - 1
esp/services/ws_topology/CMakeLists.txt

@@ -49,7 +49,8 @@ include_directories (
          ./../../../common/remote 
          ./../../clients 
          ./../../../tools/swapnode 
-         ./../../../dali/base 
+         ./../../../dali/base
+         ./../../../dali/sasha
          ./../ws_workunits 
          ./../../bindings 
          ./../../smc/SMCLib 

+ 9 - 0
esp/services/ws_workunits/ws_workunitsHelpers.cpp

@@ -88,6 +88,15 @@ SecAccessFlags getWsWorkunitAccess(IEspContext& ctx, IConstWorkUnit& cw)
     return accessFlag;
 }
 
+bool validateWsWorkunitAccess(IEspContext& ctx, const char* wuid, SecAccessFlags minAccess)
+{
+    Owned<IWorkUnitFactory> wf = getWorkUnitFactory(ctx.querySecManager(), ctx.queryUser());
+    Owned<IConstWorkUnit> cw = wf->openWorkUnit(wuid);
+    if (!cw)
+        throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT, "Failed to open workunit %s when validating workunit access", wuid);
+    return ctx.validateFeatureAccess(getWuAccessType(*cw, ctx.queryUserId()), minAccess, false);
+}
+
 void ensureWsWorkunitAccessByOwnerId(IEspContext& ctx, const char* owner, SecAccessFlags minAccess)
 {
     if (!ctx.validateFeatureAccess(getWuAccessType(owner, ctx.queryUserId()), minAccess, false))

+ 1 - 0
esp/services/ws_workunits/ws_workunitsHelpers.hpp

@@ -75,6 +75,7 @@ void ensureWsWorkunitAccess(IEspContext& cxt, IConstWorkUnit& cw, SecAccessFlags
 void ensureWsWorkunitAccess(IEspContext& context, const char* wuid, SecAccessFlags minAccess);
 void ensureWsWorkunitAccessByOwnerId(IEspContext& context, const char* owner, SecAccessFlags minAccess);
 void ensureWsCreateWorkunitAccess(IEspContext& cxt);
+bool validateWsWorkunitAccess(IEspContext& context, const char* wuid, SecAccessFlags minAccess);
 
 const char *getGraphNum(const char *s,unsigned &num);
 

+ 43 - 48
esp/services/ws_workunits/ws_workunitsService.cpp

@@ -48,6 +48,7 @@
 #include "fvresultset.ipp"
 #include "ws_wudetails.hpp"
 #include "wuerror.hpp"
+#include "TpWrapper.hpp"
 
 #include "rtlformat.hpp"
 
@@ -60,7 +61,7 @@
 
 #define ESP_WORKUNIT_DIR "workunits/"
 
-#define SDS_LOCK_TIMEOUT (5*60*1000) // 5 mins
+#define WU_SDS_LOCK_TIMEOUT (5*60*1000) // 5 mins
 const unsigned CHECK_QUERY_STATUS_THREAD_POOL_SIZE = 25;
 const unsigned MAX_ZAP_BUFFER_SIZE = 10000000; //10M
 
@@ -84,9 +85,9 @@ public:
 };
 
 //The ECLWUActionNames[] has to match with the ESPenum ECLWUActions in the ecm file.
-static unsigned NumOfECLWUActionNames = 11;
+static unsigned NumOfECLWUActionNames = 12;
 static const char *ECLWUActionNames[] = { "Abort", "Delete", "Deschedule", "Reschedule", "Pause",
-    "PauseNow", "Protect", "Unprotect", "Restore", "Resume", "SetToFailed", NULL };
+    "PauseNow", "Protect", "Unprotect", "Restore", "Resume", "SetToFailed", "Archive", nullptr };
 
 class CECLWUActionsEx : public SoapEnumParamNew<CECLWUActions>
 {
@@ -112,7 +113,38 @@ bool doAction(IEspContext& context, StringArray& wuids, CECLWUActions action, IP
     if (!wuids.length())
         return true;
 
-    Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
+    if ((action == CECLWUActions_Restore) || (action == CECLWUActions_Archive))
+    {
+        StringBuffer msg;
+        ForEachItemIn(i, wuids)
+        {
+            StringBuffer wuidStr(wuids.item(i));
+            const char* wuid = wuidStr.trim().str();
+            if (isEmpty(wuid))
+            {
+                msg.appendf("Empty Workunit ID at %u. ", i);
+                continue;
+            }
+            if ((action == CECLWUActions_Archive) && !validateWsWorkunitAccess(context, wuid, SecAccess_Full))
+                msg.appendf("Access denied for Workunit %s. ", wuid);
+        }
+        if (!msg.isEmpty())
+            throw makeStringException(ECLWATCH_INVALID_INPUT, msg);
+
+        Owned<ISashaCommand> cmd = archiveOrRestoreWorkunits(wuids, params, action == CECLWUActions_Archive, false);
+        ForEachItemIn(idx, wuids)
+        {
+            StringBuffer reply;
+            cmd->getId(idx, reply);
+
+            const char* wuid = wuids.item(idx);
+            if ((action == CECLWUActions_Restore) && !validateWsWorkunitAccess(context, wuid, SecAccess_Full))
+                reply.appendf("Access denied for Workunit %s. ", wuid);
+
+            AuditSystemAccess(context.queryUserId(), true, "%s", reply.str());
+        }
+        return true;
+    }
 
     bool bAllSuccess = true;
     const char* strAction = (action < NumOfECLWUActionNames) ? ECLWUActionNames[action] : "Unknown Action";
@@ -132,50 +164,13 @@ bool doAction(IEspContext& context, StringArray& wuids, CECLWUActions action, IP
                 throw MakeStringException(ECLWATCH_INVALID_INPUT, "Invalid Workunit ID: %s", wuid);
 
             PROGLOG("%s %s", strAction, wuid);
-            if ((action == CECLWUActions_Restore) || (action == CECLWUActions_EventDeschedule))
+            if (action == CECLWUActions_EventDeschedule)
             {
-                switch(action)
-                {
-                case CECLWUActions_Restore:
-                {
-                    SocketEndpoint ep;
-                    if (params->hasProp("sashaServerIP"))
-                        ep.set(params->queryProp("sashaServerIP"), params->getPropInt("sashaServerPort"));
-                    else
-                        getSashaNode(ep);
-
-                    Owned<ISashaCommand> cmd = createSashaCommand();
-                    cmd->setAction(SCA_RESTORE);
-                    cmd->addId(wuid);
-
-                    Owned<INode> node = createINode(ep);
-                    if (!node)
-                        throw MakeStringException(ECLWATCH_INODE_NOT_FOUND,"INode not found.");
-
-                    StringBuffer s;
-                    if (!cmd->send(node, 1*60*1000))
-                        throw MakeStringException(ECLWATCH_CANNOT_CONNECT_ARCHIVE_SERVER,
-                            "Sasha (%s) took too long to respond from: Restore workunit %s.",
-                            ep.getUrlStr(s).str(), wuid);
-
-                    if (cmd->numIds()==0)
-                        throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT,"Could not Archive/restore %s",wuid);
-
-                    StringBuffer reply;
-                    cmd->getId(0,reply);
-
-                    AuditSystemAccess(context.queryUserId(), true, "Updated %s", wuid);
+                if (!context.validateFeatureAccess(OWN_WU_ACCESS, SecAccess_Full, false)
+                    || !context.validateFeatureAccess(OTHERS_WU_ACCESS, SecAccess_Full, false))
                     ensureWsWorkunitAccess(context, wuid, SecAccess_Full);
-                    break;
-                }
-                case CECLWUActions_EventDeschedule:
-                    if (!context.validateFeatureAccess(OWN_WU_ACCESS, SecAccess_Full, false)
-                        || !context.validateFeatureAccess(OTHERS_WU_ACCESS, SecAccess_Full, false))
-                        ensureWsWorkunitAccess(context, wuid, SecAccess_Full);
-                    descheduleWorkunit(wuid);
-                    AuditSystemAccess(context.queryUserId(), true, "Updated %s", wuid);
-                    break;
-                }
+                descheduleWorkunit(wuid);
+                AuditSystemAccess(context.queryUserId(), true, "Updated %s", wuid);
             }
             else
             {
@@ -317,7 +312,7 @@ bool doUnProtectWorkunits(IEspContext& context, StringArray& wuids, IArrayOf<ICo
 
 static void checkUpdateQuerysetLibraries()
 {
-    Owned<IRemoteConnection> globalLock = querySDS().connect("/QuerySets/", myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE_QUERY, SDS_LOCK_TIMEOUT);
+    Owned<IRemoteConnection> globalLock = querySDS().connect("/QuerySets/", myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE_QUERY, WU_SDS_LOCK_TIMEOUT);
     if (!globalLock)
         return;
 
@@ -706,7 +701,7 @@ bool CWsWorkunitsEx::onWUAction(IEspContext &context, IEspWUActionRequest &req,
 
         Owned<IProperties> params = createProperties(true);
         params->setProp("BlockTillFinishTimer", req.getBlockTillFinishTimer());
-        if ((action == CECLWUActions_Restore) && !sashaServerIp.isEmpty())
+        if (((action == CECLWUActions_Restore) || (action == CECLWUActions_Archive)) && !sashaServerIp.isEmpty())
         {
             params->setProp("sashaServerIP", sashaServerIp.get());
             params->setProp("sashaServerPort", sashaServerPort);

+ 2 - 1
esp/smc/SMCLib/CMakeLists.txt

@@ -49,7 +49,8 @@ include_directories (
          ./../../../common/workunit 
          ./../../../common/remote 
          ./../../clients 
-         ./../../../dali/base 
+         ./../../../dali/base
+         ./../../../dali/sasha
          ./../../clients/LoggingClient 
          ./../../bindings 
          ./../../bindings/SOAP/xpp 

+ 42 - 0
esp/smc/SMCLib/TpWrapper.cpp

@@ -24,6 +24,8 @@
 #include <stdio.h>
 #include "workunit.hpp"
 #include "exception_util.hpp"
+#include "portlist.h"
+
 
 const char* MSG_FAILED_GET_ENVIRONMENT_INFO = "Failed to get environment information.";
 
@@ -1968,3 +1970,43 @@ void CTpWrapper::getAttPath(const char* Path,StringBuffer& returnStr)
     JBASE64_Decode(Path, returnStr);
 }
 
+extern TPWRAPPER_API ISashaCommand* archiveOrRestoreWorkunits(StringArray& wuids, IProperties* params, bool archive, bool dfu)
+{
+
+    StringBuffer sashaAddress;
+    unsigned port = DEFAULT_SASHA_PORT;
+    if (params && params->hasProp("sashaServerIP"))
+    {
+        sashaAddress.set(params->queryProp("sashaServerIP"));
+        port = params->getPropInt("sashaServerPort", DEFAULT_SASHA_PORT);
+    }
+    else
+    {
+        IArrayOf<IConstTpSashaServer> sashaservers;
+        CTpWrapper dummy;
+        dummy.getTpSashaServers(sashaservers);
+        if (sashaservers.ordinality() == 0)
+            throw makeStringException(ECLWATCH_ARCHIVE_SERVER_NOT_FOUND, "Sasha server not found");
+
+        IArrayOf<IConstTpMachine>& sashaservermachine = sashaservers.item(0).getTpMachines();
+        sashaAddress.set(sashaservermachine.item(0).getNetaddress());
+        if (sashaAddress.isEmpty())
+            throw makeStringException(ECLWATCH_ARCHIVE_SERVER_NOT_FOUND, "Sasha address not found");
+    }
+
+    SocketEndpoint ep(sashaAddress.str(), port);
+    Owned<INode> node = createINode(ep);
+    Owned<ISashaCommand> cmd = createSashaCommand();
+    cmd->setAction(archive ? SCA_ARCHIVE : SCA_RESTORE);
+    if (dfu)
+        cmd->setDFU(true);
+
+    ForEachItemIn(i, wuids)
+        cmd->addId(wuids.item(i));
+
+    if (!cmd->send(node, 1*60*1000))
+        throw MakeStringException(ECLWATCH_CANNOT_CONNECT_ARCHIVE_SERVER,
+            "Sasha (%s) took too long to respond for Archive/restore workunit.",
+            sashaAddress.str());
+    return cmd.getClear();
+}

+ 3 - 0
esp/smc/SMCLib/TpWrapper.hpp

@@ -41,6 +41,7 @@
 #include "ws_topology.hpp"
 #include <string>
 #include <set>
+#include "sacmd.hpp"
 
 
 using std::set;
@@ -209,5 +210,7 @@ private:
     Owned<IRemoteConnection> conn;
 };
 
+extern TPWRAPPER_API ISashaCommand* archiveOrRestoreWorkunits(StringArray& wuids, IProperties* params, bool archive, bool dfu);
+
 #endif //_ESPWIZ_TpWrapper_HPP__