Browse Source

HPCC-17256 Add paging/sorting to WsWorkunits.WUGetNumFileToCopy

Signed-off-by: wangkx <kevin.wang@lexisnexis.com>
wangkx 8 years ago
parent
commit
c54a1af2fb
2 changed files with 76 additions and 12 deletions
  1. 9 1
      esp/scm/ws_workunits.ecm
  2. 67 11
      esp/services/ws_workunits/ws_workunitsQuerySets.cpp

+ 9 - 1
esp/scm/ws_workunits.ecm

@@ -1809,16 +1809,24 @@ ESPresponse [exceptions_inline, nil_remove] WUGetArchiveFileResponse
 ESPrequest [nil_remove] WUGetNumFileToCopyRequest
 {
     string ClusterName;
+    [min_ver("1.69")] int64 PageSize(0);
+    [min_ver("1.69")] int64 PageStartFrom(0);
+    [min_ver("1.69")] string Sortby;
+    [min_ver("1.69")] bool Descending(false);
+    [min_ver("1.69")] int64 CacheHint;
+    
 };
 
 ESPresponse [exceptions_inline, nil_remove] WUGetNumFileToCopyResponse
 {
     ESParray<ESPstruct ClusterEndpoint, Endpoint> Endpoints;
+    [min_ver("1.69")] int64 CacheHint;
+    [min_ver("1.69")] int64 Total;
 };
 
 ESPservice [
     auth_feature("DEFERRED"), //This declares that the method logic handles feature level authorization
-    version("1.68"), default_client_version("1.68"),
+    version("1.69"), default_client_version("1.69"),
     noforms,exceptions_inline("./smc_xslt/exceptions.xslt"),use_method_name] WsWorkunits
 {
     ESPmethod [resp_xsl_default("/esp/xslt/workunits.xslt")]     WUQuery(WUQueryRequest, WUQueryResponse);

+ 67 - 11
esp/services/ws_workunits/ws_workunitsQuerySets.cpp

@@ -2835,33 +2835,89 @@ bool CWsWorkunitsEx::onWUUpdateQueryEntry(IEspContext& context, IEspWUUpdateQuer
 
 bool CWsWorkunitsEx::onWUGetNumFileToCopy(IEspContext& context, IEspWUGetNumFileToCopyRequest& req, IEspWUGetNumFileToCopyResponse& resp)
 {
+    class CWUGetNumFileToCopyPager : public CSimpleInterface, implements IElementsPager
+    {
+        StringAttr clusterName;
+        StringAttr sortOrder;
+    public:
+        IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
+
+        CWUGetNumFileToCopyPager(const char* _clusterName, const char *_sortOrder)
+            : clusterName(_clusterName), sortOrder(_sortOrder) { };
+
+        virtual IRemoteConnection* getElements(IArrayOf<IPropertyTree> &elements)
+        {
+            SocketEndpointArray servers;
+            getRoxieProcessServers(clusterName.get(), servers);
+            if (servers.length() < 1)
+            {
+                PROGLOG("WUGetNumFileToCopy: Process Server not found for %s", clusterName.get());
+                return NULL;
+            }
+            Owned<IPropertyTree> result = sendRoxieControlAllNodes(servers.item(0), "<control:numfilestoprocess/>", false, ROXIELOCKCONNECTIONTIMEOUT);
+            if (!result)
+            {
+                PROGLOG("WUGetNumFileToCopy: Empty result received for cluster %s", clusterName.get());
+                return NULL;
+            }
+            Owned<IPropertyTreeIterator> iter = result->getElements("*");
+            if (!iter)
+                return NULL;
+
+            StringArray unknownAttributes;
+            sortElements(iter, sortOrder.get(), NULL, NULL, unknownAttributes, elements);
+            return NULL;
+        }
+        virtual bool allMatchingElementsReceived() { return true; } //For now, roxie always returns all of matched items.
+    };
+
     try
     {
-        Owned<IPropertyTree> result;
         StringBuffer clusterName = req.getClusterName();
         if (clusterName.isEmpty())
             throw MakeStringException(ECLWATCH_CANNOT_RESOLVE_CLUSTER_NAME, "Cluster not specified");
 
-        SocketEndpointArray servers;
-        getRoxieProcessServers(clusterName.str(), servers);
-        if (servers.length() < 1)
-            throw MakeStringException(ECLWATCH_CANNOT_RESOLVE_CLUSTER_NAME, "Process Server not found for %s", clusterName.str());
-        result.setown(sendRoxieControlAllNodes(servers.item(0), "<control:numfilestoprocess/>", false, ROXIELOCKCONNECTIONTIMEOUT));
-        if (!result)
-            throw MakeStringException(ECLWATCH_CANNOT_RESOLVE_CLUSTER_NAME, "Empty result received for cluster %s", clusterName.str());
+        StringBuffer so;
+        bool descending = req.getDescending();
+        if (descending)
+            so.set("-");
+        const char *sortBy =  req.getSortby();
+        if (!isEmptyString(sortBy) && strieq(sortBy, "URL"))
+            so.append("?@ep");
+        else if (!isEmptyString(sortBy) && strieq(sortBy, "Status"))
+            so.append("?Status");
+        else
+            so.append("#FilesToProcess/@value");
+
+        unsigned pageSize = req.getPageSize();
+        unsigned pageStartFrom = req.getPageStartFrom();
+        if(pageSize < 1)
+            pageSize = 100;
+
+        __int64 cacheHint = 0;
+        if (!req.getCacheHint_isNull())
+            cacheHint = req.getCacheHint();
+
+        unsigned numberOfEndpoints = 0;
+        IArrayOf<IPropertyTree> results;
+        Owned<IElementsPager> elementsPager = new CWUGetNumFileToCopyPager(clusterName.str(), so.str());
+        getElementsPaged(elementsPager, pageStartFrom, pageSize, NULL, "", &cacheHint, results, &numberOfEndpoints, NULL, false);
 
         IArrayOf<IEspClusterEndpoint> endpoints;
-        Owned<IPropertyTreeIterator> it = result->getElements("Endpoint");
-        ForEach(*it)
+        ForEachItemIn(i, results)
         {
-            IPropertyTree& item = it->query();
+            IPropertyTree &item = results.item(i);
+
             Owned<IEspClusterEndpoint> endpoint = createClusterEndpoint();
             endpoint->setURL(item.queryProp("@ep"));
             endpoint->setStatus(item.queryProp("Status"));
             endpoint->setNumQueryFileToCopy(item.getPropInt("FilesToProcess/@value", 0));
             endpoints.append(*endpoint.getClear());
         }
+
         resp.setEndpoints(endpoints);
+        resp.setCacheHint(cacheHint);
+        resp.setTotal(numberOfEndpoints);
     }
     catch(IException* e)
     {