|
@@ -1886,122 +1886,183 @@ void doWUQueryWithSort(IEspContext &context, IEspWUQueryRequest & req, IEspWUQue
|
|
|
}
|
|
|
|
|
|
void doWUQueryFromArchive(IEspContext &context, const char* sashaServerIP, unsigned sashaServerPort,
|
|
|
- ArchivedWuCache &archivedWuCache, int cacheTime, IEspWUQueryRequest & req, IEspWUQueryResponse & resp)
|
|
|
-{
|
|
|
- SecAccessFlags accessOwn;
|
|
|
- SecAccessFlags accessOthers;
|
|
|
- getUserWuAccessFlags(context, accessOwn, accessOthers, true);
|
|
|
+ ArchivedWuCache &archivedWuCache, unsigned cacheMinutes, IEspWUQueryRequest & req, IEspWUQueryResponse & resp)
|
|
|
+{
|
|
|
+ class CArchivedWUsReader : public CInterface, implements IArchivedWUsReader
|
|
|
+ {
|
|
|
+ IEspContext& context;
|
|
|
+ IEspWUQueryRequest& req;
|
|
|
+ unsigned pageFrom, pageSize;
|
|
|
+ StringAttr sashaServerIP;
|
|
|
+ unsigned sashaServerPort;
|
|
|
+ unsigned cacheMinutes;
|
|
|
+ StringBuffer filterStr;
|
|
|
+ ArchivedWuCache& archivedWuCache;
|
|
|
+ unsigned numberOfWUsReturned;
|
|
|
+ bool hasMoreWU;
|
|
|
+
|
|
|
+ void readDateFilters(StringBuffer& from, StringBuffer& to)
|
|
|
+ {
|
|
|
+ CDateTime timeFrom, timeTo;
|
|
|
+ if(notEmpty(req.getEndDate()))
|
|
|
+ timeTo.setString(req.getEndDate(), NULL, true);
|
|
|
+ else
|
|
|
+ timeTo.setNow();
|
|
|
|
|
|
- __int64 pageSize = req.getPageSize();
|
|
|
- if(pageSize < 1)
|
|
|
- pageSize=100;
|
|
|
- __int64 displayStart = req.getPageStartFrom();
|
|
|
- __int64 displayEnd = displayStart + pageSize;
|
|
|
- unsigned dateLimit = 0;
|
|
|
- bool hasNextPage = true;
|
|
|
+ unsigned year, month, day, hour, minute, second, nano;
|
|
|
+ timeTo.getDate(year, month, day, true);
|
|
|
+ timeTo.getTime(hour, minute, second, nano, true);
|
|
|
+ to.setf("%4d%02d%02d%02d%02d", year, month, day, hour, minute);
|
|
|
|
|
|
- SocketEndpoint ep;
|
|
|
- if (sashaServerIP && *sashaServerIP)
|
|
|
- ep.set(sashaServerIP, sashaServerPort);
|
|
|
- else
|
|
|
- getSashaNode(ep);
|
|
|
+ if(!notEmpty(req.getStartDate()))
|
|
|
+ return;
|
|
|
|
|
|
- Owned<INode> sashaserver = createINode(ep);
|
|
|
+ timeFrom.setString(req.getStartDate(), NULL, true);
|
|
|
+ if (timeFrom >= timeTo)
|
|
|
+ return;
|
|
|
|
|
|
- CDateTime wuTimeFrom, wuTimeTo;
|
|
|
- if(notEmpty(req.getEndDate()))
|
|
|
- wuTimeTo.setString(req.getEndDate(), NULL, true);
|
|
|
- else
|
|
|
- wuTimeTo.setNow();
|
|
|
+ unsigned year0, month0, day0, hour0, minute0, second0, nano0;
|
|
|
+ timeFrom.getDate(year0, month0, day0, true);
|
|
|
+ timeFrom.getTime(hour0, minute0, second0, nano0, true);
|
|
|
+ from.setf("%4d%02d%02d%02d%02d", year0, month0, day0, hour0, minute0);
|
|
|
|
|
|
- if(notEmpty(req.getStartDate()))
|
|
|
- {
|
|
|
- wuTimeFrom.setString(req.getStartDate(), NULL, true);
|
|
|
- dateLimit = 1;
|
|
|
- }
|
|
|
+ return;
|
|
|
+ }
|
|
|
|
|
|
- IArrayOf<IEspECLWorkunit> results;
|
|
|
+ bool addToFilterString(const char *name, const char *value)
|
|
|
+ {
|
|
|
+ if (isEmpty(name) || isEmpty(value))
|
|
|
+ return false;
|
|
|
+ if (filterStr.length())
|
|
|
+ filterStr.append(';');
|
|
|
+ filterStr.append(name).append("=").append(value);
|
|
|
+ return true;
|
|
|
+ }
|
|
|
|
|
|
- StringBuffer filter;
|
|
|
- addToQueryString(filter, "cluster", req.getCluster(), ';');
|
|
|
- addToQueryString(filter, "owner", req.getOwner(), ';');
|
|
|
- addToQueryString(filter, "jobName", req.getJobname(), ';');
|
|
|
- addToQueryString(filter, "state", req.getState(), ';');
|
|
|
- StringBuffer s;
|
|
|
- if (!req.getLastNDays_isNull() && req.getLastNDays()>0)
|
|
|
- addToQueryString(filter, "LastNDays", s.clear().append(req.getLastNDays()).str(), ';');
|
|
|
- else
|
|
|
- {
|
|
|
- addToQueryString(filter, "wuTimeFrom", req.getStartDate(), ';');
|
|
|
- addToQueryString(filter, "wuTimeTo", req.getEndDate(), ';');
|
|
|
- }
|
|
|
- addToQueryString(filter, "displayStart", s.append(displayStart).str(), ';');
|
|
|
- addToQueryString(filter, "pageSize", s.clear().append(pageSize).str(), ';');
|
|
|
+ bool addToFilterString(const char *name, unsigned value)
|
|
|
+ {
|
|
|
+ if (isEmpty(name))
|
|
|
+ return false;
|
|
|
+ if (filterStr.length())
|
|
|
+ filterStr.append(';');
|
|
|
+ filterStr.append(name).append("=").append(value);
|
|
|
+ return true;
|
|
|
+ }
|
|
|
|
|
|
- Owned<ArchivedWuCacheElement> found = archivedWuCache.lookup(context, filter, "AddWhenAvailable", cacheTime);
|
|
|
- if (found)
|
|
|
- {
|
|
|
- hasNextPage = found->m_hasNextPage;
|
|
|
- if (found->m_results.length())
|
|
|
+ void setFilterString()
|
|
|
{
|
|
|
- ForEachItemIn(ai, found->m_results)
|
|
|
+ addToFilterString("cluster", req.getCluster());
|
|
|
+ addToFilterString("owner", req.getOwner());
|
|
|
+ addToFilterString("jobName", req.getJobname());
|
|
|
+ addToFilterString("state", req.getState());
|
|
|
+ addToFilterString("timeFrom", req.getStartDate());
|
|
|
+ addToFilterString("timeTo", req.getEndDate());
|
|
|
+ addToFilterString("pageStart", pageFrom);
|
|
|
+ addToFilterString("pageSize", pageSize);
|
|
|
+ if (sashaServerIP && *sashaServerIP)
|
|
|
{
|
|
|
- Owned<IEspECLWorkunit> info= createECLWorkunit("","");
|
|
|
- info->copy(found->m_results.item(ai));
|
|
|
- results.append(*info.getClear());
|
|
|
+ addToFilterString("sashaServerIP", sashaServerIP.get());
|
|
|
+ addToFilterString("sashaServerPort", sashaServerPort);
|
|
|
}
|
|
|
}
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- IArrayOf<IEspECLWorkunit> resultList;
|
|
|
|
|
|
- CDateTime timeTo = wuTimeTo;
|
|
|
- __int64 totalWus = 0;
|
|
|
- bool complete = false;
|
|
|
- while (!complete)
|
|
|
- {
|
|
|
- CDateTime timeFrom = timeTo;
|
|
|
- timeFrom.adjustTime(-1439); //one day earlier
|
|
|
- if (dateLimit > 0 && wuTimeFrom > timeFrom)
|
|
|
- timeFrom = wuTimeFrom;
|
|
|
+ void setSashaCommand(INode* sashaserver, ISashaCommand* cmd)
|
|
|
+ {
|
|
|
+ cmd->setAction(SCA_LIST);
|
|
|
+ cmd->setOutputFormat("owner,jobname,cluster,state");
|
|
|
+ cmd->setOnline(false);
|
|
|
+ cmd->setArchived(true);
|
|
|
+ cmd->setStart(pageFrom);
|
|
|
+ cmd->setLimit(pageSize+1); //read an extra WU to check hasMoreWU
|
|
|
+ if (notEmpty(req.getCluster()))
|
|
|
+ cmd->setCluster(req.getCluster());
|
|
|
+ if (notEmpty(req.getOwner()))
|
|
|
+ cmd->setOwner(req.getOwner());
|
|
|
+ if (notEmpty(req.getJobname()))
|
|
|
+ cmd->setJobName(req.getJobname());
|
|
|
+ if (notEmpty(req.getState()))
|
|
|
+ cmd->setState(req.getState());
|
|
|
+
|
|
|
+ StringBuffer timeFrom, timeTo;
|
|
|
+ readDateFilters(timeFrom, timeTo);
|
|
|
+ if (timeFrom.length())
|
|
|
+ cmd->setAfter(timeFrom.str());
|
|
|
+ if (timeTo.length())
|
|
|
+ cmd->setBefore(timeTo.str());
|
|
|
+
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ void addArchivedWU(IArrayOf<IEspECLWorkunit>& archivedWUs, StringArray& wuDataArray, bool canAccess)
|
|
|
+ {
|
|
|
+ Owned<IEspECLWorkunit> info= createECLWorkunit("","");
|
|
|
+ const char* wuid = wuDataArray.item(0);
|
|
|
+ info->setWuid(wuid);
|
|
|
+ if (!canAccess)
|
|
|
+ info->setState("<Hidden>");
|
|
|
+ else
|
|
|
+ {
|
|
|
+ if (notEmpty(wuDataArray.item(1)))
|
|
|
+ info->setOwner(wuDataArray.item(1));
|
|
|
+ if (notEmpty(wuDataArray.item(2)))
|
|
|
+ info->setJobname(wuDataArray.item(2));
|
|
|
+ if (notEmpty(wuDataArray.item(3)))
|
|
|
+ info->setCluster(wuDataArray.item(3));
|
|
|
+ if (notEmpty(wuDataArray.item(4)))
|
|
|
+ info->setState(wuDataArray.item(4));
|
|
|
+ }
|
|
|
|
|
|
- unsigned year0, month0, day0, hour0, minute0, second0, nano0;
|
|
|
- timeFrom.getDate(year0, month0, day0, true);
|
|
|
- timeFrom.getTime(hour0, minute0, second0, nano0, true);
|
|
|
- VStringBuffer wuFrom("%4d%02d%02d%02d%02d", year0, month0, day0, hour0, minute0);
|
|
|
+ //Sort WUs by WUID
|
|
|
+ ForEachItemIn(i, archivedWUs)
|
|
|
+ {
|
|
|
+ IEspECLWorkunit& w = archivedWUs.item(i);
|
|
|
+ if (!isEmpty(w.getWuid()) && strcmp(wuid, w.getWuid())>0)
|
|
|
+ {
|
|
|
+ archivedWUs.add(*info.getClear(), (aindex_t) i);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ archivedWUs.append(*info.getClear());
|
|
|
+ return;
|
|
|
+ }
|
|
|
|
|
|
- unsigned year, month, day, hour, minute, second, nano;
|
|
|
- timeTo.getDate(year, month, day, true);
|
|
|
- timeTo.getTime(hour, minute, second, nano, true);
|
|
|
- VStringBuffer wuTo("%4d%02d%02d%02d%02d", year, month, day, hour, minute);
|
|
|
+ public:
|
|
|
+ IMPLEMENT_IINTERFACE_USING(CInterface);
|
|
|
|
|
|
- __int64 begin = 0;
|
|
|
- unsigned limit = 1000;
|
|
|
- bool continueSashaLoop = true;
|
|
|
- while (continueSashaLoop)
|
|
|
- {
|
|
|
- Owned<ISashaCommand> cmd = createSashaCommand();
|
|
|
+ CArchivedWUsReader(IEspContext& _context, const char* _sashaServerIP, unsigned _sashaServerPort, ArchivedWuCache& _archivedWuCache,
|
|
|
+ unsigned _cacheMinutes, unsigned _pageFrom, unsigned _pageSize, IEspWUQueryRequest& _req)
|
|
|
+ : context(_context), sashaServerIP(_sashaServerIP), sashaServerPort(_sashaServerPort),
|
|
|
+ archivedWuCache(_archivedWuCache), cacheMinutes(_cacheMinutes), pageFrom(_pageFrom), pageSize(_pageSize), req(_req)
|
|
|
+ {
|
|
|
+ hasMoreWU = false;
|
|
|
+ numberOfWUsReturned = 0;
|
|
|
+ };
|
|
|
|
|
|
- cmd->setAction(SCA_LIST);
|
|
|
- cmd->setOnline(false);
|
|
|
- cmd->setArchived(true);
|
|
|
- cmd->setAfter(wuFrom.str());
|
|
|
- cmd->setBefore(wuTo.str());
|
|
|
- cmd->setStart((unsigned)begin);
|
|
|
- cmd->setLimit(limit);
|
|
|
-
|
|
|
- if (notEmpty(req.getCluster()))
|
|
|
- cmd->setCluster(req.getCluster());
|
|
|
- if (notEmpty(req.getOwner()))
|
|
|
- cmd->setOwner(req.getOwner());
|
|
|
- if (notEmpty(req.getJobname()))
|
|
|
- cmd->setJobName(req.getJobname());
|
|
|
- if (notEmpty(req.getState()))
|
|
|
- cmd->setState(req.getState());
|
|
|
-
|
|
|
- cmd->setOutputFormat("owner,jobname,cluster,state");
|
|
|
+ void getArchivedWUs(IArrayOf<IEspECLWorkunit>& archivedWUs)
|
|
|
+ {
|
|
|
+ setFilterString();
|
|
|
+ Owned<ArchivedWuCacheElement> cachedResults = archivedWuCache.lookup(context, filterStr, "AddWhenAvailable", cacheMinutes);
|
|
|
+ if (cachedResults)
|
|
|
+ {
|
|
|
+ hasMoreWU = cachedResults->m_hasNextPage;
|
|
|
+ numberOfWUsReturned = cachedResults->numWUsReturned;
|
|
|
+ if (cachedResults->m_results.length())
|
|
|
+ {
|
|
|
+ ForEachItemIn(ai, cachedResults->m_results)
|
|
|
+ archivedWUs.append(*LINK(&cachedResults->m_results.item(ai)));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ SocketEndpoint ep;
|
|
|
+ if (sashaServerIP && *sashaServerIP)
|
|
|
+ ep.set(sashaServerIP, sashaServerPort);
|
|
|
+ else
|
|
|
+ getSashaNode(ep);
|
|
|
+ Owned<INode> sashaserver = createINode(ep);
|
|
|
|
|
|
+ Owned<ISashaCommand> cmd = createSashaCommand();
|
|
|
+ setSashaCommand(sashaserver, cmd);
|
|
|
if (!cmd->send(sashaserver))
|
|
|
{
|
|
|
StringBuffer msg("Cannot connect to archive server at ");
|
|
@@ -2009,115 +2070,68 @@ void doWUQueryFromArchive(IEspContext &context, const char* sashaServerIP, unsig
|
|
|
throw MakeStringException(ECLWATCH_CANNOT_CONNECT_ARCHIVE_SERVER, "%s", msg.str());
|
|
|
}
|
|
|
|
|
|
- unsigned actualCount = cmd->numIds();
|
|
|
- if (actualCount < 1)
|
|
|
- break;
|
|
|
-
|
|
|
- totalWus += actualCount;
|
|
|
-
|
|
|
- if (actualCount < limit)
|
|
|
- continueSashaLoop = false;
|
|
|
+ numberOfWUsReturned = cmd->numIds();
|
|
|
+ hasMoreWU = (numberOfWUsReturned > pageSize);
|
|
|
+ if (hasMoreWU)
|
|
|
+ numberOfWUsReturned--;
|
|
|
|
|
|
- for (unsigned ii=0; ii<actualCount; ii++)
|
|
|
+ if (numberOfWUsReturned > 0)
|
|
|
{
|
|
|
- const char *csline = cmd->queryId(ii);
|
|
|
- if (!csline)
|
|
|
- continue;
|
|
|
+ SecAccessFlags accessOwn, accessOthers;
|
|
|
+ getUserWuAccessFlags(context, accessOwn, accessOthers, true);
|
|
|
|
|
|
- StringArray wuidArray;
|
|
|
- wuidArray.appendList(csline, ",");
|
|
|
-
|
|
|
- if (chooseWuAccessFlagsByOwnership(context.queryUserId(), cmd->queryOwner(), accessOwn, accessOthers) < SecAccess_Read)
|
|
|
- continue;
|
|
|
-
|
|
|
- const char* wuid = wuidArray.item(0);
|
|
|
- if (isEmpty(wuid))
|
|
|
- continue;
|
|
|
-
|
|
|
- __int64 addToPos = -1;
|
|
|
- ForEachItemIn(ridx, resultList)
|
|
|
+ for (unsigned i=0; i<numberOfWUsReturned; i++)
|
|
|
{
|
|
|
- IEspECLWorkunit& w = resultList.item(ridx);
|
|
|
- if (isEmpty(w.getWuid()))
|
|
|
+ const char *csline = cmd->queryId(i);
|
|
|
+ if (!csline || !*csline)
|
|
|
continue;
|
|
|
|
|
|
- if (strcmp(wuid, w.getWuid())>0)
|
|
|
+ StringArray wuDataArray;
|
|
|
+ wuDataArray.appendList(csline, ",");
|
|
|
+
|
|
|
+ const char* wuid = wuDataArray.item(0);
|
|
|
+ if (isEmpty(wuid))
|
|
|
{
|
|
|
- addToPos = ridx;
|
|
|
- break;
|
|
|
+ WARNLOG("Empty WUID in SCA_LIST response");
|
|
|
+ continue;
|
|
|
}
|
|
|
+
|
|
|
+ addArchivedWU(archivedWUs, wuDataArray, chooseWuAccessFlagsByOwnership(context.queryUserId(), wuDataArray.item(1), accessOwn, accessOthers) >= SecAccess_Read);
|
|
|
}
|
|
|
|
|
|
- if (addToPos < 0 && (ridx > displayEnd))
|
|
|
- continue;
|
|
|
-
|
|
|
- Owned<IEspECLWorkunit> info= createECLWorkunit("","");
|
|
|
- info->setWuid(wuid);
|
|
|
- info->setArchived(true);
|
|
|
- if (notEmpty(wuidArray.item(1)))
|
|
|
- info->setOwner(wuidArray.item(1));
|
|
|
- if (notEmpty(wuidArray.item(2)))
|
|
|
- info->setJobname(wuidArray.item(2));
|
|
|
- if (notEmpty(wuidArray.item(3)))
|
|
|
- info->setCluster(wuidArray.item(3));
|
|
|
- if (notEmpty(wuidArray.item(4)))
|
|
|
- info->setState(wuidArray.item(4));
|
|
|
-
|
|
|
- if (addToPos < 0)
|
|
|
- resultList.append(*info.getClear());
|
|
|
- else
|
|
|
- resultList.add(*info.getClear(), (aindex_t) addToPos);
|
|
|
- if (resultList.length() > displayEnd)
|
|
|
- resultList.pop();
|
|
|
+ archivedWuCache.add(filterStr, "AddWhenAvailable", hasMoreWU, numberOfWUsReturned, archivedWUs);
|
|
|
}
|
|
|
-
|
|
|
- begin += limit;
|
|
|
- }
|
|
|
-
|
|
|
- timeTo.adjustTime(-1440);//one day earlier
|
|
|
- if (dateLimit > 0 && wuTimeFrom > timeTo) //we reach the date limit
|
|
|
- {
|
|
|
- if (totalWus <= displayEnd)
|
|
|
- hasNextPage = false;
|
|
|
- complete = true;
|
|
|
}
|
|
|
- else if ( resultList.length() >= displayEnd) //we have all we need
|
|
|
- complete = true;
|
|
|
- }
|
|
|
-
|
|
|
- if (displayEnd > resultList.length())
|
|
|
- displayEnd = resultList.length();
|
|
|
-
|
|
|
- for (aindex_t i = (aindex_t)displayStart; i < (aindex_t)displayEnd; i++)
|
|
|
- {
|
|
|
- Owned<IEspECLWorkunit> info = createECLWorkunit("","");
|
|
|
- info->copy(resultList.item(i));
|
|
|
- results.append(*info.getClear());
|
|
|
- }
|
|
|
+ return;
|
|
|
+ };
|
|
|
|
|
|
- archivedWuCache.add(filter, "AddWhenAvailable", hasNextPage, results);
|
|
|
- }
|
|
|
+ bool getHasMoreWU() { return hasMoreWU; };
|
|
|
+ unsigned getNumberOfWUsReturned() { return numberOfWUsReturned; };
|
|
|
+ };
|
|
|
|
|
|
- resp.setPageStartFrom(displayStart+1);
|
|
|
- resp.setPageEndAt(displayEnd);
|
|
|
+ unsigned pageStart = (unsigned) req.getPageStartFrom();
|
|
|
+ unsigned pageSize = (unsigned) req.getPageSize();
|
|
|
+ if(pageSize < 1)
|
|
|
+ pageSize=500;
|
|
|
+ IArrayOf<IEspECLWorkunit> archivedWUs;
|
|
|
+ Owned<IArchivedWUsReader> archiveWUsReader = new CArchivedWUsReader(context, sashaServerIP, sashaServerPort, archivedWuCache,
|
|
|
+ cacheMinutes, pageStart, pageSize, req);
|
|
|
+ archiveWUsReader->getArchivedWUs(archivedWUs);
|
|
|
|
|
|
- if(dateLimit < 1 || hasNextPage)
|
|
|
- resp.setNextPage(displayStart + pageSize);
|
|
|
- else
|
|
|
- resp.setNextPage(-1);
|
|
|
+ resp.setWorkunits(archivedWUs);
|
|
|
+ resp.setNumWUs(archiveWUsReader->getNumberOfWUsReturned());
|
|
|
|
|
|
- if(displayStart > 0)
|
|
|
- {
|
|
|
+ resp.setType("archived only");
|
|
|
+ resp.setPageSize(pageSize);
|
|
|
+ resp.setPageStartFrom(pageStart+1);
|
|
|
+ resp.setPageEndAt(pageStart + archiveWUsReader->getNumberOfWUsReturned());
|
|
|
+ if(pageStart > 0)
|
|
|
+ { //This is not the first page;
|
|
|
resp.setFirst(false);
|
|
|
- if (displayStart - pageSize > 0)
|
|
|
- resp.setPrevPage(displayStart - pageSize);
|
|
|
- else
|
|
|
- resp.setPrevPage(0);
|
|
|
+ resp.setPrevPage((pageStart > pageSize) ? pageStart - pageSize: 0);
|
|
|
}
|
|
|
-
|
|
|
- resp.setPageSize(pageSize);
|
|
|
- resp.setWorkunits(results);
|
|
|
- resp.setType("archived only");
|
|
|
+ if (archiveWUsReader->getHasMoreWU())
|
|
|
+ resp.setNextPage(pageStart + pageSize);
|
|
|
return;
|
|
|
}
|
|
|
|