Jelajahi Sumber

HPCC-15121 Add WsWorkunits.WUGetNumFileToCopy

Signed-off-by: wangkx <kevin.wang@lexisnexis.com>
wangkx 8 tahun lalu
induk
melakukan
35f3cba366

+ 20 - 1
esp/scm/ws_workunits.ecm

@@ -302,6 +302,13 @@ ESPStruct NetworkNode
     string id;
 };
 
+ESPStruct [nil_remove] ClusterEndpoint
+{
+    string URL;
+    string Status;
+    int NumQueryFileToCopy;
+};
+
 ESPStruct LogicalFileUpload
 {
     int     Type;
@@ -1767,9 +1774,20 @@ ESPresponse [exceptions_inline, nil_remove] WUGetArchiveFileResponse
     string Message;
 };
 
+ESPrequest [nil_remove] WUGetNumFileToCopyRequest
+{
+    string ClusterName; //for AllNodes
+    string ClusterURL;
+};
+
+ESPresponse [exceptions_inline, nil_remove] WUGetNumFileToCopyResponse
+{
+    ESParray<ESPstruct ClusterEndpoint, Endpoint> Endpoints;
+};
+
 ESPservice [
     auth_feature("DEFERRED"), //This declares that the method logic handles feature level authorization
-    version("1.60"), default_client_version("1.60"),
+    version("1.61"), default_client_version("1.61"),
     noforms,exceptions_inline("./smc_xslt/exceptions.xslt"),use_method_name] WsWorkunits
 {
     ESPmethod [resp_xsl_default("/esp/xslt/workunits.xslt")]     WUQuery(WUQueryRequest, WUQueryResponse);
@@ -1846,6 +1864,7 @@ ESPservice [
     ESPmethod WUGetStats(WUGetStatsRequest, WUGetStatsResponse);
     ESPmethod [min_ver("1.57")] WUListArchiveFiles(WUListArchiveFilesRequest, WUListArchiveFilesResponse);
     ESPmethod [min_ver("1.57")] WUGetArchiveFile(WUGetArchiveFileRequest, WUGetArchiveFileResponse);
+    ESPmethod [min_ver("1.61")] WUGetNumFileToCopy(WUGetNumFileToCopyRequest, WUGetNumFileToCopyResponse);
 };
 
 

+ 66 - 0
esp/services/ws_workunits/ws_workunitsQuerySets.cpp

@@ -27,6 +27,7 @@
 #include "dfuutil.hpp"
 #include "dautils.hpp"
 #include "httpclient.hpp"
+#include "portlist.h" //ROXIE_SERVER_PORT
 
 #define DALI_FILE_LOOKUP_TIMEOUT (1000*15*1)  // 15 seconds
 
@@ -2740,3 +2741,68 @@ bool CWsWorkunitsEx::onWUUpdateQueryEntry(IEspContext& context, IEspWUUpdateQuer
     }
     return true;
 }
+
+bool CWsWorkunitsEx::onWUGetNumFileToCopy(IEspContext& context, IEspWUGetNumFileToCopyRequest& req, IEspWUGetNumFileToCopyResponse& resp)
+{
+    try
+    {
+        Owned<IPropertyTree> result;
+        StringBuffer clusterName = req.getClusterName();
+        if (!clusterName.isEmpty())
+        {
+            SocketEndpointArray servers;
+            getRoxieProcessServers(clusterName.str(), servers);
+            if (servers.length() < 1)
+                throw MakeStringException(ECLWATCH_CANNOT_RESOLVE_CLUSTER_NAME, "Process Server not found for %s", clusterName.str());
+            result.setown(sendRoxieControlAllNodes(servers.item(0), "<control:numfilestoprocess/>", false, ROXIELOCKCONNECTIONTIMEOUT));
+        }
+        else
+        {
+            StringBuffer url = req.getClusterURL();
+            if (url.isEmpty())
+                throw MakeStringException(ECLWATCH_CANNOT_RESOLVE_CLUSTER_NAME, "Cluster not specified");
+
+            int port;
+            StringBuffer netAddress;
+            const char* start = url.str();
+            const char* split = strchr(start, ':');
+            if (!split)
+            {
+                netAddress.set(start);
+                port = ROXIE_SERVER_PORT;
+            }
+            else
+            {
+                netAddress.append(split - start, start);
+                port = atoi(split+1);
+            }
+
+            SocketEndpoint ep;
+            SocketEndpointArray eps;
+            ep.set(netAddress.str(), port);
+            eps.append(ep);
+            Owned<ISocket> sock = ISocket::connect_timeout(eps.item(0), ROXIELOCKCONNECTIONTIMEOUT);
+            result.setown(sendRoxieControlQuery(sock, "<control:numfilestoprocess/>", ROXIELOCKCONNECTIONTIMEOUT));
+        }
+        if (!result)
+            throw MakeStringException(ECLWATCH_CANNOT_RESOLVE_CLUSTER_NAME, "Empty result received for cluster %s", clusterName.str());
+
+        IArrayOf<IEspClusterEndpoint> endpoints;
+        Owned<IPropertyTreeIterator> it = result->getElements("Endpoint");
+        ForEach(*it)
+        {
+            IPropertyTree& item = it->query();
+            Owned<IEspClusterEndpoint> endpoint = createClusterEndpoint();
+            endpoint->setURL(item.queryProp("@ep"));
+            endpoint->setStatus(item.queryProp("Status"));
+            endpoint->setNumQueryFileToCopy(item.getPropInt("FilesToProcess/@value", 0));
+            endpoints.append(*endpoint.getClear());
+        }
+        resp.setEndpoints(endpoints);
+    }
+    catch(IException* e)
+    {
+        FORWARDEXCEPTION(context, e,  ECLWATCH_INTERNAL_ERROR);
+    }
+    return true;
+}

+ 1 - 0
esp/services/ws_workunits/ws_workunitsService.hpp

@@ -243,6 +243,7 @@ public:
     bool onWUGetGraph(IEspContext& context, IEspWUGetGraphRequest& req, IEspWUGetGraphResponse& resp);
     bool onWUGraphTiming(IEspContext& context, IEspWUGraphTimingRequest& req, IEspWUGraphTimingResponse& resp);
     bool onWUGetDependancyTrees(IEspContext& context, IEspWUGetDependancyTreesRequest& req, IEspWUGetDependancyTreesResponse& resp);
+    bool onWUGetNumFileToCopy(IEspContext &context, IEspWUGetNumFileToCopyRequest &req, IEspWUGetNumFileToCopyResponse &resp);
 
     bool onWUListLocalFileRequired(IEspContext& context, IEspWUListLocalFileRequiredRequest& req, IEspWUListLocalFileRequiredResponse& resp);
     bool onWUAddLocalFileToWorkunit(IEspContext& context, IEspWUAddLocalFileToWorkunitRequest& req, IEspWUAddLocalFileToWorkunitResponse& resp);