|
@@ -85,295 +85,507 @@ static void mkDateCompare(bool dfu,const char *dt,StringBuffer &out,char fill)
|
|
|
|
|
|
void WUiterate(ISashaCommand *cmd, const char *mask)
|
|
|
{
|
|
|
- bool dfu = cmd->getDFU();
|
|
|
- const char *beforedt = cmd->queryBefore();
|
|
|
- const char *afterdt = cmd->queryAfter();
|
|
|
- const char *owner = cmd->queryOwner();
|
|
|
- const char *state = cmd->queryState();
|
|
|
- const char *cluster = cmd->queryCluster();
|
|
|
- const char *jobname = cmd->queryJobName();
|
|
|
- const char *outputformat = cmd->queryOutputFormat();
|
|
|
- const char *priority = cmd->queryPriority();
|
|
|
- const char *fileread = cmd->queryFileRead();
|
|
|
- const char *filewritten = cmd->queryFileWritten();
|
|
|
- const char *eclcontains = cmd->queryEclContains();
|
|
|
- const char *cmdname = cmd->queryDfuCmdName();
|
|
|
- unsigned start = cmd->getStart();
|
|
|
- unsigned num = cmd->getLimit();
|
|
|
- StringBuffer before;
|
|
|
- StringBuffer after;
|
|
|
- StringBuffer tmppath;
|
|
|
- mkDateCompare(dfu,afterdt,after,'0');
|
|
|
- mkDateCompare(dfu,beforedt,before,'9');
|
|
|
- bool haswusoutput = cmd->getAction()==SCA_WORKUNIT_SERVICES_GET;
|
|
|
- bool hasdtoutput = cmd->getAction()==SCA_LISTDT;
|
|
|
- MemoryBuffer WUSbuf;
|
|
|
- if (haswusoutput)
|
|
|
- cmd->setWUSresult(WUSbuf); // swap in/out (in case ever do multiple)
|
|
|
-
|
|
|
- StringBuffer baseXPath = dfu ? "DFU/WorkUnits" : "WorkUnits";
|
|
|
- Owned<IRemoteConnection> conn;
|
|
|
- bool isWild = !mask || !*mask || isWildString(mask);
|
|
|
- bool unfiltered = !mask || !*mask || (0==strcmp(mask,"*"));
|
|
|
- if (cmd->getArchived()) {
|
|
|
- // used to check not online
|
|
|
- StringBuffer path;
|
|
|
- if (dfu)
|
|
|
- getLdsPath("Archive/DFUWorkUnits",path);
|
|
|
- else
|
|
|
- getLdsPath("Archive/WorkUnits",path);
|
|
|
- Owned<IFile> dir = createIFile(path.str());
|
|
|
- StringBuffer masktmp;
|
|
|
- if (unfiltered&&after.length()&&before.length()) {
|
|
|
- const char *lo = after.str();
|
|
|
- const char *hi = before.str();
|
|
|
- while (*lo&&(toupper(*lo)==toupper(*hi))) {
|
|
|
- masktmp.append((char)toupper(*lo));
|
|
|
- lo++;
|
|
|
- hi++;
|
|
|
+ class CWUData : public CInterface
|
|
|
+ {
|
|
|
+ public:
|
|
|
+ CDateTime modifiedTime;
|
|
|
+ Owned<IPropertyTree> wuTree;
|
|
|
+ };
|
|
|
+
|
|
|
+ class CArchivedWUReader
|
|
|
+ {
|
|
|
+ StringBuffer mask, fromDT, toDT, fromDir, toDir, beforeWU, afterWU, baseXPath;
|
|
|
+ StringAttr owner, cluster, jobName, state, priority, eclContains, fileRead, fileWritten, cmdName;
|
|
|
+ bool isWild, descendingReq, countBackward, outputXML;
|
|
|
+ unsigned maxNumWUs = 0;
|
|
|
+ StringArray outputFields;
|
|
|
+ ISashaCommand* cmd;
|
|
|
+ const char *fromDTDefinedByBeforeOrAfterWU = nullptr;
|
|
|
+ const char *toDTDefinedByBeforeOrAfterWU = nullptr;
|
|
|
+
|
|
|
+ void getFileMasks(StringBuffer &dirMask, StringBuffer &fileMask)
|
|
|
+ {
|
|
|
+ //The fromDT and toDT are the filters which defines a search range.
|
|
|
+ //The afterWU (beforeWU) defines paging within the search range.
|
|
|
+ //For the first page, a user may define the fromDT and/or toDT without afterWU or beforeWU.
|
|
|
+ StringBuffer from, to;
|
|
|
+ if (!isEmptyString(fromDTDefinedByBeforeOrAfterWU) && (fromDT.isEmpty() || (strcmp(fromDTDefinedByBeforeOrAfterWU, fromDT.str()) > 0)))
|
|
|
+ from.set(fromDTDefinedByBeforeOrAfterWU);
|
|
|
+ else if (!fromDT.isEmpty())
|
|
|
+ from.set(fromDT.str());
|
|
|
+ if (!isEmptyString(toDTDefinedByBeforeOrAfterWU) && (toDT.isEmpty() || (strcmp(toDTDefinedByBeforeOrAfterWU, toDT.str()) < 0)))
|
|
|
+ to.set(toDTDefinedByBeforeOrAfterWU);
|
|
|
+ else if (!toDT.isEmpty())
|
|
|
+ to.set(toDT.str());
|
|
|
+
|
|
|
+ if (!isEmptyString(from))
|
|
|
+ splitWUID(from, fromDir);
|
|
|
+ if (!isEmptyString(to))
|
|
|
+ splitWUID(to, toDir);
|
|
|
+
|
|
|
+ bool unfiltered = mask.isEmpty() || streq(mask.str(), "*");
|
|
|
+
|
|
|
+ //The dirMask is used to filter out the directories (ex: W20171010) which store archived WU files.
|
|
|
+ //The fileMask is used to filter out the WU files (ex: W20171010-101010*.xml).
|
|
|
+ StringBuffer masktmp;
|
|
|
+ if (unfiltered && !from.isEmpty() && !to.isEmpty())
|
|
|
+ {
|
|
|
+ const char *lo = from.str();
|
|
|
+ const char *hi = to.str();
|
|
|
+ while (*lo && (toupper(*lo) == toupper(*hi)))
|
|
|
+ {
|
|
|
+ masktmp.append((char)toupper(*lo));
|
|
|
+ lo++;
|
|
|
+ hi++;
|
|
|
+ }
|
|
|
+ masktmp.append("*");
|
|
|
+ mask = masktmp.str();
|
|
|
}
|
|
|
- masktmp.append("*");
|
|
|
- mask = masktmp.str();
|
|
|
+
|
|
|
+ StringBuffer head;
|
|
|
+ if (!mask.isEmpty())
|
|
|
+ {
|
|
|
+ splitWUID(mask.str(), head);
|
|
|
+ if (!head.isEmpty())
|
|
|
+ dirMask = head.str();
|
|
|
+ fileMask.set(mask.str()).toUpperCase();
|
|
|
+ }
|
|
|
+ else
|
|
|
+ fileMask.append("*");
|
|
|
+ fileMask.append(".xml");
|
|
|
}
|
|
|
- StringBuffer head, tmask;
|
|
|
- const char *hmask = NULL;
|
|
|
- if (mask&&*mask) {
|
|
|
- splitWUID(mask,head);
|
|
|
- if (head.length())
|
|
|
- hmask = head.str();
|
|
|
- tmask.clear().append(mask).toUpperCase();
|
|
|
+ bool checkDirs(const char *dir)
|
|
|
+ {
|
|
|
+ if (!fromDir.isEmpty() && (strcmp(dir, fromDir.str()) < 0))
|
|
|
+ return false;
|
|
|
+ if (!toDir.isEmpty() && (strcmp(dir, toDir.str()) > 0))
|
|
|
+ return false;
|
|
|
+ return true;
|
|
|
}
|
|
|
- else
|
|
|
- tmask.append("*");
|
|
|
- tmask.append(".xml");
|
|
|
- Owned<IDirectoryIterator> di = dir->directoryFiles(hmask,false,true);
|
|
|
- StringBuffer name;
|
|
|
- unsigned index = 0;
|
|
|
- StringBuffer xb;
|
|
|
- bool overflowed = false;
|
|
|
- ForEach(*di) {
|
|
|
- if (overflowed||(index>start+num))
|
|
|
- break;
|
|
|
- if (di->isDir()) {
|
|
|
- Owned<IDirectoryIterator> di2 = di->query().directoryFiles(tmask.str(),false);
|
|
|
+ bool checkArchivedWUID(const char *wuid)
|
|
|
+ {
|
|
|
+ if (isEmptyString(wuid))
|
|
|
+ return false;
|
|
|
+
|
|
|
+ if (!mask.isEmpty() && !WildMatch(wuid, mask.str(),true))
|
|
|
+ return false;
|
|
|
+
|
|
|
+ if (!isEmptyString(fromDTDefinedByBeforeOrAfterWU) && (stricmp(wuid, fromDTDefinedByBeforeOrAfterWU) <= 0))
|
|
|
+ return false;
|
|
|
+ if (!isEmptyString(toDTDefinedByBeforeOrAfterWU) && (stricmp(wuid, toDTDefinedByBeforeOrAfterWU) >= 0))
|
|
|
+ return false;
|
|
|
+
|
|
|
+ if (!fromDT.isEmpty() && (stricmp(wuid, fromDT.str()) < 0))
|
|
|
+ return false;
|
|
|
+ if (!toDT.isEmpty() && (stricmp(wuid, toDT.str()) > 0))
|
|
|
+ return false;
|
|
|
+
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ bool checkOnlineWUID(const char *wuid)
|
|
|
+ {
|
|
|
+ if (isEmptyString(wuid))
|
|
|
+ return false;
|
|
|
+
|
|
|
+ if (!isEmptyString(fromDTDefinedByBeforeOrAfterWU) && (stricmp(wuid, fromDTDefinedByBeforeOrAfterWU) <= 0))
|
|
|
+ return false;
|
|
|
+ if (!isEmptyString(toDTDefinedByBeforeOrAfterWU) && (stricmp(wuid, toDTDefinedByBeforeOrAfterWU) >= 0))
|
|
|
+ return false;
|
|
|
+
|
|
|
+ if (!fromDT.isEmpty() && (stricmp(wuid, fromDT.str()) < 0))
|
|
|
+ return false;
|
|
|
+ if (!toDT.isEmpty() && (stricmp(wuid, toDT.str()) > 0))
|
|
|
+ return false;
|
|
|
+
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ const char *readWUIDFromFileName(StringBuffer &name)
|
|
|
+ {
|
|
|
+ if (name.length() < 5)
|
|
|
+ return nullptr;
|
|
|
+
|
|
|
+ name.setLength(name.length()-4);
|
|
|
+ name.toUpperCase();
|
|
|
+ const char *wuid = name.str();
|
|
|
+ if ((name.length()>6) && strieq(wuid+name.length()-6, "_HINTS"))
|
|
|
+ return nullptr;
|
|
|
+ return name.str();
|
|
|
+ }
|
|
|
+ IRemoteConnection *getSDSConnection(const char *mask)
|
|
|
+ {
|
|
|
+ if (!mask)
|
|
|
+ return querySDS().connect(baseXPath.str(), myProcessSession(), 0, 5*60*1000);
|
|
|
+
|
|
|
+ VStringBuffer xpath("%s/%s", baseXPath.str(), mask);
|
|
|
+ return querySDS().connect(xpath.str(), myProcessSession(), 0, 5*60*1000);
|
|
|
+ }
|
|
|
+ bool isOnline(Owned<IRemoteConnection> &conn, const char *wuid)
|
|
|
+ {
|
|
|
+ return (isWild && conn) ? conn->queryRoot()->hasProp(wuid) : conn != nullptr;
|
|
|
+ }
|
|
|
+ bool checkFilters(IPropertyTree *t, bool onlineWU)
|
|
|
+ {
|
|
|
+ StringBuffer path, val;
|
|
|
+ if (!owner.isEmpty() && (!t->getProp("@submitID", val.clear()) || !WildMatch(val.str(), owner.get(), true)))
|
|
|
+ return false;
|
|
|
+ if (!state.isEmpty() && (!t->getProp((!onlineWU && cmd->getDFU()) ? "Progress/@state" : "@state", val.clear()) || !WildMatch(val.str(), state.get(), true)))
|
|
|
+ return false;
|
|
|
+ if (!cluster.isEmpty() && (!t->getProp("@clusterName", val.clear()) || !WildMatch(val.str(), cluster.get(), true)))
|
|
|
+ return false;
|
|
|
+ if (!jobName.isEmpty() && (!t->getProp("@jobName", val.clear()) || !WildMatch(val.str(), jobName.get(), true)))
|
|
|
+ return false;
|
|
|
+
|
|
|
+ if (onlineWU)
|
|
|
+ return true;
|
|
|
+
|
|
|
+ if (!cmdName.isEmpty() && (!t->getProp("@command", val.clear()) || !WildMatch(val.str(), cmdName.get(), true)))
|
|
|
+ return false;
|
|
|
+ if (!priority.isEmpty() && (!t->getProp("@priorityClass", val.clear()) || !WildMatch(val.str(), priority.get(), true)))
|
|
|
+ return false;
|
|
|
+ if (!fileRead.isEmpty() && !t->hasProp(path.setf("FilesRead/File[@name=~?\"%s\"]", fileRead.get()).str()))
|
|
|
+ return false;
|
|
|
+ if (!fileWritten.isEmpty() && !t->hasProp(path.setf("Files/File[@name=~?\"%s\"]", fileWritten.get()).str()))
|
|
|
+ return false;
|
|
|
+ if (!eclContains.isEmpty() && !t->hasProp(path.setf("Query[Text=~?\"*%s*\"]", eclContains.get()).str()))
|
|
|
+ return false;
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ bool addOutput(bool outputModifiedTime, bool hasWUSOutput, IPropertyTree *wuTree,
|
|
|
+ CDateTime &modifiedTime, MemoryBuffer &WUSbuf)
|
|
|
+ {
|
|
|
+ if (hasWUSOutput)
|
|
|
+ { //cmd->getAction()==SCA_WORKUNIT_SERVICES_GET
|
|
|
+ if (!serializeWUSrow(*wuTree, WUSbuf, WORKUNIT_SERVICES_BUFFER_MAX, false))
|
|
|
+ return false; //Log overflowed?
|
|
|
+ }
|
|
|
+ else if (!addOutputFromWUTree(wuTree))
|
|
|
+ return false; //Log overflowed?
|
|
|
+ else if (outputModifiedTime)
|
|
|
+ cmd->addDT(modifiedTime);
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ bool addOutputFromWUTree(IPropertyTree *wuTree)
|
|
|
+ {
|
|
|
+ if (outputXML)
|
|
|
+ { //cmd->getAction()==SCA_GET
|
|
|
+ StringBuffer xml;
|
|
|
+ toXML(wuTree, xml);
|
|
|
+ if (!cmd->addResult(xml.str()))
|
|
|
+ return false; //Log overflowed?
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ if (outputFields.length() == 0)
|
|
|
+ cmd->addId(wuTree->queryName());
|
|
|
+ else
|
|
|
+ {
|
|
|
+ StringBuffer output = wuTree->queryName();
|
|
|
+ //Append more into the output.
|
|
|
+ setWUDataTree(wuTree, output);
|
|
|
+ cmd->addId(output.str());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ void setWUDataTree(IPropertyTree *wu, StringBuffer &output)
|
|
|
+ {
|
|
|
+ ForEachItemIn(i, outputFields)
|
|
|
+ {
|
|
|
+ const char* outputField = outputFields.item(i);
|
|
|
+
|
|
|
StringBuffer val;
|
|
|
- ForEach(*di2) {
|
|
|
- di2->getName(name.clear());
|
|
|
- if (!di2->isDir()&&(name.length()>4)) {
|
|
|
- name.setLength(name.length()-4);
|
|
|
- name.toUpperCase();
|
|
|
- const char *wuid = name.str();
|
|
|
- if ((name.length()>6)&&(stricmp(wuid+name.length()-6,"_HINTS")==0))
|
|
|
- continue;
|
|
|
- if ((!mask||!*mask||WildMatch(wuid,mask,true)) &&
|
|
|
- ((before.length()==0)||(stricmp(wuid,before.str())<=0)) &&
|
|
|
- ((after.length()==0)||(stricmp(wuid,after.str())>=0))) {
|
|
|
- if (isWild) {
|
|
|
- if (!conn)
|
|
|
- conn.setown(querySDS().connect(baseXPath.str(), myProcessSession(), 0, 5*60*1000)); // connection to all
|
|
|
- }
|
|
|
- else {
|
|
|
- VStringBuffer xpath("%s/%s", baseXPath.str(), mask);
|
|
|
- conn.setown(querySDS().connect(xpath.str(), myProcessSession(), 0, 5*60*1000));
|
|
|
- }
|
|
|
- if ((isWild && !conn->queryRoot()->hasProp(wuid)) || (!isWild && !conn)) { // check not online
|
|
|
- Owned<IPropertyTree> t;
|
|
|
- bool hasowner = owner&&*owner;
|
|
|
- bool hascluster = cluster&&*cluster;
|
|
|
- bool hasstate = state&&*state;
|
|
|
- bool hasjobname = jobname&&*jobname;
|
|
|
- bool hasoutput = outputformat&&*outputformat;
|
|
|
- bool inrange = (index>=start)&&(index<start+num);
|
|
|
- bool hascommand = cmdname&&*cmdname;
|
|
|
- bool haspriority = priority&&*priority;
|
|
|
- bool hasfileread = fileread&&*fileread;
|
|
|
- bool hasfilewritten = filewritten&&*filewritten;
|
|
|
- bool haseclcontains = eclcontains&&*eclcontains;
|
|
|
- if ((cmd->getAction()==SCA_GET)||haswusoutput||hasowner||hasstate||hascluster||hasjobname||hascommand||(hasoutput&&inrange)||haspriority||hasfileread||hasfilewritten||haseclcontains) {
|
|
|
- try {
|
|
|
- t.setown(createPTree(di2->query()));
|
|
|
- if (!t)
|
|
|
- continue;
|
|
|
- if (hasowner&&(!t->getProp("@submitID",val.clear())||!WildMatch(val.str(),owner,true)))
|
|
|
- continue;
|
|
|
- if (hasstate&&(!t->getProp(dfu?"Progress/@state":"@state",val.clear())||!WildMatch(val.str(),state,true)))
|
|
|
- continue;
|
|
|
- if (hascluster&&(!t->getProp("@clusterName",val.clear())||!WildMatch(val.str(),cluster,true)))
|
|
|
- continue;
|
|
|
- if (hasjobname&&(!t->getProp("@jobName",val.clear())||!WildMatch(val.str(),jobname,true)))
|
|
|
- continue;
|
|
|
- if (hascommand&&(!t->getProp("@command",val.clear())||!WildMatch(val.str(),cmdname,true)))
|
|
|
- continue;
|
|
|
- if (haspriority&&(!t->getProp("@priorityClass",val.clear())||!WildMatch(val.str(),priority,true)))
|
|
|
- continue;
|
|
|
- if (hasfileread&&!t->hasProp(tmppath.clear().appendf("FilesRead/File[@name=~?\"%s\"]",fileread).str()))
|
|
|
- continue;
|
|
|
- if (hasfilewritten&&!t->hasProp(tmppath.clear().appendf("Files/File[@name=~?\"%s\"]",filewritten).str()))
|
|
|
- continue;
|
|
|
- if (haseclcontains&&!t->hasProp(tmppath.clear().appendf("Query[Text=~?\"*%s*\"]",eclcontains).str()))
|
|
|
- continue;
|
|
|
- }
|
|
|
- catch (IException *e) {
|
|
|
- StringBuffer msg;
|
|
|
- msg.appendf("WUiterate: Workunit %s failed to load", wuid);
|
|
|
- EXCLOG(e,msg.str());
|
|
|
- e->Release();
|
|
|
- continue;
|
|
|
- }
|
|
|
- }
|
|
|
- index++;
|
|
|
- if (!inrange)
|
|
|
- continue;
|
|
|
- if (hasoutput) {
|
|
|
- char *saveptr;
|
|
|
- char *parse = strdup(outputformat);
|
|
|
- char *tok = strtok_r(parse, "|,",&saveptr);
|
|
|
- while (tok) {
|
|
|
- val.clear();
|
|
|
- bool found = true;
|
|
|
- if (stricmp(tok,"owner")==0)
|
|
|
- t->getProp("@submitID",val);
|
|
|
- else if (stricmp(tok,"cluster")==0)
|
|
|
- t->getProp("@clusterName",val);
|
|
|
- else if (stricmp(tok,"jobname")==0)
|
|
|
- t->getProp("@jobName",val);
|
|
|
- else if (stricmp(tok,"state")==0)
|
|
|
- t->getProp(dfu?"Progress/@state":"@state",val);
|
|
|
- else if (stricmp(tok,"command")==0)
|
|
|
- t->getProp("@command",val);
|
|
|
- else if (stricmp(tok,"wuid")==0)
|
|
|
- t->getName(val);
|
|
|
- else
|
|
|
- found = false;
|
|
|
- if (found) {
|
|
|
- // remove commas TBD
|
|
|
- name.append(',').append(val);
|
|
|
- }
|
|
|
- tok = strtok_r(NULL, "|,",&saveptr);
|
|
|
- }
|
|
|
- free(parse);
|
|
|
- }
|
|
|
- if (haswusoutput) {
|
|
|
- if (!serializeWUSrow(*t,WUSbuf,false)) {
|
|
|
- overflowed = true;
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- else {
|
|
|
- cmd->addId(name.str());
|
|
|
- if (hasdtoutput) {
|
|
|
- CDateTime dt;
|
|
|
- di2->getModifiedTime(dt);
|
|
|
- cmd->addDT(dt);
|
|
|
- }
|
|
|
- }
|
|
|
- if (cmd->getAction()==SCA_GET) {
|
|
|
- StringBuffer xml;
|
|
|
- toXML(t,xml);
|
|
|
- if (!cmd->addResult(xml.str()))
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- if (index>start+num)
|
|
|
+ bool found = true;
|
|
|
+ if (strieq(outputField, "owner"))
|
|
|
+ wu->getProp("@submitID", val);
|
|
|
+ else if (strieq(outputField, "cluster"))
|
|
|
+ wu->getProp("@clusterName", val);
|
|
|
+ else if (strieq(outputField, "jobname"))
|
|
|
+ wu->getProp("@jobName",val);
|
|
|
+ else if (strieq(outputField,"state"))
|
|
|
+ wu->getProp("@state", val);
|
|
|
+ else
|
|
|
+ found = false;
|
|
|
+ if (found)
|
|
|
+ output.append(',').append(val);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ bool checkOnlineWU(IPropertyTree *wuTree)
|
|
|
+ {
|
|
|
+ const char *wuid = wuTree->queryName();
|
|
|
+ if (!checkOnlineWUID(wuid))
|
|
|
+ return false;
|
|
|
+
|
|
|
+ try
|
|
|
+ {
|
|
|
+ if (!checkFilters(wuTree, true))
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ catch (IException *e)
|
|
|
+ {
|
|
|
+ VStringBuffer msg("WUiterate: Workunit %s failed to load", wuid);
|
|
|
+ EXCLOG(e,msg.str());
|
|
|
+ e->Release();
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ void getOnlineWUsAscending(IRemoteConnection *conn)
|
|
|
+ {
|
|
|
+ IArrayOf<IPropertyTree> wus;
|
|
|
+ unsigned wuCount = 0;
|
|
|
+ Owned<IPropertyTreeIterator> iter = conn->queryRoot()->getElements(isWild ? (mask.isEmpty() ? "*" : mask.str()) : nullptr, iptiter_sort);
|
|
|
+ ForEach(*iter)
|
|
|
+ {
|
|
|
+ IPropertyTree *pt = &iter->query();
|
|
|
+ if (!checkOnlineWU(pt))
|
|
|
+ continue;
|
|
|
+
|
|
|
+ if (countBackward)
|
|
|
+ wus.append(*LINK(pt));
|
|
|
+ else if (!addOutputFromWUTree(pt))
|
|
|
+ break;
|
|
|
+
|
|
|
+ wuCount++;
|
|
|
+ if (wuCount == maxNumWUs)
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ if (countBackward)
|
|
|
+ {
|
|
|
+ ForEachItemInRev(i, wus)
|
|
|
+ {
|
|
|
+ if (!addOutputFromWUTree(&wus.item(i)))
|
|
|
break;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- }
|
|
|
- if (cmd->getOnline()) {
|
|
|
- if (haswusoutput)
|
|
|
- throw MakeStringException(-1,"SCA_WORKUNIT_SERVICES_GET not implemented for online workunits!");
|
|
|
- StringBuffer xpath(baseXPath);
|
|
|
- if (!conn)
|
|
|
+
|
|
|
+ void getOnlineWUsDescending(IRemoteConnection *conn)
|
|
|
+ {
|
|
|
+ IArrayOf<IPropertyTree> wus;
|
|
|
+ Owned<IPropertyTreeIterator> iter = conn->queryRoot()->getElements(isWild ? (mask.isEmpty() ? "*" : mask.str()) : nullptr);
|
|
|
+ ForEach(*iter)
|
|
|
+ {
|
|
|
+ IPropertyTree *pt = &iter->query();
|
|
|
+ if (!checkOnlineWU(pt))
|
|
|
+ continue;
|
|
|
+
|
|
|
+ wus.append(*LINK(pt));
|
|
|
+ }
|
|
|
+ if (!wus.empty())
|
|
|
+ {
|
|
|
+ wus.sort(comparePropTrees);
|
|
|
+
|
|
|
+ if (countBackward)
|
|
|
+ {
|
|
|
+ unsigned i = wus.ordinality();
|
|
|
+ if (i > maxNumWUs)
|
|
|
+ i = maxNumWUs;
|
|
|
+ for (; i--;)
|
|
|
+ {
|
|
|
+ if (!addOutputFromWUTree(&wus.item(i)))
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ unsigned wuCount = 0;
|
|
|
+ ForEachItemIn(i, wus)
|
|
|
+ {
|
|
|
+ if (!addOutputFromWUTree(&wus.item(i)))
|
|
|
+ break;
|
|
|
+ wuCount++;
|
|
|
+ if (wuCount == maxNumWUs)
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ static int comparePropTrees(IInterface * const *ll, IInterface * const *rr)
|
|
|
{
|
|
|
- if (!isWild)
|
|
|
- xpath.append("/").append(mask);
|
|
|
- conn.setown(querySDS().connect(xpath.str(), myProcessSession(), 0, 5*60*1000));
|
|
|
+ IPropertyTree *l = (IPropertyTree *) *ll;
|
|
|
+ IPropertyTree *r = (IPropertyTree *) *rr;
|
|
|
+ return stricmp(r->queryName(), l->queryName());
|
|
|
}
|
|
|
- if (conn)
|
|
|
+
|
|
|
+ public:
|
|
|
+
|
|
|
+ CArchivedWUReader(ISashaCommand *_cmd, const char *_mask)
|
|
|
{
|
|
|
- Owned<IPropertyTreeIterator> iter = conn->queryRoot()->getElements(isWild ? "*" : NULL);
|
|
|
- unsigned index = 0;
|
|
|
- StringBuffer val;
|
|
|
- ForEach(*iter) {
|
|
|
- IPropertyTree &pt=iter->query();
|
|
|
- const char *wuid = pt.queryName();
|
|
|
- if (index>start+num)
|
|
|
- break;
|
|
|
- // PROGLOG("match before=%s after=%s wuid=%s",before.str(),after.str(),wuid);
|
|
|
- if ((!mask||!*mask||!isWild||WildMatch(wuid,mask,true)) &&
|
|
|
- ((before.length()==0)||(stricmp(wuid,before)<0)) &&
|
|
|
- ((after.length()==0)||(stricmp(wuid,after)>=0))) {
|
|
|
- // PROGLOG("matched before=%s after=%s wuid=%s",before.str(),after.str(),wuid);
|
|
|
- bool hasowner = owner&&*owner;
|
|
|
- bool hascluster = cluster&&*cluster;
|
|
|
- bool hasstate = state&&*state;
|
|
|
- bool hasjobname = jobname&&*jobname;
|
|
|
- bool hasoutput = outputformat&&*outputformat;
|
|
|
- bool inrange = (index>=start)&&(index<start+num);
|
|
|
- if (hasowner||hasstate||hascluster||hasjobname||(hasoutput&&inrange)) {
|
|
|
- try {
|
|
|
- if (hasowner&&(!pt.getProp("@submitID",val.clear())||!WildMatch(val.str(),owner,true)))
|
|
|
- continue;
|
|
|
- if (hasstate&&(!pt.getProp("@state",val.clear())||!WildMatch(val.str(),state,true)))
|
|
|
- continue;
|
|
|
- if (hascluster&&(!pt.getProp("@clusterName",val.clear())||!WildMatch(val.str(),cluster,true)))
|
|
|
- continue;
|
|
|
- if (hasjobname&&(!pt.getProp("@jobName",val.clear())||!WildMatch(val.str(),jobname,true)))
|
|
|
+ cmd = _cmd;
|
|
|
+ mask.set(_mask);
|
|
|
+ isWild = mask.isEmpty() || isWildString(mask.str());
|
|
|
+
|
|
|
+ bool dfu = cmd->getDFU();
|
|
|
+ baseXPath.set(dfu ? "DFU/WorkUnits" : "WorkUnits");
|
|
|
+ owner.set(cmd->queryOwner());
|
|
|
+ cluster.set(cmd->queryCluster());
|
|
|
+ jobName.set(cmd->queryJobName());
|
|
|
+ state.set(cmd->queryState());
|
|
|
+ priority.set(cmd->queryPriority());
|
|
|
+ eclContains.set(cmd->queryEclContains());
|
|
|
+ fileRead.set(cmd->queryFileRead());
|
|
|
+ fileWritten.set(cmd->queryFileWritten());
|
|
|
+ cmdName.set(cmd->queryDfuCmdName());
|
|
|
+
|
|
|
+ //In WUiterate(), the queryAfter() is used as fromDT and the queryBefore() is used as toDT.
|
|
|
+ mkDateCompare(dfu, cmd->queryAfter(), fromDT, '0');
|
|
|
+ mkDateCompare(dfu, cmd->queryBefore(), toDT, '9');
|
|
|
+
|
|
|
+ beforeWU.set(cmd->queryBeforeWU());
|
|
|
+ afterWU.set(cmd->queryAfterWU());
|
|
|
+ descendingReq = cmd->querySortDescending();
|
|
|
+ if (descendingReq)
|
|
|
+ {
|
|
|
+ fromDTDefinedByBeforeOrAfterWU = beforeWU.str();
|
|
|
+ toDTDefinedByBeforeOrAfterWU = afterWU.str();
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ fromDTDefinedByBeforeOrAfterWU = afterWU.str();
|
|
|
+ toDTDefinedByBeforeOrAfterWU = beforeWU.str();
|
|
|
+ }
|
|
|
+
|
|
|
+ //The countBackward below is used to support paging. For example, a client retrieves
|
|
|
+ //archived WUs with the order new->old (Descending = true) and the client is on page 50.
|
|
|
+ //If a client wants to go back to the previous page by specifying the beforeWU,
|
|
|
+ //the countBackward is set to true (old->new). We call the getSortedDirectoryIterator()
|
|
|
+ //with the order old->new, find the WU before the 'beforeWU', count a page of WUs, and
|
|
|
+ //reorder them to new->old.
|
|
|
+ countBackward = !beforeWU.isEmpty() && afterWU.isEmpty();
|
|
|
+
|
|
|
+ maxNumWUs = cmd->getLimit();
|
|
|
+ outputXML = cmd->getAction()==SCA_GET;
|
|
|
+ const char *outputFormat = cmd->queryOutputFormat();
|
|
|
+ outputFields.appendList(outputFormat, "|,");
|
|
|
+ }
|
|
|
+ void getArchivedWUs()
|
|
|
+ {
|
|
|
+ bool outputModifiedTime = cmd->getAction()==SCA_LISTDT;
|
|
|
+ StringBuffer dirMask, fileMask, path, name;
|
|
|
+ getFileMasks(dirMask, fileMask);
|
|
|
+
|
|
|
+ if (cmd->getDFU())
|
|
|
+ getLdsPath("Archive/DFUWorkUnits", path);
|
|
|
+ else
|
|
|
+ getLdsPath("Archive/WorkUnits", path);
|
|
|
+
|
|
|
+ MemoryBuffer WUSbuf;
|
|
|
+ bool hasWUSOutput = cmd->getAction()==SCA_WORKUNIT_SERVICES_GET;
|
|
|
+ if (hasWUSOutput)
|
|
|
+ cmd->setWUSresult(WUSbuf); // swap in/out (in case ever do multiple)
|
|
|
+
|
|
|
+ CIArrayOf<CWUData> wus;
|
|
|
+ unsigned wuCount = 0;
|
|
|
+ bool overflowed = false;
|
|
|
+ Owned<IRemoteConnection> conn;
|
|
|
+ Owned<IDirectoryIterator> dirIterator = getSortedDirectoryIterator(path.str(), SD_bynameNC,
|
|
|
+ countBackward ? !descendingReq : descendingReq, dirMask.isEmpty() ? nullptr : dirMask.str(), false, true);
|
|
|
+ ForEach(*dirIterator)
|
|
|
+ {
|
|
|
+ dirIterator->getName(name.clear());
|
|
|
+ if (!dirIterator->isDir() || !checkDirs(name.str()))
|
|
|
+ continue;
|
|
|
+
|
|
|
+ Owned<IDirectoryIterator> fileIterator = getSortedDirectoryIterator(&dirIterator->query(),
|
|
|
+ SD_bynameNC, countBackward ? !descendingReq : descendingReq, fileMask.str(), false);
|
|
|
+ ForEach(*fileIterator)
|
|
|
+ {
|
|
|
+ fileIterator->getName(name.clear());
|
|
|
+ if (!checkArchivedWUID(readWUIDFromFileName(name)))
|
|
|
+ continue;
|
|
|
+
|
|
|
+ const char *wuid = name.str();
|
|
|
+ if (!isWild)
|
|
|
+ conn.setown(getSDSConnection(mask.str()));
|
|
|
+ else if (!conn)
|
|
|
+ conn.setown(getSDSConnection(nullptr));
|
|
|
+ if (isOnline(conn, wuid))
|
|
|
+ continue;
|
|
|
+
|
|
|
+ Owned<IPropertyTree> wuTree;
|
|
|
+ if (outputXML || hasWUSOutput || (outputFields.length() > 0) || !owner.isEmpty()
|
|
|
+ || !state.isEmpty() || !cluster.isEmpty() || !jobName.isEmpty()
|
|
|
+ || !cmdName.isEmpty() || !priority.isEmpty() || !fileRead.isEmpty()
|
|
|
+ || !fileWritten.isEmpty() || !eclContains.isEmpty())
|
|
|
+ {
|
|
|
+ try
|
|
|
+ {
|
|
|
+ wuTree.setown(createPTree(fileIterator->query()));
|
|
|
+ if (!wuTree || isEmptyString(wuTree->queryName()) || !checkFilters(wuTree, false))
|
|
|
continue;
|
|
|
}
|
|
|
- catch (IException *e) {
|
|
|
- StringBuffer msg;
|
|
|
- msg.appendf("WUiterate: Workunit %s failed", wuid);
|
|
|
+ catch (IException *e)
|
|
|
+ {
|
|
|
+ VStringBuffer msg("WUiterate: Workunit %s failed to load", wuid);
|
|
|
EXCLOG(e,msg.str());
|
|
|
e->Release();
|
|
|
continue;
|
|
|
}
|
|
|
}
|
|
|
- index++;
|
|
|
- if (!inrange)
|
|
|
- continue;
|
|
|
- StringBuffer name(wuid);
|
|
|
- if (hasoutput) {
|
|
|
- char *saveptr;
|
|
|
- char *parse = strdup(outputformat);
|
|
|
- char *tok = strtok_r(parse, "|,",&saveptr);
|
|
|
- while (tok) {
|
|
|
- val.clear();
|
|
|
- bool found = true;
|
|
|
- if (stricmp(tok,"owner")==0)
|
|
|
- pt.getProp("@submitID",val);
|
|
|
- else if (stricmp(tok,"cluster")==0)
|
|
|
- pt.getProp("@clusterName",val);
|
|
|
- else if (stricmp(tok,"jobname")==0)
|
|
|
- pt.getProp("@jobName",val);
|
|
|
- else if (stricmp(tok,"state")==0)
|
|
|
- pt.getProp("@state",val);
|
|
|
- else
|
|
|
- found = false;
|
|
|
- if (found)
|
|
|
- name.append(',').append(val);
|
|
|
- tok = strtok_r(NULL, "|,",&saveptr);
|
|
|
- }
|
|
|
- free(parse);
|
|
|
+
|
|
|
+ if (countBackward)
|
|
|
+ { //The output should contain WUs in an order specified by the descendingReq.
|
|
|
+ //Since the getSortedDirectoryIterator() returns WUs in an order of the
|
|
|
+ //!descendingReq, we store them into the wus for now.
|
|
|
+ Owned<CWUData> wu = new CWUData();
|
|
|
+ wu->wuTree.setown(wuTree.getClear());
|
|
|
+ if (outputModifiedTime)
|
|
|
+ fileIterator->getModifiedTime(wu->modifiedTime);
|
|
|
+ wus.append(*wu.getClear());
|
|
|
}
|
|
|
- cmd->addId(name.str());
|
|
|
- if (cmd->getAction()==SCA_GET) {
|
|
|
- StringBuffer xml;
|
|
|
- toXML(&pt,xml);
|
|
|
- if (!cmd->addResult(xml.str()))
|
|
|
+ else
|
|
|
+ {
|
|
|
+ CDateTime dt;
|
|
|
+ if (outputModifiedTime)
|
|
|
+ fileIterator->getModifiedTime(dt);
|
|
|
+
|
|
|
+ if (!addOutput(outputModifiedTime, hasWUSOutput, wuTree, dt, WUSbuf))
|
|
|
+ {
|
|
|
+ overflowed = true;
|
|
|
break;
|
|
|
+ }
|
|
|
}
|
|
|
+ wuCount++;
|
|
|
+ if (wuCount == maxNumWUs)
|
|
|
+ break;
|
|
|
}
|
|
|
- if (index>start+num)
|
|
|
+ if (overflowed || (wuCount == maxNumWUs))
|
|
|
break;
|
|
|
}
|
|
|
+ if (countBackward)
|
|
|
+ { //now, we output the wus in reverse order.
|
|
|
+ ForEachItemInRev(i, wus)
|
|
|
+ {
|
|
|
+ CWUData &wuData = wus.item(i);
|
|
|
+ if (!addOutput(outputModifiedTime, hasWUSOutput, wuData.wuTree, wuData.modifiedTime, WUSbuf)) //Log an error?
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (hasWUSOutput)
|
|
|
+ cmd->setWUSresult(WUSbuf);
|
|
|
}
|
|
|
- }
|
|
|
- if (haswusoutput)
|
|
|
- cmd->setWUSresult(WUSbuf);
|
|
|
+ void getOnlineWUs()
|
|
|
+ {
|
|
|
+ if (cmd->getAction()==SCA_WORKUNIT_SERVICES_GET)
|
|
|
+ throw MakeStringException(-1,"SCA_WORKUNIT_SERVICES_GET not implemented for online workunits!");
|
|
|
+
|
|
|
+ Owned<IRemoteConnection> conn = getSDSConnection(isWild ? nullptr : mask.str());
|
|
|
+ if (!conn)
|
|
|
+ return;
|
|
|
+
|
|
|
+ if (countBackward ? !descendingReq : descendingReq)
|
|
|
+ getOnlineWUsDescending(conn);
|
|
|
+ else
|
|
|
+ getOnlineWUsAscending(conn);
|
|
|
+ }
|
|
|
+ } reader(cmd, mask);
|
|
|
+ if (cmd->getArchived())
|
|
|
+ reader.getArchivedWUs();
|
|
|
+ if (cmd->getOnline())
|
|
|
+ reader.getOnlineWUs();
|
|
|
}
|
|
|
|
|
|
|