Browse Source

Merge pull request #5858 from wangkx/h11063a

HPCC-11063 Refactor doWUQueryFromArchive() of WsWorkunits

Reviewed-By: Jake Smith <jake.smith@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 11 years ago
parent
commit
b515afe102

+ 4 - 4
esp/services/ws_workunits/ws_workunitsHelpers.cpp

@@ -2107,7 +2107,7 @@ DataCacheElement* DataCache::lookup(IEspContext &context, const char* filter, un
         if (list_iter == cache.end())
             break;
 
-        DataCacheElement* awu = list_iter->getLink();
+        DataCacheElement* awu = list_iter->get();
         if (!awu || (awu->m_timeCached > timeNow))
             break;
 
@@ -2174,7 +2174,7 @@ ArchivedWuCacheElement* ArchivedWuCache::lookup(IEspContext &context, const char
         if (list_iter == cache.end())
             break;
 
-        ArchivedWuCacheElement* awu = list_iter->getLink();
+        ArchivedWuCacheElement* awu = list_iter->get();
         if (awu && !stricmp(sashaUpdatedWhen, awu->m_sashaUpdatedWhen.c_str()) && (awu->m_timeCached > timeNow))
             break;
 
@@ -2192,12 +2192,12 @@ ArchivedWuCacheElement* ArchivedWuCache::lookup(IEspContext &context, const char
     return NULL;
 }
 
-void ArchivedWuCache::add(const char* filter, const char* sashaUpdatedWhen, bool hasNextPage, IArrayOf<IEspECLWorkunit>& wus)
+void ArchivedWuCache::add(const char* filter, const char* sashaUpdatedWhen, bool hasNextPage, unsigned numWUsReturned, IArrayOf<IEspECLWorkunit>& wus)
 {
     CriticalBlock block(crit);
 
     //Save new data
-    Owned<ArchivedWuCacheElement> e=new ArchivedWuCacheElement(filter, sashaUpdatedWhen, hasNextPage, /*data.str(),*/ wus);
+    Owned<ArchivedWuCacheElement> e=new ArchivedWuCacheElement(filter, sashaUpdatedWhen, hasNextPage, numWUsReturned, wus);
     if (cacheSize > 0)
     {
         if (cache.size() >= cacheSize)

+ 11 - 3
esp/services/ws_workunits/ws_workunitsHelpers.hpp

@@ -261,11 +261,18 @@ struct DataCache: public CInterface, implements IInterface
     size32_t cacheSize;
 };
 
+interface IArchivedWUsReader : extends IInterface
+{
+    virtual void getArchivedWUs(IArrayOf<IEspECLWorkunit>& results) = 0;
+    virtual bool getHasMoreWU() = 0;
+    virtual unsigned getNumberOfWUsReturned() = 0;
+};
+
 struct ArchivedWuCacheElement: public CInterface, implements IInterface
 {
     IMPLEMENT_IINTERFACE;
-    ArchivedWuCacheElement(const char* filter, const char* sashaUpdatedWhen, bool hasNextPage, /*const char* data,*/ IArrayOf<IEspECLWorkunit>& wus):m_filter(filter),
-        m_sashaUpdatedWhen(sashaUpdatedWhen), m_hasNextPage(hasNextPage)/*, m_data(data)*/
+    ArchivedWuCacheElement(const char* filter, const char* sashaUpdatedWhen, bool hasNextPage, unsigned _numWUsReturned, IArrayOf<IEspECLWorkunit>& wus):m_filter(filter),
+        m_sashaUpdatedWhen(sashaUpdatedWhen), m_hasNextPage(hasNextPage), numWUsReturned(_numWUsReturned)
     {
         m_timeCached.setNow();
         if (wus.length() > 0)
@@ -283,6 +290,7 @@ struct ArchivedWuCacheElement: public CInterface, implements IInterface
     std::string m_filter;
     std::string m_sashaUpdatedWhen;
     bool m_hasNextPage;
+    unsigned numWUsReturned;
 
     CDateTime m_timeCached;
     IArrayOf<IEspECLWorkunit> m_results;
@@ -295,7 +303,7 @@ struct ArchivedWuCache: public CInterface, implements IInterface
     ArchivedWuCache(size32_t _cacheSize=0): cacheSize(_cacheSize){}
     ArchivedWuCacheElement* lookup(IEspContext &context, const char* filter, const char* sashaUpdatedWhen, unsigned timeOutMin);
 
-    void add(const char* filter, const char* sashaUpdatedWhen, bool hasNextPage, IArrayOf<IEspECLWorkunit>& wus);
+    void add(const char* filter, const char* sashaUpdatedWhen, bool hasNextPage, unsigned numWUsReturned, IArrayOf<IEspECLWorkunit>& wus);
 
     std::list<Linked<ArchivedWuCacheElement> > cache;
     CriticalSection crit;

+ 203 - 189
esp/services/ws_workunits/ws_workunitsService.cpp

@@ -1886,122 +1886,183 @@ void doWUQueryWithSort(IEspContext &context, IEspWUQueryRequest & req, IEspWUQue
 }
 
 void doWUQueryFromArchive(IEspContext &context, const char* sashaServerIP, unsigned sashaServerPort,
-       ArchivedWuCache &archivedWuCache, int cacheTime, IEspWUQueryRequest & req, IEspWUQueryResponse & resp)
-{
-    SecAccessFlags accessOwn;
-    SecAccessFlags accessOthers;
-    getUserWuAccessFlags(context, accessOwn, accessOthers, true);
+       ArchivedWuCache &archivedWuCache, unsigned cacheMinutes, IEspWUQueryRequest & req, IEspWUQueryResponse & resp)
+{
+    class CArchivedWUsReader : public CInterface, implements IArchivedWUsReader
+    {
+        IEspContext& context;
+        IEspWUQueryRequest& req;
+        unsigned pageFrom, pageSize;
+        StringAttr sashaServerIP;
+        unsigned sashaServerPort;
+        unsigned cacheMinutes;
+        StringBuffer filterStr;
+        ArchivedWuCache& archivedWuCache;
+        unsigned numberOfWUsReturned;
+        bool hasMoreWU;
+
+        void readDateFilters(StringBuffer& from, StringBuffer& to)
+        {
+            CDateTime timeFrom, timeTo;
+            if(notEmpty(req.getEndDate()))
+                timeTo.setString(req.getEndDate(), NULL, true);
+            else
+                timeTo.setNow();
 
-    __int64 pageSize = req.getPageSize();
-    if(pageSize < 1)
-        pageSize=100;
-    __int64 displayStart = req.getPageStartFrom();
-    __int64 displayEnd = displayStart + pageSize;
-    unsigned dateLimit = 0;
-    bool hasNextPage = true;
+            unsigned year, month, day, hour, minute, second, nano;
+            timeTo.getDate(year, month, day, true);
+            timeTo.getTime(hour, minute, second, nano, true);
+            to.setf("%4d%02d%02d%02d%02d", year, month, day, hour, minute);
 
-    SocketEndpoint ep;
-    if (sashaServerIP && *sashaServerIP)
-        ep.set(sashaServerIP, sashaServerPort);
-    else
-        getSashaNode(ep);
+            if(!notEmpty(req.getStartDate()))
+                return;
 
-    Owned<INode> sashaserver = createINode(ep);
+            timeFrom.setString(req.getStartDate(), NULL, true);
+            if (timeFrom >= timeTo)
+                return;
 
-    CDateTime wuTimeFrom, wuTimeTo;
-    if(notEmpty(req.getEndDate()))
-        wuTimeTo.setString(req.getEndDate(), NULL, true);
-    else
-        wuTimeTo.setNow();
+            unsigned year0, month0, day0, hour0, minute0, second0, nano0;
+            timeFrom.getDate(year0, month0, day0, true);
+            timeFrom.getTime(hour0, minute0, second0, nano0, true);
+            from.setf("%4d%02d%02d%02d%02d", year0, month0, day0, hour0, minute0);
 
-    if(notEmpty(req.getStartDate()))
-    {
-        wuTimeFrom.setString(req.getStartDate(), NULL, true);
-        dateLimit = 1;
-    }
+            return;
+        }
 
-    IArrayOf<IEspECLWorkunit> results;
+        bool addToFilterString(const char *name, const char *value)
+        {
+            if (isEmpty(name) || isEmpty(value))
+                return false;
+            if (filterStr.length())
+                filterStr.append(';');
+            filterStr.append(name).append("=").append(value);
+            return true;
+        }
 
-    StringBuffer filter;
-    addToQueryString(filter, "cluster", req.getCluster(), ';');
-    addToQueryString(filter, "owner", req.getOwner(), ';');
-    addToQueryString(filter, "jobName", req.getJobname(), ';');
-    addToQueryString(filter, "state", req.getState(), ';');
-    StringBuffer s;
-    if (!req.getLastNDays_isNull() && req.getLastNDays()>0)
-        addToQueryString(filter, "LastNDays", s.clear().append(req.getLastNDays()).str(), ';');
-    else
-    {
-        addToQueryString(filter, "wuTimeFrom", req.getStartDate(), ';');
-        addToQueryString(filter, "wuTimeTo", req.getEndDate(), ';');
-    }
-    addToQueryString(filter, "displayStart", s.append(displayStart).str(), ';');
-    addToQueryString(filter, "pageSize", s.clear().append(pageSize).str(), ';');
+        bool addToFilterString(const char *name, unsigned value)
+        {
+            if (isEmpty(name))
+                return false;
+            if (filterStr.length())
+                filterStr.append(';');
+            filterStr.append(name).append("=").append(value);
+            return true;
+        }
 
-    Owned<ArchivedWuCacheElement> found = archivedWuCache.lookup(context, filter, "AddWhenAvailable", cacheTime);
-    if (found)
-    {
-        hasNextPage = found->m_hasNextPage;
-        if (found->m_results.length())
+        void setFilterString()
         {
-            ForEachItemIn(ai, found->m_results)
+            addToFilterString("cluster", req.getCluster());
+            addToFilterString("owner", req.getOwner());
+            addToFilterString("jobName", req.getJobname());
+            addToFilterString("state", req.getState());
+            addToFilterString("timeFrom", req.getStartDate());
+            addToFilterString("timeTo", req.getEndDate());
+            addToFilterString("pageStart", pageFrom);
+            addToFilterString("pageSize", pageSize);
+            if (sashaServerIP && *sashaServerIP)
             {
-                Owned<IEspECLWorkunit> info= createECLWorkunit("","");
-                info->copy(found->m_results.item(ai));
-                results.append(*info.getClear());
+                addToFilterString("sashaServerIP", sashaServerIP.get());
+                addToFilterString("sashaServerPort", sashaServerPort);
             }
         }
-    }
-    else
-    {
-        IArrayOf<IEspECLWorkunit> resultList;
 
-        CDateTime timeTo = wuTimeTo;
-        __int64 totalWus = 0;
-        bool complete = false;
-        while (!complete)
-        {
-            CDateTime timeFrom = timeTo;
-            timeFrom.adjustTime(-1439); //one day earlier
-            if (dateLimit > 0 && wuTimeFrom > timeFrom)
-                timeFrom = wuTimeFrom;
+        void setSashaCommand(INode* sashaserver, ISashaCommand* cmd)
+        {
+            cmd->setAction(SCA_LIST);
+            cmd->setOutputFormat("owner,jobname,cluster,state");
+            cmd->setOnline(false);
+            cmd->setArchived(true);
+            cmd->setStart(pageFrom);
+            cmd->setLimit(pageSize+1); //read an extra WU to check hasMoreWU
+            if (notEmpty(req.getCluster()))
+                cmd->setCluster(req.getCluster());
+            if (notEmpty(req.getOwner()))
+                cmd->setOwner(req.getOwner());
+            if (notEmpty(req.getJobname()))
+                cmd->setJobName(req.getJobname());
+            if (notEmpty(req.getState()))
+                cmd->setState(req.getState());
+
+            StringBuffer timeFrom, timeTo;
+            readDateFilters(timeFrom, timeTo);
+            if (timeFrom.length())
+                cmd->setAfter(timeFrom.str());
+            if (timeTo.length())
+                cmd->setBefore(timeTo.str());
+
+            return;
+        }
+
+        void addArchivedWU(IArrayOf<IEspECLWorkunit>& archivedWUs, StringArray& wuDataArray, bool canAccess)
+        {
+            Owned<IEspECLWorkunit> info= createECLWorkunit("","");
+            const char* wuid = wuDataArray.item(0);
+            info->setWuid(wuid);
+            if (!canAccess)
+                info->setState("<Hidden>");
+            else
+            {
+                if (notEmpty(wuDataArray.item(1)))
+                    info->setOwner(wuDataArray.item(1));
+                if (notEmpty(wuDataArray.item(2)))
+                    info->setJobname(wuDataArray.item(2));
+                if (notEmpty(wuDataArray.item(3)))
+                    info->setCluster(wuDataArray.item(3));
+                if (notEmpty(wuDataArray.item(4)))
+                    info->setState(wuDataArray.item(4));
+            }
 
-            unsigned year0, month0, day0, hour0, minute0, second0, nano0;
-            timeFrom.getDate(year0, month0, day0, true);
-            timeFrom.getTime(hour0, minute0, second0, nano0, true);
-            VStringBuffer wuFrom("%4d%02d%02d%02d%02d", year0, month0, day0, hour0, minute0);
+            //Sort WUs by WUID
+            ForEachItemIn(i, archivedWUs)
+            {
+                IEspECLWorkunit& w = archivedWUs.item(i);
+                if (!isEmpty(w.getWuid()) && strcmp(wuid, w.getWuid())>0)
+                {
+                    archivedWUs.add(*info.getClear(), (aindex_t) i);
+                    return;
+                }
+            }
+            archivedWUs.append(*info.getClear());
+            return;
+        }
 
-            unsigned year, month, day, hour, minute, second, nano;
-            timeTo.getDate(year, month, day, true);
-            timeTo.getTime(hour, minute, second, nano, true);
-            VStringBuffer wuTo("%4d%02d%02d%02d%02d", year, month, day, hour, minute);
+    public:
+        IMPLEMENT_IINTERFACE_USING(CInterface);
 
-            __int64 begin = 0;
-            unsigned limit = 1000;
-            bool continueSashaLoop = true;
-            while (continueSashaLoop)
-            {
-                Owned<ISashaCommand> cmd = createSashaCommand();
+        CArchivedWUsReader(IEspContext& _context, const char* _sashaServerIP, unsigned _sashaServerPort, ArchivedWuCache& _archivedWuCache,
+            unsigned _cacheMinutes, unsigned _pageFrom, unsigned _pageSize, IEspWUQueryRequest& _req)
+            : context(_context), sashaServerIP(_sashaServerIP), sashaServerPort(_sashaServerPort),
+            archivedWuCache(_archivedWuCache), cacheMinutes(_cacheMinutes), pageFrom(_pageFrom), pageSize(_pageSize), req(_req)
+        {
+            hasMoreWU = false;
+            numberOfWUsReturned = 0;
+        };
 
-                cmd->setAction(SCA_LIST);
-                cmd->setOnline(false);
-                cmd->setArchived(true);
-                cmd->setAfter(wuFrom.str());
-                cmd->setBefore(wuTo.str());
-                cmd->setStart((unsigned)begin);
-                cmd->setLimit(limit);
-
-                if (notEmpty(req.getCluster()))
-                    cmd->setCluster(req.getCluster());
-                if (notEmpty(req.getOwner()))
-                    cmd->setOwner(req.getOwner());
-                if (notEmpty(req.getJobname()))
-                    cmd->setJobName(req.getJobname());
-                if (notEmpty(req.getState()))
-                    cmd->setState(req.getState());
-
-                cmd->setOutputFormat("owner,jobname,cluster,state");
+        void getArchivedWUs(IArrayOf<IEspECLWorkunit>& archivedWUs)
+        {
+            setFilterString();
+            Owned<ArchivedWuCacheElement> cachedResults = archivedWuCache.lookup(context, filterStr, "AddWhenAvailable", cacheMinutes);
+            if (cachedResults)
+            {
+                hasMoreWU = cachedResults->m_hasNextPage;
+                numberOfWUsReturned = cachedResults->numWUsReturned;
+                if (cachedResults->m_results.length())
+                {
+                    ForEachItemIn(ai, cachedResults->m_results)
+                        archivedWUs.append(*LINK(&cachedResults->m_results.item(ai)));
+                }
+            }
+            else
+            {
+                SocketEndpoint ep;
+                if (sashaServerIP && *sashaServerIP)
+                    ep.set(sashaServerIP, sashaServerPort);
+                else
+                    getSashaNode(ep);
+                Owned<INode> sashaserver = createINode(ep);
 
+                Owned<ISashaCommand> cmd = createSashaCommand();
+                setSashaCommand(sashaserver, cmd);
                 if (!cmd->send(sashaserver))
                 {
                     StringBuffer msg("Cannot connect to archive server at ");
@@ -2009,115 +2070,68 @@ void doWUQueryFromArchive(IEspContext &context, const char* sashaServerIP, unsig
                     throw MakeStringException(ECLWATCH_CANNOT_CONNECT_ARCHIVE_SERVER, "%s", msg.str());
                 }
 
-                unsigned actualCount = cmd->numIds();
-                if (actualCount < 1)
-                    break;
-
-                totalWus += actualCount;
-
-                if (actualCount < limit)
-                    continueSashaLoop = false;
+                numberOfWUsReturned = cmd->numIds();
+                hasMoreWU = (numberOfWUsReturned > pageSize);
+                if (hasMoreWU)
+                    numberOfWUsReturned--;
 
-                for (unsigned ii=0; ii<actualCount; ii++)
+                if (numberOfWUsReturned > 0)
                 {
-                    const char *csline = cmd->queryId(ii);
-                    if (!csline)
-                        continue;
+                    SecAccessFlags accessOwn, accessOthers;
+                    getUserWuAccessFlags(context, accessOwn, accessOthers, true);
 
-                    StringArray wuidArray;
-                    wuidArray.appendList(csline, ",");
-
-                    if (chooseWuAccessFlagsByOwnership(context.queryUserId(), cmd->queryOwner(), accessOwn, accessOthers) < SecAccess_Read)
-                        continue;
-
-                    const char* wuid = wuidArray.item(0);
-                    if (isEmpty(wuid))
-                        continue;
-
-                    __int64 addToPos = -1;
-                    ForEachItemIn(ridx, resultList)
+                    for (unsigned i=0; i<numberOfWUsReturned; i++)
                     {
-                        IEspECLWorkunit& w = resultList.item(ridx);
-                        if (isEmpty(w.getWuid()))
+                        const char *csline = cmd->queryId(i);
+                        if (!csline || !*csline)
                             continue;
 
-                        if (strcmp(wuid, w.getWuid())>0)
+                        StringArray wuDataArray;
+                        wuDataArray.appendList(csline, ",");
+
+                        const char* wuid = wuDataArray.item(0);
+                        if (isEmpty(wuid))
                         {
-                            addToPos = ridx;
-                            break;
+                            WARNLOG("Empty WUID in SCA_LIST response");
+                            continue;
                         }
+
+                        addArchivedWU(archivedWUs, wuDataArray, chooseWuAccessFlagsByOwnership(context.queryUserId(), wuDataArray.item(1), accessOwn, accessOthers) >= SecAccess_Read);
                     }
 
-                    if (addToPos < 0 && (ridx > displayEnd))
-                        continue;
-
-                    Owned<IEspECLWorkunit> info= createECLWorkunit("","");
-                    info->setWuid(wuid);
-                    info->setArchived(true);
-                    if (notEmpty(wuidArray.item(1)))
-                          info->setOwner(wuidArray.item(1));
-                    if (notEmpty(wuidArray.item(2)))
-                        info->setJobname(wuidArray.item(2));
-                    if (notEmpty(wuidArray.item(3)))
-                          info->setCluster(wuidArray.item(3));
-                    if (notEmpty(wuidArray.item(4)))
-                          info->setState(wuidArray.item(4));
-
-                    if (addToPos < 0)
-                        resultList.append(*info.getClear());
-                    else
-                        resultList.add(*info.getClear(), (aindex_t) addToPos);
-                    if (resultList.length() > displayEnd)
-                        resultList.pop();
+                    archivedWuCache.add(filterStr, "AddWhenAvailable", hasMoreWU, numberOfWUsReturned, archivedWUs);
                 }
-
-                begin += limit;
-            }
-
-            timeTo.adjustTime(-1440);//one day earlier
-            if (dateLimit > 0 && wuTimeFrom > timeTo) //we reach the date limit
-            {
-                if (totalWus <= displayEnd)
-                    hasNextPage = false;
-                complete = true;
             }
-            else if ( resultList.length() >= displayEnd) //we have all we need
-                complete = true;
-        }
-
-        if (displayEnd > resultList.length())
-            displayEnd = resultList.length();
-
-        for (aindex_t i = (aindex_t)displayStart; i < (aindex_t)displayEnd; i++)
-        {
-            Owned<IEspECLWorkunit> info = createECLWorkunit("","");
-            info->copy(resultList.item(i));
-            results.append(*info.getClear());
-        }
+            return;
+        };
 
-        archivedWuCache.add(filter, "AddWhenAvailable", hasNextPage, results);
-    }
+        bool getHasMoreWU() { return hasMoreWU; };
+        unsigned getNumberOfWUsReturned() { return numberOfWUsReturned; };
+    };
 
-    resp.setPageStartFrom(displayStart+1);
-    resp.setPageEndAt(displayEnd);
+    unsigned pageStart = (unsigned) req.getPageStartFrom();
+    unsigned pageSize = (unsigned) req.getPageSize();
+    if(pageSize < 1)
+        pageSize=500;
+    IArrayOf<IEspECLWorkunit> archivedWUs;
+    Owned<IArchivedWUsReader> archiveWUsReader = new CArchivedWUsReader(context, sashaServerIP, sashaServerPort, archivedWuCache,
+        cacheMinutes, pageStart, pageSize, req);
+    archiveWUsReader->getArchivedWUs(archivedWUs);
 
-    if(dateLimit < 1 || hasNextPage)
-        resp.setNextPage(displayStart + pageSize);
-    else
-        resp.setNextPage(-1);
+    resp.setWorkunits(archivedWUs);
+    resp.setNumWUs(archiveWUsReader->getNumberOfWUsReturned());
 
-    if(displayStart > 0)
-    {
+    resp.setType("archived only");
+    resp.setPageSize(pageSize);
+    resp.setPageStartFrom(pageStart+1);
+    resp.setPageEndAt(pageStart + archiveWUsReader->getNumberOfWUsReturned());
+    if(pageStart > 0)
+    { //This is not the first page;
         resp.setFirst(false);
-        if (displayStart - pageSize > 0)
-            resp.setPrevPage(displayStart - pageSize);
-        else
-            resp.setPrevPage(0);
+        resp.setPrevPage((pageStart > pageSize) ? pageStart - pageSize: 0);
     }
-
-    resp.setPageSize(pageSize);
-    resp.setWorkunits(results);
-    resp.setType("archived only");
+    if (archiveWUsReader->getHasMoreWU())
+        resp.setNextPage(pageStart + pageSize);
     return;
 }