/*############################################################################## Copyright (C) 2011 HPCC Systems. All rights reserved. This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with this program. If not, see . ############################################################################## */ #include "ws_workunitsService.hpp" #include "ws_fs.hpp" #include "jlib.hpp" #include "daclient.hpp" #include "dalienv.hpp" #include "dadfs.hpp" #include "daaudit.hpp" #include "exception_util.hpp" #include "wujobq.hpp" #include "eventqueue.hpp" #include "fileview.hpp" #include "hqlerror.hpp" #include "sacmd.hpp" #include "wuwebview.hpp" #include "portlist.h" #include "dllserver.hpp" #include "schedulectrl.hpp" #include "scheduleread.hpp" #include "roxiemanager.hpp" #include "dadfs.hpp" #include "dfuwu.hpp" #include "thorplugin.hpp" #ifdef _USE_ZLIB #include "zcrypt.hpp" #endif #define ESP_WORKUNIT_DIR "workunits/" class NewWsWorkunit : public Owned { public: NewWsWorkunit(IWorkUnitFactory *factory, IEspContext &context) { create(factory, context); } NewWsWorkunit(IEspContext &context) { Owned factory = getWorkUnitFactory(context.querySecManager(), context.queryUser()); create(factory, context); } ~NewWsWorkunit() { if (get()) get()->commit(); } void create(IWorkUnitFactory *factory, IEspContext &context) { setown(factory->createWorkUnit(NULL, "ws_workunits", context.queryUserId())); if(!get()) throw MakeStringException(ECLWATCH_CANNOT_CREATE_WORKUNIT,"Could not create workunit."); get()->setUser(context.queryUserId()); } void associateDll(const char *dllpath, const char *dllname) { Owned query = get()->updateQuery(); StringBuffer dllurl; createUNCFilename(dllpath, dllurl); unsigned crc = crc_file(dllpath); associateLocalFile(query, FileTypeDll, dllpath, "Workunit DLL", crc); queryDllServer().registerDll(dllname, "Workunit DLL", dllurl.str()); } void setQueryText(const char *text) { if (!text || !*text) return; Owned query=get()->updateQuery(); query->setQueryText(text); } }; void setWsWuXmlParameters(IWorkUnit *wu, const char *xml, bool setJobname=false) { if (!xml || !*xml) return; Owned tree = createPTreeFromXMLString(xml, ipt_none, (XmlReaderOptions)(xr_ignoreWhiteSpace | xr_ignoreNameSpaces)); IPropertyTree *root = tree.get(); if (strieq(root->queryName(), "Envelope")) root = root->queryPropTree("Body/*[1]"); if (!root) return; if (setJobname) { SCMStringBuffer name; wu->getJobName(name); if (!name.length()) wu->setJobName(root->queryName()); } wu->setXmlParams(LINK(root)); } void submitWsWorkunit(IEspContext& context, IConstWorkUnit* cw, const char* cluster, const char* snapshot, int maxruntime, bool compile, bool resetWorkflow, const char *paramXml=NULL) { ensureWsWorkunitAccess(context, *cw, SecAccess_Write); switch(cw->getState()) { case WUStateRunning: case WUStateDebugPaused: case WUStateDebugRunning: case WUStateCompiling: case WUStateAborting: case WUStateBlocked: { SCMStringBuffer descr; throw MakeStringException(ECLWATCH_CANNOT_SUBMIT_WORKUNIT, "Cannot submit the workunit. Workunit state is '%s'.", cw->getStateDesc(descr).str()); } } SCMStringBuffer wuid; cw->getWuid(wuid); WorkunitUpdate wu(&cw->lock()); if(!wu.get()) throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT, "Cannot update workunit %s.", wuid.str()); wu->clearExceptions(); if(notEmpty(cluster)) wu->setClusterName(cluster); if(notEmpty(snapshot)) wu->setSnapshot(snapshot); wu->setState(WUStateSubmitted); if (maxruntime) wu->setDebugValueInt("maxRunTime",maxruntime,true); if (resetWorkflow) { wu->resetWorkflow(); if (!compile) wu->schedule(); } setWsWuXmlParameters(wu, paramXml, (wu->getAction()==WUActionExecuteExisting)); wu->commit(); wu.clear(); if (!compile) runWorkUnit(wuid.str()); else if (context.querySecManager()) secSubmitWorkUnit(wuid.str(), *context.querySecManager(), *context.queryUser()); else submitWorkUnit(wuid.str(), context.queryUserId(), context.queryPassword()); AuditSystemAccess(context.queryUserId(), true, "Submitted %s", wuid.str()); } void submitWsWorkunit(IEspContext& context, const char *wuid, const char* cluster, const char* snapshot, int maxruntime, bool compile, bool resetWorkflow, const char *paramXml=NULL) { Owned factory = getWorkUnitFactory(context.querySecManager(), context.queryUser()); Owned cw = factory->openWorkUnit(wuid, false); return submitWsWorkunit(context, cw, cluster, snapshot, maxruntime, compile, resetWorkflow, paramXml); } void copyWsWorkunit(IEspContext &context, IWorkUnit &wu, const char *srcWuid) { Owned factory = getWorkUnitFactory(context.querySecManager(), context.queryUser()); Owned src(factory->openWorkUnit(srcWuid, false)); SCMStringBuffer wuid; wu.getWuid(wuid); queryExtendedWU(&wu)->copyWorkUnit(src); SCMStringBuffer token; wu.setSecurityToken(createToken(wuid.str(), context.queryUserId(), context.queryPassword(), token).str()); wu.commit(); } void runWsWorkunit(IEspContext &context, StringBuffer &wuid, const char *srcWuid, const char *cluster, const char *paramXml=NULL) { StringBufferAdaptor isvWuid(wuid); NewWsWorkunit wu(context); wu->getWuid(isvWuid); copyWsWorkunit(context, *wu, srcWuid); wu.clear(); submitWsWorkunit(context, wuid.str(), cluster, NULL, 0, false, true, paramXml); } void runWsWorkunit(IEspContext &context, IConstWorkUnit *cw, const char *srcWuid, const char *cluster, const char *paramXml=NULL) { WorkunitUpdate wu(&cw->lock()); copyWsWorkunit(context, *wu, srcWuid); wu.clear(); submitWsWorkunit(context, cw, cluster, NULL, 0, false, true, paramXml); } IException *noteException(IWorkUnit *wu, IException *e, WUExceptionSeverity level=ExceptionSeverityError) { if (wu) { Owned we = wu->createException(); StringBuffer s; we->setExceptionMessage(e->errorMessage(s).str()); we->setExceptionSource("WsWorkunits"); we->setSeverity(level); if (level==ExceptionSeverityError) wu->setState(WUStateFailed); } return e; } StringBuffer &resolveQueryWuid(StringBuffer &wuid, const char *queryset, const char *query, bool notSuspended=true, IWorkUnit *wu=NULL) { Owned qs = getQueryRegistry(queryset, true); if (!qs) throw noteException(wu, MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "QuerySet '%s' not found", queryset)); Owned q = resolveQueryAlias(qs, query); if (!q) throw noteException(wu, MakeStringException(ECLWATCH_QUERYID_NOT_FOUND, "Query '%s/%s' not found", queryset, query)); if (notSuspended && q->getPropBool("@suspended")) throw noteException(wu, MakeStringException(ECLWATCH_QUERY_SUSPENDED, "Query '%s/%s' is suspended", queryset, query)); return wuid.append(q->queryProp("@wuid")); } void runWsWuQuery(IEspContext &context, IConstWorkUnit *cw, const char *queryset, const char *query, const char *cluster, const char *paramXml=NULL) { StringBuffer srcWuid; WorkunitUpdate wu(&cw->lock()); resolveQueryWuid(srcWuid, queryset, query, true, wu); copyWsWorkunit(context, *wu, srcWuid); wu.clear(); submitWsWorkunit(context, cw, cluster, NULL, 0, false, true, paramXml); } void runWsWuQuery(IEspContext &context, StringBuffer &wuid, const char *queryset, const char *query, const char *cluster, const char *paramXml=NULL) { StringBuffer srcWuid; StringBufferAdaptor isvWuid(wuid); NewWsWorkunit wu(context); wu->getWuid(isvWuid); resolveQueryWuid(srcWuid, queryset, query, true, wu); copyWsWorkunit(context, *wu, srcWuid); wu.clear(); submitWsWorkunit(context, wuid.str(), cluster, NULL, 0, false, true, paramXml); } class ExecuteExistingQueryInfo { public: ExecuteExistingQueryInfo(IConstWorkUnit *cw) { SCMStringBuffer isv; cw->getJobName(isv); const char *name = isv.str(); const char *div = strchr(name, '.'); if (div) { queryset.set(name, div-name); query.set(div+1); } } public: StringAttr queryset; StringAttr query; }; typedef enum _WuActionType { ActionDelete=0, ActionProtect, ActionAbort, ActionRestore, ActionEventSchedule, ActionEventDeschedule, ActionChangeState, ActionPause, ActionPauseNow, ActionResume, ActionUnknown } WsWuActionType; bool doAction(IEspContext& context, StringArray& wuids, int action, IProperties* params, IArrayOf* results) { if (!wuids.length()) return true; Owned me = MakeMultiException(); Owned factory = getWorkUnitFactory(context.querySecManager(), context.queryUser()); bool bAllSuccess = true; for(aindex_t i=0; i cmd = createSashaCommand(); cmd->setAction(SCA_RESTORE); cmd->addId(wuid); Owned node = createINode(ep); if (!node) throw MakeStringException(ECLWATCH_INODE_NOT_FOUND,"INode not found."); StringBuffer s; if (!cmd->send(node, 1*60*1000)) throw MakeStringException(ECLWATCH_CANNOT_CONNECT_ARCHIVE_SERVER,"Cannot connect to Archive server at %s.", ep.getUrlStr(s).str()); if (cmd->numIds()==0) { WARNLOG("Could not Archive/restore %s",wuid); me->append(*MakeStringException(0,"Cannot archive/restore workunit %s.", wuid)); } StringBuffer reply; cmd->getId(0,reply); AuditSystemAccess(context.queryUserId(), true, "Updated %s", wuid); ensureWsWorkunitAccess(context, wuid, SecAccess_Write); if (results) { Owned res = createWUActionResult("", ""); res->setWuid(wuid); res->setAction(strAction.str()); res->setResult("Success"); results->append(*res.getClear()); } } else { Owned factory = getWorkUnitFactory(context.querySecManager(), context.queryUser()); Owned cw = factory->openWorkUnit(wuid, false); StringBuffer strAction; Owned res = createWUActionResult("", ""); res->setWuid(wuid); res->setResult("Success"); if ((action == ActionDelete) && (cw->getState() == WUStateWait)) throw MakeStringException(ECLWATCH_CANNOT_DELETE_WORKUNIT,"Cannot delete a workunit which is in a 'Wait' status."); try { switch(action) { case ActionPause: { ensureWsWorkunitAccess(context, *cw, SecAccess_Full); WorkunitUpdate wu(&cw->lock()); strAction = "Pause"; wu->setAction(WUActionPause); break; } case ActionPauseNow: { ensureWsWorkunitAccess(context, *cw, SecAccess_Full); strAction = "PauseNow"; WorkunitUpdate wu(&cw->lock()); wu->setAction(WUActionPauseNow); break; } case ActionResume: { ensureWsWorkunitAccess(context, *cw, SecAccess_Full); strAction = "Resume"; WorkunitUpdate wu(&cw->lock()); wu->setAction(WUActionResume); break; } case ActionDelete: ensureWsWorkunitAccess(context, *cw, SecAccess_Full); strAction = "Delete"; { int state = cw->getState(); switch (state) { case WUStateWait: throw MakeStringException(ECLWATCH_CANNOT_DELETE_WORKUNIT,"Cannot delete a workunit which is in a 'Wait' status."); case WUStateAborted: case WUStateCompleted: case WUStateFailed: case WUStateArchived: case WUStateCompiled: case WUStateUploadingFiles: break; default: { WorkunitUpdate wu(&cw->lock()); wu->setState(WUStateFailed); } } cw.clear(); factory->deleteWorkUnitEx(wuid); AuditSystemAccess(context.queryUserId(), true, "Deleted %s", wuid); } break; case ActionAbort: ensureWsWorkunitAccess(context, *cw, SecAccess_Full); strAction = "Abort"; { if (cw->getState() == WUStateWait) { WorkunitUpdate wu(&cw->lock()); wu->deschedule(); wu->setState(WUStateAborted); } else secAbortWorkUnit(wuid, *context.querySecManager(), *context.queryUser()); AuditSystemAccess(context.queryUserId(), true, "Aborted %s", wuid); } break; case ActionProtect: strAction = "Protect"; cw->protect(!params || params->getPropBool("Protect",true)); AuditSystemAccess(context.queryUserId(), true, "Updated %s", wuid); break; case ActionChangeState: strAction = "ChangeState"; { if (params) { WUState state = (WUState) params->getPropInt("State"); if (state > WUStateUnknown && state < WUStateSize) { WorkunitUpdate wu(&cw->lock()); wu->setState(state); AuditSystemAccess(context.queryUserId(), true, "Updated %s", wuid); } } } break; case ActionEventSchedule: strAction = "EventSchedule"; { WorkunitUpdate wu(&cw->lock()); wu->schedule(); AuditSystemAccess(context.queryUserId(), true, "Updated %s", wuid); } break; case ActionEventDeschedule: strAction = "EventDeschedule"; { WorkunitUpdate wu(&cw->lock()); wu->deschedule(); AuditSystemAccess(context.queryUserId(), true, "Updated %s", wuid); } break; } } catch (IException *e) { bAllSuccess = false; StringBuffer eMsg; StringBuffer failedMsg("Failed: "); res->setResult(failedMsg.append(e->errorMessage(eMsg)).str()); WARNLOG("Failed to %s workunit: %s, %s", strAction.str(), wuid, eMsg.str()); AuditSystemAccess(context.queryUserId(), false, "Failed to %s %s", strAction.str(), wuid); e->Release(); } if (results) { res->setAction(strAction.str()); results->append(*res.getClear()); } } } catch (IException *E) { me->append(*E); } catch (...) { me->append(*MakeStringException(0,"Unknown exception wuid=%s",wuid)); } } if(me->ordinality()) throw me.getLink(); int timeToWait = 0; if (params) timeToWait = params->getPropInt("BlockTillFinishTimer"); if (timeToWait != 0) { for(aindex_t i=0; i wuActionTable; void CWsWorkunitsEx::init(IPropertyTree *cfg, const char *process, const char *service) { if (!daliClientActive()) { ERRLOG("No Dali Connection Active."); throw MakeStringException(-1, "No Dali Connection Active. Please Specify a Dali to connect to in you configuration file"); } setPasswordsFromSDS(); DBGLOG("Initializing %s service [process = %s]", service, process); wuActionTable.setValue("delete", ActionDelete); wuActionTable.setValue("abort", ActionAbort); wuActionTable.setValue("pausenow", ActionPauseNow); wuActionTable.setValue("pause", ActionPause); wuActionTable.setValue("resume", ActionResume); wuActionTable.setValue("protect", ActionProtect); wuActionTable.setValue("unprotect", ActionProtect); wuActionTable.setValue("restore", ActionRestore); wuActionTable.setValue("reschedule", ActionEventSchedule); wuActionTable.setValue("deschedule", ActionEventDeschedule); wuActionTable.setValue("settofailed", ActionChangeState); awusCacheMinutes = AWUS_CACHE_MIN_DEFAULT; VStringBuffer xpath("Software/EspProcess[@name=\"%s\"]/EspService[@name=\"%s\"]/AWUsCacheMinutes", process, service); cfg->getPropInt(xpath.str(), awusCacheMinutes); const char *name = cfg->queryProp("Software/EspProcess/@name"); getConfigurationDirectory(cfg->queryPropTree("Software/Directories"), "query", "esp", name ? name : "esp", queryDirectory); recursiveCreateDirectory(queryDirectory.str()); dataCache.setown(new DataCache(DATA_SIZE)); archivedWuCache.setown(new ArchivedWuCache(AWUS_CACHE_SIZE)); //Create a folder for temporarily holding gzip files by WUResultBin() Owned tmpdir = createIFile(TEMPZIPDIR); if(!tmpdir->exists()) tmpdir->createDirectory(); recursiveCreateDirectory(ESP_WORKUNIT_DIR); m_sched.start(); } bool CWsWorkunitsEx::onWUCreate(IEspContext &context, IEspWUCreateRequest &req, IEspWUCreateResponse &resp) { try { if (!context.validateFeatureAccess(OWN_WU_ACCESS, SecAccess_Write, false)) throw MakeStringException(ECLWATCH_ECL_WU_ACCESS_DENIED, "Failed to create workunit. Permission denied."); NewWsWorkunit wu(context); SCMStringBuffer wuid; resp.updateWorkunit().setWuid(wu->getWuid(wuid).str()); AuditSystemAccess(context.queryUserId(), true, "Updated %s", wuid.str()); } catch(IException* e) { FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR); } return true; } static bool origValueChanged(const char *newValue, const char *origValue, StringBuffer &s, bool nillable=true) { if (!nillable && isEmpty(newValue)) return false; if(newValue && origValue) { if (!streq(origValue, newValue)) { s.append(newValue).trim(); return true; } return false; } if (newValue) { s.append(newValue).trim(); return true; } return (origValue!=NULL); } bool CWsWorkunitsEx::onWUUpdate(IEspContext &context, IEspWUUpdateRequest &req, IEspWUUpdateResponse &resp) { try { ensureWsWorkunitAccess(context, req.getWuid(), SecAccess_Write); Owned factory = getWorkUnitFactory(context.querySecManager(), context.queryUser()); Owned cw = factory->openWorkUnit(req.getWuid(), false); if(!cw) throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT,"Cannot open workunit %s.",req.getWuid()); if(req.getProtected() != req.getProtectedOrig()) { cw->protect(req.getProtected()); cw.clear(); cw.setown(factory->openWorkUnit(req.getWuid(), false)); if(!cw) throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT,"Cannot open workunit %s.",req.getWuid()); } if ((req.getState() == WUStateRunning)||(req.getState() == WUStateDebugPaused)||(req.getState() == WUStateDebugRunning)) { WsWuInfo winfo(context, cw); winfo.getInfo(resp.updateWorkunit(), WUINFO_All); resp.setRedirectUrl(StringBuffer("/WsWorkunits/WUInfo?Wuid=").append(req.getWuid()).str()); AuditSystemAccess(context.queryUserId(), true, "Updated %s", req.getWuid()); return true; } WorkunitUpdate wu(&cw->lock()); if(!req.getState_isNull() && (req.getStateOrig_isNull() || req.getState() != req.getStateOrig())) { if (!req.getStateOrig_isNull() && cw->getState() != (WUState) req.getStateOrig()) throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT, "Cannot update workunit %s because its state has been changed internally. Please refresh the page and try again.", req.getWuid()); WUState state = (WUState) req.getState(); if(state < WUStateSize) wu->setState(state); } StringBuffer s; if (origValueChanged(req.getJobname(), req.getJobnameOrig(), s)) wu->setJobName(s.trim().str()); if (origValueChanged(req.getDescription(), req.getDescriptionOrig(), s.clear())) wu->setDebugValue("description", (req.getDescription()) ? s.trim().str() : NULL, true); double version = context.getClientVersion(); if (version > 1.04) { if (origValueChanged(req.getClusterSelection(), req.getClusterOrig(), s.clear(), false)) { if (req.getState() == WUStateBlocked) switchWorkUnitQueue(wu.get(), s.str()); else if ((req.getState() != WUStateSubmitted) && (req.getState() != WUStateRunning) && (req.getState() != WUStateDebugPaused) && (req.getState() != WUStateDebugRunning)) wu->setClusterName(s.str()); } } setWsWuXmlParameters(wu, req.getXmlParams(), (req.getAction()==WUActionExecuteExisting)); if (notEmpty(req.getQueryText())) { Owned query=wu->updateQuery(); query->setQueryText(req.getQueryText()); } if (version > 1.34 && notEmpty(req.getQueryMainDefinition())) { Owned query=wu->updateQuery(); query->setQueryMainDefinition(req.getQueryMainDefinition()); } if (!req.getResultLimit_isNull()) wu->setResultLimit(req.getResultLimit()); if (!req.getAction_isNull()) { WUAction action = (WUAction) req.getAction(); if(action < WUActionSize) wu->setAction(action); } if (!req.getPriorityClass_isNull()) { WUPriorityClass priority = (WUPriorityClass) req.getPriorityClass(); if(prioritysetPriority(priority); } if (!req.getPriorityLevel_isNull()) wu->setPriorityLevel(req.getPriorityLevel()); if (origValueChanged(req.getScope(), req.getScopeOrig(), s.clear(), false)) wu->setWuScope(s.str()); ForEachItemIn(di, req.getDebugValues()) { IConstDebugValue& item = req.getDebugValues().item(di); if (notEmpty(item.getName())) wu->setDebugValue(item.getName(), item.getValue(), true); } ForEachItemIn(ai, req.getApplicationValues()) { IConstApplicationValue& item=req.getApplicationValues().item(ai); if(notEmpty(item.getApplication()) && notEmpty(item.getName())) wu->setApplicationValue(item.getApplication(), item.getName(), item.getValue(), true); } wu->commit(); wu.clear(); WsWuInfo winfo(context, cw); winfo.getInfo(resp.updateWorkunit(), WUINFO_All); StringBuffer thorSlaveIP; if (version > 1.24 && notEmpty(req.getThorSlaveIP())) thorSlaveIP = req.getThorSlaveIP(); if (thorSlaveIP.length() > 0) { StringBuffer url; url.appendf("/WsWorkunits/WUInfo?Wuid=%s&ThorSlaveIP=%s", req.getWuid(), thorSlaveIP.str()); resp.setRedirectUrl(url.str()); } else resp.setRedirectUrl(StringBuffer("/WsWorkunits/WUInfo?Wuid=").append(req.getWuid()).str()); AuditSystemAccess(context.queryUserId(), true, "Updated %s", req.getWuid()); } catch(IException* e) { FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR); } return true; } bool CWsWorkunitsEx::onWUCreateAndUpdate(IEspContext &context, IEspWUUpdateRequest &req, IEspWUUpdateResponse &resp) { try { if (!context.validateFeatureAccess(OWN_WU_ACCESS, SecAccess_Write, false)) throw MakeStringException(ECLWATCH_ECL_WU_ACCESS_DENIED, "Failed to create workunit. Permission denied."); NewWsWorkunit wu(context); SCMStringBuffer wuid; wu->getWuid(wuid); req.setWuid(wuid.str()); } catch(IException* e) { FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR); } return onWUUpdate(context, req, resp); } static inline StringBuffer &appendUrlParameter(StringBuffer &url, const char *name, const char *value, bool &first) { if (notEmpty(value)) { url.append(first ? '?' : '&').append(name).append('=').append(value); first=false; } return url; } bool CWsWorkunitsEx::onWUAction(IEspContext &context, IEspWUActionRequest &req, IEspWUActionResponse &resp) { try { StringBuffer sAction(req.getActionType()); if (!sAction.length()) throw MakeStringException(ECLWATCH_INVALID_INPUT,"Action not defined."); int *action=wuActionTable.getValue(sAction.toLowerCase().str()); if (!action) throw MakeStringException(ECLWATCH_INVALID_INPUT,"Invalid Action '%s'.", sAction.str()); Owned params = createProperties(true); params->setProp("BlockTillFinishTimer", req.getBlockTillFinishTimer()); if (*action==ActionProtect) params->setProp("Protect", streq(sAction.str(), "protect")); if (*action==ActionChangeState && streq(sAction.str(), "settofailed")) params->setProp("State",4); IArrayOf results; if (doAction(context, req.getWuids(), *action, params, &results) && *action!=ActionDelete) { StringBuffer redirect; if(req.getPageFrom() && strieq(req.getPageFrom(), "wuid")) redirect.append("/WsWorkunits/WUInfo?Wuid=").append(req.getWuids().item(0)); else if (req.getPageFrom() && strieq(req.getPageFrom(), "scheduler")) { redirect.set("/WsWorkunits/WUShowScheduled"); bool first=true; appendUrlParameter(redirect, "Cluster", req.getEventServer(), first); appendUrlParameter(redirect, "EventName", req.getEventName(), first); } else { redirect.append("/WsWorkunits/WUQuery"); bool first=true; appendUrlParameter(redirect, "PageSize", req.getPageSize(), first); appendUrlParameter(redirect, "PageStartFrom", req.getCurrentPage(), first); appendUrlParameter(redirect, "Sortby", req.getSortby(), first); appendUrlParameter(redirect, "Descending", req.getDescending() ? "1" : "0", first); appendUrlParameter(redirect, "State", req.getState(), first); appendUrlParameter(redirect, "Cluster", req.getCluster(), first); appendUrlParameter(redirect, "Owner", req.getOwner(), first); appendUrlParameter(redirect, "StartDate", req.getStartDate(), first); appendUrlParameter(redirect, "EndDate", req.getEndDate(), first); appendUrlParameter(redirect, "ECL", req.getECL(), first); appendUrlParameter(redirect, "Jobname", req.getJobname(), first); } resp.setRedirectUrl(redirect.str()); } else resp.setActionResults(results); } catch(IException* e) { FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR); } return true; } bool CWsWorkunitsEx::onWUDelete(IEspContext &context, IEspWUDeleteRequest &req, IEspWUDeleteResponse &resp) { try { IArrayOf results; Owned params = createProperties(true); params->setProp("BlockTillFinishTimer", req.getBlockTillFinishTimer()); if (!doAction(context,req.getWuids(), ActionDelete, params, &results)) resp.setActionResults(results); } catch(IException* e) { FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR); } return true; } bool CWsWorkunitsEx::onWUAbort(IEspContext &context, IEspWUAbortRequest &req, IEspWUAbortResponse &resp) { try { IArrayOf results; Owned params = createProperties(true); params->setProp("BlockTillFinishTimer", req.getBlockTillFinishTimer()); if (!doAction(context,req.getWuids(), ActionAbort, params, &results)) resp.setActionResults(results); } catch(IException* e) { FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR); } return true; } bool CWsWorkunitsEx::onWUProtect(IEspContext &context, IEspWUProtectRequest &req, IEspWUProtectResponse &resp)\ { try { IArrayOf results; Owned params(createProperties(true)); params->setProp("Protect", req.getProtect()); params->setProp("BlockTillFinishTimer", 0); if (!doAction(context,req.getWuids(), ActionProtect, params, &results)) resp.setActionResults(results); } catch(IException* e) { FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR); } return true; } bool CWsWorkunitsEx::onWUResubmit(IEspContext &context, IEspWUResubmitRequest &req, IEspWUResubmitResponse &resp) { try { Owned me = MakeMultiException(); SCMStringBuffer wuid; for(aindex_t i=0; i factory = getWorkUnitFactory(context.querySecManager(), context.queryUser()); if(req.getCloneWorkunit() || req.getRecompile()) { Owned src(factory->openWorkUnit(wuid.str(), false)); NewWsWorkunit wu(factory, context); wu->getWuid(wuid); queryExtendedWU(wu)->copyWorkUnit(src); SCMStringBuffer token; wu->setSecurityToken(createToken(wuid.str(), context.queryUserId(), context.queryPassword(), token).str()); } Owned cw(factory->openWorkUnit(wuid.str(), false)); submitWsWorkunit(context, cw, NULL, NULL, 0, req.getRecompile(), req.getResetWorkflow()); } catch (IException *E) { me->append(*E); } catch (...) { me->append(*MakeStringException(0,"Unknown exception submitting %s",wuid.str())); } } if(me->ordinality()) throw me.getLink(); int timeToWait = req.getBlockTillFinishTimer(); if (timeToWait != 0) { for(aindex_t i=0; i factory = getWorkUnitFactory(context.querySecManager(), context.queryUser()); WorkunitUpdate wu(factory->updateWorkUnit(req.getWuid())); ensureWsWorkunitAccess(context, *wu.get(), SecAccess_Write); switch(wu->getState()) { case WUStateDebugPaused: case WUStateDebugRunning: case WUStateRunning: case WUStateAborting: case WUStateBlocked: { SCMStringBuffer descr; throw MakeStringException(ECLWATCH_CANNOT_SCHEDULE_WORKUNIT, "Cannot schedule the workunit. Workunit state is '%s'.", wu->getStateDesc(descr).str()); } } wu->clearExceptions(); wu->setClusterName(cluster); if (notEmpty(req.getWhen())) { WsWuDateTime dt; dt.setString(req.getWhen()); wu->setTimeScheduled(dt); } if(notEmpty(req.getSnapshot())) wu->setSnapshot(req.getSnapshot()); wu->setState(WUStateScheduled); if (req.getMaxRunTime()) wu->setDebugValueInt("maxRunTime", req.getMaxRunTime(), true); SCMStringBuffer token; wu->setSecurityToken(createToken(req.getWuid(), context.queryUserId(), context.queryPassword(), token).str()); AuditSystemAccess(context.queryUserId(), true, "Scheduled %s", req.getWuid()); } catch(IException* e) { FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR); } return true; } bool CWsWorkunitsEx::onWUSubmit(IEspContext &context, IEspWUSubmitRequest &req, IEspWUSubmitResponse &resp) { try { if (isEmpty(req.getWuid())) throw MakeStringException(ECLWATCH_INVALID_INPUT, "No workunit ID provided."); DBGLOG("Submit workunit: %s", req.getWuid()); if (isEmpty(req.getCluster())) throw MakeStringException(ECLWATCH_INVALID_INPUT,"No Cluster defined."); Owned factory = getWorkUnitFactory(context.querySecManager(), context.queryUser()); Owned cw = factory->openWorkUnit(req.getWuid(), false); if (cw->getAction()==WUActionExecuteExisting) { ExecuteExistingQueryInfo info(cw); if (info.queryset.isEmpty() || info.query.isEmpty()) { WorkunitUpdate wu(&cw->lock()); throw noteException(wu, MakeStringException(ECLWATCH_INVALID_INPUT,"Queryset and/or query not specified")); } runWsWuQuery(context, cw, info.queryset.sget(), info.query.sget(), req.getCluster(), NULL); } else submitWsWorkunit(context, cw, req.getCluster(), req.getSnapshot(), req.getMaxRunTime(), true, false); if (req.getBlockTillFinishTimer() != 0) waitForWorkUnitToComplete(req.getWuid(), req.getBlockTillFinishTimer()); resp.setRedirectUrl(StringBuffer("/WsWorkunits/WUInfo?Wuid=").append(req.getWuid()).str()); } catch(IException* e) { FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR); } return true; } bool CWsWorkunitsEx::onWURun(IEspContext &context, IEspWURunRequest &req, IEspWURunResponse &resp) { try { const char *runWuid = req.getWuid(); StringBuffer wuid; if (runWuid && *runWuid) { if (req.getCloneWorkunit()) runWsWorkunit(context, wuid, runWuid, req.getCluster(), req.getInput()); else { submitWsWorkunit(context, runWuid, req.getCluster(), NULL, 0, false, true, req.getInput()); wuid.set(runWuid); } } else if (notEmpty(req.getQuerySet()) && notEmpty(req.getQuery())) runWsWuQuery(context, wuid, req.getQuerySet(), req.getQuery(), req.getCluster(), req.getInput()); else throw MakeStringException(ECLWATCH_MISSING_PARAMS,"Workunit or Query required"); int timeToWait = req.getWait(); if (timeToWait != 0) waitForWorkUnitToComplete(wuid.str(), timeToWait); Owned factory = getWorkUnitFactory(context.querySecManager(), context.queryUser()); Owned cw = factory->openWorkUnit(wuid.str(), false); if (!cw) throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT,"Cannot open workunit %s.", wuid.str()); SCMStringBuffer stateDesc; resp.setState(cw->getStateDesc(stateDesc).str()); resp.setWuid(wuid.str()); switch (cw->getState()) { case WUStateCompleted: case WUStateFailed: case WUStateUnknown: { SCMStringBuffer result; getFullWorkUnitResultsXML(context.queryUserId(), context.queryPassword(), cw.get(), result, false, ExceptionSeverityInformation, req.getNoRootTag()); resp.setResults(result.str()); break; } default: break; } } catch(IException* e) { FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR); } return true; } bool CWsWorkunitsEx::onWUWaitCompiled(IEspContext &context, IEspWUWaitRequest &req, IEspWUWaitResponse &resp) { try { secWaitForWorkUnitToCompile(req.getWuid(), *context.querySecManager(), *context.queryUser(), req.getWait()); Owned factory = getWorkUnitFactory(context.querySecManager(), context.queryUser()); Owned cw = factory->openWorkUnit(req.getWuid(), false); resp.setStateID(cw->getState()); } catch(IException* e) { FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR); } return true; } bool CWsWorkunitsEx::onWUWaitComplete(IEspContext &context, IEspWUWaitRequest &req, IEspWUWaitResponse &resp) { try { resp.setStateID(secWaitForWorkUnitToComplete(req.getWuid(), *context.querySecManager(), *context.queryUser(), req.getWait(), req.getReturnOnWait())); } catch(IException* e) { FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR); } return true; } bool CWsWorkunitsEx::onWUCDebug(IEspContext &context, IEspWUDebugRequest &req, IEspWUDebugResponse &resp) { try { StringBuffer result; secDebugWorkunit(req.getWuid(), *context.querySecManager(), *context.queryUser(), req.getCommand(), result); resp.setResult(result); } catch(IException* e) { FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR); } return true; } bool CWsWorkunitsEx::onWUSyntaxCheckECL(IEspContext &context, IEspWUSyntaxCheckRequest &req, IEspWUSyntaxCheckResponse &resp) { try { Owned factory = getWorkUnitFactory(context.querySecManager(), context.queryUser()); NewWsWorkunit wu(factory, context); wu->setAction(WUActionCheck); if(notEmpty(req.getModuleName()) && notEmpty(req.getAttributeName())) { wu->setApplicationValue("SyntaxCheck", "ModuleName", req.getModuleName(), true); wu->setApplicationValue("SyntaxCheck", "AttributeName", req.getAttributeName(), true); } ForEachItemIn(di, req.getDebugValues()) { IConstDebugValue& item=req.getDebugValues().item(di); if(notEmpty(item.getName())) wu->setDebugValue(item.getName(), item.getValue(), true); } wu.setQueryText(req.getECL()); SCMStringBuffer wuid; wu->getWuid(wuid); wu->commit(); wu.clear(); submitWsWorkunit(context, wuid.str(), req.getCluster(), req.getSnapshot(), 0, true, false); waitForWorkUnitToComplete(wuid.str(), req.getTimeToWait()); Owned cw(factory->openWorkUnit(wuid.str(), false)); WsWUExceptions errors(*cw); resp.setErrors(errors); cw.clear(); factory->deleteWorkUnit(wuid.str()); } catch(IException* e) { FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR); } return true; } bool CWsWorkunitsEx::onWUCompileECL(IEspContext &context, IEspWUCompileECLRequest &req, IEspWUCompileECLResponse &resp) { try { ensureWsCreateWorkunitAccess(context); Owned factory = getWorkUnitFactory(context.querySecManager(), context.queryUser()); NewWsWorkunit wu(factory, context); if(req.getIncludeComplexity()) { wu->setAction(WUActionCompile); wu->setDebugValueInt("calculateComplexity",1,true); } else wu->setAction(WUActionCheck); if(req.getModuleName() && req.getAttributeName()) { wu->setApplicationValue("SyntaxCheck","ModuleName",req.getModuleName(),true); wu->setApplicationValue("SyntaxCheck","AttributeName",req.getAttributeName(),true); } if(req.getIncludeDependencies()) wu->setApplicationValueInt("SyntaxCheck","IncludeDependencies",1,true); wu.setQueryText(req.getECL()); SCMStringBuffer wuid; wu->getWuid(wuid); wu.clear(); submitWsWorkunit(context, wuid.str(), req.getCluster(), req.getSnapshot(), 0, true, false); waitForWorkUnitToComplete(wuid.str(),req.getTimeToWait()); Owned cw = factory->openWorkUnit(wuid.str(), false); SCMStringBuffer s; cw->getDebugValue("__Calculated__Complexity__",s); if(s.length()) resp.setComplexity(s.str()); WsWUExceptions errors(*cw); resp.setErrors(errors); if(!errors.ErrCount()) { IArrayOf dependencies; for(unsigned count=1;;count++) { SCMStringBuffer xml; cw->getApplicationValue("SyntaxCheck",StringBuffer("Dependency").append(count).str(),xml); if(!xml.length()) break; Owned dep=createPTreeFromXMLString(xml.str(), ipt_caseInsensitive); if(!dep) continue; Owned att = createWUECLAttribute("",""); att->setModuleName(dep->queryProp("@module")); att->setAttributeName(dep->queryProp("@name")); int flags = dep->getPropInt("@flags",0); if(flags & ob_locked) { if(flags & ob_lockedself) att->setIsCheckedOut(true); else att->setIsLocked(true); } if(flags & ob_sandbox) att->setIsSandbox(true); if(flags & ob_orphaned) att->setIsOrphaned(true); dependencies.append(*att.getLink()); } resp.setDependencies(dependencies); } cw.clear(); factory->deleteWorkUnit(wuid.str()); } catch(IException* e) { FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR); } return true; } bool CWsWorkunitsEx::onWUGetDependancyTrees(IEspContext& context, IEspWUGetDependancyTreesRequest& req, IEspWUGetDependancyTreesResponse& resp) { try { DBGLOG("WUGetDependancyTrees"); unsigned int timeMilliSec = 500; Owned factory = getWorkUnitFactory(context.querySecManager(), context.queryUser()); NewWsWorkunit wu(factory, context); wu->setAction(WUActionCheck); if (notEmpty(req.getCluster())) wu->setClusterName(req.getCluster()); if (notEmpty(req.getSnapshot())) wu->setSnapshot(req.getSnapshot()); wu->setDebugValue("gatherDependenciesSelection",notEmpty(req.getItems()) ? req.getItems() : NULL,true); if (context.getClientVersion() > 1.12) { wu->setDebugValueInt("gatherDependencies", 1, true); const char *timeout = req.getTimeoutMilliSec(); if (notEmpty(timeout)) { const char *finger = timeout; while (*finger) { if (!isdigit(*finger++)) throw MakeStringException(ECLWATCH_INVALID_INPUT, "Incorrect timeout value"); } timeMilliSec = atol(timeout); } } SCMStringBuffer wuid; wu->getWuid(wuid); wu->commit(); wu.clear(); ensureWsWorkunitAccess(context, wuid.str(), SecAccess_Read); submitWsWorkunit(context, wuid.str(), req.getCluster(), req.getSnapshot(), 0, true, false); int state = waitForWorkUnitToComplete(wuid.str(), timeMilliSec); Owned cw = factory->openWorkUnit(wuid.str(), false); WsWUExceptions errors(*cw); resp.setErrors(errors); MemoryBuffer temp; MemoryBuffer2IDataVal xmlresult(temp); Owned result = wu->getResultBySequence(0); if (result) { result->getResultRaw(xmlresult, NULL, NULL); resp.setDependancyTrees(temp); } wu.setown(&cw->lock()); wu->setState(WUStateAborted); wu->commit(); wu.clear(); factory->deleteWorkUnit(wuid.str()); } catch(IException* e) { FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR); } return true; } bool getWsWuInfoFromSasha(IEspContext &context, SocketEndpoint &ep, const char* wuid, IEspECLWorkunit *info) { Owned node = createINode(ep); Owned cmd = createSashaCommand(); cmd->addId(wuid); cmd->setAction(SCA_GET); if (!cmd->send(node, 1*60*1000)) { StringBuffer url; DBGLOG("Could not connect to Sasha server at %s", ep.getUrlStr(url).str()); throw MakeStringException(ECLWATCH_CANNOT_CONNECT_ARCHIVE_SERVER,"Cannot connect to archive server at %s.", url.str()); } if (cmd->numIds()==0) { DBGLOG("Could not read archived workunit %s",wuid); throw MakeStringException(ECLWATCH_CANNOT_GET_WORKUNIT,"Cannot read workunit %s.",wuid); } unsigned num = cmd->numResults(); if (num < 1) return false; StringBuffer res; cmd->getResult(0, res); if(res.length() < 1) return false; Owned wpt = createPTreeFromXMLString(res.str()); if (!wpt) return false; const char * owner = wpt->queryProp("@submitID"); ensureWsWorkunitAccessByOwnerId(context, owner, SecAccess_Read); info->setWuid(wuid); info->setArchived(true); if (notEmpty(owner)) info->setOwner(owner); const char * state = wpt->queryProp("@state"); if (notEmpty(state)) info->setState(state); const char * cluster = wpt->queryProp("@clusterName"); if (notEmpty(cluster)) info->setCluster(cluster); const char * scope = wpt->queryProp("@scope"); if (notEmpty(scope)) info->setScope(scope); const char * jobName = wpt->queryProp("@jobName"); if (notEmpty(jobName)) info->setJobname(jobName); const char * description = wpt->queryProp("Debug/description"); if (notEmpty(description)) info->setDescription(description); const char * queryText = wpt->queryProp("Query/Text"); if (notEmpty(queryText)) info->updateQuery().setText(queryText); const char * protectedWU = wpt->queryProp("@protected"); info->setProtected((protectedWU && *protectedWU!='0')); return true; } #define WUDETAILS_REFRESH_MINS 1 void getArchivedWUInfo(IEspContext &context, IEspWUInfoRequest &req, IEspWUInfoResponse &resp) { const char *wuid = req.getWuid(); if (isEmpty(req.getWuid())) throw MakeStringException(ECLWATCH_INVALID_INPUT, "Workunit ID not specified."); Owned envFactory = getEnvironmentFactory(); Owned constEnv = envFactory->openEnvironmentByFile(); Owned root = &constEnv->getPTree(); if (!root) throw MakeStringExceptionDirect(ECLWATCH_CANNOT_GET_ENV_INFO, "Failed to get environment info"); Owned instances = root->getElements("Software/SashaServerProcess/Instance"); ForEach(*instances) { IPropertyTree &instance = instances->query(); SocketEndpoint ep(instance.queryProp("@netAddress"), instance.getPropInt("@port", 8877)); if (getWsWuInfoFromSasha(context, ep, req.getWuid(), &resp.updateWorkunit())) { resp.setAutoRefresh(WUDETAILS_REFRESH_MINS); resp.setCanCompile(false); return; } } throw MakeStringException(ECLWATCH_CANNOT_GET_WORKUNIT, "Cannot find workunit %s.", wuid); return; } #define WUDETAILS_REFRESH_MINS 1 bool CWsWorkunitsEx::onWUInfo(IEspContext &context, IEspWUInfoRequest &req, IEspWUInfoResponse &resp) { try { if (req.getType() && strieq(req.getType(), "archived workunits")) getArchivedWUInfo(context, req, resp); else { try { unsigned flags=0; if (req.getTruncateEclTo64k()) flags|=WUINFO_TruncateEclTo64k; if (req.getIncludeExceptions()) flags|=WUINFO_IncludeExceptions; if (req.getIncludeGraphs()) flags|=WUINFO_IncludeGraphs; if (req.getIncludeSourceFiles()) flags|=WUINFO_IncludeSourceFiles; if (req.getIncludeResults()) flags|=WUINFO_IncludeResults; if (req.getIncludeVariables()) flags|=WUINFO_IncludeVariables; if (req.getIncludeTimers()) flags|=WUINFO_IncludeTimers; if (req.getIncludeDebugValues()) flags|=WUINFO_IncludeDebugValues; if (req.getIncludeApplicationValues()) flags|=WUINFO_IncludeApplicationValues; if (req.getIncludeWorkflows()) flags|=WUINFO_IncludeWorkflows; if (!req.getSuppressResultSchemas()) flags|=WUINFO_IncludeEclSchemas; WsWuInfo winfo(context, req.getWuid()); winfo.getInfo(resp.updateWorkunit(), flags); if (req.getIncludeResultsViewNames()) { StringArray views; winfo.getResultViews(views, WUINFO_IncludeResultsViewNames); resp.setResultViews(views); } switch (resp.getWorkunit().getStateID()) { case WUStateCompiling: case WUStateCompiled: case WUStateScheduled: case WUStateSubmitted: case WUStateRunning: case WUStateAborting: case WUStateWait: case WUStateUploadingFiles: case WUStateDebugPaused: case WUStateDebugRunning: resp.setAutoRefresh(WUDETAILS_REFRESH_MINS); break; case WUStateBlocked: resp.setAutoRefresh(WUDETAILS_REFRESH_MINS*5); break; } resp.setCanCompile(notEmpty(context.queryUserId())); if (context.getClientVersion() > 1.24 && notEmpty(req.getThorSlaveIP())) resp.setThorSlaveIP(req.getThorSlaveIP()); } catch (IException *e) { StringBuffer errMsg; if (strnicmp(e->errorMessage(errMsg), "Could not open workunit", 23)) throw e; getArchivedWUInfo(context, req, resp); } } } catch(IException* e) { FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR); } return true; } bool CWsWorkunitsEx::onWUInfoDetails(IEspContext &context, IEspWUInfoRequest &req, IEspWUInfoResponse &resp) { return onWUInfo(context, req, resp); } bool CWsWorkunitsEx::onWUResultView(IEspContext &context, IEspWUResultViewRequest &req, IEspWUResultViewResponse &resp) { ensureWsWorkunitAccess(context, req.getWuid(), SecAccess_Read); Owned wv = createWuWebView(req.getWuid(), NULL, getCFD(), true); StringBuffer html; wv->renderSingleResult(req.getViewName(), req.getResultName(), html); resp.setResult(html.str()); resp.setResult_mimetype("text/html"); return true; } void doWUQueryBySingleWuid(IEspContext &context, const char *wuid, IEspWUQueryResponse &resp) { Owned info= createECLWorkunit("",""); WsWuInfo winfo(context, wuid); winfo.getCommon(*info, 0); IArrayOf results; results.append(*info.getClear()); resp.setWorkunits(results); resp.setPageSize(1); resp.setCount(1); } void doWUQueryByFile(IEspContext &context, const char *logicalFile, IEspWUQueryResponse &resp) { StringBuffer wuid; getWuidFromLogicalFileName(context, logicalFile, wuid); if (!wuid.length()) throw MakeStringException(ECLWATCH_CANNOT_GET_WORKUNIT,"Cannot find the workunit for file %s.", logicalFile); Owned factory = getWorkUnitFactory(context.querySecManager(), context.queryUser()); Owned cw= factory->openWorkUnit(wuid.str(), false); if (!cw) throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT,"Cannot find the workunit for file %s.", logicalFile); if (getWsWorkunitAccess(context, *cw) < SecAccess_Read) throw MakeStringException(ECLWATCH_ECL_WU_ACCESS_DENIED,"Cannot access the workunit for file %s.",logicalFile); SCMStringBuffer parent; if (!cw->getParentWuid(parent).length()) doWUQueryBySingleWuid(context, wuid.str(), resp); resp.setFirst(false); resp.setPageSize(1); resp.setCount(1); } void doWUQueryByXPath(IEspContext &context, IEspWUQueryRequest & req, IEspWUQueryResponse & resp) { IArrayOf results; WsWuSearch wlist(context,req.getOwner(),req.getState(),req.getCluster(),req.getStartDate(),req.getEndDate(),req.getECL(),req.getJobname(),req.getApplicationName(),req.getApplicationKey(),req.getApplicationData()); int count=(int)req.getPageSize(); if (!count) count=100; if (wlist.getSize() < 1) { resp.setNumWUs(0); return; } if (wlist.getSize() < count) count = (int) wlist.getSize() - 1; WsWuSearch::iterator begin, end; if(notEmpty(req.getAfter())) { begin=wlist.locate(req.getAfter()); end=min(begin+count,wlist.end()); } else if (notEmpty(req.getBefore())) { end=wlist.locate(req.getBefore()); begin=max(end-count,wlist.begin()); } else { begin=wlist.begin(); end=min(begin+count,wlist.end()); } if(begin>wlist.begin() && beginc_str()); if (context.getClientVersion() > 1.02) { resp.setPageStartFrom(begin - wlist.begin() + 1); resp.setNumWUs((int)wlist.getSize()); resp.setCount(end - begin); } if(endc_str()); for(;begin!=end;begin++) { Owned info = createECLWorkunit("",""); WsWuInfo winfo(context, begin->c_str()); winfo.getCommon(*info, 0); results.append(*info.getClear()); } resp.setPageSize(abs(count)); resp.setWorkunits(results); return; } bool addWUQueryFilter(WUSortField *filters, unsigned short &count, MemoryBuffer &buff, const char *name, WUSortField value) { if (isEmpty(name)) return false; filters[count++] = value; buff.append(name); return true; } bool addWUQueryFilterTime(WUSortField *filters, unsigned short &count, MemoryBuffer &buff, const char *stime, WUSortField value) { if (isEmpty(stime)) return false; CDateTime dt; dt.setString(stime, NULL, true); unsigned year, month, day, hour, minute, second, nano; dt.getDate(year, month, day, true); dt.getTime(hour, minute, second, nano, true); VStringBuffer wuid("W%4d%02d%02d-%02d%02d%02d",year,month,day,hour,minute,second); filters[count++] = value; buff.append(wuid.str()); return true; } void doWUQueryWithSort(IEspContext &context, IEspWUQueryRequest & req, IEspWUQueryResponse & resp) { SecAccessFlags accessOwn; SecAccessFlags accessOthers; getUserWuAccessFlags(context, accessOwn, accessOthers, true); double version = context.getClientVersion(); IArrayOf results; int begin = 0; unsigned int count = 100; int pagesize = 100; if (version > 1.01) { pagesize = (int)req.getPageSize(); if (!req.getCount_isNull()) pagesize = req.getCount(); if(pagesize < 1) pagesize = 100; begin = (int)req.getPageStartFrom(); } else { count=(unsigned)req.getCount(); if(!count) count=100; if (notEmpty(req.getAfter())) begin=atoi(req.getAfter()); else if (notEmpty(req.getBefore())) begin=atoi(req.getBefore())-count; if (begin < 0) begin = 0; pagesize = count; } WUSortField sortorder[2] = {(WUSortField) (WUSFwuid | WUSFreverse), WUSFterm}; if(notEmpty(req.getSortby())) { const char *sortby = req.getSortby(); if (strieq(sortby, "Owner")) sortorder[0] = WUSFuser; else if (strieq(sortby, "JobName")) sortorder[0] = WUSFjob; else if (strieq(sortby, "Cluster")) sortorder[0] = WUSFcluster; else if (strieq(sortby, "RoxieCluster")) sortorder[0] = WUSFroxiecluster; else if (strieq(sortby, "Protected")) sortorder[0] = WUSFprotected; else if (strieq(sortby, "State")) sortorder[0] = WUSFstate; else if (strieq(sortby, "ThorTime")) sortorder[0] = (WUSortField) (WUSFtotalthortime+WUSFnumeric); else sortorder[0] = WUSFwuid; bool descending = req.getDescending(); if (descending) sortorder[0] = (WUSortField) (sortorder[0] | WUSFreverse); } WUSortField filters[10]; unsigned short filterCount = 0; MemoryBuffer filterbuf; bool bDoubleCheckState = false; if(req.getState()) { addWUQueryFilter(filters, filterCount, filterbuf, strieq(req.getState(), "unknown") ? "" : req.getState(), WUSFstate); if (strieq(req.getState(), "submitted")) bDoubleCheckState = true; } addWUQueryFilter(filters, filterCount, filterbuf, req.getCluster(), WUSFcluster); if(version > 1.07) addWUQueryFilter(filters, filterCount, filterbuf, req.getRoxieCluster(), WUSFroxiecluster); addWUQueryFilter(filters, filterCount, filterbuf, req.getLogicalFile(), WUSFfileread); addWUQueryFilter(filters, filterCount, filterbuf, req.getOwner(), (WUSortField) (WUSFuser | WUSFnocase)); addWUQueryFilter(filters, filterCount, filterbuf, req.getJobname(), (WUSortField) (WUSFjob | WUSFnocase)); addWUQueryFilterTime(filters, filterCount, filterbuf, req.getStartDate(), WUSFwuid); addWUQueryFilterTime(filters, filterCount, filterbuf, req.getEndDate(), WUSFwuidhigh); filters[filterCount] = WUSFterm; Owned factory = getWorkUnitFactory(context.querySecManager(), context.queryUser()); unsigned numWUs = factory->numWorkUnitsFiltered(filters, filterbuf.bufferBase()); Owned it = factory->getWorkUnitsSorted(sortorder, filters, filterbuf.bufferBase(), begin, pagesize+1, "", NULL); unsigned actualCount = 0; ForEach(*it) { actualCount++; IConstWorkUnit& cw = it->query(); if (chooseWuAccessFlagsByOwnership(context.queryUserId(), cw, accessOwn, accessOthers) < SecAccess_Read) continue; if (bDoubleCheckState && (cw.getState() != WUStateSubmitted)) continue; SCMStringBuffer parent; if (!cw.getParentWuid(parent).length()) { Owned info = createECLWorkunit("",""); WsWuInfo winfo(context, cw.getWuid(parent).str()); winfo.getCommon(*info, 0); results.append(*info.getClear()); } } if (version > 1.02) { resp.setPageStartFrom(begin+1); resp.setNumWUs(numWUs); if (results.length() > (aindex_t)pagesize) results.pop(); if(begin + pagesize < numWUs) { resp.setNextPage(begin + pagesize); resp.setPageEndAt(begin + pagesize); int last = begin + pagesize; while (numWUs > (unsigned) last + pagesize) last += pagesize; resp.setLastPage(last); } else { resp.setNextPage(-1); resp.setPageEndAt(numWUs); } if(begin > 0) { resp.setFirst(false); if (begin - pagesize > 0) resp.setPrevPage(begin - pagesize); else resp.setPrevPage(0); } resp.setPageSize(pagesize); } else { if(begin>0 && actualCount > 0) { char buf[10]; itoa(begin, buf, 10); resp.setCurrent(buf); } if(count count) results.pop(); } if(begin == 0 && actualCount <= count) resp.setFirst(false); resp.setCount(count); } resp.setWorkunits(results); return; } void doWUQueryFromArchive(IEspContext &context, ArchivedWuCache &archivedWuCache, int cacheTime, IEspWUQueryRequest & req, IEspWUQueryResponse & resp) { SecAccessFlags accessOwn; SecAccessFlags accessOthers; getUserWuAccessFlags(context, accessOwn, accessOthers, true); __int64 pageSize = req.getPageSize(); if(pageSize < 1) pageSize=100; __int64 displayStart = req.getPageStartFrom(); __int64 displayEnd = displayStart + pageSize; unsigned dateLimit = 0; bool hasNextPage = true; SocketEndpoint ep; getSashaNode(ep); Owned sashaserver = createINode(ep); CDateTime wuTimeFrom, wuTimeTo; if(notEmpty(req.getEndDate())) wuTimeTo.setString(req.getEndDate(), NULL, true); else wuTimeTo.setNow(); if(notEmpty(req.getStartDate())) { wuTimeFrom.setString(req.getStartDate(), NULL, true); dateLimit = 1; } IArrayOf results; StringBuffer filter; addToQueryString(filter, "cluster", req.getCluster(), ';'); addToQueryString(filter, "owner", req.getOwner(), ';'); addToQueryString(filter, "jobName", req.getJobname(), ';'); addToQueryString(filter, "state", req.getState(), ';'); StringBuffer s; if (!req.getLastNDays_isNull() && req.getLastNDays()>0) addToQueryString(filter, "LastNDays", s.clear().append(req.getLastNDays()).str(), ';'); else { addToQueryString(filter, "wuTimeFrom", req.getStartDate(), ';'); addToQueryString(filter, "wuTimeTo", req.getEndDate(), ';'); } addToQueryString(filter, "displayStart", s.append(displayStart).str(), ';'); addToQueryString(filter, "pageSize", s.clear().append(pageSize).str(), ';'); Owned found = archivedWuCache.lookup(context, filter, "AddWhenAvailable", cacheTime); if (found) { hasNextPage = found->m_hasNextPage; if (found->m_results.length()) { ForEachItemIn(ai, found->m_results) { Owned info= createECLWorkunit("",""); info->copy(found->m_results.item(ai)); results.append(*info.getClear()); } } } else { IArrayOf resultList; CDateTime timeTo = wuTimeTo; __int64 totalWus = 0; bool complete = false; while (!complete) { CDateTime timeFrom = timeTo; timeFrom.adjustTime(-1439); //one day earlier if (dateLimit > 0 && wuTimeFrom > timeFrom) timeFrom = wuTimeFrom; unsigned year0, month0, day0, hour0, minute0, second0, nano0; timeFrom.getDate(year0, month0, day0, true); timeFrom.getTime(hour0, minute0, second0, nano0, true); VStringBuffer wuFrom("%4d%02d%02d%02d%02d", year0, month0, day0, hour0, minute0); unsigned year, month, day, hour, minute, second, nano; timeTo.getDate(year, month, day, true); timeTo.getTime(hour, minute, second, nano, true); VStringBuffer wuTo("%4d%02d%02d%02d%02d", year, month, day, hour, minute); __int64 begin = 0; unsigned limit = 1000; bool continueSashaLoop = true; while (continueSashaLoop) { Owned cmd = createSashaCommand(); cmd->setAction(SCA_LIST); cmd->setOnline(false); cmd->setArchived(true); cmd->setAfter(wuFrom.str()); cmd->setBefore(wuTo.str()); cmd->setStart((unsigned)begin); cmd->setLimit(limit); if (notEmpty(req.getCluster())) cmd->setCluster(req.getCluster()); if (notEmpty(req.getOwner())) cmd->setOwner(req.getOwner()); if (notEmpty(req.getJobname())) cmd->setJobName(req.getJobname()); if (notEmpty(req.getState())) cmd->setState(req.getState()); cmd->setOutputFormat("owner,jobname,cluster,state"); if (!cmd->send(sashaserver)) { StringBuffer msg("Cannot connect to archive server at "); sashaserver->endpoint().getUrlStr(msg); throw MakeStringException(ECLWATCH_CANNOT_CONNECT_ARCHIVE_SERVER, "%s", msg.str()); } unsigned actualCount = cmd->numIds(); if (actualCount < 1) break; totalWus += actualCount; if (actualCount < limit) continueSashaLoop = false; for (unsigned ii=0; iiqueryId(ii); if (!csline) continue; StringArray wuidArray; CslToStringArray(csline, wuidArray, false); if (chooseWuAccessFlagsByOwnership(context.queryUserId(), cmd->queryOwner(), accessOwn, accessOthers) < SecAccess_Read) continue; const char* wuid = wuidArray.item(0); if (isEmpty(wuid)) continue; __int64 addToPos = -1; ForEachItemIn(ridx, resultList) { IEspECLWorkunit& w = resultList.item(ridx); if (isEmpty(w.getWuid())) continue; if (strcmp(wuid, w.getWuid())>0) { addToPos = ridx; break; } } if (addToPos < 0 && (ridx > displayEnd)) continue; Owned info= createECLWorkunit("",""); info->setWuid(wuid); if (notEmpty(wuidArray.item(1))) info->setOwner(wuidArray.item(1)); if (notEmpty(wuidArray.item(2))) info->setJobname(wuidArray.item(2)); if (notEmpty(wuidArray.item(3))) info->setCluster(wuidArray.item(3)); if (notEmpty(wuidArray.item(4))) info->setState(wuidArray.item(4)); if (addToPos < 0) resultList.append(*info.getClear()); else resultList.add(*info.getClear(), (aindex_t) addToPos); if (resultList.length() > displayEnd) resultList.pop(); } begin += limit; } timeTo.adjustTime(-1440);//one day earlier if (dateLimit > 0 && wuTimeFrom > timeTo) //we reach the date limit { if (totalWus <= displayEnd) hasNextPage = false; complete = true; } else if ( resultList.length() >= displayEnd) //we have all we need complete = true; } if (displayEnd > resultList.length()) displayEnd = resultList.length(); for (aindex_t i = (aindex_t)displayStart; i < (aindex_t)displayEnd; i++) { Owned info = createECLWorkunit("",""); info->copy(resultList.item(i)); results.append(*info.getClear()); } archivedWuCache.add(filter, "AddWhenAvailable", hasNextPage, results); } resp.setPageStartFrom(displayStart+1); resp.setPageEndAt(displayEnd); if(dateLimit < 1 || hasNextPage) resp.setNextPage(displayStart + pageSize); else resp.setNextPage(-1); if(displayStart > 0) { resp.setFirst(false); if (displayStart - pageSize > 0) resp.setPrevPage(displayStart - pageSize); else resp.setPrevPage(0); } resp.setPageSize(pageSize); resp.setWorkunits(results); resp.setType("archived only"); return; } bool CWsWorkunitsEx::onWUQuery(IEspContext &context, IEspWUQueryRequest & req, IEspWUQueryResponse & resp) { try { DBGLOG("Started CWsWorkunitsEx::onWUQuery\n"); if (req.getType() && strieq(req.getType(), "archived workunits")) doWUQueryFromArchive(context, *archivedWuCache, awusCacheMinutes, req, resp); else if(notEmpty(req.getWuid())) doWUQueryBySingleWuid(context, req.getWuid(), resp); else if (notEmpty(req.getECL()) || notEmpty(req.getApplicationName()) || notEmpty(req.getApplicationKey()) || notEmpty(req.getApplicationData())) doWUQueryByXPath(context, req, resp); else if (notEmpty(req.getLogicalFile()) && req.getLogicalFileSearchType() && strieq(req.getLogicalFileSearchType(), "Created")) doWUQueryByFile(context, req.getLogicalFile(), resp); else doWUQueryWithSort(context, req, resp); resp.setState(req.getState()); resp.setCluster(req.getCluster()); resp.setRoxieCluster(req.getRoxieCluster()); resp.setOwner(req.getOwner()); resp.setStartDate(req.getStartDate()); resp.setEndDate(req.getEndDate()); double version = context.getClientVersion(); StringBuffer basicQuery; addToQueryString(basicQuery, "State", req.getState()); addToQueryString(basicQuery, "Cluster", req.getCluster()); if (version > 1.07) addToQueryString(basicQuery, "RoxieCluster", req.getRoxieCluster()); addToQueryString(basicQuery, "Owner", req.getOwner()); addToQueryString(basicQuery, "StartDate", req.getStartDate()); addToQueryString(basicQuery, "EndDate", req.getEndDate()); if (version > 1.25 && req.getLastNDays() > -1) addToQueryString(basicQuery, "LastNDays", StringBuffer().append(req.getLastNDays()).str()); addToQueryString(basicQuery, "ECL", req.getECL()); addToQueryString(basicQuery, "Jobname", req.getJobname()); addToQueryString(basicQuery, "Type", req.getType()); if (addToQueryString(basicQuery, "LogicalFile", req.getLogicalFile())) addToQueryString(basicQuery, "LogicalFileSearchType", req.getLogicalFileSearchType()); resp.setFilters(basicQuery.str()); if (notEmpty(req.getSortby()) && !strstr(basicQuery.str(), StringBuffer(req.getSortby()).append('=').str())) { resp.setSortby(req.getSortby()); addToQueryString(basicQuery, "Sortby", req.getSortby()); if (req.getDescending()) { resp.setDescending(req.getDescending()); addToQueryString(basicQuery, "Descending", "1"); } } resp.setBasicQuery(basicQuery.str()); StringBuffer s; if(notEmpty(req.getECL())) resp.setECL(Utils::url_encode(req.getECL(), s).str()); if(notEmpty(req.getJobname())) resp.setJobname(Utils::url_encode(req.getJobname(), s.clear()).str()); } catch(IException* e) { FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR); } return true; } void appendResultSet(MemoryBuffer& mb, INewResultSet* result, const char *name, __int64 start, unsigned& count, __int64& total, bool bin) { if (!result) return; const IResultSetMetaData &meta = result->getMetaData(); Owned cursor(result->createCursor()); total=result->getNumRows(); if(bin) count = getResultBin(mb, result, (unsigned)start, count); else { struct MemoryBuffer2IStringVal : public CInterface, implements IStringVal { MemoryBuffer2IStringVal(MemoryBuffer & _buffer) : buffer(_buffer) {} IMPLEMENT_IINTERFACE; virtual const char * str() const { UNIMPLEMENTED; } virtual void set(const char *val) { buffer.append(strlen(val),val); } virtual void clear() { } // support appending only virtual void setLen(const char *val, unsigned length) { buffer.append(length, val); } virtual unsigned length() const { return buffer.length(); }; MemoryBuffer & buffer; } adaptor(mb); count = getResultXml(adaptor, result, name, (unsigned) start, count, "myschema"); } } void getWsWuResult(IEspContext &context, const char* wuid, const char *name, const char *logical, unsigned index, __int64 start, unsigned& count, __int64& total, IStringVal& resname, bool bin, MemoryBuffer& mb) { Owned factory = getWorkUnitFactory(context.querySecManager(), context.queryUser()); Owned cw = factory->openWorkUnit(wuid, false); Owned result; if (notEmpty(name)) result.setown(cw->getResultByName(name)); else if (notEmpty(logical)) { Owned it = &cw->getResults(); ForEach(*it) { IConstWUResult &r = it->query(); SCMStringBuffer filename; if(strieq(r.getResultLogicalName(filename).str(), logical)) { result.setown(LINK(&r)); break; } } } else result.setown(cw->getResultBySequence(index)); if (!result) throw MakeStringException(ECLWATCH_CANNOT_GET_WU_RESULT,"Cannot open the workunit result."); if (!resname.length()) result->getResultName(resname); Owned resultSetFactory; if (context.querySecManager()) resultSetFactory.setown(getSecResultSetFactory(*context.querySecManager(), *context.queryUser())); else resultSetFactory.setown(getResultSetFactory(context.queryUserId(), context.queryPassword())); SCMStringBuffer logicalName; result->getResultLogicalName(logicalName); Owned rs; if (logicalName.length()) { SCMStringBuffer cluster; //MORE is this wrong cluster? rs.setown(resultSetFactory->createNewFileResultSet(logicalName.str(), cw->getClusterName(cluster).str())); } else rs.setown(resultSetFactory->createNewResultSet(result, wuid)); appendResultSet(mb, rs, name, start, count, total, bin); } void openSaveFile(IEspContext &context, int opt, const char* filename, const char* origMimeType, MemoryBuffer& buf, IEspWULogFileResponse &resp) { if (opt < 1) { resp.setThefile(buf); resp.setThefile_mimetype(origMimeType); } else if (opt < 2) { StringBuffer headerStr("attachment;"); if (filename && *filename) headerStr.appendf("filename=%s", filename); MemoryBuffer buf0; unsigned i = 0; char* p = (char*) buf.toByteArray(); while (i < buf.length()) { if (p[0] != 10) buf0.append(p[0]); else buf0.append(0x0d); p++; i++; } resp.setThefile(buf); resp.setThefile_mimetype(origMimeType); context.addCustomerHeader("Content-disposition", headerStr.str()); } else { #ifndef _USE_ZLIB throw MakeStringException(ECLWATCH_CANNOT_COMPRESS_DATA,"The data cannot be compressed."); #else StringBuffer fileNameStr, headerStr("attachment;"); if (notEmpty(filename)) { fileNameStr.append(filename); headerStr.append("filename=").append(filename).append((opt>2) ? ".gz" : ".zip"); } else fileNameStr.append("file"); StringBuffer ifname; ifname.appendf("%s%sT%xAT%x", TEMPZIPDIR, PATHSEPSTR, (unsigned)(memsize_t)GetCurrentThreadId(), msTick()).append((opt>2)? "" : ".zip"); IZZIPor* Zipor = createZZIPor(); int ret = 0; if (opt > 2) ret = Zipor->gzipToFile(buf.length(), (void*)buf.toByteArray(), ifname.str()); else ret = Zipor->zipToFile(buf.length(), (void*)buf.toByteArray(), fileNameStr.str(), ifname.str()); releaseIZ(Zipor); if (ret < 0) { Owned rFile = createIFile(ifname.str()); if (rFile->exists()) rFile->remove(); throw MakeStringException(ECLWATCH_CANNOT_COMPRESS_DATA,"The data cannot be compressed."); } Owned rf = createIFile(ifname.str()); if (!rf->exists()) throw MakeStringException(ECLWATCH_CANNOT_COMPRESS_DATA,"The data cannot be compressed."); MemoryBuffer out; Owned fio = rf->open(IFOread); read(fio, 0, (size32_t) rf->size(), out); resp.setThefile(out); fio.clear(); rf->remove(); resp.setThefile_mimetype((opt > 2) ? "application/x-gzip" : "application/zip"); context.addCustomerHeader("Content-disposition", headerStr.str()); #endif } } bool CWsWorkunitsEx::onWUFile(IEspContext &context,IEspWULogFileRequest &req, IEspWULogFileResponse &resp) { try { DBGLOG("CWsWorkunitsEx::onWUFile WUID=%s",req.getWuid()); ensureWsWorkunitAccess(context, req.getWuid(), SecAccess_Read); StringAttr wuid(req.getWuid()); if (wuid.isEmpty() && notEmpty(req.getQuerySet()) && notEmpty(req.getQuery())) { Owned registry = getQueryRegistry(req.getQuerySet(), false); if (!registry) throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Queryset %s not found", req.getQuerySet()); Owned query = resolveQueryAlias(registry, req.getQuery()); if (!query) throw MakeStringException(ECLWATCH_QUERYID_NOT_FOUND, "Query %s not found", req.getQuery()); resp.setQuerySet(req.getQuerySet()); resp.setQueryName(query->queryProp("@name")); resp.setQueryId(query->queryProp("@id")); wuid.set(query->queryProp("@wuid")); } int opt = req.getOption(); if (!wuid.isEmpty()) { resp.setWuid(wuid.get()); MemoryBuffer mb; WsWuInfo winfo(context, wuid); if (strieq(File_ArchiveQuery, req.getType())) { winfo.getWorkunitArchiveQuery(mb); openSaveFile(context, opt, "ArchiveQuery.xml", HTTP_TYPE_TEXT_XML, mb, resp); } else if (strieq(File_Cpp,req.getType()) && notEmpty(req.getName())) { winfo.getWorkunitCpp(req.getName(), req.getDescription(), req.getIPAddress(),mb); openSaveFile(context, opt, req.getName(), HTTP_TYPE_TEXT_PLAIN, mb, resp); } else if (strieq(File_DLL,req.getType())) { StringBuffer name; winfo.getWorkunitDll(name, mb); resp.setFileName(name.str()); openSaveFile(context, opt, req.getName(), HTTP_TYPE_OCTET_STREAM, mb, resp); } else if (strieq(File_Res,req.getType())) { winfo.getWorkunitResTxt(mb); openSaveFile(context, opt, "res.txt", HTTP_TYPE_TEXT_PLAIN, mb, resp); } else if (strncmp(req.getType(), File_ThorLog, 7) == 0) { winfo.getWorkunitThorLog(mb); openSaveFile(context, opt, "thormaster.log", HTTP_TYPE_TEXT_PLAIN, mb, resp); } else if (strieq(File_ThorSlaveLog,req.getType())) { winfo.getWorkunitThorSlaveLog(req.getSlaveIP(), mb); openSaveFile(context, opt, "ThorSlave.log", HTTP_TYPE_TEXT_PLAIN, mb, resp); } else if (strieq(File_EclAgentLog,req.getType())) { winfo.getWorkunitEclAgentLog(mb); openSaveFile(context, opt, "eclagent.log", HTTP_TYPE_TEXT_PLAIN, mb, resp); } else if (strieq(File_XML,req.getType())) { winfo.getWorkunitXml(req.getPlainText(), mb); resp.setThefile(mb); const char* plainText = req.getPlainText(); if (plainText && (!stricmp(plainText, "yes"))) resp.setThefile_mimetype(HTTP_TYPE_TEXT_PLAIN); else resp.setThefile_mimetype(HTTP_TYPE_TEXT_XML); } } } catch(IException* e) { FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR); } return true; } bool CWsWorkunitsEx::onWUResultBin(IEspContext &context,IEspWUResultBinRequest &req, IEspWUResultBinResponse &resp) { try { ensureWsWorkunitAccess(context, req.getWuid(), SecAccess_Read); MemoryBuffer mb; __int64 total=0; __int64 start = req.getStart() > 0 ? req.getStart() : 0; unsigned count = req.getCount(), requested=count; SCMStringBuffer name; bool bin = (req.getFormat() && strieq(req.getFormat(),"raw")); if (notEmpty(req.getWuid()) && notEmpty(req.getResultName())) getWsWuResult(context, req.getWuid(), req.getResultName(), NULL, 0, start, count, total, name, bin, mb); else if (notEmpty(req.getWuid()) && (req.getSequence() >= 0)) getWsWuResult(context, req.getWuid(), NULL, NULL, req.getSequence(), start, count, total, name, bin,mb); else if (notEmpty(req.getLogicalName())) { const char* logicalName = req.getLogicalName(); StringBuffer wuid; getWuidFromLogicalFileName(context, logicalName, wuid); if (!wuid.length()) throw MakeStringException(ECLWATCH_CANNOT_GET_WORKUNIT,"Cannot find the workunit for file %s.",logicalName); getWsWuResult(context, wuid.str(), NULL, logicalName, 0, start, count, total, name, bin, mb); } else throw MakeStringException(ECLWATCH_CANNOT_GET_WU_RESULT,"Cannot open the workunit result."); if(stricmp(req.getFormat(),"xls")==0) { Owned params(createProperties()); params->setProp("showCount",0); StringBuffer xml; xml.append("").append(mb.length(), mb.toByteArray()).append(""); if (xml.length() > MAXXLSTRANSFER) throw MakeStringException(ECLWATCH_TOO_BIG_DATA_SET, "The data set is too big to be converted to an Excel file. Please use the gzip link to download a compressed XML data file."); StringBuffer xls; xsltTransform(xml.str(), StringBuffer(getCFD()).append("./smc_xslt/result.xslt").str(), params, xls); MemoryBuffer out; out.setBuffer(xls.length(), (void*)xls.str()); resp.setResult(out); resp.setResult_mimetype("application/vnd.ms-excel"); } #ifdef _USE_ZLIB else if(strieq(req.getFormat(),"zip") || strieq(req.getFormat(),"gzip")) { bool gzip = strieq(req.getFormat(),"gzip"); StringBuffer xml(""); xml.append("").append(mb.length(),mb.toByteArray()).append(""); VStringBuffer ifname("%s%sT%xAT%x%s", TEMPZIPDIR, PATHSEPSTR, (unsigned)(memsize_t)GetCurrentThreadId(), msTick(), gzip ? "" : ".zip"); IZZIPor* Zipor = createZZIPor(); int ret = 0; if (gzip) ret = Zipor->gzipToFile(xml.length(), (void*)xml.str(), ifname.str()); else ret = Zipor->zipToFile(xml.length(), (void*)xml.str(), "WUResult.xml", ifname.str()); releaseIZ(Zipor); if (ret < 0) { Owned rFile = createIFile(ifname.str()); if (rFile->exists()) rFile->remove(); throw MakeStringException(ECLWATCH_CANNOT_COMPRESS_DATA, "The data cannot be compressed."); } MemoryBuffer out; Owned rf = createIFile(ifname.str()); if (rf->exists()) { Owned fio = rf->open(IFOread); read(fio, 0, (size32_t) rf->size(), out); resp.setResult(out); } if (gzip) { resp.setResult_mimetype("application/x-gzip"); context.addCustomerHeader("Content-disposition", "attachment;filename=WUResult.xml.gz"); } else { resp.setResult_mimetype("application/zip"); context.addCustomerHeader("Content-disposition", "attachment;filename=WUResult.xml.zip"); } Owned rFile = createIFile(ifname.str()); if (rFile->exists()) rFile->remove(); } #endif else { resp.setResult(mb); } resp.setName(name.str()); resp.setWuid(req.getWuid()); resp.setSequence(req.getSequence()); resp.setStart(start); if (requested > total) requested = (unsigned)total; resp.setRequested(requested); resp.setCount(count); resp.setTotal(total); resp.setFormat(req.getFormat()); } catch(IException* e) { FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR); } return true; } bool CWsWorkunitsEx::onWUResultSummary(IEspContext &context, IEspWUResultSummaryRequest &req, IEspWUResultSummaryResponse &resp) { try { Owned factory = getWorkUnitFactory(context.querySecManager(), context.queryUser()); Owned cw = factory->openWorkUnit(req.getWuid(), false); ensureWsWorkunitAccess(context, *cw, SecAccess_Read); resp.setWuid(req.getWuid()); resp.setSequence(req.getSequence()); IArrayOf results; Owned r = cw->getResultBySequence(req.getSequence()); if (r) { WsWuInfo winfo(context, cw); winfo.getResult(*r, results, 0); resp.setFormat(r->getResultFormat()); resp.setResult(results.item(0)); } } catch(IException* e) { FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR); } return true; } void getFileResults(IEspContext &context, const char* logicalName, const char* cluster,__int64 start, unsigned& count,__int64& total,IStringVal& resname,bool bin, MemoryBuffer& buf) { Owned resultSetFactory; if (context.querySecManager()) resultSetFactory.setown(getSecResultSetFactory(*context.querySecManager(), *context.queryUser())); else resultSetFactory.setown(getResultSetFactory(context.queryUserId(), context.queryPassword())); Owned result(resultSetFactory->createNewFileResultSet(logicalName, cluster)); appendResultSet(buf, result, resname.str(), start, count, total, bin); } void getWorkunitCluster(IEspContext &context, const char* wuid, SCMStringBuffer& cluster, bool checkArchiveWUs) { if (isEmpty(wuid)) return; Owned factory = getWorkUnitFactory(context.querySecManager(), context.queryUser()); Owned cw = factory->openWorkUnit(wuid, false); if (cw) cw->getClusterName(cluster); else if (checkArchiveWUs) { Owned wuProps;// = getArchivedWorkUnitProperties(wuid); if (wuProps) cluster.set(wuProps->queryProp("@clusterName")); } } bool CWsWorkunitsEx::onWUResult(IEspContext &context, IEspWUResultRequest &req, IEspWUResultResponse &resp) { try { ensureWsWorkunitAccess(context, req.getWuid(), SecAccess_Read); MemoryBuffer mb; SCMStringBuffer name; __int64 total=0; __int64 start = req.getStart() > 0 ? req.getStart() : 0; unsigned count=req.getCount() ? req.getCount() : 100, requested=count; unsigned seq = req.getSequence(); VStringBuffer filter("start=%"I64F"d;count=%d", start, count); addToQueryString(filter, "clusterName", req.getCluster(), ';'); addToQueryString(filter, "logicalName", req.getLogicalName(), ';'); addToQueryString(filter, "wuid", req.getWuid(), ';'); addToQueryString(filter, "resultName", req.getResultName(), ';'); filter.appendf(";seq=%d;", seq); const char* wuid = req.getWuid(); const char* logicalName = req.getLogicalName(); const char* clusterName = req.getCluster(); const char* resultName = req.getResultName(); Owned data = dataCache->lookup(context, filter, awusCacheMinutes); if (data) { mb.append(data->m_data.c_str()); name.set(data->m_name.c_str()); logicalName = data->m_logicalName.c_str(); wuid = data->m_wuid.c_str(); resultName = data->m_resultName.c_str(); seq = data->m_seq; start = data->m_start; count = data->m_rowcount; requested = (unsigned)data->m_requested; total = data->m_total; if (notEmpty(logicalName)) resp.setLogicalName(logicalName); else { if (notEmpty(wuid)) resp.setWuid(wuid); resp.setSequence(seq); } } else { if(logicalName && *logicalName) { StringBuffer lwuid; getWuidFromLogicalFileName(context, logicalName, lwuid); SCMStringBuffer cluster; if (lwuid.length()) getWorkunitCluster(context, lwuid.str(), cluster, true); if (cluster.length()) { getFileResults(context, logicalName, cluster.str(), start, count, total, name, false, mb); resp.setLogicalName(logicalName); } else if (notEmpty(clusterName)) { getFileResults(context, logicalName, clusterName, start, count, total, name, false, mb); resp.setLogicalName(logicalName); } else throw MakeStringException(ECLWATCH_INVALID_INPUT,"Need valid target cluster to browse file %s.",logicalName); } else if (notEmpty(wuid) && notEmpty(resultName)) { name.set(resultName); getWsWuResult(context, wuid, resultName, NULL, 0, start, count, total, name, false, mb); resp.setWuid(wuid); resp.setSequence(seq); } else { getWsWuResult(context, wuid, NULL, NULL, seq, start, count, total, name, false, mb); resp.setWuid(wuid); resp.setSequence(seq); } mb.append(0); if (requested > total) requested = (unsigned)total; dataCache->add(filter, mb.toByteArray(), name.str(), logicalName, wuid, resultName, seq, start, count, requested, total); } resp.setName(name.str()); resp.setStart(start); if (clusterName && *clusterName) resp.setCluster(clusterName); resp.setRequested(requested); resp.setCount(count); resp.setTotal(total); resp.setResult(mb.toByteArray()); context.queryXslParameters()->setProp("escapeResults","1"); } catch(IException* e) { FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR); } return true; } void getScheduledWUs(IEspContext &context, const char *serverName, const char *eventName, IArrayOf & results) { if (notEmpty(serverName)) { Owned factory = getWorkUnitFactory(context.querySecManager(), context.queryUser()); Owned reader = getScheduleReader(serverName, eventName); Owned it(reader->getIterator()); while(it->isValidEventName()) { StringBuffer ieventName; it->getEventName(ieventName); while(it->isValidEventText()) { StringBuffer ieventText; it->getEventText(ieventText); while(it->isValidWuid()) { StringBuffer wuid; it->getWuid(wuid); if (wuid.length()) { Owned scheduledWU = createScheduledWU(""); scheduledWU->setWuid(wuid.str()); scheduledWU->setCluster(serverName); if (ieventName.length()) scheduledWU->setEventName(ieventName.str()); if (ieventText.str()) scheduledWU->setEventText(ieventText.str()); try { SCMStringBuffer s; Owned cw = factory->openWorkUnit(wuid.str(), false); if (cw) scheduledWU->setJobName(cw->getJobName(s).str()); } catch (IException *e) { e->Release(); } results.append(*scheduledWU.getLink()); } it->nextWuid(); } it->nextEventText(); } it->nextEventName(); } } return; } bool CWsWorkunitsEx::onWUShowScheduled(IEspContext &context, IEspWUShowScheduledRequest & req, IEspWUShowScheduledResponse & resp) { try { DBGLOG("WUShowScheduled"); const char *clusterName = req.getCluster(); const char *eventName = req.getEventName(); IArrayOf results; if(notEmpty(req.getPushEventName())) resp.setPushEventName(req.getPushEventName()); if(notEmpty(req.getPushEventText())) resp.setPushEventText(req.getPushEventText()); Owned factory = getEnvironmentFactory(); Owned environment = factory->openEnvironmentByFile(); Owned root = &environment->getPTree(); if (!root) throw MakeStringException(ECLWATCH_CANNOT_GET_ENV_INFO, "Failed to get environment information."); unsigned i = 0; Owned ic = root->getElements("Software/Topology/Cluster"); IArrayOf servers; ForEach(*ic) { IPropertyTree &cluster = ic->query(); const char *iclusterName = cluster.queryProp("@name"); if (isEmpty(iclusterName)) continue; if(isEmpty(clusterName)) getScheduledWUs(context, iclusterName, eventName, results); else if (strieq(clusterName, iclusterName)) { getScheduledWUs(context, clusterName, eventName, results); resp.setClusterSelected(i+1); } Owned server = createServerInfo(""); server->setName(iclusterName); servers.append(*server.getLink()); i++; } if (servers.length()) resp.setClusters(servers); if (results.length()) resp.setWorkunits(results); bool first=false; StringBuffer Query("PageFrom=Scheduler"); appendUrlParameter(Query, "EventName", eventName, first); appendUrlParameter(Query, "ECluster", clusterName, first); resp.setQuery(Query.str()); } catch(IException* e) { FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR); } return true; } bool CWsWorkunitsEx::onWUExport(IEspContext &context, IEspWUExportRequest &req, IEspWUExportResponse &resp) { try { Owned factory = getWorkUnitFactory(context.querySecManager(), context.queryUser()); WsWuSearch ws(context, req.getOwner(), req.getState(), req.getCluster(), req.getStartDate(), req.getEndDate(), req.getECL(), req.getJobname()); StringBuffer xml(""); for(WsWuSearch::iterator it=ws.begin(); it!=ws.end(); it++) { Owned cw = factory->openWorkUnit(it->c_str(), false); if (cw) exportWorkUnitToXML(cw, xml); } xml.append(""); MemoryBuffer mb; mb.setBuffer(xml.length(),(void*)xml.str()); resp.setExportData(mb); resp.setExportData_mimetype(HTTP_TYPE_TEXT_XML); } catch(IException* e) { FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR); } return true; } bool CWsWorkunitsEx::onWUListLocalFileRequired(IEspContext& context, IEspWUListLocalFileRequiredRequest& req, IEspWUListLocalFileRequiredResponse& resp) { try { const char* wuid = req.getWuid(); if (isEmpty(wuid)) throw MakeStringException(ECLWATCH_INVALID_INPUT, "Workunit ID not defined."); ensureWsWorkunitAccess(context, wuid, SecAccess_Read); Owned factory = getWorkUnitFactory(context.querySecManager(), context.queryUser()); Owned cw = factory->openWorkUnit(wuid, false); if (!cw) throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT, "Workunit %s not found.", wuid); IArrayOf localFiles; Owned it = cw->getLocalFileUploads(); ForEach(*it) { Owned file = it->get(); if(!file) continue; Owned up = createLogicalFileUpload(); SCMStringBuffer s; up->setType(file->queryType()); up->setSource(file->getSource(s).str()); up->setDestination(file->getDestination(s).str()); up->setEventTag(file->getEventTag(s).str()); localFiles.append(*up.getLink()); } resp.setLocalFileUploads(localFiles); } catch(IException* e) { FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR); } return true; } typedef enum wsEclTypes_ { wsEclTypeUnknown, xsdString, xsdBoolean, xsdDecimal, xsdFloat, xsdDouble, xsdDuration, xsdDateTime, xsdTime, xsdDate, xsdYearMonth, xsdYear, xsdMonthDay, xsdDay, xsdMonth, xsdHexBinary, xsdBase64Binary, xsdAnyURI, xsdQName, xsdNOTATION, xsdNormalizedString, xsdToken, xsdLanguage, xsdNMTOKEN, xsdNMTOKENS, xsdName, xsdNCName, xsdID, xsdIDREF, xsdIDREFS, xsdENTITY, xsdENTITIES, xsdInteger, xsdNonPositiveInteger, xsdNegativeInteger, xsdLong, xsdInt, xsdShort, xsdByte, xsdNonNegativeInteger, xsdUnsignedLong, xsdUnsignedInt, xsdUnsignedShort, xsdUnsignedByte, xsdPositiveInteger, tnsRawDataFile, tnsCsvDataFile, tnsEspStringArray, tnsEspIntArray, tnsXmlDataSet, maxWsEclType } wsEclType; bool CWsWorkunitsEx::onWUAddLocalFileToWorkunit(IEspContext& context, IEspWUAddLocalFileToWorkunitRequest& req, IEspWUAddLocalFileToWorkunitResponse& resp) { try { const char* wuid = req.getWuid(); if (isEmpty(wuid)) { resp.setResult("WUID is not defined!"); return true; } ensureWsWorkunitAccess(context, wuid, SecAccess_Write); resp.setWuid(wuid); const char* varname = req.getName(); if (isEmpty(varname)) { resp.setResult("Name is not defined!"); return true; } resp.setName(varname); wsEclType type = (wsEclType) req.getType(); const char *val = req.getVal(); unsigned len = req.getLength(); Owned factory = getWorkUnitFactory(context.querySecManager(), context.queryUser()); WorkunitUpdate wu(factory->updateWorkUnit(wuid)); if (!wu) { resp.setResult("Workunit not found!"); return true; } Owned wuRslt = wu->updateResultByName(varname); if (isEmpty(val)) val=req.getDefVal(); if (notEmpty(val)) { switch (type) { case xsdBoolean: wuRslt->setResultBool((strieq(val, "1") || strieq(val, "true") || strieq(val, "on"))); wuRslt->setResultStatus(ResultStatusSupplied); break; case xsdDecimal: case xsdFloat: case xsdDouble: wuRslt->setResultReal(atof(val)); wuRslt->setResultStatus(ResultStatusSupplied); break; case xsdInteger: case xsdNonPositiveInteger: case xsdNegativeInteger: case xsdLong: case xsdInt: case xsdShort: case xsdByte: case xsdNonNegativeInteger: case xsdUnsignedLong: case xsdUnsignedInt: case xsdUnsignedShort: case xsdUnsignedByte: case xsdPositiveInteger: wuRslt->setResultInt(_atoi64(val)); wuRslt->setResultStatus(ResultStatusSupplied); break; case tnsEspIntArray: case tnsEspStringArray: wuRslt->setResultRaw(len, val, ResultFormatXmlSet); break; case tnsRawDataFile: wuRslt->setResultRaw(len, val, ResultFormatRaw); break; case tnsXmlDataSet: wuRslt->setResultRaw(len, val, ResultFormatXml); break; case tnsCsvDataFile: case xsdBase64Binary: //tbd case xsdHexBinary: break; default: wuRslt->setResultString(val, len); wuRslt->setResultStatus(ResultStatusSupplied); break; } } resp.setResult("Result has been set as required!"); } catch(IException* e) { FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR); } return true; } void getClusterConfig(char const * clusterType, char const * clusterName, char const * processName, StringBuffer& netAddress) { Owned factory = getEnvironmentFactory(); Owned environment = factory->openEnvironmentByFile(); Owned pRoot = &environment->getPTree(); VStringBuffer xpath("Software/%s[@name='%s']", clusterType, clusterName); IPropertyTree* pCluster = pRoot->queryPropTree(xpath.str()); if (!pCluster) throw MakeStringException(ECLWATCH_CLUSTER_NOT_IN_ENV_INFO, "'%s %s' is not defined.", clusterType, clusterName); const char* port = pCluster->queryProp(xpath.set(processName).append("@port").str()); const char* computer = pCluster->queryProp(xpath.set(processName).append("@computer").str()); if (isEmpty(computer)) throw MakeStringException(ECLWATCH_INVALID_CLUSTER_INFO, "'%s %s: %s' is not defined.", clusterType, clusterName, processName); Owned pMachine = environment->getMachine(computer); if (pMachine) { StringBufferAdaptor s(netAddress); pMachine->getNetAddress(s); #ifdef MACHINE_IP if (streq(netAddress.str(), ".")) netAddress = MACHINE_IP; #endif netAddress.append(':').append(port); } return; } bool CWsWorkunitsEx::onWUProcessGraph(IEspContext &context,IEspWUProcessGraphRequest &req, IEspWUProcessGraphResponse &resp) { try { Owned factory = getWorkUnitFactory(context.querySecManager(), context.queryUser()); Owned cw = factory->openWorkUnit(req.getWuid(), false); ensureWsWorkunitAccess(context, *cw, SecAccess_Read); Owned graph = cw->getGraph(req.getName()); Owned xgmml = graph->getXGMMLTree(true); // merge in graph progress information StringBuffer xml; resp.setTheGraph(toXML(xgmml.get(), xml).str()); } catch(IException* e) { FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR); } return true; } bool isRunning(IConstWorkUnit &cw) { // MORE - move into workunit interface switch (cw.getState()) { case WUStateFailed: case WUStateAborted: case WUStateCompleted: return false; default: return true; } } bool CWsWorkunitsEx::onWUGetGraph(IEspContext& context, IEspWUGetGraphRequest& req, IEspWUGetGraphResponse& resp) { try { Owned factory = getWorkUnitFactory(context.querySecManager(), context.queryUser()); Owned cw = factory->openWorkUnit(req.getWuid(), false); ensureWsWorkunitAccess(context, *cw, SecAccess_Read); WUGraphIDType id; SCMStringBuffer runningGraph; bool running= (isRunning(*cw) && cw->getRunningGraph(runningGraph,id)); IArrayOf graphs; Owned it = &cw->getGraphs(GraphTypeAny); ForEach(*it) { IConstWUGraph &graph = it->query(); if(!graph.isValid()) continue; SCMStringBuffer name, label, type; graph.getName(name); graph.getLabel(label); graph.getTypeName(type); if(isEmpty(req.getGraphName()) || strieq(name.str(), req.getGraphName())) { Owned g = createECLGraphEx("",""); g->setName(name.str()); g->setLabel(label.str()); g->setType(type.str()); if(running && streq(name.str(), runningGraph.str())) { g->setRunning(true); g->setRunningId(id); } Owned xgmml = graph.getXGMMLTree(true); // New functionality, if a subgraph id is specified and we only want to load the xgmml for that subgraph // then we need to conditionally pull a propertytree from the xgmml graph one and use that for the xgmml. StringBuffer xml; if (notEmpty(req.getSubGraphId())) { VStringBuffer xpath("//node[@id='%s']", req.getSubGraphId()); toXML(xgmml->queryPropTree(xpath.str()), xml); } else toXML(xgmml, xml); g->setGraph(xml.str()); if (context.getClientVersion() > 1.20) { Owned progress = cw->getGraphProgress(name.str()); if (progress) { WUGraphState graphstate= progress->queryGraphState(); if (graphstate == WUGraphComplete) g->setComplete(true); else if (graphstate == WUGraphFailed) g->setFailed(true); } } graphs.append(*g.getClear()); } } resp.setGraphs(graphs); } catch(IException* e) { FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR); } return true; } bool CWsWorkunitsEx::onGVCAjaxGraph(IEspContext &context, IEspGVCAjaxGraphRequest &req, IEspGVCAjaxGraphResponse &resp) { try { resp.setName(req.getName()); resp.setGraphName(req.getGraphName()); resp.setGraphType("eclwatch"); double version = context.getClientVersion(); if (version > 1.19) resp.setSubGraphId(req.getSubGraphId()); if (version > 1.20) resp.setSubGraphOnly(req.getSubGraphOnly()); } catch(IException* e) { FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR); } return true; } bool CWsWorkunitsEx::onWUGraphInfo(IEspContext &context,IEspWUGraphInfoRequest &req, IEspWUGraphInfoResponse &resp) { try { Owned factory = getWorkUnitFactory(context.querySecManager(), context.queryUser()); Owned cw = factory->openWorkUnit(req.getWuid(), false); if(!cw) throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT,"Cannot open workunit %s.",req.getWuid()); ensureWsWorkunitAccess(context, *cw, SecAccess_Write); resp.setWuid(req.getWuid()); resp.setName(req.getName()); resp.setRunning(isRunning(*cw)); if (notEmpty(req.getGID())) resp.setGID(req.getGID()); if(!req.getBatchWU_isNull()) resp.setBatchWU(req.getBatchWU()); } catch(IException* e) { FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR); } return true; } bool CWsWorkunitsEx::onWUGVCGraphInfo(IEspContext &context,IEspWUGVCGraphInfoRequest &req, IEspWUGVCGraphInfoResponse &resp) { try { Owned factory = getWorkUnitFactory(context.querySecManager(), context.queryUser()); Owned cw = factory->openWorkUnit(req.getWuid(), false); if(!cw) throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT,"Cannot open workunit %s.",req.getWuid()); ensureWsWorkunitAccess(context, *cw, SecAccess_Read); resp.setWuid(req.getWuid()); resp.setName(req.getName()); resp.setRunning(isRunning(*cw)); if (notEmpty(req.getGID())) resp.setGID(req.getGID()); if(!req.getBatchWU_isNull()) resp.setBatchWU(req.getBatchWU()); StringBuffer xml(""); xml.appendf("", req.getName()); if (context.getClientVersion() > 1.17) { xml.append(""); xml.append(req.getSubgraphId_isNull() ? 0 : req.getSubgraphId()); xml.append(""); } xml.append(""); resp.setTheGraph(xml.str()); } catch(IException* e) { FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR); } return true; } bool CWsWorkunitsEx::onWUGraphTiming(IEspContext &context, IEspWUGraphTimingRequest &req, IEspWUGraphTimingResponse &resp) { try { DBGLOG("WUGraphTiming WUID=%s", req.getWuid()); Owned factory = getWorkUnitFactory(context.querySecManager(), context.queryUser()); Owned cw = factory->openWorkUnit(req.getWuid(), false); if(!cw) throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT,"Cannot open workunit %s.",req.getWuid()); ensureWsWorkunitAccess(context, *cw, SecAccess_Read); resp.updateWorkunit().setWuid(req.getWuid()); WsWuInfo winfo(context, cw); IArrayOf timingData; winfo.getGraphTimingData(timingData, 0); resp.updateWorkunit().setTimingData(timingData); } catch(IException* e) { FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR); } return true; } int CWsWorkunitsSoapBindingEx::onGetForm(IEspContext &context, CHttpRequest* request, CHttpResponse* response, const char *service, const char *method) { try { StringBuffer xml; StringBuffer xslt; if(strieq(method,"WUQuery") || strieq(method,"WUJobList")) { Owned factory = getEnvironmentFactory(); Owned environment = factory->openEnvironmentByFile(); Owned root = &environment->getPTree(); if (!root) throw MakeStringException(ECLWATCH_CANNOT_GET_ENV_INFO, "Failed to get environment information."); if(strieq(method,"WUQuery")) { SecAccessFlags accessOwn; SecAccessFlags accessOthers; getUserWuAccessFlags(context, accessOwn, accessOthers, false); xml.append(""); if ((accessOwn == SecAccess_None) && (accessOthers == SecAccess_None)) xml.appendf("Access to workunit is denied."); else { MapStringTo added; Owned it = root->getElements("Software/Topology/Cluster"); ForEach(*it) { const char *name = it->query().queryProp("@name"); if (notEmpty(name) && !added.getValue(name)) { added.setValue(name, true); appendXMLTag(xml, "Cluster", name); } } } xml.append(""); xslt.append(getCFD()).append("./smc_xslt/wuid_search.xslt"); } else if (strieq(method,"WUJobList")) { StringBuffer cluster; request->getParameter("Cluster", cluster); StringBuffer range; request->getParameter("Range",range); Owned clusterInfo = getTargetClusterInfo(cluster); xml.append(""); if (range.length()) appendXMLTag(xml, "Range", range.str()); if (clusterInfo) { const StringArray &thorInstances = clusterInfo->getThorProcesses(); ForEachItemIn(i, thorInstances) { xml.append("').append(thorInstances.item(i)).append(""); } } xml.append("").append(cluster).append(""); xml.append(""); xslt.append(getCFD()).append("./smc_xslt/jobs_search.xslt"); response->addHeader("Expires", "0"); } } if (xslt.length() && xml.length()) { StringBuffer html; xsltTransform(xml.str(), xslt.str(), NULL, html); response->setContent(html.str()); response->setContentType(HTTP_TYPE_TEXT_HTML_UTF8); response->send(); return 0; } } catch(IException* e) { FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR); } return onGetNotFound(context, request, response, service); } void deployEclOrArchive(IEspContext &context, IEspWUDeployWorkunitRequest & req, IEspWUDeployWorkunitResponse & resp) { NewWsWorkunit wu(context); SCMStringBuffer wuid; wu->getWuid(wuid); wu->setAction(WUActionCompile); StringBuffer name(req.getName()); if (!name.trim().length() && notEmpty(req.getFileName())) splitFilename(req.getFileName(), NULL, NULL, &name, NULL); if (name.length()) wu->setJobName(name.str()); if (req.getObject().length()) { StringBuffer text(req.getObject().length(), req.getObject().toByteArray()); wu.setQueryText(text.str()); } if (!req.getResultLimit_isNull()) wu->setResultLimit(req.getResultLimit()); wu->commit(); wu.clear(); submitWsWorkunit(context, wuid.str(), req.getCluster(), NULL, 0, true, false); waitForWorkUnitToCompile(wuid.str(), req.getWait()); WsWuInfo winfo(context, wuid.str()); winfo.getCommon(resp.updateWorkunit(), WUINFO_All); winfo.getExceptions(resp.updateWorkunit(), WUINFO_All); name.clear(); if (notEmpty(resp.updateWorkunit().getJobname())) origValueChanged(req.getName(), resp.updateWorkunit().getJobname(), name, false); if (name.length()) //non generated user specified name, so override #Workunit('name') { WorkunitUpdate wx(&winfo.cw->lock()); wx->setJobName(name.str()); resp.updateWorkunit().setJobname(name.str()); } AuditSystemAccess(context.queryUserId(), true, "Updated %s", wuid.str()); } StringBuffer &sharedObjectFileName(StringBuffer &filename, const char *name, const char *ext, unsigned copy) { filename.append((name && *name) ? name : "workunit"); if (copy) filename.append('-').append(copy); if (notEmpty(ext)) filename.append(ext); return filename; } inline StringBuffer &buildFullDllPath(StringBuffer &dllpath, StringBuffer &dllname, const char *dir, const char *name, const char *ext, unsigned copy) { return addPathSepChar(dllpath.set(dir)).append(sharedObjectFileName(dllname, name, ext, copy)); } void writeSharedObject(const char *srcpath, const MemoryBuffer &obj, const char *dir, StringBuffer &dllpath, StringBuffer &dllname) { StringBuffer name, ext; if (srcpath && *srcpath) splitFilename(srcpath, NULL, NULL, &name, &ext); unsigned copy=0; buildFullDllPath(dllpath.clear(), dllname.clear(), dir, name.str(), ext.str(), copy); while (checkFileExists(dllpath.str())) buildFullDllPath(dllpath.clear(), dllname.clear(), dir, name.str(), ext.str(), ++copy); DBGLOG("Writing workunit dll: %s", dllpath.str()); Owned f = createIFile(dllpath.str()); Owned io = f->open(IFOcreate); io->write(0, obj.length(), obj.toByteArray()); } void deploySharedObject(IEspContext &context, StringBuffer &wuid, const char *filename, const char *cluster, const char *name, const MemoryBuffer &obj, const char *dir, const char *xml) { StringBuffer dllpath, dllname; StringBuffer srcname(filename); if (!srcname.length()) srcname.append(name).append(SharedObjectExtension); writeSharedObject(srcname.str(), obj, dir, dllpath, dllname); NewWsWorkunit wu(context); StringBufferAdaptor isvWuid(wuid); wu->getWuid(isvWuid); wu->setClusterName(cluster); wu->commit(); StringBuffer dllXML; if (getWorkunitXMLFromFile(dllpath.str(), dllXML)) { Owned embeddedWU = createLocalWorkUnit(); embeddedWU->loadXML(dllXML.str()); queryExtendedWU(wu)->copyWorkUnit(embeddedWU); } wu.associateDll(dllpath.str(), dllname.str()); if (name && *name) wu->setJobName(name); //clean slate, copy only select items from processed workunit xml if (xml && *xml) { Owned srcxml = createPTreeFromXMLString(xml); if (srcxml->hasProp("@jobName")) wu->setJobName(srcxml->queryProp("@jobName")); if (srcxml->hasProp("@token")) wu->setSecurityToken(srcxml->queryProp("@token")); if (srcxml->hasProp("Query/Text")) wu.setQueryText(srcxml->queryProp("Query/Text")); } wu->setState(WUStateCompiled); wu->commit(); wu.clear(); AuditSystemAccess(context.queryUserId(), true, "Updated %s", wuid.str()); } void deploySharedObject(IEspContext &context, IEspWUDeployWorkunitRequest & req, IEspWUDeployWorkunitResponse & resp, const char *dir, const char *xml=NULL) { if (isEmpty(req.getFileName())) throw MakeStringException(ECLWATCH_INVALID_INPUT, "File name required when deploying a shared object."); if (isEmpty(req.getCluster())) throw MakeStringException(ECLWATCH_INVALID_INPUT, "Cluster name required when deploying a shared object."); StringBuffer wuid; deploySharedObject(context, wuid, req.getFileName(), req.getCluster(), req.getName(), req.getObject(), dir, xml); WsWuInfo winfo(context, wuid.str()); winfo.getCommon(resp.updateWorkunit(), WUINFO_All); AuditSystemAccess(context.queryUserId(), true, "Updated %s", wuid.str()); } bool CWsWorkunitsEx::onWUDeployWorkunit(IEspContext &context, IEspWUDeployWorkunitRequest & req, IEspWUDeployWorkunitResponse & resp) { const char *type = req.getObjType(); try { if (!context.validateFeatureAccess(OWN_WU_ACCESS, SecAccess_Write, false)) throw MakeStringException(ECLWATCH_ECL_WU_ACCESS_DENIED, "Failed to create workunit. Permission denied."); if (strieq(type, "archive")|| strieq(type, "ecl_text")) deployEclOrArchive(context, req, resp); else if (strieq(type, "shared_object")) deploySharedObject(context, req, resp, queryDirectory.str()); else throw MakeStringException(ECLWATCH_INVALID_INPUT, "WUDeployWorkunit '%s' unkown object type.", type); } catch(IException* e) { FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR); } return true; }