/*############################################################################## Copyright (C) 2011 HPCC Systems. All rights reserved. This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with this program. If not, see . ############################################################################## */ #include "jlib.hpp" #include "workunit.hpp" #include "jprop.hpp" #include "jmisc.hpp" #include "jexcept.hpp" #include "jiter.ipp" #include "jptree.hpp" #include "jtime.ipp" #include "jencrypt.hpp" #include "eclrtl.hpp" #include "deftype.hpp" #include #include "mpbase.hpp" #include "daclient.hpp" #include "dadfs.hpp" #include "dafdesc.hpp" #include "dasds.hpp" #include "danqs.hpp" #include "dautils.hpp" #include "dllserver.hpp" #include "thorhelper.hpp" #include "workflow.hpp" #include "nbcd.hpp" #include "seclib.hpp" #include "wuerror.hpp" #include "wujobq.hpp" #define GLOBAL_WORKUNIT "global" #define SDS_LOCK_TIMEOUT (5*60*1000) // 5mins, 30s a bit short static int workUnitTraceLevel = 1; static StringBuffer &getXPath(StringBuffer &wuRoot, const char *wuid) { // MORE - can fold in the date return wuRoot.append("/WorkUnits/").append(wuid); } //To be called by eclserver, but esp etc. won't know, so we need to store it. static StringBuffer & appendLibrarySuffix(StringBuffer & suffix) { #ifdef _WIN32 suffix.append("W"); #else suffix.append("L"); #endif #ifdef __64BIT__ suffix.append("64"); #else suffix.append("32"); #endif return suffix; } typedef MapStringTo UniqueScopes; static void wuAccessError(const char *username, const char *action, const char *wuscope, const char *wuid, bool excpt, bool log) { StringBuffer err; err.append("Workunit Access Denied - action: ").append(action).append(" user:").append(username ? username : ""); if (wuid) err.append(" workunit:").append(wuid); if (wuscope) err.append(" scope:").append(wuscope); //MORE - we would need more information passed in from outside if we want to make the audit message format the same as from higher level ESP calls SYSLOG(AUDIT_TYPE_ACCESS_FAILURE, err.str()); if (log) LOG(MCuserError, "%s", err.str()); if (excpt) throw MakeStringException(WUERR_AccessError, "%s", err.str()); } static bool checkWuScopeSecAccess(const char *wuscope, ISecManager &secmgr, ISecUser *secuser, int required, const char *action, bool excpt, bool log) { bool ret=(!secuser) ? true : (secmgr.authorizeEx(RT_WORKUNIT_SCOPE, *secuser, wuscope)>=required); if (!ret && (log || excpt)) wuAccessError(secuser ? secuser->getName() : NULL, action, wuscope, NULL, excpt, log); return ret; } static bool checkWuScopeListSecAccess(const char *wuscope, ISecResourceList *scopes, int required, const char *action, bool excpt, bool log) { if (!scopes) return true; bool ret=true; if (wuscope) { Owned res=scopes->getResource(wuscope); if (!res || res->getAccessFlags()count(); seq++) { ISecResource *res=scopes->queryResource(seq); if (res && res->getAccessFlags()=required); if (!ret && (log || excpt)) { SCMStringBuffer wuid; wuAccessError(secuser ? secuser->getName() : NULL, action, wuscope.str(), cw.getWuid(wuid).str(), excpt, log); } return ret; } static bool checkWuSecAccess(const char *wuid, ISecManager &secmgr, ISecUser *secuser, int required, const char *action, bool excpt, bool log) { StringBuffer wuRoot; Owned conn = querySDS().connect(getXPath(wuRoot, wuid).str(), myProcessSession(), 0, SDS_LOCK_TIMEOUT); if (conn) { Owned ptree=conn->getRoot(); return checkWuScopeSecAccess(ptree->queryProp("@scope"), secmgr, secuser, required, action, excpt, log); } return false; } #define PROGRESS_FORMAT_V 2 class CConstGraphProgress : public CInterface, implements IConstWUGraphProgress { class CGraphProgress : public CInterface, implements IWUGraphProgress { CConstGraphProgress &parent; public: IMPLEMENT_IINTERFACE; CGraphProgress(CConstGraphProgress &_parent) : parent(_parent) { parent.lockWrite(); } ~CGraphProgress() { parent.unlock(); } virtual IPropertyTree * queryProgressTree() { return parent.queryProgressTree(); } virtual WUGraphState queryGraphState() { return parent.queryGraphState(); } virtual WUGraphState queryNodeState(WUGraphIDType nodeId) { return parent.queryNodeState(nodeId); } virtual IWUGraphProgress * update() { return parent.update(); } virtual unsigned queryFormatVersion() { return parent.queryFormatVersion(); } virtual IPropertyTree & updateEdge(WUGraphIDType nodeId, const char *edgeId) { return parent.updateEdge(nodeId, edgeId); } virtual IPropertyTree & updateNode(WUGraphIDType nodeId, WUNodeIDType id) { return parent.updateNode(nodeId, id); } virtual void setGraphState(WUGraphState state) { parent.setGraphState(state); } virtual void setNodeState(WUGraphIDType nodeId, WUGraphState state) { parent.setNodeState(nodeId, state); } }; IPropertyTree &updateElement(WUGraphIDType nodeId, const char *elemName, const char *id) { IPropertyTree *elem = NULL; if (!connectedWrite) lockWrite(); StringBuffer path; path.append("node[@id=\"").append(nodeId).append("\"]"); IPropertyTree *node = progress->queryPropTree(path.str()); if (!node) { node = progress->addPropTree("node", createPTree()); node->setPropInt("@id", (int)nodeId); elem = node->addPropTree(elemName, createPTree()); elem->setProp("@id", id); } else { path.clear().append(elemName).append("[@id=\"").append(id).append("\"]"); elem = node->queryPropTree(path.str()); if (!elem) { elem = node->addPropTree(elemName, createPTree()); elem->setProp("@id", id); } } return *elem; } public: IMPLEMENT_IINTERFACE; static void deleteWuidProgress(const char *wuid) { StringBuffer path("/GraphProgress/"); path.append(wuid); Owned conn = querySDS().connect(path.str(), myProcessSession(), RTM_LOCK_WRITE, SDS_LOCK_TIMEOUT); if (conn) conn->close(true); } CConstGraphProgress(const char *_wuid, const char *_graphName) : wuid(_wuid), graphName(_graphName) { rootPath.append("/GraphProgress/").append(wuid).append('/').append(graphName).append('/'); connected = connectedWrite = false; formatVersion = 0; } void connect() { conn.clear(); packProgress(wuid,false); conn.setown(querySDS().connect(rootPath.str(), myProcessSession(), RTM_LOCK_READ|RTM_CREATE_QUERY, SDS_LOCK_TIMEOUT)); progress = conn->queryRoot(); formatVersion = progress->getPropInt("@format"); connected = true; } void lockWrite() { if (connectedWrite) return; // JCSMORE - look at using changeMode here. if (conn) conn.clear(); else packProgress(wuid,false); conn.setown(querySDS().connect(rootPath.str(), myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE_QUERY, SDS_LOCK_TIMEOUT)); progress = conn->queryRoot(); if (!progress->hasChildren()) // i.e. blank. { formatVersion = PROGRESS_FORMAT_V; progress->setPropInt("@format", PROGRESS_FORMAT_V); } else formatVersion = progress->getPropInt("@format"); connected = connectedWrite = true; } void unlock() { connected = false; connectedWrite = false; conn.clear(); } IPropertyTree &updateNode(WUGraphIDType nodeId, WUNodeIDType id) { StringBuffer s; return updateElement(nodeId, "node", s.append(id).str()); } IPropertyTree &updateEdge(WUGraphIDType nodeId, const char *edgeId) { return updateElement(nodeId, "edge", edgeId); } static bool getRunningGraph(const char *wuid, IStringVal &graphName, WUGraphIDType &subId) { StringBuffer path; Owned conn = querySDS().connect(path.append("/GraphProgress/").append(wuid).str(), myProcessSession(), RTM_LOCK_READ, SDS_LOCK_TIMEOUT); if (!conn) return false; const char *name = conn->queryRoot()->queryProp("Running/@graph"); if (name) { graphName.set(name); subId = conn->queryRoot()->getPropInt64("Running/@subId"); return true; } else return false; } void setGraphState(WUGraphState state) { progress->setPropInt("@_state", (unsigned)state); } void setNodeState(WUGraphIDType nodeId, WUGraphState state) { if (!connectedWrite) lockWrite(); StringBuffer path; path.append("node[@id=\"").append(nodeId).append("\"]"); IPropertyTree *node = progress->queryPropTree(path.str()); if (!node) { node = progress->addPropTree("node", createPTree()); node->setPropInt("@id", (int)nodeId); } node->setPropInt("@_state", (unsigned)state); switch (state) { case WUGraphRunning: { StringBuffer path; Owned conn = querySDS().connect(path.append("/GraphProgress/").append(wuid).str(), myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE_QUERY, SDS_LOCK_TIMEOUT); IPropertyTree *running = conn->queryRoot()->setPropTree("Running", createPTree()); running->setProp("@graph", graphName); running->setPropInt64("@subId", nodeId); break; } case WUGraphComplete: { StringBuffer path; Owned conn = querySDS().connect(path.append("/GraphProgress/").append(wuid).str(), myProcessSession(), RTM_LOCK_WRITE, SDS_LOCK_TIMEOUT); conn->queryRoot()->removeProp("Running"); // only one thing running at any given time and one thing with lockWrite access break; } } } virtual IPropertyTree * queryProgressTree() { if (!connected) connect(); return progress; } virtual WUGraphState queryGraphState() { return (WUGraphState)queryProgressTree()->getPropInt("@_state", (unsigned)WUGraphUnknown); } virtual WUGraphState queryNodeState(WUGraphIDType nodeId) { StringBuffer path; path.append("node[@id=\"").append(nodeId).append("\"]/@_state"); return (WUGraphState)queryProgressTree()->getPropInt(path.str(), (unsigned)WUGraphUnknown); } virtual IWUGraphProgress * update() { return new CGraphProgress(*this); } virtual unsigned queryFormatVersion() { if (!connected) connect(); return formatVersion; } static bool packProgress(const char *wuid,bool pack) { StringBuffer path; path.append("/GraphProgress/").append(wuid); Owned conn(querySDS().connect(path.str(), myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE_QUERY, SDS_LOCK_TIMEOUT)); if (!conn) return false; Owned newt; MemoryBuffer buf; IPropertyTree *root = conn->queryRoot(); if (root->getPropBin("Packed",buf)) { if (pack) return true; newt.setown(createPTree(buf)); IPropertyTree *running = root->queryPropTree("Running"); if (running) newt->setPropTree("Running",createPTreeFromIPT(running)); } else { if (!pack) return true; newt.setown(createPTree(wuid)); IPropertyTree *running = root->queryPropTree("Running"); if (running) { newt->setPropTree("Running",createPTreeFromIPT(running)); root->removeTree(running); } root->serialize(buf); newt->setPropBin("Packed",buf.length(),buf.bufferBase()); } root->setPropTree(NULL,newt.getClear()); return true; } private: Owned conn; IPropertyTree* progress; StringAttr wuid, graphName; StringBuffer rootPath; bool connected, connectedWrite; unsigned formatVersion; }; class CLocalWUTimeStamp : public CInterface, implements IConstWUTimeStamp { Owned p; public: IMPLEMENT_IINTERFACE; CLocalWUTimeStamp(IPropertyTree *p); virtual IStringVal & getApplication(IStringVal & str) const; virtual IStringVal & getEvent(IStringVal & str) const; virtual IStringVal & getDate(IStringVal & dt) const; }; class CLocalWUAppValue : public CInterface, implements IConstWUAppValue { Owned p; StringBuffer prop; public: IMPLEMENT_IINTERFACE; CLocalWUAppValue(IPropertyTree *p,unsigned child); virtual IStringVal & getApplication(IStringVal & str) const; virtual IStringVal & getName(IStringVal & str) const; virtual IStringVal & getValue(IStringVal & dt) const; }; template struct CachedTags { CachedTags(): cached(false) {} void load(IPropertyTree* p,const char* xpath) { if (!cached) { assertex(tags.length() == 0); Owned r = p->getElements(xpath); for (r->first(); r->isValid(); r->next()) { IPropertyTree *rp = &r->query(); rp->Link(); tags.append(*new T(rp)); } cached = true; } } operator IArrayOf&() { return tags; } void kill() { cached = false; tags.kill(); } bool cached; IArrayOf tags; }; template <> struct CachedTags { CachedTags(): cached(false) {} void load(IPropertyTree* p,const char* xpath) { if (!cached) { assertex(tags.length() == 0); Owned r = p->getElements(xpath); for (r->first(); r->isValid(); r->next()) { IPropertyTree *rp = &r->query(); Owned v = rp->getElements("*"); unsigned pos = 1; for (v->first(); v->isValid(); v->next()) { rp->Link(); tags.append(*new CLocalWUAppValue(rp,pos++)); } } cached = true; } } operator IArrayOf&() { return tags; } void kill() { cached = false; tags.kill(); } bool cached; IArrayOf tags; }; class CLocalWorkUnit : public CInterface, implements IConstWorkUnit , implements ISDSSubscription, implements IExtendedWUInterface { friend StringBuffer &exportWorkUnitToXML(const IConstWorkUnit *wu, StringBuffer &str); friend void exportWorkUnitToXMLFile(const IConstWorkUnit *wu, const char * filename, unsigned extraXmlFlags); // NOTE - order is important - we need to construct connection before p and (especially) destruct after p Owned connection; Owned p; bool dirty; bool connectAtRoot; mutable bool abortDirty; mutable bool abortState; mutable CriticalSection crit; mutable Owned query; mutable Owned webServicesInfo; mutable Owned roxieQueryInfo; mutable Owned workflowIterator; mutable bool workflowIteratorCached; mutable bool resultsCached; mutable bool temporariesCached; mutable bool variablesCached; mutable bool exceptionsCached; mutable bool pluginsCached; mutable bool librariesCached; mutable bool activitiesCached; mutable bool webServicesInfoCached; mutable bool roxieQueryInfoCached; mutable IArrayOf activities; mutable IArrayOf plugins; mutable IArrayOf libraries; mutable IArrayOf exceptions; mutable IArrayOf graphs; mutable IArrayOf results; mutable IArrayOf temporaries; mutable IArrayOf variables; mutable CachedTags timestamps; mutable CachedTags appvalues; mutable Owned userDesc; Mutex locked; Owned secMgr; Owned secUser; mutable Owned cachedGraphs; public: IMPLEMENT_IINTERFACE; CLocalWorkUnit(IRemoteConnection *_conn, ISecManager *secmgr, ISecUser *secuser, const char *parentWuid = NULL); CLocalWorkUnit(IRemoteConnection *_conn, IPropertyTree* root, ISecManager *secmgr, ISecUser *secuser); ~CLocalWorkUnit(); CLocalWorkUnit(const char *dummyWuid, const char *parentWuid, ISecManager *secmgr, ISecUser *secuser); ISecManager *querySecMgr(){return secMgr.get();} ISecUser *querySecUser(){return secUser.get();} void setSecIfcs(ISecManager *mgr, ISecUser*usr){secMgr.set(mgr); secUser.set(usr);} virtual bool aborting() const; virtual void forceReload(); virtual WUAction getAction() const; virtual IStringVal& getActionEx(IStringVal & str) const; virtual IStringVal & getApplicationValue(const char * application, const char * propname, IStringVal & str) const; virtual int getApplicationValueInt(const char * application, const char * propname, int defVal) const; virtual IConstWUAppValueIterator & getApplicationValues() const; virtual bool hasWorkflow() const; virtual unsigned queryEventScheduledCount() const; virtual IPropertyTree * queryWorkflowTree() const; virtual IConstWorkflowItemIterator * getWorkflowItems() const; virtual IWorkflowItemArray * getWorkflowClone() const; virtual IConstLocalFileUploadIterator * getLocalFileUploads() const; virtual bool requiresLocalFileUpload() const; virtual bool getIsQueryService() const; virtual IStringVal & getClusterName(IStringVal & str) const; virtual unsigned getCombineQueries() const; virtual WUCompareMode getCompareMode() const; virtual IStringVal & getCustomerId(IStringVal & str) const; virtual bool hasDebugValue(const char * propname) const; virtual IStringVal & getDebugValue(const char * propname, IStringVal & str) const; virtual IStringIterator & getDebugValues() const; virtual IStringIterator & getDebugValues(const char *prop) const; virtual int getDebugValueInt(const char * propname, int defVal) const; virtual __int64 getDebugValueInt64(const char * propname, __int64 defVal) const; virtual bool getDebugValueBool(const char * propname, bool defVal) const; virtual unsigned getExceptionCount() const; virtual IConstWUExceptionIterator & getExceptions() const; virtual IConstWUResult * getGlobalByName(const char * name) const; virtual unsigned getGraphCount() const; virtual unsigned getSourceFileCount() const; virtual unsigned getResultCount() const; virtual unsigned getVariableCount() const; virtual unsigned getTimerCount() const; virtual unsigned getApplicationValueCount() const; virtual IConstWUGraphIterator & getGraphs(WUGraphType type) const; virtual IConstWUGraph * getGraph(const char *name) const; virtual IConstWUGraphProgress * getGraphProgress(const char * name) const; virtual IStringVal & getJobName(IStringVal & str) const; virtual IStringVal & getParentWuid(IStringVal & str) const; virtual IConstWUPlugin * getPluginByName(const char * name) const; virtual IConstWUPluginIterator & getPlugins() const; virtual IConstWULibraryIterator & getLibraries() const; virtual WUPriorityClass getPriority() const; virtual int getPriorityLevel() const; virtual int getPriorityValue() const; virtual IConstWUQuery * getQuery() const; virtual bool getRescheduleFlag() const; virtual IConstWUResult * getResultByName(const char * name) const; virtual IConstWUResult * getResultBySequence(unsigned seq) const; virtual unsigned getResultLimit() const; virtual IConstWUResultIterator & getResults() const; virtual IConstWUActivityIterator& getActivities() const; virtual IConstWUActivity* getActivity(__int64 id) const; virtual IStringVal & getScope(IStringVal & str) const; virtual IStringVal & getSecurityToken(IStringVal & str) const; virtual WUState getState() const; virtual IStringVal & getStateEx(IStringVal & str) const; virtual __int64 getAgentSession() const; virtual unsigned getAgentPID() const; virtual IStringVal & getStateDesc(IStringVal & str) const; virtual IConstWUResult * getTemporaryByName(const char * name) const; virtual IConstWUResultIterator & getTemporaries() const; virtual unsigned getTimerCount(const char * timerName, const char * instance) const; virtual unsigned getTimerDuration(const char * timerName, const char * instance) const; virtual IStringIterator & getTimers() const; virtual IConstWUTimeStampIterator & getTimeStamps() const; virtual IConstWUWebServicesInfo * getWebServicesInfo() const; virtual IConstWURoxieQueryInfo * getRoxieQueryInfo() const; virtual IStringVal & getXmlParams(IStringVal & params) const; virtual const IPropertyTree *getXmlParams() const; virtual unsigned __int64 getHash() const; virtual bool getWuDate(unsigned & year, unsigned & month, unsigned& day); virtual IStringVal & getSnapshot(IStringVal & str) const; virtual IStringVal & getTimeStamp(const char * name, const char * instance, IStringVal & str) const; virtual IStringVal & getUser(IStringVal & str) const; virtual IStringVal & getWuScope(IStringVal & str) const; virtual IConstWUResult * getVariableByName(const char * name) const; virtual IConstWUResultIterator & getVariables() const; virtual IStringVal & getWuid(IStringVal & str) const; virtual bool getRunningGraph(IStringVal &graphName, WUGraphIDType &subId) const; virtual bool isProtected() const; virtual bool isPausing() const; virtual bool isBilled() const; virtual IWorkUnit& lock(); virtual bool reload(); virtual void requestAbort(); virtual void subscribe(WUSubscribeOptions options); virtual unsigned calculateHash(unsigned prevHash); virtual void copyWorkUnit(IConstWorkUnit *cached); virtual unsigned queryFileUsage(const char *filename) const; virtual bool getCloneable() const; virtual IUserDescriptor * getUserDescriptor() const; virtual unsigned getCodeVersion() const; virtual void getBuildVersion(IStringVal & buildVersion, IStringVal & eclVersion) const; virtual IPropertyTree * getDiskUsageStats(); virtual IPropertyTreeIterator & getFileIterator() const; virtual bool archiveWorkUnit(const char *base,bool del,bool ignoredllerrors,bool deleteOwned); virtual void packWorkUnit(bool pack=true); virtual IDateTime & getTimeScheduled(IDateTime &val) const; virtual IPropertyTreeIterator & getFilesReadIterator() const; virtual void protect(bool protectMode); virtual IConstWULibrary * getLibraryByName(const char * name) const; virtual unsigned getDebugAgentListenerPort() const; virtual IStringVal & getDebugAgentListenerIP(IStringVal &ip) const; void clearExceptions(); void commit(); IWUException *createException(); void setTimeStamp(const char *name, const char *instance, const char *event); void addTimeStamp(const char * name, const char * instance, const char *event); void setAction(WUAction action); void setApplicationValue(const char * application, const char * propname, const char * value, bool overwrite); void setApplicationValueInt(const char * application, const char * propname, int value, bool overwrite); void incEventScheduledCount(); void setIsQueryService(bool value); void setCloneable(bool value); void setIsClone(bool value); void setClusterName(const char * value); void setCodeVersion(unsigned version, const char * buildVersion, const char * eclVersion); void setCombineQueries(unsigned combine); void setCompareMode(WUCompareMode value); void setCustomerId(const char * value); void setDebugValue(const char * propname, const char * value, bool overwrite); void setDebugValueInt(const char * propname, int value, bool overwrite); void setJobName(const char * value); void setPriority(WUPriorityClass cls); void setPriorityLevel(int level); void setRescheduleFlag(bool value); void setResultLimit(unsigned value); void setState(WUState state); void setStateEx(const char * text); void setAgentSession(__int64 sessionId); void setAgentPID(unsigned pid); void setSecurityToken(const char *value); void setTimerInfo(const char * name, const char * instance, unsigned ms, unsigned count, unsigned max); void setTracingValue(const char * propname, const char * value); void setTracingValueInt(const char * propname, int value); void setUser(const char * value); void setWuScope(const char * value); void setBilled(bool billed); void setSnapshot(const char * value); void setTimeStamp(const char *application, const char *instance, const char *event, bool add); void setDebugAgentListenerPort(unsigned port); void setDebugAgentListenerIP(const char * ip); void setXmlParams(const char *params); void setXmlParams(IPropertyTree *tree); void setHash(unsigned __int64 hash); IWorkflowItem* addWorkflowItem(unsigned wfid, WFType type, WFMode mode, unsigned success, unsigned failure, unsigned recovery, unsigned retriesAllowed, unsigned contingencyFor); IWorkflowItemIterator * updateWorkflowItems(); void syncRuntimeWorkflow(IWorkflowItemArray * array); void resetWorkflow(); void schedule(); void deschedule(); unsigned addLocalFileUpload(LocalFileUploadType type, char const * source, char const * destination, char const * eventTag); IWUResult * updateGlobalByName(const char * name); IWUGraph * updateGraph(const char * name); IWUQuery * updateQuery(); IWUWebServicesInfo* updateWebServicesInfo(bool create); IWURoxieQueryInfo* updateRoxieQueryInfo(const char *wuid, const char *roxieClusterName); IWUActivity* updateActivity(__int64 id); IWUPlugin * updatePluginByName(const char * name); IWULibrary * updateLibraryByName(const char * name); IWUResult * updateResultByName(const char * name); IWUResult * updateResultBySequence(unsigned seq); IWUResult * updateTemporaryByName(const char * name); IWUResult * updateVariableByName(const char * name); void addFile(const char *fileName, StringArray *clusters, unsigned usageCount, WUFileKind fileKind, const char *graphOwner); void noteFileRead(IDistributedFile *file); void releaseFile(const char *fileName); void clearGraphProgress(); void resetBeforeGeneration(); void deleteTempFiles(const char *graph, bool deleteOwned, bool deleteJobOwned); void addDiskUsageStats(__int64 avgNodeUsage, unsigned minNode, __int64 minNodeUsage, unsigned maxNode, __int64 maxNodeUsage, __int64 graphId); void setTimeScheduled(const IDateTime &val); // ILocalWorkUnit - used for debugging etc void loadXML(const char *xml); void serialize(MemoryBuffer &tgt); void deserialize(MemoryBuffer &src); IWorkUnit &lockRemote(bool commit); void unlockRemote(bool closing); void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen=0, const void *valueData=NULL); void abort(); void cleanupAndDelete(bool deldll,bool deleteOwned); bool switchThorQueue(const char *cluster, IQueueSwitcher *qs); void setAllowedClusters(const char *value); IStringVal & getAllowedClusters(IStringVal & str) const; void remoteCheckAccess(IUserDescriptor *user, bool writeaccess) const; void setAllowAutoQueueSwitch(bool val); bool getAllowAutoQueueSwitch() const; void setLibraryInformation(const char * name, unsigned interfaceHash, unsigned definitionHash); private: void init(); IWUGraph *createGraph(); IWUResult *createResult(); void loadGraphs() const; void loadResults() const; void loadTemporaries() const; void loadVariables() const; void loadExceptions() const; void loadPlugins() const; void loadLibraries() const; void loadClusters() const; void loadActivities() const; void unsubscribe(); void checkAgentRunning(WUState & state); // MORE - the two could be a bit more similar... class CWorkUnitWatcher : public CInterface, implements ISDSSubscription { ISDSSubscription *parent; // not linked - it links me SubscriptionId change; bool sub; public: IMPLEMENT_IINTERFACE; CWorkUnitWatcher(ISDSSubscription *_parent, const char *wuid, bool _sub) : parent(_parent), sub(_sub) { StringBuffer wuRoot; getXPath(wuRoot, wuid); change = querySDS().subscribe(wuRoot.str(), *this, sub); } ~CWorkUnitWatcher() { assertex(change==0); } bool watchingChildren() { return sub; } void unsubscribe() { querySDS().unsubscribe(change); change = 0; } void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData) { parent->notify(id, xpath, flags, valueLen, valueData); } }; class CWorkUnitAbortWatcher : public CInterface, implements ISDSSubscription { CLocalWorkUnit *parent; // not linked - it links me SubscriptionId abort; public: IMPLEMENT_IINTERFACE; CWorkUnitAbortWatcher(CLocalWorkUnit *_parent, const char *wuid) : parent(_parent) { StringBuffer wuRoot; wuRoot.append("/WorkUnitAborts/").append(wuid); abort = querySDS().subscribe(wuRoot.str(), *this); } ~CWorkUnitAbortWatcher() { assertex(abort==0); } void unsubscribe() { querySDS().unsubscribe(abort); abort = 0; } void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData) { parent->abort(); } }; Owned abortWatcher; Owned changeWatcher; void ensureGraphsUnpacked () { IPropertyTree *t = p->queryPropTree("PackedGraphs"); MemoryBuffer buf; if (t&&t->getPropBin(NULL,buf)) { cachedGraphs.clear(); IPropertyTree *st = createPTree(buf); if (st) { p->setPropTree("Graphs",st); p->removeTree(t); } } } }; class CLockedWorkUnit : public CInterface, implements ILocalWorkUnit, implements IExtendedWUInterface { public: Owned c; IMPLEMENT_IINTERFACE; CLockedWorkUnit(CLocalWorkUnit *_c) : c(_c) {} ~CLockedWorkUnit() { if (workUnitTraceLevel > 1) { StringAttr x; StringAttrAdaptor strval(x); getWuid(strval); PrintLog("Releasing locked workunit %s", x.get()); } if (c) c->unlockRemote(c->IsShared()); } void setSecIfcs(ISecManager *mgr, ISecUser*usr){c->setSecIfcs(mgr, usr);} virtual IConstWorkUnit * unlock() { c->unlockRemote(c->IsShared()); return c.getClear(); } virtual bool aborting() const { return c->aborting(); } virtual void forceReload() { UNIMPLEMENTED; } virtual WUAction getAction() const { return c->getAction(); } virtual IStringVal& getActionEx(IStringVal & str) const { return c->getActionEx(str); } virtual IStringVal & getApplicationValue(const char * application, const char * propname, IStringVal & str) const { return c->getApplicationValue(application, propname, str); } virtual int getApplicationValueInt(const char * application, const char * propname, int defVal) const { return c->getApplicationValueInt(application, propname, defVal); } virtual IConstWUAppValueIterator & getApplicationValues() const { return c->getApplicationValues(); } virtual bool hasWorkflow() const { return c->hasWorkflow(); } virtual unsigned queryEventScheduledCount() const { return c->queryEventScheduledCount(); } virtual IPropertyTree * queryWorkflowTree() const { return c->queryWorkflowTree(); } virtual IConstWorkflowItemIterator * getWorkflowItems() const { return c->getWorkflowItems(); } virtual IWorkflowItemArray * getWorkflowClone() const { return c->getWorkflowClone(); } virtual bool requiresLocalFileUpload() const { return c->requiresLocalFileUpload(); } virtual IConstLocalFileUploadIterator * getLocalFileUploads() const { return c->getLocalFileUploads(); } virtual bool getIsQueryService() const { return c->getIsQueryService(); } virtual bool getCloneable() const { return c->getCloneable(); } virtual IUserDescriptor * getUserDescriptor() const { return c->getUserDescriptor(); } virtual IStringVal & getClusterName(IStringVal & str) const { return c->getClusterName(str); } virtual unsigned getCodeVersion() const { return c->getCodeVersion(); } virtual void getBuildVersion(IStringVal & buildVersion, IStringVal & eclVersion) const { c->getBuildVersion(buildVersion, eclVersion); } virtual unsigned getCombineQueries() const { return c->getCombineQueries(); } virtual WUCompareMode getCompareMode() const { return c->getCompareMode(); } virtual IStringVal & getCustomerId(IStringVal & str) const { return c->getCustomerId(str); } virtual bool hasDebugValue(const char * propname) const { return c->hasDebugValue(propname); } virtual IStringVal & getDebugValue(const char * propname, IStringVal & str) const { return c->getDebugValue(propname, str); } virtual int getDebugValueInt(const char * propname, int defVal) const { return c->getDebugValueInt(propname, defVal); } virtual __int64 getDebugValueInt64(const char * propname, __int64 defVal) const { return c->getDebugValueInt64(propname, defVal); } virtual bool getDebugValueBool(const char * propname, bool defVal) const { return c->getDebugValueBool(propname, defVal); } virtual IStringIterator & getDebugValues() const { return c->getDebugValues(NULL); } virtual IStringIterator & getDebugValues(const char *prop) const { return c->getDebugValues(prop); } virtual unsigned getExceptionCount() const { return c->getExceptionCount(); } virtual IConstWUExceptionIterator & getExceptions() const { return c->getExceptions(); } virtual unsigned getGraphCount() const { return c->getGraphCount(); } virtual unsigned getSourceFileCount() const { return c->getSourceFileCount(); } virtual unsigned getResultCount() const { return c->getResultCount(); } virtual unsigned getVariableCount() const { return c->getVariableCount(); } virtual unsigned getTimerCount() const { return c->getTimerCount(); } virtual unsigned getApplicationValueCount() const { return c->getApplicationValueCount(); } virtual IConstWUGraphIterator & getGraphs(WUGraphType type) const { return c->getGraphs(type); } virtual IConstWUGraph * getGraph(const char *name) const { return c->getGraph(name); } virtual IConstWUGraphProgress * getGraphProgress(const char * name) const { return c->getGraphProgress(name); } virtual IStringVal & getJobName(IStringVal & str) const { return c->getJobName(str); } virtual IStringVal & getParentWuid(IStringVal & str) const { return c->getParentWuid(str); } virtual IConstWUPlugin * getPluginByName(const char * name) const { return c->getPluginByName(name); } virtual IConstWUPluginIterator & getPlugins() const { return c->getPlugins(); } virtual IConstWULibrary* getLibraryByName(const char *name) const { return c->getLibraryByName(name); } virtual IConstWULibraryIterator & getLibraries() const { return c->getLibraries(); } virtual WUPriorityClass getPriority() const { return c->getPriority(); } virtual int getPriorityLevel() const { return c->getPriorityLevel(); } virtual int getPriorityValue() const { return c->getPriorityValue(); } virtual IConstWUQuery * getQuery() const { return c->getQuery(); } virtual IConstWUWebServicesInfo * getWebServicesInfo() const { return c->getWebServicesInfo(); } virtual IConstWURoxieQueryInfo* getRoxieQueryInfo() const { return c->getRoxieQueryInfo(); } virtual bool getRescheduleFlag() const { return c->getRescheduleFlag(); } virtual IConstWUResult * getResultByName(const char * name) const { return c->getResultByName(name); } virtual IConstWUResult * getResultBySequence(unsigned seq) const { return c->getResultBySequence(seq); } virtual unsigned getResultLimit() const { return c->getResultLimit(); } virtual IConstWUResultIterator & getResults() const { return c->getResults(); } virtual IConstWUActivityIterator & getActivities() const { return c->getActivities(); } virtual IConstWUActivity * getActivity(__int64 id) const { return c->getActivity(id); } virtual IStringVal & getScope(IStringVal & str) const { return c->getScope(str); } virtual IStringVal & getSecurityToken(IStringVal & str) const { return c->getSecurityToken(str); } virtual WUState getState() const { return c->getState(); } virtual IStringVal & getStateEx(IStringVal & str) const { return c->getStateEx(str); } virtual __int64 getAgentSession() const { return c->getAgentSession(); } virtual unsigned getAgentPID() const { return c->getAgentPID(); } virtual IStringVal & getStateDesc(IStringVal & str) const { return c->getStateDesc(str); } virtual bool getRunningGraph(IStringVal & graphName, WUGraphIDType & subId) const { return c->getRunningGraph(graphName, subId); } virtual unsigned getTimerCount(const char * timerName, const char * instance) const { return c->getTimerCount(timerName, instance); } virtual unsigned getTimerDuration(const char * timerName, const char * instance) const { return c->getTimerDuration(timerName, instance); } virtual IStringVal & getTimeStamp(const char * name, const char * instance, IStringVal & str) const { return c->getTimeStamp(name, instance, str); } virtual IStringIterator & getTimers() const { return c->getTimers(); } virtual IConstWUTimeStampIterator & getTimeStamps() const { return c->getTimeStamps(); } virtual bool getWuDate(unsigned & year, unsigned & month, unsigned& day) { return c->getWuDate(year,month,day);} virtual IStringVal & getSnapshot(IStringVal & str) const { return c->getSnapshot(str); } virtual IStringVal & getUser(IStringVal & str) const { return c->getUser(str); } virtual IStringVal & getWuScope(IStringVal & str) const { return c->getWuScope(str); } virtual IStringVal & getWuid(IStringVal & str) const { return c->getWuid(str); } virtual IConstWUResult * getGlobalByName(const char * name) const { return c->getGlobalByName(name); } virtual IConstWUResult * getTemporaryByName(const char * name) const { return c->getTemporaryByName(name); } virtual IConstWUResultIterator & getTemporaries() const { return c->getTemporaries(); } virtual IConstWUResult * getVariableByName(const char * name) const { return c->getVariableByName(name); } virtual IConstWUResultIterator & getVariables() const { return c->getVariables(); } virtual bool isProtected() const { return c->isProtected(); } virtual bool isPausing() const { return c->isPausing(); } virtual bool isBilled() const { return c->isBilled(); } virtual IWorkUnit & lock() { ((CInterface *)this)->Link(); return (IWorkUnit &) *this; } virtual bool reload() { UNIMPLEMENTED; } virtual void subscribe(WUSubscribeOptions options) { c->subscribe(options); } virtual void requestAbort() { c->requestAbort(); } virtual unsigned calculateHash(unsigned prevHash) { return c->calculateHash(prevHash); } virtual void copyWorkUnit(IConstWorkUnit *cached) { c->copyWorkUnit(cached); } virtual bool archiveWorkUnit(const char *base,bool del,bool deldll,bool deleteOwned) { return c->archiveWorkUnit(base,del,deldll,deleteOwned); } virtual void packWorkUnit(bool pack) { c->packWorkUnit(pack); } virtual unsigned queryFileUsage(const char *filename) const { return c->queryFileUsage(filename); } virtual IDateTime & getTimeScheduled(IDateTime &val) const { return c->getTimeScheduled(val); } virtual unsigned getDebugAgentListenerPort() const { return c->getDebugAgentListenerPort(); } virtual IStringVal & getDebugAgentListenerIP(IStringVal &ip) const { return c->getDebugAgentListenerIP(ip); } virtual IStringVal & getXmlParams(IStringVal & params) const { return c->getXmlParams(params); } virtual const IPropertyTree *getXmlParams() const { return c->getXmlParams(); } virtual unsigned __int64 getHash() const { return c->getHash(); } virtual void clearExceptions() { c->clearExceptions(); } virtual void commit() { c->commit(); } virtual IWUException * createException() { return c->createException(); } virtual void setTimeStamp(const char * name, const char * instance, const char *event) { c->setTimeStamp(name, instance, event); } virtual void addTimeStamp(const char * name, const char * instance, const char *event) { c->addTimeStamp(name, instance, event); } virtual void protect(bool protectMode) { c->protect(protectMode); } virtual void setBilled(bool billed) { c->setBilled(billed); } virtual void setAction(WUAction action) { c->setAction(action); } virtual void setApplicationValue(const char * application, const char * propname, const char * value, bool overwrite) { c->setApplicationValue(application, propname, value, overwrite); } virtual void setApplicationValueInt(const char * application, const char * propname, int value, bool overwrite) { c->setApplicationValueInt(application, propname, value, overwrite); } virtual void incEventScheduledCount() { c->incEventScheduledCount(); } virtual void setIsQueryService(bool value) { c->setIsQueryService(value); } virtual void setCloneable(bool value) { c->setCloneable(value); } virtual void setIsClone(bool value) { c->setIsClone(value); } virtual void setClusterName(const char * value) { c->setClusterName(value); } virtual void setCodeVersion(unsigned version, const char * buildVersion, const char * eclVersion) { c->setCodeVersion(version, buildVersion, eclVersion); } virtual void setCombineQueries(unsigned combine) { c->setCombineQueries(combine); } virtual void setCompareMode(WUCompareMode value) { c->setCompareMode(value); } virtual void setCustomerId(const char * value) { c->setCustomerId(value); } virtual void setDebugValue(const char * propname, const char * value, bool overwrite) { c->setDebugValue(propname, value, overwrite); } virtual void setDebugValueInt(const char * propname, int value, bool overwrite) { c->setDebugValueInt(propname, value, overwrite); } virtual void setJobName(const char * value) { c->setJobName(value); } virtual void setPriority(WUPriorityClass cls) { c->setPriority(cls); } virtual void setPriorityLevel(int level) { c->setPriorityLevel(level); } virtual void setRescheduleFlag(bool value) { c->setRescheduleFlag(value); } virtual void setResultLimit(unsigned value) { c->setResultLimit(value); } virtual void setSecurityToken(const char *value) { c->setSecurityToken(value); } virtual void setState(WUState state) { c->setState(state); } virtual void setStateEx(const char * text) { c->setStateEx(text); } virtual void setAgentSession(__int64 sessionId) { c->setAgentSession(sessionId); } virtual void setAgentPID(unsigned pid) { c->setAgentPID(pid); } virtual void setTimerInfo(const char * name, const char * instance, unsigned ms, unsigned count, unsigned max) { c->setTimerInfo(name, instance, ms, count, max); } virtual void setTracingValue(const char * propname, const char * value) { c->setTracingValue(propname, value); } virtual void setTracingValueInt(const char * propname, int value) { c->setTracingValueInt(propname, value); } virtual void setUser(const char * value) { c->setUser(value); } virtual void setWuScope(const char * value) { if (value && *value) { ISecManager *secmgr=c->querySecMgr(); ISecUser *secusr=c->querySecUser(); if (!secmgr || !secusr) throw MakeStringException(WUERR_SecurityNotAvailable, "Trying to change workunit scope without security interfaces available"); if (checkWuScopeSecAccess(value, *secmgr, secusr, SecAccess_Write, "Change Scope", true, true)) c->setWuScope(value); } } virtual IWorkflowItem* addWorkflowItem(unsigned wfid, WFType type, WFMode mode, unsigned success, unsigned failure, unsigned recovery, unsigned retriesAllowed, unsigned contingencyFor) { return c->addWorkflowItem(wfid, type, mode, success, failure, recovery, retriesAllowed, contingencyFor); } virtual void syncRuntimeWorkflow(IWorkflowItemArray * array) { c->syncRuntimeWorkflow(array); } virtual IWorkflowItemIterator * updateWorkflowItems() { return c->updateWorkflowItems(); } virtual void resetWorkflow() { c->resetWorkflow(); } virtual void schedule() { c->schedule(); } virtual void deschedule() { c->deschedule(); } virtual unsigned addLocalFileUpload(LocalFileUploadType type, char const * source, char const * destination, char const * eventTag) { return c->addLocalFileUpload(type, source, destination, eventTag); } virtual IWUResult * updateGlobalByName(const char * name) { return c->updateGlobalByName(name); } virtual IWUGraph * updateGraph(const char * name) { return c->updateGraph(name); } virtual IWUQuery * updateQuery() { return c->updateQuery(); } virtual IWUWebServicesInfo * updateWebServicesInfo(bool create) { return c->updateWebServicesInfo(create); } virtual IWURoxieQueryInfo * updateRoxieQueryInfo(const char *wuid, const char *roxieClusterName) { return c->updateRoxieQueryInfo(wuid, roxieClusterName); } virtual IWUActivity * updateActivity(__int64 id) { return c->updateActivity(id); } virtual IWUPlugin * updatePluginByName(const char * name) { return c->updatePluginByName(name); } virtual IWULibrary * updateLibraryByName(const char * name) { return c->updateLibraryByName(name); } virtual IWUResult * updateResultByName(const char * name) { return c->updateResultByName(name); } virtual IWUResult * updateResultBySequence(unsigned seq) { return c->updateResultBySequence(seq); } virtual IWUResult * updateTemporaryByName(const char * name) { return c->updateTemporaryByName(name); } virtual IWUResult * updateVariableByName(const char * name) { return c->updateVariableByName(name); } virtual void addFile(const char *fileName, StringArray *clusters, unsigned usageCount, WUFileKind fileKind, const char *graphOwner) { c->addFile(fileName, clusters, usageCount, fileKind, graphOwner); } virtual void noteFileRead(IDistributedFile *file) { c->noteFileRead(file); } virtual void releaseFile(const char *fileName) { c->releaseFile(fileName); } virtual void clearGraphProgress() { c->clearGraphProgress(); } virtual void resetBeforeGeneration() { c->resetBeforeGeneration(); } virtual void deleteTempFiles(const char *graph, bool deleteOwned, bool deleteJobOwned) { c->deleteTempFiles(graph, deleteOwned, deleteJobOwned); } virtual void addDiskUsageStats(__int64 avgNodeUsage, unsigned minNode, __int64 minNodeUsage, unsigned maxNode, __int64 maxNodeUsage, __int64 graphId) { c->addDiskUsageStats(avgNodeUsage, minNode, minNodeUsage, maxNode, maxNodeUsage, graphId); } virtual IPropertyTree * getDiskUsageStats() { return c->getDiskUsageStats(); } virtual IPropertyTreeIterator & getFileIterator() const { return c->getFileIterator(); } virtual IPropertyTreeIterator & getFilesReadIterator() const { return c->getFilesReadIterator(); } virtual void setSnapshot(const char * value) { c->setSnapshot(value); } virtual void setTimeScheduled(const IDateTime &val) { c->setTimeScheduled(val); } virtual void setDebugAgentListenerPort(unsigned port) { c->setDebugAgentListenerPort(port); } virtual void setDebugAgentListenerIP(const char * ip) { c->setDebugAgentListenerIP(ip); } virtual void setXmlParams(const char *params) { c->setXmlParams(params); } virtual void setXmlParams(IPropertyTree *tree) { c->setXmlParams(tree); } virtual void setHash(unsigned __int64 hash) { c->setHash(hash); } // ILocalWorkUnit - used for debugging etc virtual void loadXML(const char *xml) { c->loadXML(xml); } virtual void serialize(MemoryBuffer &tgt) { c->serialize(tgt); } virtual void deserialize(MemoryBuffer &src) { c->deserialize(src); } virtual bool switchThorQueue(const char *cluster, IQueueSwitcher *qs) { return c->switchThorQueue(cluster,qs); } virtual void setAllowedClusters(const char *value) { c->setAllowedClusters(value); } virtual IStringVal& getAllowedClusters(IStringVal &str) const { return c->getAllowedClusters(str); } virtual void remoteCheckAccess(IUserDescriptor *user, bool writeaccess) const { c->remoteCheckAccess(user,writeaccess); } virtual void setAllowAutoQueueSwitch(bool val) { c->setAllowAutoQueueSwitch(val); } virtual bool getAllowAutoQueueSwitch() const { return c->getAllowAutoQueueSwitch(); } virtual void setLibraryInformation(const char * name, unsigned interfaceHash, unsigned definitionHash) { c->setLibraryInformation(name, interfaceHash, definitionHash); } virtual void setResultInt(const char * name, unsigned sequence, __int64 val) { Owned r = updateResult(name, sequence); if (r) { r->setResultInt(val); r->setResultStatus(ResultStatusCalculated); } } virtual void setResultUInt(const char * name, unsigned sequence, unsigned __int64 val) { Owned r = updateResult(name, sequence); if (r) { r->setResultUInt(val); r->setResultStatus(ResultStatusCalculated); } } virtual void setResultReal(const char *name, unsigned sequence, double val) { Owned r = updateResult(name, sequence); if (r) { r->setResultReal(val); r->setResultStatus(ResultStatusCalculated); } } virtual void setResultVarString(const char * stepname, unsigned sequence, const char *val) { setResultString(stepname, sequence, strlen(val), val); } virtual void setResultVarUnicode(const char * stepname, unsigned sequence, UChar const *val) { setResultUnicode(stepname, sequence, rtlUnicodeStrlen(val), val); } virtual void setResultString(const char * stepname, unsigned sequence, int len, const char *val) { doSetResultString(type_string, stepname, sequence, len, val); } virtual void setResultData(const char * stepname, unsigned sequence, int len, const void *val) { doSetResultString(type_data, stepname, sequence, len, (const char *)val); } virtual void setResultRaw(const char * name, unsigned sequence, int len, const void *val) { Owned r = updateResult(name, sequence); if (r) { r->setResultRaw(len, val, ResultFormatRaw); r->setResultStatus(ResultStatusCalculated); } } virtual void setResultSet(const char * name, unsigned sequence, bool isAll, size32_t len, const void *val, ISetToXmlTransformer *) { Owned r = updateResult(name, sequence); if (r) { r->setResultIsAll(isAll); r->setResultRaw(len, val, ResultFormatRaw); r->setResultStatus(ResultStatusCalculated); } } virtual void setResultUnicode(const char * name, unsigned sequence, int len, UChar const * val) { Owned r = updateResult(name, sequence); if (r) { r->setResultUnicode((char const *)val, len); r->setResultStatus(ResultStatusCalculated); } } virtual void setResultBool(const char *name, unsigned sequence, bool val) { Owned r = updateResult(name, sequence); if (r) { r->setResultBool(val); r->setResultStatus(ResultStatusCalculated); } } virtual void setResultDecimal(const char *name, unsigned sequence, int len, int precision, bool isSigned, const void *val) { Owned r = updateResult(name, sequence); if (r) { r->setResultDecimal(val, len); r->setResultStatus(ResultStatusCalculated); } } virtual void setResultDataset(const char * name, unsigned sequence, size32_t len, const void *val, unsigned numRows, bool extend) { Owned r = updateResult(name, sequence); if (r) { __int64 totalRows = numRows; if (extend) { totalRows += r->getResultRowCount(); r->addResultRaw(len, val, ResultFormatRaw); } else r->setResultRaw(len, val, ResultFormatRaw); r->setResultStatus(ResultStatusCalculated); r->setResultRowCount(totalRows); r->setResultTotalRowCount(totalRows); } } protected: IWUResult *updateResult(const char *name, unsigned sequence) { Owned result = updateWorkUnitResult(this, name, sequence); if (result) { SCMStringBuffer rname; if (!result->getResultName(rname).length()) result->setResultName(name); } return result.getClear(); } void doSetResultString(type_t type, const char *name, unsigned sequence, int len, const char *val) { Owned r = updateResult(name, sequence); if (r) { r->setResultString(val, len); r->setResultStatus(ResultStatusCalculated); } } }; class CLocalWUAssociated : public CInterface, implements IConstWUAssociatedFile { Owned p; public: IMPLEMENT_IINTERFACE; CLocalWUAssociated(IPropertyTree *p); virtual WUFileType getType() const; virtual IStringVal & getDescription(IStringVal & ret) const; virtual IStringVal & getIp(IStringVal & ret) const; virtual IStringVal & getName(IStringVal & ret) const; virtual IStringVal & getNameTail(IStringVal & ret) const; virtual unsigned getCrc() const; }; class CLocalWUQuery : public CInterface, implements IWUQuery { Owned p; mutable IArrayOf associated; mutable CriticalSection crit; mutable bool associatedCached; private: void addSpecialCaseAssociated(WUFileType type, const char * propname, unsigned crc) const; void loadAssociated() const; public: IMPLEMENT_IINTERFACE; CLocalWUQuery(IPropertyTree *p); virtual WUQueryType getQueryType() const; virtual IStringVal& getQueryText(IStringVal &str) const; virtual IStringVal& getQueryShortText(IStringVal &str) const; virtual IStringVal& getQueryName(IStringVal &str) const; virtual IStringVal& getQueryDllName(IStringVal &str) const; virtual unsigned getQueryDllCrc() const; virtual IStringVal& getQueryCppName(IStringVal &str) const; virtual IStringVal& getQueryResTxtName(IStringVal &str) const; virtual IConstWUAssociatedFile * getAssociatedFile(WUFileType type, unsigned index) const; virtual IConstWUAssociatedFileIterator& getAssociatedFiles() const; virtual void setQueryType(WUQueryType qt); virtual void setQueryText(const char *pstr); virtual void setQueryName(const char *); virtual void addAssociatedFile(WUFileType type, const char * name, const char * ip, const char * desc, unsigned crc); virtual void removeAssociatedFiles(); }; class CLocalWUWebServicesInfo : public CInterface, implements IWUWebServicesInfo { Owned p; mutable CriticalSection crit; private: public: IMPLEMENT_IINTERFACE; CLocalWUWebServicesInfo(IPropertyTree *p); virtual IStringVal& getModuleName(IStringVal &str) const; virtual IStringVal& getAttributeName(IStringVal &str) const; virtual IStringVal& getDefaultName(IStringVal &str) const; virtual IStringVal& getInfo(const char *name, IStringVal &str) const; virtual unsigned getWebServicesCRC() const; virtual void setModuleName(const char *); virtual void setAttributeName(const char *); virtual void setDefaultName(const char *); virtual void setInfo(const char *name, const char *info); virtual void setWebServicesCRC(unsigned); }; class CLocalWURoxieQueryInfo : public CInterface, implements IWURoxieQueryInfo { Owned p; mutable CriticalSection crit; private: public: IMPLEMENT_IINTERFACE; CLocalWURoxieQueryInfo(IPropertyTree *p); virtual IStringVal& getQueryInfo(IStringVal &str) const; virtual IStringVal& getDefaultPackageInfo(IStringVal &str) const; virtual IStringVal& getRoxieClusterName(IStringVal &str) const; virtual IStringVal& getWuid(IStringVal &str) const; virtual void setQueryInfo(const char *info); virtual void setDefaultPackageInfo(const char *, int len); virtual void setRoxieClusterName(const char *str); virtual void setWuid(const char *str); }; class CLocalWUResult : public CInterface, implements IWUResult { friend class CLocalWorkUnit; Owned p; void getSchema(TypeInfoArray &types, StringAttrArray &names, IStringVal * ecl=NULL) const; public: IMPLEMENT_IINTERFACE; CLocalWUResult(IPropertyTree *props); ~CLocalWUResult() { try { p.clear(); } catch (IException *E) {E->Release();}} virtual WUResultStatus getResultStatus() const; virtual IStringVal& getResultName(IStringVal &str) const; virtual int getResultSequence() const; virtual bool isResultScalar() const; virtual IStringVal& getResultXml(IStringVal &str) const; virtual unsigned getResultFetchSize() const; virtual __int64 getResultTotalRowCount() const; virtual __int64 getResultRowCount() const; virtual void getResultDataset(IStringVal & ecl, IStringVal & defs) const; virtual IStringVal& getResultLogicalName(IStringVal &ecl) const; virtual IStringVal& getResultKeyField(IStringVal& ecl) const; virtual unsigned getResultRequestedRows() const; virtual __int64 getResultInt() const; virtual bool getResultBool() const; virtual double getResultReal() const; virtual IStringVal& getResultString(IStringVal & str) const; virtual IDataVal& getResultRaw(IDataVal & data, IXmlToRawTransformer * xmlTransformer, ICsvToRawTransformer * csvTransformer) const; virtual IDataVal& getResultUnicode(IDataVal & data) const; virtual void getResultDecimal(void * val, unsigned length, unsigned precision, bool isSigned) const; virtual IStringVal& getResultEclSchema(IStringVal & str) const; virtual __int64 getResultRawSize(IXmlToRawTransformer * xmlTransformer, ICsvToRawTransformer * csvTransformer) const; virtual IDataVal& getResultRaw(IDataVal & data, __int64 from, __int64 length, IXmlToRawTransformer * xmlTransformer, ICsvToRawTransformer * csvTransformer) const; virtual IStringVal& getResultRecordSizeEntry(IStringVal & str) const; virtual IStringVal& getResultTransformerEntry(IStringVal & str) const; virtual __int64 getResultRowLimit() const; virtual IStringVal& getResultFilename(IStringVal & str) const; virtual WUResultFormat getResultFormat() const; virtual unsigned getResultHash() const; virtual bool getResultIsAll() const; // interface IWUResult virtual void setResultStatus(WUResultStatus status); virtual void setResultName(const char *name); virtual void setResultSequence(unsigned seq); virtual void setResultSchemaRaw(unsigned len, const void *schema); virtual void setResultScalar(bool isScalar); virtual void setResultRaw(unsigned len, const void *xml, WUResultFormat format); virtual void setResultFetchSize(unsigned rows); // 0 means file-loaded virtual void setResultTotalRowCount(__int64 rows); // -1 means unknown virtual void setResultRowCount(__int64 rows); virtual void setResultDataset(const char *ecl, const char *defs); virtual void setResultLogicalName(const char *logicalName); virtual void setResultKeyField(const char * name); virtual void setResultRequestedRows(unsigned req); virtual void setResultRecordSizeEntry(const char * val); virtual void setResultTransformerEntry(const char * val); virtual void setResultInt(__int64 val); virtual void setResultReal(double val); virtual void setResultBool(bool val); virtual void setResultString(const char * val, unsigned length); virtual void setResultUnicode(const void * val, unsigned length); virtual void setResultData(const void * val, unsigned length); virtual void setResultDecimal(const void * val, unsigned length); virtual void addResultRaw(unsigned len, const void * data, WUResultFormat format); virtual void setResultRowLimit(__int64 value); virtual void setResultFilename(const char * name); virtual void setResultUInt(unsigned __int64 val); virtual void setResultIsAll(bool value); virtual void setResultFormat(WUResultFormat format); virtual void setResultXML(const char *val); }; class CLocalWUPlugin : public CInterface, implements IWUPlugin { Owned p; public: IMPLEMENT_IINTERFACE; CLocalWUPlugin(IPropertyTree *p); virtual IStringVal& getPluginName(IStringVal &str) const; virtual IStringVal& getPluginVersion(IStringVal &str) const; virtual bool getPluginThor() const; virtual bool getPluginHole() const; virtual void setPluginName(const char *str); virtual void setPluginVersion(const char *str); virtual void setPluginThor(bool on); virtual void setPluginHole(bool on); }; class CLocalWULibrary : public CInterface, implements IWULibrary { Owned p; public: IMPLEMENT_IINTERFACE; CLocalWULibrary(IPropertyTree *p); virtual IStringVal & getName(IStringVal & str) const; virtual IConstWULibraryActivityIterator * getActivities() const; virtual void setName(const char * str); virtual void addActivity(unsigned id); }; class CLocalWUGraph : public CInterface, implements IWUGraph { Owned p; mutable Linked progress; StringAttr wuid; void mergeProgress(IPropertyTree &tree, IPropertyTree &progressTree, const unsigned &progressV) const; public: IMPLEMENT_IINTERFACE; CLocalWUGraph(IPropertyTree *p, const char *wuid); virtual IStringVal & getXGMML(IStringVal & ret, bool mergeProgress) const; virtual IStringVal & getDOT(IStringVal & ret) const; virtual IStringVal & getName(IStringVal & ret) const; virtual IStringVal & getLabel(IStringVal & ret) const; virtual IStringVal & getTypeName(IStringVal & ret) const; virtual WUGraphType getType() const; virtual IPropertyTree * getXGMMLTree(bool mergeProgress) const; virtual bool isValid() const; virtual void setName(const char *str); virtual void setType(WUGraphType type); virtual void setXGMML(const char *str); virtual void setXGMMLTree(IPropertyTree * tree); }; class CLocalWUActivity : public CInterface, implements IWUActivity { Owned p; public: IMPLEMENT_IINTERFACE; CLocalWUActivity(IPropertyTree *p, __int64 id = 0); virtual __int64 getId() const; virtual unsigned getKind() const; virtual IStringVal & getHelper(IStringVal & ret) const; virtual void setKind(unsigned id); virtual void setHelper(const char * str); }; class CLocalWUException : public CInterface, implements IWUException { Owned p; public: IMPLEMENT_IINTERFACE; CLocalWUException(IPropertyTree *p); virtual IStringVal& getExceptionSource(IStringVal &str) const; virtual IStringVal& getExceptionMessage(IStringVal &str) const; virtual unsigned getExceptionCode() const; virtual WUExceptionSeverity getSeverity() const; virtual IStringVal & getTimeStamp(IStringVal & dt) const; virtual IStringVal & getExceptionFileName(IStringVal & str) const; virtual unsigned getExceptionLineNo() const; virtual unsigned getExceptionColumn() const; virtual void setExceptionSource(const char *str); virtual void setExceptionMessage(const char *str); virtual void setExceptionCode(unsigned code); virtual void setSeverity(WUExceptionSeverity level); virtual void setTimeStamp(const char * dt); virtual void setExceptionFileName(const char *str); virtual void setExceptionLineNo(unsigned r); virtual void setExceptionColumn(unsigned c); }; //========================================================================================== struct mapEnums { int val; const char *str; }; const char *getEnumText(int value, mapEnums *map) { const char *defval = map->str; while (map->str) { if (value==map->val) return map->str; map++; } assertex(!"Unexpected value in setEnum"); return defval; } void setEnum(IPropertyTree *p, const char *propname, int value, mapEnums *map) { const char *defval = map->str; while (map->str) { if (value==map->val) { p->setProp(propname, map->str); return; } map++; } assertex(!"Unexpected value in setEnum"); p->setProp(propname, defval); } static int getEnum(const char *v, mapEnums *map) { if (v) { while (map->str) { if (stricmp(v, map->str)==0) return map->val; map++; } assertex(!"Unexpected value in getEnum"); } return 0; } static int getEnum(IPropertyTree *p, const char *propname, mapEnums *map) { return getEnum(p->queryProp(propname),map); } //========================================================================================== class CConstWUArrayIterator : public CInterface, implements IConstWorkUnitIterator { IArrayOf w; CArrayIteratorOf it; public: IMPLEMENT_IINTERFACE; CConstWUArrayIterator(IRemoteConnection *conn, IArrayOf &trees, ISecManager *secmgr=NULL, ISecUser *secuser=NULL) : it(w) { ForEachItemIn(i,trees) { IPropertyTree &tree = trees.item(i); tree.Link(); w.append(*(IConstWorkUnit *) new CLocalWorkUnit(LINK(conn), &tree, secmgr, secuser)); } } bool first() { return it.first(); } bool isValid() { return it.isValid(); } bool next() { return it.next(); } IConstWorkUnit & query() { return it.query();} }; //========================================================================================== class CStringArrayIterator : public CInterface, implements IStringIterator { StringArray strings; unsigned idx; public: IMPLEMENT_IINTERFACE; CStringArrayIterator() { idx = 0; }; void append(const char *str) { strings.append(str); } virtual bool first() { idx = 0; return strings.isItem(idx); } virtual bool next() { idx ++; return strings.isItem(idx); } virtual bool isValid() { return strings.isItem(idx); } virtual IStringVal & str(IStringVal &s) { s.set(strings.item(idx)); return s; } }; class CCachedJobNameIterator : public CInterface, implements IStringIterator { Owned it; public: IMPLEMENT_IINTERFACE; CCachedJobNameIterator(IPropertyTreeIterator *p) : it(p) {}; virtual bool first() { return it->first(); } virtual bool next() { return it->next(); } virtual bool isValid() { return it->isValid(); } virtual IStringVal & str(IStringVal &s) { s.set(it->query().queryName()+1); return s; } }; class CEmptyStringIterator : public CInterface, implements IStringIterator { public: IMPLEMENT_IINTERFACE; virtual bool first() { return false; } virtual bool next() { return false; } virtual bool isValid() { return false; } virtual IStringVal & str(IStringVal &s) { s.clear(); return s; } }; mapEnums sortFields[] = { { WUSFuser, "@submitID" }, { WUSFcluster, "@clusterName" }, { WUSFjob, "@jobName" }, { WUSFstate, "@state" }, { WUSFpriority, "@priorityClass" }, { WUSFprotected, "@protected" }, { WUSFwuid, "@" }, { WUSFfileread, "FilesRead/File/@name" }, { WUSFroxiecluster, "RoxieQueryInfo/@roxieClusterName" }, { WUSFbatchloginid, "Application/Dispatcher/FTPUserID" }, { WUSFbatchcustomername, "Application/Dispatcher/CustomerName" }, { WUSFbatchpriority, "Application/Dispatcher/JobPriority" }, { WUSFbatchinputreccount, "Application/Dispatcher/InputRecords" }, { WUSFbatchtimeuploaded, "Application/Dispatcher/TimeUploaded" }, { WUSFbatchtimecompleted, "Application/Dispatcher/TimeCompleted" }, { WUSFbatchmachine, "Application/Dispatcher/Machine" }, { WUSFbatchinputfile, "Application/Dispatcher/InputFileName" }, { WUSFbatchoutputfile, "Application/Dispatcher/OutputFileName" }, { WUSFtotalthortime, "Timings/Timing[@name='Total thor time']/@duration" }, { WUSFterm, NULL } }; class asyncRemoveDllWorkItem: public CInterface, implements IWorkQueueItem // class only used in asyncRemoveDll { StringAttr name; unsigned version; bool removeDlls; bool removeDirectory; public: IMPLEMENT_IINTERFACE; asyncRemoveDllWorkItem(const char * _name, bool _removeDlls, bool _removeDirectory) : name(_name) { removeDlls = _removeDlls; removeDirectory = _removeDirectory; } void execute() { PROGLOG("WU removeDll %s",name.get()); queryDllServer().removeDll(name,removeDlls,removeDirectory); } }; class CWorkUnitFactory : public CInterface, implements IWorkUnitFactory, implements IDaliClientShutdown { Owned deletedllworkq; public: IMPLEMENT_IINTERFACE; CWorkUnitFactory() { // Assumes dali client configuration has already been done sdsManager = &querySDS(); session = myProcessSession(); deletedllworkq.setown(createWorkQueueThread()); addShutdownHook(*this); } ~CWorkUnitFactory() { removeShutdownHook(*this); // deletepool->joinAll(); } void clientShutdown(); SessionId startSession() { // Temporary placeholder until startSession is implemented #ifdef _WIN32 return GetTickCount(); #else struct timeval tm; gettimeofday(&tm,NULL); return tm.tv_usec; #endif } IWorkUnit* ensureNamedWorkUnit(const char *name) { if (workUnitTraceLevel > 1) PrintLog("ensureNamedWorkUnit created %s", name); StringBuffer wuRoot; getXPath(wuRoot, name); IRemoteConnection* conn = sdsManager->connect(wuRoot.str(), session, RTM_LOCK_WRITE|RTM_CREATE_QUERY, SDS_LOCK_TIMEOUT); conn->queryRoot()->setProp("@xmlns:xsi", "http://www.w3.org/1999/XMLSchema-instance"); Owned cw = new CLocalWorkUnit(conn, (ISecManager *)NULL, NULL, (const char *)NULL); return &cw->lockRemote(false); } virtual IWorkUnit* createNamedWorkUnit(const char *wuid,const char *parentWuid, const char *app, const char *user) { StringBuffer wuRoot; getXPath(wuRoot, wuid); IRemoteConnection *conn; if (queryDaliServerVersion().compare("2.0") >= 0) conn = sdsManager->connect(wuRoot.str(), session, RTM_LOCK_WRITE|RTM_CREATE_UNIQUE, SDS_LOCK_TIMEOUT); else conn = sdsManager->connect(wuRoot.str(), session, RTM_LOCK_WRITE|RTM_CREATE, SDS_LOCK_TIMEOUT); conn->queryRoot()->setProp("@xmlns:xsi", "http://www.w3.org/1999/XMLSchema-instance"); Owned cw = new CLocalWorkUnit(conn, (ISecManager*)NULL, NULL, parentWuid); IWorkUnit* ret = &cw->lockRemote(false); ret->setDebugValue("CREATED_BY", app, true); ret->setDebugValue("CREATED_FOR", user, true); if (user) cw->setWuScope(user); return ret; } virtual IWorkUnit* createWorkUnit(const char *parentWuid, const char *app, const char *user) { StringBuffer wuid("W"); char result[32]; time_t ltime; time( <ime ); tm *today = localtime( <ime ); // MORE - this is not threadsafe. But I probably don't care that much! strftime(result, sizeof(result), "%Y%m%d-%H%M%S", today); wuid.append(result); if (queryDaliServerVersion().compare("2.0") < 0) wuid.append('-').append(startSession()); if (workUnitTraceLevel > 1) PrintLog("createWorkUnit created %s", wuid.str()); IWorkUnit* ret = createNamedWorkUnit(wuid.str(),parentWuid, app, user); if (workUnitTraceLevel > 1) { SCMStringBuffer wuidName; ret->getWuid(wuidName); PrintLog("createWorkUnit created %s", wuidName.str()); } ret->addTimeStamp("workunit", NULL, "Created"); return ret; } bool secDeleteWorkUnit(const char * wuid, ISecManager *secmgr, ISecUser *secuser, bool raiseexceptions) { if (workUnitTraceLevel > 1) PrintLog("deleteWorkUnit %s", wuid); StringBuffer wuRoot; getXPath(wuRoot, wuid); IRemoteConnection *conn = sdsManager->connect(wuRoot.str(), session, RTM_LOCK_WRITE|RTM_LOCK_SUB, SDS_LOCK_TIMEOUT); if (!conn) { if (workUnitTraceLevel > 0) PrintLog("deleteWorkUnit %s not found", wuid); return false; } Owned cw = new CLocalWorkUnit(conn, secmgr, secuser); // takes ownership of conn if (secmgr && !checkWuSecAccess(*cw.get(), *secmgr, secuser, SecAccess_Full, "delete", true, true)) { if (raiseexceptions) { // perhaps raise exception here? } return false; } if (raiseexceptions) { try { cw->cleanupAndDelete(true,true); } catch (IException *E) { StringBuffer s; LOG(MCexception(E, MSGCLS_warning), E, s.append("Exception during deleteWorkUnit: ").append(wuid).str()); E->Release(); return false; } } else cw->cleanupAndDelete(true,true); return true; } virtual bool deleteWorkUnitEx(const char * wuid) { return secDeleteWorkUnit(wuid,NULL,NULL,true); } virtual bool deleteWorkUnit(const char * wuid) { return secDeleteWorkUnit(wuid,NULL,NULL,false); } virtual IConstWorkUnitIterator * getWorkUnitsByOwner(const char * owner) { // Not sure what to do about customerID vs user etc StringBuffer path("*"); if (owner && *owner) path.append("[@submitID=\"").append(owner).append("\"]"); return getWorkUnitsByXPath(path.str()); } virtual IConstWorkUnitIterator * getWorkUnitsByState(WUState state); virtual IConstWorkUnitIterator * getWorkUnitsByECL(const char* ecl) { StringBuffer path("*"); if (ecl && *ecl) path.append("[Query/Text=~\"*").append(ecl).append("*\"]"); return getWorkUnitsByXPath(path.str()); } virtual IConstWorkUnitIterator * getWorkUnitsByCluster(const char* cluster) { StringBuffer path("*"); if (cluster && *cluster) path.append("[@clusterName=\"").append(cluster).append("\"]"); return getWorkUnitsByXPath(path.str()); } virtual IConstWorkUnitIterator * getChildWorkUnits(const char *parent) { StringBuffer path("*[@parent=\""); path.append(parent).append("\"]"); return getWorkUnitsByXPath(path.str()); } virtual IConstWorkUnit* secOpenWorkUnit(const char *wuid, bool lock, ISecManager *secmgr=NULL, ISecUser *secuser=NULL) { if (workUnitTraceLevel > 1) PrintLog("openWorkUnit %s", wuid); StringBuffer wuRoot; getXPath(wuRoot, wuid); IRemoteConnection* conn = sdsManager->connect(wuRoot.str(), session, lock ? RTM_LOCK_READ|RTM_LOCK_SUB : 0, SDS_LOCK_TIMEOUT); if (conn) { CLocalWorkUnit *wu = new CLocalWorkUnit(conn, secmgr, secuser); if (secmgr && wu) { if (!checkWuSecAccess(*wu, *secmgr, secuser, SecAccess_Read, "opening", true, true)) { delete wu; return NULL; } } return wu; } else { if (workUnitTraceLevel > 0) PrintLog("openWorkUnit %s not found", wuid); return NULL; } } virtual IConstWorkUnit* openWorkUnit(const char *wuid, bool lock) { return secOpenWorkUnit(wuid, lock); } virtual IWorkUnit* secUpdateWorkUnit(const char *wuid, ISecManager *secmgr=NULL, ISecUser *secuser=NULL) { if (workUnitTraceLevel > 1) PrintLog("updateWorkUnit %s", wuid); StringBuffer wuRoot; getXPath(wuRoot, wuid); IRemoteConnection* conn = sdsManager->connect(wuRoot.str(), session, RTM_LOCK_WRITE|RTM_LOCK_SUB, SDS_LOCK_TIMEOUT); if (conn) { Owned cw = new CLocalWorkUnit(conn, secmgr, secuser); if (secmgr && cw) { if (!checkWuSecAccess(*cw.get(), *secmgr, secuser, SecAccess_Write, "updating", true, true)) return NULL; } return &cw->lockRemote(false); } else { if (workUnitTraceLevel > 0) PrintLog("updateWorkUnit %s not found", wuid); return NULL; } } virtual IWorkUnit* updateWorkUnit(const char *wuid) { return secUpdateWorkUnit(wuid); } virtual int setTracingLevel(int newLevel) { if (newLevel) PrintLog("Setting workunit trace level to %d", newLevel); int level = workUnitTraceLevel; workUnitTraceLevel = newLevel; return level; } IConstWorkUnitIterator * getWorkUnitsByXPath(const char *xpath) { return getWorkUnitsByXPath(xpath,NULL,NULL); } void descheduleAllWorkUnits() { Owned conn = querySDS().connect("/Schedule", myProcessSession(), RTM_LOCK_WRITE, SDS_LOCK_TIMEOUT); if(!conn) return; Owned root(conn->queryRoot()->getBranch(".")); KeptAtomTable entries; Owned iter(root->getElements("*/*/*/*")); StringBuffer wuid; for(iter->first(); iter->isValid(); iter->next()) { char const * entry = iter->query().queryName(); if(!entries.find(entry)) { entries.addAtom(entry); ncnameUnescape(entry, wuid.clear()); Owned wu = updateWorkUnit(wuid); if(wu && (wu->getState() == WUStateWait)) wu->setState(WUStateCompleted); } } bool more; do more = root->removeProp("*"); while(more); } IConstWorkUnitIterator * getWorkUnitsByXPath(const char *xpath, ISecManager *secmgr, ISecUser *secuser) { Owned conn = sdsManager->connect("/WorkUnits", session, 0, SDS_LOCK_TIMEOUT); if (conn) { CDaliVersion serverVersionNeeded("3.2"); Owned iter(queryDaliServerVersion().compare(serverVersionNeeded) < 0 ? conn->queryRoot()->getElements(xpath) : conn->getElements(xpath)); return new CConstWUIterator(conn, NULL, iter, secmgr, secuser); } else return NULL; } IConstWorkUnitIterator* getWorkUnitsSorted( WUSortField *sortorder, // list of fields to sort by (terminated by WUSFterm) WUSortField *filters, // NULL or list of fields to folteron (terminated by WUSFterm) const void *filterbuf, // (appended) string values for filters unsigned startoffset, unsigned maxnum, const char *queryowner, __int64 *cachehint, ISecManager *secmgr, ISecUser *secuser) { class cScopeChecker: implements ISortedElementsTreeFilter { UniqueScopes done; ISecManager *secmgr; ISecUser *secuser; public: cScopeChecker(ISecManager *_secmgr,ISecUser *_secuser) { secmgr = _secmgr; secuser = _secuser; } bool isOK(IPropertyTree &tree) { const char *scopename = tree.queryProp("@scope"); if (!scopename||!*scopename) return true; const bool *b = done.getValue(scopename); if (b) return *b; bool ret = checkWuScopeSecAccess(scopename,*secmgr,secuser,SecAccess_Read,"iterating",false,false); done.setValue(scopename,ret); return ret; } } sc(secmgr,secuser); StringBuffer query("*"); StringBuffer so; StringAttr namefilterlo; StringAttr namefilterhi; if (filters) { const char *fv = (const char *)filterbuf; for (unsigned i=0;filters[i]!=WUSFterm;i++) { int fmt = filters[i]; int subfmt = (fmt&0xff); if (subfmt==WUSFwuid) namefilterlo.set(fv); else if (subfmt==WUSFwuidhigh) namefilterhi.set(fv); else { query.append('[').append(getEnumText(subfmt,sortFields)).append('='); if (fmt&WUSFnocase) query.append('?'); if (fmt&WUSFwild) query.append('~'); query.append('"').append(fv).append("\"]"); } fv = fv + strlen(fv)+1; } } if (sortorder) { for (unsigned i=0;sortorder[i]!=WUSFterm;i++) { if (so.length()) so.append(','); int fmt = sortorder[i]; if (fmt&WUSFreverse) so.append('-'); if (fmt&WUSFnocase) so.append('~'); if (fmt&WUSFnumeric) so.append('#'); so.append(getEnumText(fmt&0xff,sortFields)); } } IArrayOf results; Owned conn=getElementsPaged( "WorkUnits", query.str(), so.length()?so.str():NULL,startoffset,maxnum, secmgr?&sc:NULL,queryowner,cachehint,namefilterlo.get(),namefilterhi.get(),results); return new CConstWUArrayIterator(conn, results, secmgr, secuser); } IConstWorkUnitIterator* getWorkUnitsSorted( WUSortField *sortorder, // list of fields to sort by (terminated by WUSFterm) WUSortField *filters, // NULL or list of fields to filter on (terminated by WUSFterm) const void *filterbuf, // (appended) string values for filters unsigned startoffset, unsigned maxnum, const char *queryowner, __int64 *cachehint) { return getWorkUnitsSorted(sortorder,filters,filterbuf,startoffset,maxnum,queryowner,cachehint, NULL, NULL); } virtual unsigned numWorkUnits() { Owned conn = sdsManager->connect("/WorkUnits", session, 0, SDS_LOCK_TIMEOUT); if (!conn) return 0; IPropertyTree *root = conn->queryRoot(); return root->numChildren(); } virtual unsigned numWorkUnitsFiltered(WUSortField *filters, const void *filterbuf, ISecManager *secmgr, ISecUser *secuser) { Owned iter = getWorkUnitsSorted( NULL,filters,filterbuf,0,0x7fffffff,NULL,NULL,secmgr,secuser); // this is rather slow but necessarily so (for security check) unsigned ret = 0; ForEach(*iter) ret++; return ret; } virtual unsigned numWorkUnitsFiltered(WUSortField *filters,const void *filterbuf) { if (!filters) return numWorkUnits(); return numWorkUnitsFiltered(filters,filterbuf,NULL,NULL); } void asyncRemoveDll(const char * name, bool removeDlls, bool removeDirectory) { deletedllworkq->post(new asyncRemoveDllWorkItem(name,removeDlls,removeDirectory)); } ISDSManager *sdsManager; SessionId session; ISecManager *secMgr; private: void deleteChildren(IPropertyTree *root, const char *wuid) { StringBuffer kids("*[@parent=\""); kids.append(wuid).append("\"]"); Owned it = root->getElements(kids.str()); ForEach (*it) { deleteChildren(root, it->query().queryName()); } root->removeProp(wuid); } class CConstWUIterator : public CInterface, implements IConstWorkUnitIterator { IArrayOf w; CArrayIteratorOf it; public: IMPLEMENT_IINTERFACE; CConstWUIterator() : it(w) { } CConstWUIterator(IRemoteConnection *conn, IPropertyTree *, IPropertyTreeIterator *_it, ISecManager *secmgr=NULL, ISecUser *secuser=NULL) : it(w) { UniqueScopes us; Owned scopes; if (secmgr /* && secmgr->authTypeRequired(RT_WORKUNIT_SCOPE) tbd */) { scopes.setown(secmgr->createResourceList("wuscopes")); for (_it->first(); _it->isValid(); _it->next()) { const char *scopename = _it->query().queryProp("@scope"); if (scopename && *scopename && !us.getValue(scopename)) { scopes->addResource(scopename); us.setValue(scopename, true); } } if (scopes->count()) { secmgr->authorizeEx(RT_WORKUNIT_SCOPE, *secuser, scopes); if (checkWuScopeListSecAccess(NULL, scopes, SecAccess_Read, "iterating", false, false)) scopes.clear(); //if no scopes restricted, no need to check later } else scopes.clear(); } for (_it->first(); _it->isValid(); _it->next()) { IPropertyTree *rp = &_it->query(); const char *scopename=rp->queryProp("@scope"); if (!scopename || !*scopename || !scopes || checkWuScopeListSecAccess(rp->queryProp("@scope"), scopes, SecAccess_Read, "iterating", false, false)) w.append(*(IConstWorkUnit *) new CLocalWorkUnit(LINK(conn), LINK(rp), secmgr, secuser)); } } bool first() { return it.first(); } bool isValid() { return it.isValid(); } bool next() { return it.next(); } IConstWorkUnit & query() { return it.query();} }; IRemoteConnection* connect(const char *xpath, unsigned flags) { return sdsManager->connect(xpath, session, flags, SDS_LOCK_TIMEOUT); } }; static Owned factory; void CWorkUnitFactory::clientShutdown() { factory.clear(); } void clientShutdownWorkUnit() { factory.clear(); } extern WORKUNIT_API IWorkUnitFactory * getWorkUnitFactory() { if (!factory) factory.setown(new CWorkUnitFactory()); return factory.getLink(); } class CSecureWorkUnitFactory : public CInterface, implements IWorkUnitFactory { public: IMPLEMENT_IINTERFACE; CSecureWorkUnitFactory(ISecManager &secmgr, ISecUser &secuser) { if (!factory) factory.setown(new CWorkUnitFactory()); secMgr.set(&secmgr); secUser.set(&secuser); } virtual IWorkUnit* createNamedWorkUnit(const char *wuid,const char *parentWuid, const char *app, const char *user) { checkWuScopeSecAccess(user, *secMgr.get(), secUser.get(), SecAccess_Write, "Create", true, true); IWorkUnit *wu=factory->createNamedWorkUnit(wuid, parentWuid, app, user); if (wu) { CLockedWorkUnit* lw = dynamic_cast(wu); if (lw) lw->setSecIfcs(secMgr.get(), secUser.get()); } return wu; } virtual IWorkUnit* createWorkUnit(const char *parentWuid, const char *app, const char *user) { checkWuScopeSecAccess(user, *secMgr.get(), secUser.get(), SecAccess_Write, "Create", true, true); IWorkUnit *wu=factory->createWorkUnit(parentWuid, app, user); if (wu) { CLockedWorkUnit* lw = dynamic_cast(wu); if (lw) lw->setSecIfcs(secMgr.get(), secUser.get()); } return wu; } virtual bool deleteWorkUnitEx(const char * wuid) { return factory->secDeleteWorkUnit(wuid, secMgr.get(), secUser.get(), true); } virtual bool deleteWorkUnit(const char * wuid) { return factory->secDeleteWorkUnit(wuid, secMgr.get(), secUser.get(), false); } virtual IConstWorkUnit* openWorkUnit(const char *wuid, bool lock) { return factory->secOpenWorkUnit(wuid, lock, secMgr.get(), secUser.get()); } virtual IWorkUnit* updateWorkUnit(const char *wuid) { return factory->secUpdateWorkUnit(wuid, secMgr.get(), secUser.get()); } //make cached workunits a non secure pass through for now. virtual IConstWorkUnitIterator * getWorkUnitsByOwner(const char * owner) { // Not sure what to do about customerID vs user etc StringBuffer path("*"); if (owner && *owner) path.append("[@submitID=\"").append(owner).append("\"]"); return factory->getWorkUnitsByXPath(path.str(), secMgr.get(), secUser.get()); } virtual IConstWorkUnitIterator * getWorkUnitsByState(WUState state); virtual IConstWorkUnitIterator * getWorkUnitsByECL(const char* ecl) { return factory->getWorkUnitsByECL(ecl); } virtual IConstWorkUnitIterator * getWorkUnitsByCluster(const char* cluster) { return factory->getWorkUnitsByCluster(cluster); } virtual IConstWorkUnitIterator * getWorkUnitsByXPath(const char * xpath) { return factory->getWorkUnitsByXPath(xpath, secMgr.get(), secUser.get()); } virtual void descheduleAllWorkUnits() { factory->descheduleAllWorkUnits(); } virtual IConstWorkUnitIterator * getChildWorkUnits(const char *parent) { StringBuffer path("*[@parent=\""); path.append(parent).append("\"]"); return factory->getWorkUnitsByXPath(path.str(), secMgr.get(), secUser.get()); } virtual int setTracingLevel(int newLevel) { return factory->setTracingLevel(newLevel); } virtual IConstWorkUnitIterator* getWorkUnitsSorted( WUSortField *sortorder, // list of fields to sort by (terminated by WUSFterm) WUSortField *filters, // NULL or list of fields to filter on (terminated by WUSFterm) const void *filterbuf, // (appended) string values for filters unsigned startoffset, unsigned maxnum, const char *queryowner, __int64 *cachehint) { return factory->getWorkUnitsSorted(sortorder,filters,filterbuf,startoffset,maxnum,queryowner,cachehint, secMgr.get(), secUser.get()); } virtual unsigned numWorkUnits() { return factory->numWorkUnits(); } virtual unsigned numWorkUnitsFiltered(WUSortField *filters, const void *filterbuf) { return factory->numWorkUnitsFiltered(filters,filterbuf,secMgr.get(), secUser.get()); } private: Owned base_factory; Owned secMgr; Owned secUser; }; extern WORKUNIT_API IWorkUnitFactory * getSecWorkUnitFactory(ISecManager &secmgr, ISecUser &secuser) { return new CSecureWorkUnitFactory(secmgr, secuser); } //========================================================================================== class CStringPTreeIterator : public CInterface, implements IStringIterator { Owned it; public: IMPLEMENT_IINTERFACE; CStringPTreeIterator(IPropertyTreeIterator *p) : it(p) {}; virtual bool first() { return it->first(); } virtual bool next() { return it->next(); } virtual bool isValid() { return it->isValid(); } virtual IStringVal & str(IStringVal &s) { s.set(it->query().queryProp(NULL)); return s; } }; class CStringPTreeTagIterator : public CInterface, implements IStringIterator { Owned it; public: IMPLEMENT_IINTERFACE; CStringPTreeTagIterator(IPropertyTreeIterator *p) : it(p) {}; virtual bool first() { return it->first(); } virtual bool next() { return it->next(); } virtual bool isValid() { return it->isValid(); } virtual IStringVal & str(IStringVal &s) { s.set(it->query().queryName()); return s; } }; class CStringPTreeAttrIterator : public CInterface, implements IStringIterator { Owned it; StringAttr name; public: IMPLEMENT_IINTERFACE; CStringPTreeAttrIterator(IPropertyTreeIterator *p, const char *_name) : it(p), name(_name) {}; virtual bool first() { return it->first(); } virtual bool next() { return it->next(); } virtual bool isValid() { return it->isValid(); } virtual IStringVal & str(IStringVal &s) { s.set(it->query().queryProp(name)); return s; } }; //========================================================================================== CLocalWorkUnit::CLocalWorkUnit(IRemoteConnection *_conn, ISecManager *secmgr, ISecUser *secuser, const char *parentWuid) : connection(_conn) { connectAtRoot = true; init(); p.setown(connection->getRoot()); if (parentWuid) p->setProp("@parent", parentWuid); secMgr.set(secmgr); secUser.set(secuser); } CLocalWorkUnit::CLocalWorkUnit(IRemoteConnection *_conn, IPropertyTree* root, ISecManager *secmgr, ISecUser *secuser) : connection(_conn) { connectAtRoot = false; init(); p.setown(root); secMgr.set(secmgr); secUser.set(secuser); } void CLocalWorkUnit::init() { p.clear(); cachedGraphs.clear(); workflowIterator.clear(); query.clear(); graphs.kill(); results.kill(); variables.kill(); plugins.kill(); libraries.kill(); activities.kill(); exceptions.kill(); temporaries.kill(); roxieQueryInfo.clear(); webServicesInfo.clear(); workflowIteratorCached = false; resultsCached = false; temporariesCached = false; variablesCached = false; exceptionsCached = false; pluginsCached = false; librariesCached = false; activitiesCached = false; webServicesInfoCached = false; roxieQueryInfoCached = false; dirty = false; abortDirty = true; abortState = false; } // Dummy workunit support CLocalWorkUnit::CLocalWorkUnit(const char *_wuid, const char *parentWuid, ISecManager *secmgr, ISecUser *secuser) { connectAtRoot = true; init(); p.setown(createPTree(_wuid)); p->setProp("@xmlns:xsi", "http://www.w3.org/1999/XMLSchema-instance"); if (parentWuid) p->setProp("@parentWuid", parentWuid); secMgr.set(secmgr); secUser.set(secuser); } CLocalWorkUnit::~CLocalWorkUnit() { if (workUnitTraceLevel > 1) { PrintLog("Releasing workunit %s mode %x", p->queryName(), connection ? connection->queryMode() :0); } try { unsubscribe(); query.clear(); webServicesInfo.clear(); roxieQueryInfo.clear(); workflowIterator.clear(); activities.kill(); plugins.kill(); libraries.kill(); exceptions.kill(); graphs.kill(); results.kill(); temporaries.kill(); variables.kill(); timestamps.kill(); appvalues.kill(); userDesc.clear(); secMgr.clear(); secUser.clear(); cachedGraphs.clear(); p.clear(); connection.clear(); } catch (IException *E) { LOG(MCexception(E, MSGCLS_warning), E, "Exception during ~CLocalWorkUnit"); E->Release(); } } void CLocalWorkUnit::cleanupAndDelete(bool deldll,bool deleteOwned) { TIME_SECTION("WUDELETE cleanupAndDelete total"); // Delete any related things in SDS etc that might otherwise be forgotten assertex(connectAtRoot); // make sure we don't delete entire workunit tree! if (p->getPropBool("@protected", false)) throw MakeStringException(WUERR_WorkunitProtected, "%s: Workunit is protected",p->queryName()); switch (getState()) { case WUStateAborted: case WUStateCompleted: case WUStateFailed: case WUStateArchived: break; case WUStateCompiled: if (getAction()==WUActionRun || getAction()==WUActionUnknown) throw MakeStringException(WUERR_WorkunitActive, "%s: Workunit is active",p->queryName()); break; case WUStateWait: throw MakeStringException(WUERR_WorkunitScheduled, "%s: Workunit is scheduled, not deleting",p->queryName()); default: throw MakeStringException(WUERR_WorkunitActive, "%s: Workunit is active",p->queryName()); break; } try { //Move any service aliases if (getIsQueryService()) { Owned registry = getQueryRegistry(p->queryProp("@clusterName"), false); if (registry) removeWuidFromNamedQueries(registry, p->queryName()); } if (deldll && !p->getPropBool("@isClone", false)) { Owned q = getQuery(); if (q) { Owned iter = &q->getAssociatedFiles(); SCMStringBuffer name; ForEach(*iter) { IConstWUAssociatedFile & cur = iter->query(); cur.getName(name); bool removeDir = (cur.getType() == FileTypeDll); // this is to keep the code the same as before, but I don't know why it only does it for the dll. factory->asyncRemoveDll(name.str(), true, removeDir); } } } StringBuffer apath; // PROGLOG("wuid dll files removed"); { apath.append("/WorkUnitAborts/").append(p->queryName()); Owned acon = factory->sdsManager->connect(apath.str(), factory->session, RTM_LOCK_WRITE | RTM_DELETE_ON_DISCONNECT, SDS_LOCK_TIMEOUT); acon.clear(); } // PROGLOG("wuid WorkUnitAborts entry removed"); deleteTempFiles(NULL, deleteOwned, true); // all, any remaining. } catch(IException *E) { StringBuffer s; LOG(MCexception(E, MSGCLS_warning), E, s.append("Exception during cleanupAndDelete: ").append(p->queryName()).str()); E->Release(); } catch (...) { WARNLOG("Unknown exception during cleanupAndDelete: %s", p->queryName()); } CConstGraphProgress::deleteWuidProgress(p->queryName()); connection->close(true); PROGLOG("WUID %s removed",p->queryName()); connection.clear(); } void CLocalWorkUnit::setTimeScheduled(const IDateTime &val) { SCMStringBuffer strval; val.getGmtString(strval); p->setProp("@timescheduled",strval.str()); } IDateTime & CLocalWorkUnit::getTimeScheduled(IDateTime &val) const { StringBuffer str; p->getProp("@timescheduled",str); if(str.length()) val.setGmtString(str.str()); return val; } bool modifyAndWriteWorkUnitXML(char const * wuid, StringBuffer & buf, StringBuffer & extra, IFileIO * fileio) { // kludge in extra chunks of XML such as GraphProgress and GeneratedDlls if(extra.length()) { size32_t l = (size32_t)strlen(wuid); size32_t p = buf.length()-l-4; // bit of a kludge assertex(memcmp(buf.str()+p+2,wuid,l)==0); StringAttr tail(buf.str()+p); buf.setLength(p); buf.append(extra); buf.append(tail); } return (fileio->write(0,buf.length(),buf.str()) == buf.length()); } bool CLocalWorkUnit::archiveWorkUnit(const char *base,bool del,bool ignoredllerrors,bool deleteOwned) { CriticalBlock block(crit); StringBuffer path(base); if (!p) return false; const char *wuid = p->queryName(); if (!wuid||!*wuid) return false; addPathSepChar(path).append(wuid).append(".xml"); Owned file = createIFile(path.str()); if (!file) return false; Owned fileio = file->open(IFOcreate); if (!fileio) return false; StringBuffer buf; exportWorkUnitToXML(this, buf); StringBuffer extraWorkUnitXML; StringBuffer xpath("/GraphProgress/"); xpath.append(wuid); Owned conn = querySDS().connect(xpath.str(), myProcessSession(), RTM_LOCK_WRITE, SDS_LOCK_TIMEOUT); if (conn) { Owned tmp = createPTree("GraphProgress"); mergePTree(tmp,conn->queryRoot()); toXML(tmp,extraWorkUnitXML,1,XML_Format); conn->close(); } Owned q = getQuery(); if (!q) { if(!modifyAndWriteWorkUnitXML(wuid, buf, extraWorkUnitXML, fileio)) return false; if (del) { if (getState()==WUStateUnknown) setState(WUStateArchived); // to allow delete cleanupAndDelete(false,deleteOwned); // no query, may as well delete } return false; } Owned iter = &q->getAssociatedFiles(); SCMStringBuffer name; Owned exception; Owned loc; StringBuffer dst, locpath; Owned generatedDlls = createPTree("GeneratedDlls"); ForEach(*iter) { IConstWUAssociatedFile & cur = iter->query(); cur.getName(name); if (name.length()) { Owned entry = queryDllServer().getEntry(name.str()); if (entry.get()) { Owned generatedDllBranch = createPTree(); generatedDllBranch->setProp("@name", entry->queryName()); generatedDllBranch->setProp("@kind", entry->queryKind()); bool removeDllFiles = true; exception.clear(); try { loc.setown(entry->getBestLocation()); //throws exception if no readable locations } catch(IException * e) { exception.setown(e); loc.setown(entry->getBestLocationCandidate()); //this will be closest of the unreadable locations } RemoteFilename filename; loc->getDllFilename(filename); if(!exception) { Owned srcfile = createIFile(filename); addPathSepChar(dst.clear().append(base)); filename.getTail(dst); Owned dstfile = createIFile(dst.str()); try { copyFile(dstfile,srcfile); makeAbsolutePath(dstfile->queryFilename(), locpath.clear()); } catch(IException * e) { exception.setown(e); } } if(exception) { if(ignoredllerrors) { EXCLOG(exception.get(), "archiveWorkUnit (copying associated file)"); //copy failed, so store original (best) location and don't delete the files filename.getRemotePath(locpath.clear()); removeDllFiles = false; } else { throw exception.getLink(); } } generatedDllBranch->setProp("@location", locpath.str()); generatedDlls->addPropTree("GeneratedDll", generatedDllBranch.getClear()); if (del) { bool removeDir = (cur.getType() == FileTypeDll); // copied from cleanupAndDelete code, above if (!p->getPropBool("@isClone", false)) // Leak to protect against cloned WUs entry->remove(removeDllFiles, removeDir); } } } } iter.clear(); if(generatedDlls->numChildren()) toXML(generatedDlls, extraWorkUnitXML, 1, XML_Format); if(!modifyAndWriteWorkUnitXML(wuid, buf, extraWorkUnitXML, fileio)) return false; if (del) { //setState(WUStateArchived); // this isn't useful as about to delete it! q.clear(); //deldll false as should have deleted all those we successfully copied, and archived and removed SDS entries, above cleanupAndDelete(false, deleteOwned); } return true; } void CLocalWorkUnit::packWorkUnit(bool pack) { // only packs Graph info currently CriticalBlock block(crit); if (!p) return; const char *wuid = p->queryName(); if (!wuid||!*wuid) return; if (pack) { if (!p->hasProp("PackedGraphs")) { cachedGraphs.clear(); IPropertyTree *t = p->queryPropTree("Graphs"); if (t) { MemoryBuffer buf; t->serialize(buf); p->setPropBin("PackedGraphs",buf.length(),buf.bufferBase()); p->removeTree(t); } } } else { ensureGraphsUnpacked(); } CConstGraphProgress::packProgress(wuid,pack); } IPropertyTree * pruneBranch(IPropertyTree * from, char const * xpath) { Owned ret; IPropertyTree * branch = from->queryPropTree(xpath); if(branch) { ret.setown(createPTreeFromIPT(branch)); from->removeTree(branch); } return ret.getClear(); } bool restoreWorkUnit(const char *base,const char *wuid) { StringBuffer path(base); if (!wuid||!*wuid) return false; addPathSepChar(path).append(wuid).append(".xml"); Owned file = createIFile(path.str()); if (!file) return false; Owned fileio = file->open(IFOread); if (!fileio) return false; Owned pt = createPTree(*fileio); if (!pt) return false; CDateTime dt; dt.setNow(); StringBuffer dts; dt.getString(dts); pt->setProp("@restoredDate", dts.str()); Owned conn = querySDS().connect("/WorkUnits", myProcessSession(), RTM_LOCK_WRITE, SDS_LOCK_TIMEOUT); if (!conn) { ERRLOG("restoreWorkUnit could not connect to /WorkUnits"); return false; } IPropertyTree *root = conn->queryRoot(); if (root->hasProp(wuid)) { ERRLOG("restoreWorkUnit WUID %s already exists",wuid); return false; } Owned gprogress = pruneBranch(pt, "GraphProgress[1]"); Owned generatedDlls = pruneBranch(pt, "GeneratedDlls[1]"); root->setPropTree(wuid,pt.getClear()); conn.clear(); // now kludge back GraphProgress and GeneratedDlls if (gprogress) { StringBuffer xpath("/GraphProgress/"); conn.setown(querySDS().connect("/GraphProgress", myProcessSession(), RTM_LOCK_WRITE, SDS_LOCK_TIMEOUT)); if (conn) { IPropertyTree *groot = conn->queryRoot(); if (groot->hasProp(wuid)) { ERRLOG("restoreWorkUnit WUID %s graphprogress already exists, removing",wuid); groot->removeProp(wuid); } groot->setPropTree(wuid,gprogress.getClear()); } } if(generatedDlls) { Owned dlls = generatedDlls->getElements("GeneratedDll"); for(dlls->first(); dlls->isValid(); dlls->next()) { IPropertyTree & dll = dlls->query(); char const * name = dll.queryProp("@name"); char const * kind = dll.queryProp("@kind"); char const * location = dll.queryProp("@location"); Owned got = queryDllServer().getEntry(name); if(!got) queryDllServer().registerDll(name, kind, location); } } return true; } void CLocalWorkUnit::loadXML(const char *xml) { CriticalBlock block(crit); init(); assertex(xml); p.setown(createPTreeFromXMLString(xml)); } void CLocalWorkUnit::serialize(MemoryBuffer &tgt) { CriticalBlock block(crit); StringBuffer x; tgt.append(exportWorkUnitToXML(this, x).str()); } void CLocalWorkUnit::deserialize(MemoryBuffer &src) { CriticalBlock block(crit); StringAttr value; src.read(value); loadXML(value); } void CLocalWorkUnit::notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData) { dirty = true; } void CLocalWorkUnit::abort() { abortDirty = true; } void CLocalWorkUnit::requestAbort() { CriticalBlock block(crit); abortWorkUnit(p->queryName()); } void CLocalWorkUnit::subscribe(WUSubscribeOptions options) { CriticalBlock block(crit); bool subscribeAbort = false; bool subscribeChange = false; bool needChildren = true; switch (options) { case SubscribeOptionAbort: subscribeAbort = true; break; case SubscribeOptionRunningState: needChildren = false; case SubscribeOptionAnyState: subscribeAbort = true; subscribeChange = true; break; case SubscribeOptionProgress: case SubscribeOptionAll: subscribeChange = true; break; } if (subscribeChange) { if (changeWatcher && changeWatcher->watchingChildren() != needChildren) { changeWatcher->unsubscribe(); changeWatcher.clear(); } if (!changeWatcher) { changeWatcher.setown(new CWorkUnitWatcher(this, p->queryName(), needChildren)); dirty = true; } } if (subscribeAbort && !abortWatcher) { abortWatcher.setown(new CWorkUnitAbortWatcher(this, p->queryName())); abortDirty = true; } } #if 0 // I don't think this is used (I grepped the source), am leaving here just in case I've missed somewhere (PG) WUState CLocalWorkUnit::waitComplete(int timeout, bool returnOnWaitState) { class WorkUnitWaiter : public CInterface, implements ISDSSubscription, implements IAbortHandler { Semaphore changed; CLocalWorkUnit *parent; public: IMPLEMENT_IINTERFACE; WorkUnitWaiter(CLocalWorkUnit *_parent) : parent(_parent) { aborted = false; }; void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData) { parent->notify(id, xpath, flags, valueLen, valueData); changed.signal(); } bool wait(unsigned timeout) { return changed.wait(timeout) && !aborted; } bool onAbort() { aborted = true; changed.signal(); return false; } bool aborted; } waiter(this); Owned w = new CWorkUnitWatcher(&waiter, p->queryName(), false); LocalIAbortHandler abortHandler(waiter); forceReload(); // or may miss changes that already happened, between load of wu and now. unsigned start = msTick(); WUState ret; loop { ret = getState(); switch (ret) { case WUStateWait: if(!returnOnWaitState) break; //fall thru case WUStateCompleted: case WUStateFailed: case WUStateAborted: w->unsubscribe(); return ret; } unsigned waited = msTick() - start; if (timeout==-1) { waiter.wait(20000); if (waiter.aborted) { ret = WUStateUnknown; // MORE - throw an exception? break; } } else if (waited > timeout || !waiter.wait(timeout-waited)) { ret = WUStateUnknown; // MORE - throw an exception? break; } reload(); } w->unsubscribe(); return ret; } #endif void CLocalWorkUnit::forceReload() { dirty = true; reload(); } bool CLocalWorkUnit::reload() { CriticalBlock block(crit); if (dirty) { if (!connectAtRoot) { StringBuffer wuRoot; getXPath(wuRoot, p->queryName()); IRemoteConnection *newconn = factory->sdsManager->connect(wuRoot.str(), factory->session, 0, SDS_LOCK_TIMEOUT); if (!newconn) throw MakeStringException(WUERR_ConnectFailed, "Could not connect to workunit %s (deleted?)",p->queryName()); connection.setown(newconn); connectAtRoot = true; } else connection->reload(); init(); p.setown(connection->getRoot()); return true; } return false; } void CLocalWorkUnit::unsubscribe() { CriticalBlock block(crit); if (abortWatcher) { abortWatcher->unsubscribe(); abortWatcher.clear(); } if (changeWatcher) { changeWatcher->unsubscribe(); changeWatcher.clear(); } } void CLocalWorkUnit::unlockRemote(bool commit) { CriticalBlock block(crit); MTIME_SECTION(timer, "WorkUnit_unlockRemote"); locked.unlock(); if (commit) { try { assertex(connectAtRoot); setTimeStamp("workunit", NULL, "Modified",false); try { connection->commit(); } catch (IException *e) { EXCLOG(e, "Error during workunit commit"); connection->rollback(); connection->changeMode(0, SDS_LOCK_TIMEOUT); throw; } connection->changeMode(0, SDS_LOCK_TIMEOUT); } catch (IException *E) { StringBuffer s; PrintLog("Failed to release write lock on workunit: %s", E->errorMessage(s).str()); throw; } } } IWorkUnit &CLocalWorkUnit::lockRemote(bool commit) { if (secMgr) checkWuSecAccess(*this, *secMgr.get(), secUser.get(), SecAccess_Write, "write lock", true, true); locked.lock(); CriticalBlock block(crit); MTIME_SECTION(timer, "WorkUnit_lockRemote"); if (commit) { try { StringBuffer wuRoot; getXPath(wuRoot, p->queryName()); if (connection&&connectAtRoot) connection->changeMode(RTM_LOCK_WRITE,SDS_LOCK_TIMEOUT); else connection.setown(factory->sdsManager->connect(wuRoot.str(), factory->session, RTM_LOCK_WRITE, SDS_LOCK_TIMEOUT)); if (!connection) throw MakeStringException(WUERR_LockFailed, "Failed to get connection for xpath %s", wuRoot.str()); connectAtRoot = true; init(); p.setown(connection->getRoot()); } catch (IException *E) { StringBuffer s; PrintLog("Failed to get write lock on workunit: %s", E->errorMessage(s).str()); locked.unlock(); throw; } } return *new CLockedWorkUnit(LINK(this)); } void CLocalWorkUnit::commit() { CriticalBlock block(crit); assertex(connectAtRoot); if (connection) connection->commit(); } IWorkUnit& CLocalWorkUnit::lock() { return lockRemote(true); } IStringVal& CLocalWorkUnit::getWuid(IStringVal &str) const { CriticalBlock block(crit); str.set(p->queryName()); return str; } unsigned CLocalWorkUnit::getDebugAgentListenerPort() const { CriticalBlock block(crit); return p->getPropInt("@DebugListenerPort", 0); } void CLocalWorkUnit::setDebugAgentListenerPort(unsigned port) { CriticalBlock block(crit); p->setPropInt("@DebugListenerPort", port); } IStringVal& CLocalWorkUnit::getDebugAgentListenerIP(IStringVal &ip) const { CriticalBlock block(crit); ip.set(p->queryProp("@DebugListenerIP")); return ip; } void CLocalWorkUnit::setDebugAgentListenerIP(const char * ip) { CriticalBlock block(crit); p->setProp("@DebugListenerIP", ip); } IStringVal& CLocalWorkUnit::getSecurityToken(IStringVal &str) const { CriticalBlock block(crit); str.set(p->queryProp("@token")); return str; } void CLocalWorkUnit::setSecurityToken(const char *value) { CriticalBlock block(crit); p->setProp("@token", value); } bool CLocalWorkUnit::getRunningGraph(IStringVal &graphName, WUGraphIDType &subId) const { return CConstGraphProgress::getRunningGraph(p->queryName(), graphName, subId); } void CLocalWorkUnit::setJobName(const char *value) { CriticalBlock block(crit); p->setProp("@jobName", value); } IStringVal& CLocalWorkUnit::getJobName(IStringVal &str) const { CriticalBlock block(crit); str.set(p->queryProp("@jobName")); return str; } void CLocalWorkUnit::setClusterName(const char *value) { CriticalBlock block(crit); p->setProp("@clusterName", value); } IStringVal& CLocalWorkUnit::getClusterName(IStringVal &str) const { CriticalBlock block(crit); str.set(p->queryProp("@clusterName")); return str; } void CLocalWorkUnit::setAllowedClusters(const char *value) { setDebugValue("allowedclusters",value, true); } IStringVal& CLocalWorkUnit::getAllowedClusters(IStringVal &str) const { CriticalBlock block(crit); getDebugValue("allowedclusters",str); if (str.length()!=0) return str; str.set(p->queryProp("@clusterName")); return str; } void CLocalWorkUnit::setAllowAutoQueueSwitch(bool val) { setDebugValueInt("allowautoqueueswitch",val?1:0,true); } bool CLocalWorkUnit::getAllowAutoQueueSwitch() const { CriticalBlock block(crit); return getDebugValueBool("allowautoqueueswitch",false); } void CLocalWorkUnit::setLibraryInformation(const char * name, unsigned interfaceHash, unsigned definitionHash) { StringBuffer suffix; if (name && *name) setApplicationValue("LibraryModule", "name", name, true); setApplicationValueInt("LibraryModule", "interfaceHash", interfaceHash, true); setApplicationValueInt("LibraryModule", "definitionHash", definitionHash, true); setApplicationValue("LibraryModule", "platform", appendLibrarySuffix(suffix).str(), true); } void CLocalWorkUnit::remoteCheckAccess(IUserDescriptor *user, bool writeaccess) const { unsigned auditflags = DALI_LDAP_AUDIT_REPORT|DALI_LDAP_READ_WANTED; if (writeaccess) auditflags |= DALI_LDAP_WRITE_WANTED; int perm = 255; const char *scopename = p->queryProp("@scope"); if (scopename&&*scopename) { Owned tmpuser; if (!user) { tmpuser.setown(getUserDescriptor()); // probably not that useful as presumably owner has access user = tmpuser.get(); } perm = querySessionManager().getPermissionsLDAP("workunit",scopename,user,auditflags); if (perm<0) { if (perm==-1) perm = 255; else perm = 0; } } IDFS_Exception *e = NULL; if (!HASREADPERMISSION(perm)) { SCMStringBuffer wuid; getWuid(wuid); throw MakeStringException(WUERR_WorkunitAccessDenied, "Read access denied for workunit %s",wuid.s.str()); } if (writeaccess&&!HASWRITEPERMISSION(perm)) { SCMStringBuffer wuid; getWuid(wuid); throw MakeStringException(WUERR_WorkunitAccessDenied, "Write access denied for workunit %s",wuid.s.str()); } } IStringVal& CLocalWorkUnit::getParentWuid(IStringVal &str) const { CriticalBlock block(crit); str.set(p->queryProp("@parent")); return str; } void CLocalWorkUnit::setUser(const char * value) { CriticalBlock block(crit); p->setProp("@submitID", value); } IStringVal& CLocalWorkUnit::getUser(IStringVal &str) const { CriticalBlock block(crit); str.set(p->queryProp("@submitID")); return str; } void CLocalWorkUnit::setWuScope(const char * value) { CriticalBlock block(crit); p->setProp("@scope", value); } IStringVal& CLocalWorkUnit::getWuScope(IStringVal &str) const { CriticalBlock block(crit); str.set(p->queryProp("@scope")); return str; } void CLocalWorkUnit::setCustomerId(const char * value) { CriticalBlock block(crit); p->setProp("CustomerID", value); } IStringVal& CLocalWorkUnit::getCustomerId(IStringVal &str) const { CriticalBlock block(crit); str.set(p->queryProp("CustomerID")); return str; } mapEnums priorityClasses[] = { { PriorityClassUnknown, "unknown" }, { PriorityClassLow, "low" }, { PriorityClassNormal, "normal" }, { PriorityClassHigh, "high" }, { PriorityClassSize, NULL }, }; void CLocalWorkUnit::setPriority(WUPriorityClass cls) { CriticalBlock block(crit); setEnum(p, "@priorityClass", cls, priorityClasses); } WUPriorityClass CLocalWorkUnit::getPriority() const { CriticalBlock block(crit); return (WUPriorityClass) getEnum(p, "@priorityClass", priorityClasses); } mapEnums states[] = { { WUStateUnknown, "unknown" }, { WUStateCompiled, "compiled" }, { WUStateRunning, "running" }, { WUStateCompleted, "completed" }, { WUStateFailed, "failed" }, { WUStateArchived, "archived" }, { WUStateAborting, "aborting" }, { WUStateAborted, "aborted" }, { WUStateBlocked, "blocked" }, { WUStateSubmitted, "submitted" }, { WUStateScheduled, "scheduled" }, { WUStateCompiling, "compiling" }, { WUStateWait, "wait" }, { WUStateUploadingFiles, "uploading_files" }, { WUStateDebugPaused, "debugging" }, { WUStateDebugRunning, "debug_running" }, { WUStatePaused, "paused" }, { WUStateSize, NULL } }; IConstWorkUnitIterator * CWorkUnitFactory::getWorkUnitsByState(WUState state) { StringBuffer path("*"); path.append("[@state=\"").append(getEnumText(state, states)).append("\"]"); return getWorkUnitsByXPath(path.str()); } IConstWorkUnitIterator * CSecureWorkUnitFactory::getWorkUnitsByState(WUState state) { StringBuffer path("*"); path.append("[@state=\"").append(getEnumText(state, states)).append("\"]"); return factory->getWorkUnitsByXPath(path.str(), secMgr.get(), secUser.get()); } void CLocalWorkUnit::setState(WUState value) { CriticalBlock block(crit); if (value==WUStateAborted || value==WUStatePaused || value==WUStateCompleted || value==WUStateFailed || value==WUStateSubmitted || value==WUStateWait) { if (abortWatcher) { abortWatcher->unsubscribe(); abortWatcher.clear(); } StringBuffer apath; apath.append("/WorkUnitAborts/").append(p->queryName()); if(factory) { Owned acon = factory->sdsManager->connect(apath.str(), factory->session, RTM_LOCK_WRITE|RTM_LOCK_SUB, SDS_LOCK_TIMEOUT); if (acon) acon->close(true); } } setEnum(p, "@state", value, states); if (getDebugValueBool("monitorWorkunit", false)) { switch(value) { case WUStateAborted: FLLOG(MCoperatorWarning, "Workunit %s aborted", p->queryName()); break; case WUStateCompleted: FLLOG(MCoperatorProgress, "Workunit %s completed", p->queryName()); break; case WUStateFailed: FLLOG(MCoperatorProgress, "Workunit %s failed", p->queryName()); break; } } p->removeProp("@stateEx"); } void CLocalWorkUnit::setStateEx(const char * text) { CriticalBlock block(crit); p->setProp("@stateEx", text); } void CLocalWorkUnit::setAgentSession(__int64 sessionId) { CriticalBlock block(crit); p->setPropInt64("@agentSession", sessionId); } void CLocalWorkUnit::setAgentPID(unsigned pid) { CriticalBlock block(crit); p->setPropInt("@agentPID", pid); } bool CLocalWorkUnit::aborting() const { CriticalBlock block(crit); if (abortDirty) { if (factory) { StringBuffer apath; apath.append("/WorkUnitAborts/").append(p->queryName()); Owned acon = factory->sdsManager->connect(apath.str(), factory->session, 0, SDS_LOCK_TIMEOUT); if (acon) abortState = acon->queryRoot()->getPropInt(NULL)!=0; else abortState = false; } abortDirty = false; } return abortState; } bool CLocalWorkUnit::getIsQueryService() const { CriticalBlock block(crit); return p->getPropBool("@isQueryService", false); } void CLocalWorkUnit::setIsQueryService(bool value) { CriticalBlock block(crit); p->setPropBool("@isQueryService", value); } void CLocalWorkUnit::checkAgentRunning(WUState & state) { if (queryDaliServerVersion().compare("2.1")<0) return; switch(state) { case WUStateRunning: case WUStateDebugPaused: case WUStateDebugRunning: case WUStateBlocked: case WUStateAborting: case WUStateCompiling: case WUStatePaused: { SessionId agent = getAgentSession(); if((agent>0) && querySessionManager().sessionStopped(agent, 0)) { forceReload(); state = (WUState) getEnum(p, "@state", states); bool isecl=state==WUStateCompiling; if (aborting()) state = WUStateAborted; else if (state==WUStateRunning || state==WUStatePaused || state==WUStateDebugPaused || state==WUStateDebugRunning || state==WUStateBlocked || state==WUStateCompiling) state = WUStateFailed; else return; WARNLOG("checkAgentRunning terminated: %"I64F"d state = %d",(__int64)agent,(int)state); Owned w = &lock(); w->setState(state); Owned e = w->createException(); WUAction action = w->getAction(); switch (action) { case WUActionPause: case WUActionPauseNow: case WUActionResume: w->setAction(WUActionUnknown); } if(isecl) { e->setExceptionCode(1001); e->setExceptionMessage("EclServer terminated unexpectedly"); } else { e->setExceptionCode(1000); e->setExceptionMessage("Workunit terminated unexpectedly"); } } } } } WUState CLocalWorkUnit::getState() const { CriticalBlock block(crit); WUState state = (WUState) getEnum(p, "@state", states); switch (state) { case WUStateRunning: case WUStateDebugPaused: case WUStateDebugRunning: case WUStateBlocked: case WUStateCompiling: if (aborting()) state = WUStateAborting; break; case WUStateSubmitted: if (aborting()) state = WUStateAborted; break; } const_cast(this)->checkAgentRunning(state); //need const_cast as will change state if agent has died return state; } IStringVal& CLocalWorkUnit::getStateEx(IStringVal & str) const { CriticalBlock block(crit); str.set(p->queryProp("@stateEx")); return str; } __int64 CLocalWorkUnit::getAgentSession() const { CriticalBlock block(crit); return p->getPropInt64("@agentSession", -1); } unsigned CLocalWorkUnit::getAgentPID() const { CriticalBlock block(crit); return p->getPropInt("@agentPID", -1); } IStringVal& CLocalWorkUnit::getStateDesc(IStringVal &str) const { // MORE - not sure about this - may prefer a separate interface CriticalBlock block(crit); try { str.set(getEnumText(getState(), states)); } catch (...) { str.set("???"); } return str; } mapEnums actions[] = { { WUActionUnknown, "unknown" }, { WUActionCompile, "compile" }, { WUActionCheck, "check" }, { WUActionRun, "run" }, { WUActionExecuteExisting, "execute" }, { WUActionPause, "pause" }, { WUActionPauseNow, "pausenow" }, { WUActionResume, "resume" }, { WUActionSize, NULL }, }; void CLocalWorkUnit::setAction(WUAction value) { CriticalBlock block(crit); setEnum(p, "Action", value, actions); } WUAction CLocalWorkUnit::getAction() const { CriticalBlock block(crit); return (WUAction) getEnum(p, "Action", actions); } IStringVal& CLocalWorkUnit::getActionEx(IStringVal & str) const { CriticalBlock block(crit); str.set(p->queryProp("Action")); return str; } IStringVal& CLocalWorkUnit::getApplicationValue(const char *app, const char *propname, IStringVal &str) const { CriticalBlock block(crit); StringBuffer prop("Application/"); prop.append(app).append('/').append(propname); str.set(p->queryProp(prop.str())); return str; } int CLocalWorkUnit::getApplicationValueInt(const char *app, const char *propname, int defVal) const { CriticalBlock block(crit); StringBuffer prop("Application/"); prop.append(app).append('/').append(propname); return p->getPropInt(prop.str(), defVal); } IConstWUAppValueIterator& CLocalWorkUnit::getApplicationValues() const { CriticalBlock block(crit); appvalues.load(p,"Application/*"); return *new CArrayIteratorOf (appvalues, 0, (IConstWorkUnit *) this); } void CLocalWorkUnit::setApplicationValue(const char *app, const char *propname, const char *value, bool overwrite) { CriticalBlock block(crit); StringBuffer prop("Application/"); prop.append(app).append('/').append(propname); if (overwrite || !p->hasProp(prop.str())) { // MORE - not sure these lines should be needed.... StringBuffer sp; p->setProp(sp.append("Application").str(), ""); p->setProp(sp.append('/').append(app).str(), ""); p->setProp(prop.str(), value); } } void CLocalWorkUnit::setApplicationValueInt(const char *app, const char *propname, int value, bool overwrite) { CriticalBlock block(crit); StringBuffer prop("Application/"); prop.append(app).append('/').append(propname); if (overwrite || !p->hasProp(prop.str())) { // MORE - not sure these lines should be needed.... StringBuffer sp; p->setProp(sp.append("Application").str(), ""); p->setProp(sp.append('/').append(app).str(), ""); p->setPropInt(prop.str(), value); } } void CLocalWorkUnit::setPriorityLevel(int level) { CriticalBlock block(crit); p->setPropInt("PriorityFlag", level); } int CLocalWorkUnit::getPriorityLevel() const { CriticalBlock block(crit); return p->getPropInt("PriorityFlag"); } int CLocalWorkUnit::getPriorityValue() const { CriticalBlock block(crit); int priority = p->getPropInt("PriorityFlag"); switch((WUPriorityClass) getEnum(p, "@priorityClass", priorityClasses)) { case PriorityClassLow: priority -= 100; break; case PriorityClassHigh: priority += 100; break; } return priority; } void CLocalWorkUnit::setRescheduleFlag(bool value) { CriticalBlock block(crit); p->setPropInt("RescheduleFlag", (int) value); } bool CLocalWorkUnit::getRescheduleFlag() const { CriticalBlock block(crit); return p->getPropInt("RescheduleFlag") != 0; } class NullIStringIterator : public CInterface, extends IStringIterator { public: IMPLEMENT_IINTERFACE; bool first() { return false; } bool next() { return false; } bool isValid() { return false; } IStringVal & str(IStringVal & str) { return str; } }; class CEnvironmentClusterInfo: public CInterface, implements IConstWUClusterInfo { StringAttr name; StringAttr serverQueue; StringAttr agentQueue; StringAttr thorQueue; StringAttr prefix; StringAttr platform; StringAttr querySetName; unsigned clusterWidth; public: IMPLEMENT_IINTERFACE; CEnvironmentClusterInfo(const char *_name, const char *_prefix, const char *_querySetName, IPropertyTree *agent, IArrayOf &thors, IPropertyTree *roxie) : name(_name), prefix(_prefix), querySetName(_querySetName) { StringBuffer queue; if (thors.ordinality()) { thorQueue.set(queue.clear().append(name).append(".thor")); clusterWidth = 0; bool lcr = false; ForEachItemIn(i,thors) { IPropertyTree &thor = thors.item(i); unsigned ts = thor.getPropInt("@slaves"); if (clusterWidth && (ts!=clusterWidth)) throw MakeStringException(WUERR_MismatchClusterSize,"CEnvironmentClusterInfo: mismatched thor sizes in cluster"); clusterWidth = ts; bool islcr = thor.getPropBool("@LCR"); if (i==0) lcr = islcr; else if (lcr!=islcr) throw MakeStringException(WUERR_MismatchThorType,"CEnvironmentClusterInfo: mismatched thor LCR in cluster"); } platform.set(lcr ? "thorlcr" : "thor"); } else if (roxie) { clusterWidth = roxie->getPropInt("@numChannels", 1); platform.set("roxie"); } else { clusterWidth = 1; platform.set("hthor"); } if (agent) agentQueue.set(queue.clear().append(name).append(".agent")); // MORE - does this need to be conditional? serverQueue.set(queue.clear().append(name).append(".eclserver")); } IStringVal & getName(IStringVal & str) const { str.set(name.get()); return str; } IStringVal & getScope(IStringVal & str) const { str.set(prefix.get()); return str; } IStringVal & getAgentQueue(IStringVal & str) const { str.set(agentQueue); return str; } virtual IStringVal & getServerQueue(IStringVal & str) const { str.set(serverQueue); return str; } IStringVal & getThorQueue(IStringVal & str) const { str.set(thorQueue); return str; } unsigned getSize() const { return clusterWidth; } virtual IStringVal & getPlatform(IStringVal & str) const { str.set(platform); return str; } IStringVal & getQuerySetName(IStringVal & str) const { str.set(querySetName.get()); return str; } }; IStringVal &getProcessQueueNames(IStringVal &ret, const char *process, const char *type, const char *suffix) { if (process) { Owned conn = querySDS().connect("Environment", myProcessSession(), RTM_LOCK_READ, SDS_LOCK_TIMEOUT); if (conn) { StringBuffer queueNames; StringBuffer xpath; xpath.appendf("%s[@process=\"%s\"]", type, process); Owned targets = conn->queryRoot()->getElements("Software/Topology/Cluster"); ForEach(*targets) { IPropertyTree &target = targets->query(); if (target.hasProp(xpath)) { if (queueNames.length()) queueNames.append(','); queueNames.append(target.queryProp("@name")).append(suffix); } } ret.set(queueNames); } } return ret; } extern WORKUNIT_API IStringVal &getEclCCServerQueueNames(IStringVal &ret, const char *process) { return getProcessQueueNames(ret, process, "EclCCServerProcess", ".eclserver"); } extern WORKUNIT_API IStringVal &getEclServerQueueNames(IStringVal &ret, const char *process) { return getProcessQueueNames(ret, process, "EclServerProcess", ".eclserver"); // shares queue name with EclCCServer } extern WORKUNIT_API IStringVal &getEclSchedulerQueueNames(IStringVal &ret, const char *process) { return getProcessQueueNames(ret, process, "EclCCServerProcess", ".eclscheduler"); // Shares deployment/config with EclCCServer } extern WORKUNIT_API IStringVal &getAgentQueueNames(IStringVal &ret, const char *process) { return getProcessQueueNames(ret, process, "EclAgentProcess", ".agent"); } extern WORKUNIT_API IStringVal &getThorQueueNames(IStringVal &ret, const char *process) { return getProcessQueueNames(ret, process, "ThorCluster", ".thor"); } extern WORKUNIT_API IStringIterator *getTargetClusters(const char *processType, const char *processName) { Owned conn = querySDS().connect("Environment", myProcessSession(), RTM_LOCK_READ, SDS_LOCK_TIMEOUT); Owned ret = new CStringArrayIterator; if (conn) { StringBuffer xpath; xpath.appendf("%s", processType ? processType : "*"); if (processName) xpath.appendf("[@process=\"%s\"]", processName); Owned targets = conn->queryRoot()->getElements("Software/Topology/Cluster"); ForEach(*targets) { IPropertyTree &target = targets->query(); if (target.hasProp(xpath)) { ret->append(target.queryProp("@name")); } } } return ret.getClear(); } IConstWUClusterInfo* getTargetClusterInfo(const char *clustname) { if (!clustname) return NULL; Owned conn = querySDS().connect("Environment", myProcessSession(), RTM_LOCK_READ, SDS_LOCK_TIMEOUT); if (!conn) return NULL; StringBuffer xpath; // MORE - at the moment configenf specifies eclagent and thor queues by (in effect) placing an 'example' thor or eclagent in the topology // that uses the queue that will be used. // We should and I hope will change that, at which point the code below gets simpler xpath.appendf("Software/Topology/Cluster[@name=\"%s\"]", clustname); Owned cluster = conn->queryRoot()->getPropTree(xpath.str()); if (!cluster) return NULL; StringBuffer prefix(cluster->queryProp("@prefix")); prefix.toLowerCase(); StringBuffer querySetName; IPropertyTree *agent = NULL; const char *agentName = cluster->queryProp("EclAgentProcess/@process"); if (agentName) { xpath.clear().appendf("Software/EclAgentProcess[@name=\"%s\"]", agentName); agent = conn->queryRoot()->queryPropTree(xpath.str()); } Owned ti = cluster->getElements("ThorCluster"); IArrayOf thors; ForEach(*ti) { const char *thorName = ti->query().queryProp("@process"); if (thorName) { xpath.clear().appendf("Software/ThorCluster[@name=\"%s\"]", thorName); thors.append(*conn->queryRoot()->getPropTree(xpath.str())); } } IPropertyTree *roxie = NULL; const char *roxieName = cluster->queryProp("RoxieCluster/@process"); if (roxieName) { xpath.clear().appendf("Software/RoxieCluster[@name=\"%s\"]", roxieName); roxie = conn->queryRoot()->queryPropTree(xpath.str()); querySetName.clear().append(roxieName); } if (querySetName.length() == 0) querySetName.append(clustname); return new CEnvironmentClusterInfo(clustname, prefix, querySetName, agent, thors, roxie); } const char *getTargetClusterComponentName(const char *clustname, const char *processType, StringBuffer &name) { if (!clustname) return NULL; Owned conn = querySDS().connect("Environment", myProcessSession(), RTM_LOCK_READ, SDS_LOCK_TIMEOUT); if (!conn) return NULL; StringBuffer xpath; xpath.appendf("Software/Topology/Cluster[@name=\"%s\"]", clustname); Owned cluster = conn->queryRoot()->getPropTree(xpath.str()); if (!cluster) return NULL; StringBuffer xpath1; xpath1.appendf("%s/@process", processType); name.append(cluster->queryProp(xpath1.str())); return name.str(); } unsigned getEnvironmentThorClusterNames(StringArray &clusternames, StringArray &groupnames, StringArray &qnames) { Owned conn = querySDS().connect("Environment", myProcessSession(), RTM_LOCK_READ, SDS_LOCK_TIMEOUT); if (!conn) return 0; Owned iter = conn->queryRoot()->getElements("Software/Topology/EclServerProcess/Cluster"); ForEach(*iter) { IPropertyTree &tc = iter->query(); // I think we want the name here but it isn't exactly clear const char *cname = tc.queryProp("@name"); if (cname&&*cname) { Owned iter2 = tc.getElements("ThorCluster"); ForEach(*iter2) { IPropertyTree &tc2 = iter2->query(); StringBuffer query; const char *pname = tc2.queryProp("@process"); query.appendf("Software/ThorCluster[@name=\"%s\"]",pname); IPropertyTree *t = conn->queryRoot()->queryPropTree(query.str()); if (t) { const char *qname = t->queryProp("@queueName"); if (!qname||!*qname) qname = pname; const char *gname = t->queryProp("@nodeGroup"); if (!gname||!*gname) gname = pname; clusternames.append(cname); groupnames.append(gname); qnames.append(qname); } } } } return clusternames.ordinality(); } IStringVal& CLocalWorkUnit::getScope(IStringVal &str) const { CriticalBlock block(crit); if (p->hasProp("Debug/ForceScope")) { StringBuffer prefix(p->queryProp("Debug/ForceScope")); str.set(prefix.toLowerCase().str()); } else { Owned ci = getTargetClusterInfo(p->queryProp("@clusterName")); if (ci) ci->getScope(str); else str.clear(); } return str; } //Queries void CLocalWorkUnit::setCodeVersion(unsigned codeVersion, const char * buildVersion, const char * eclVersion) { CriticalBlock block(crit); p->setPropInt("@codeVersion", codeVersion); p->setProp("@buildVersion", buildVersion); p->setProp("@eclVersion", eclVersion); } unsigned CLocalWorkUnit::getCodeVersion() const { CriticalBlock block(crit); return p->getPropInt("@codeVersion"); } void CLocalWorkUnit::getBuildVersion(IStringVal & buildVersion, IStringVal & eclVersion) const { CriticalBlock block(crit); buildVersion.set(p->queryProp("@buildVersion")); eclVersion.set(p->queryProp("@eclVersion")); } void CLocalWorkUnit::setCloneable(bool value) { CriticalBlock block(crit); p->setPropInt("@cloneable", value); } void CLocalWorkUnit::setIsClone(bool value) { CriticalBlock block(crit); p->setPropInt("@isClone", value); } bool CLocalWorkUnit::getCloneable() const { CriticalBlock block(crit); return p->getPropBool("@cloneable", false); } IUserDescriptor *CLocalWorkUnit::getUserDescriptor() const { CriticalBlock block(crit); if (!userDesc) { SCMStringBuffer token, user, password; getSecurityToken(token); SCMStringBuffer wuid; getWuid(wuid); extractToken(token.str(), wuid.str(), user, password); userDesc.setown(createUserDescriptor()); userDesc->set(user.str(), password.str()); } return LINK(userDesc); } void CLocalWorkUnit::setCombineQueries(unsigned combine) { CriticalBlock block(crit); p->setPropInt("COMBINE_QUERIES", combine); } unsigned CLocalWorkUnit::getCombineQueries() const { CriticalBlock block(crit); return p->getPropInt("COMBINE_QUERIES"); } bool CLocalWorkUnit::isProtected() const { CriticalBlock block(crit); return p->getPropBool("@protected", false); } bool CLocalWorkUnit::isPausing() const { CriticalBlock block(crit); if (WUActionPause == getAction()) { switch (getState()) { case WUStateRunning: case WUStateAborting: return true; } } return false; } void CLocalWorkUnit::protect(bool protectMode) { CriticalBlock block(crit); p->setPropBool("@protected", protectMode); } bool CLocalWorkUnit::isBilled() const { CriticalBlock block(crit); return p->getPropBool("@billed", false); } void CLocalWorkUnit::setBilled(bool value) { CriticalBlock block(crit); p->setPropBool("@billed", value); } void CLocalWorkUnit::setResultLimit(unsigned value) { CriticalBlock block(crit); p->setPropInt("resultLimit", value); } unsigned CLocalWorkUnit::getResultLimit() const { CriticalBlock block(crit); return p->getPropInt("resultLimit"); } void CLocalWorkUnit::setCompareMode(WUCompareMode value) { CriticalBlock block(crit); p->setPropInt("comparemode", (int)value); } WUCompareMode CLocalWorkUnit::getCompareMode() const { CriticalBlock block(crit); return (WUCompareMode) p->getPropInt("comparemode"); } IStringVal & CLocalWorkUnit::getSnapshot(IStringVal & str) const { CriticalBlock block(crit); str.set(p->queryProp("SNAPSHOT")); return str; } void CLocalWorkUnit::setSnapshot(const char * val) { CriticalBlock block(crit); p->setProp("SNAPSHOT", val); } static int comparePropTrees(IInterface **ll, IInterface **rr) { IPropertyTree *l = (IPropertyTree *) *ll; IPropertyTree *r = (IPropertyTree *) *rr; return stricmp(l->queryName(), r->queryName()); }; unsigned CLocalWorkUnit::calculateHash(unsigned crc) { // Any other values in the WU that could affect generated code should be crc'ed here IPropertyTree *tree = p->queryBranch("Debug"); if (tree) { Owned sub = tree->getElements("*"); ICopyArrayOf subs; for(sub->first(); sub->isValid(); sub->next()) subs.append(sub->query()); subs.sort(comparePropTrees); ForEachItemIn(idx, subs) { const char *name = subs.item(idx).queryName(); const char *val = subs.item(idx).queryProp(NULL); crc = crc32(name, (size32_t)strlen(name), crc); if (val) crc = crc32(val, (size32_t)strlen(val), crc); } } Owned plugins = &getPlugins(); for (plugins->first();plugins->isValid();plugins->next()) { IConstWUPlugin &thisplugin = plugins->query(); SCMStringBuffer version; thisplugin.getPluginVersion(version); crc = crc32(version.str(), version.length(), crc); } return crc; } static void updateProp(IPropertyTree * to, const IPropertyTree * from, const char * xpath) { if (!to->hasProp(xpath) && from->hasProp(xpath)) to->setProp(xpath, from->queryProp(xpath)); } static void copyTree(IPropertyTree * to, const IPropertyTree * from, const char * xpath) { IPropertyTree * match = from->getBranch(xpath); if (match) to->setPropTree(xpath, match); } void CLocalWorkUnit::copyWorkUnit(IConstWorkUnit *cached) { CLocalWorkUnit *from = QUERYINTERFACE(cached, CLocalWorkUnit); if (!from) { CLockedWorkUnit *fl = QUERYINTERFACE(cached, CLockedWorkUnit); if (!fl) throw MakeStringException(WUERR_InternalUnknownImplementation, "Cached workunit not created using workunit dll"); from = fl->c; } // Need to copy the query, the results, and the graphs from the cached query. // The cache is made before the query is executed so there is no need to clear them. if (!cached->getCloneable()) throw MakeStringException(WUERR_CannotCloneWorkunit, "Source work unit not marked as clonable"); const IPropertyTree * fromP = from->p; IPropertyTree *pt; CriticalBlock block(crit); query.clear(); updateProp(p, fromP, "@jobName"); copyTree(p, fromP, "Query"); pt = fromP->getBranch("Application/LibraryModule"); if (pt) { ensurePTree(p, "Application"); p->setPropTree("Application/LibraryModule", pt); } pt = fromP->queryBranch("Debug"); if (pt) { IPropertyTree *curDebug = p->queryPropTree("Debug"); if (curDebug) { Owned elems = pt->getElements("*"); ForEach(*elems) { IPropertyTree *elem = &elems->query(); if (!curDebug->hasProp(elem->queryName())) curDebug->setPropTree(elem->queryName(),LINK(elem)); } } else p->setPropTree("Debug", LINK(pt)); } copyTree(p, fromP, "Plugins"); copyTree(p, fromP, "Libraries"); copyTree(p, fromP, "Results"); copyTree(p, fromP, "Graphs"); copyTree(p, fromP, "Workflow"); updateProp(p, fromP, "@clusterName"); updateProp(p, fromP, "allowedclusters"); updateProp(p, fromP, "@submitID"); updateProp(p, fromP, "CustomerID"); //Variables may have been set up as parameters to the query - so need to preserve any values that were supplied. pt = fromP->getBranch("Variables"); if (pt) { IPropertyTree *ptTgtVariables = ensurePTree(p, "Variables"); Owned ptiVariable = pt->getElements("Variable"); for (ptiVariable->first(); ptiVariable->isValid(); ptiVariable->next()) { IPropertyTree *ptSrcVariable = &ptiVariable->query(); const char *name = ptSrcVariable->queryProp("@name"); assertex(name); StringBuffer xpath; xpath.append("Variable[@name='").append(name).append("']"); IPropertyTree *ptTgtVariable = ptTgtVariables->queryPropTree(xpath.str()); IPropertyTree *merged = createPTreeFromIPT(ptSrcVariable); // clone entire source info... merged->removeProp("Value"); // except value and status merged->setProp("@status", "undefined"); if (!merged->getPropBool("@isScalar")) merged->removeProp("totalRowCount"); merged->removeProp("rowCount"); // If there are any other fields that get set ONLY by eclagent, strip them out here... if (ptTgtVariable) { // copy status and Value from what is already set in target merged->setProp("@status", ptTgtVariable->queryProp("@status")); MemoryBuffer value; if (ptTgtVariable->getPropBin("Value", value)) merged->setPropBin("Value", value.length(), value.toByteArray()); ptTgtVariable->removeProp(xpath.str()); // If there are any other fields in a variable that get set by ws_ecl before submitting, copy them across here... } ptTgtVariables->addPropTree("Variable", merged); } pt->Release(); } p->setProp("@codeVersion", fromP->queryProp("@codeVersion")); p->setPropBool("@cloneable", true); p->setPropBool("@isClone", true); resetWorkflow(); // the source Workflow section may have had some parts already executed... // resetResults(); // probably should be resetting the results as well... rather than waiting for the rerun to overwrite them } bool CLocalWorkUnit::hasDebugValue(const char *propname) const { StringBuffer lower; lower.append(propname).toLowerCase(); CriticalBlock block(crit); StringBuffer prop("Debug/"); return p->hasProp(prop.append(lower)); } IStringVal& CLocalWorkUnit::getDebugValue(const char *propname, IStringVal &str) const { StringBuffer lower; lower.append(propname).toLowerCase(); CriticalBlock block(crit); StringBuffer prop("Debug/"); str.set(p->queryProp(prop.append(lower).str())); return str; } IStringIterator& CLocalWorkUnit::getDebugValues() const { return getDebugValues(NULL); } IStringIterator& CLocalWorkUnit::getDebugValues(const char *prop) const { CriticalBlock block(crit); StringBuffer path("Debug/"); if (prop) { StringBuffer lower; lower.append(prop).toLowerCase(); path.append(lower); } else path.append("*"); return *new CStringPTreeTagIterator(p->getElements(path.str())); } int CLocalWorkUnit::getDebugValueInt(const char *propname, int defVal) const { StringBuffer lower; lower.append(propname).toLowerCase(); CriticalBlock block(crit); StringBuffer prop("Debug/"); prop.append(lower); return p->getPropInt(prop.str(), defVal); } __int64 CLocalWorkUnit::getDebugValueInt64(const char *propname, __int64 defVal) const { StringBuffer lower; lower.append(propname).toLowerCase(); CriticalBlock block(crit); StringBuffer prop("Debug/"); prop.append(lower); return p->getPropInt64(prop.str(), defVal); } bool CLocalWorkUnit::getDebugValueBool(const char * propname, bool defVal) const { StringBuffer lower; lower.append(propname).toLowerCase(); CriticalBlock block(crit); StringBuffer prop("Debug/"); prop.append(lower); return p->getPropBool(prop.str(), defVal); } void CLocalWorkUnit::setDebugValue(const char *propname, const char *value, bool overwrite) { StringBuffer lower; lower.append(propname).toLowerCase(); CriticalBlock block(crit); StringBuffer prop("Debug/"); prop.append(lower); if (overwrite || !p->hasProp(prop.str())) { // MORE - not sure this line should be needed.... p->setProp("Debug", ""); p->setProp(prop.str(), value); } } void CLocalWorkUnit::setDebugValueInt(const char *propname, int value, bool overwrite) { StringBuffer lower; lower.append(propname).toLowerCase(); CriticalBlock block(crit); StringBuffer prop("Debug/"); prop.append(lower); if (overwrite || !p->hasProp(prop.str())) { // MORE - not sure this line should be needed.... p->setProp("Debug", ""); p->setPropInt(prop.str(), value); } } void CLocalWorkUnit::setTracingValue(const char *propname, const char *value) { CriticalBlock block(crit); // MORE - not sure this line should be needed.... p->setProp("Tracing", ""); StringBuffer prop("Tracing/"); p->setProp(prop.append(propname).str(), value); } void CLocalWorkUnit::setTracingValueInt(const char *propname, int value) { CriticalBlock block(crit); StringBuffer prop("Tracing/"); p->setPropInt(prop.append(propname).str(), value); } IConstWUQuery* CLocalWorkUnit::getQuery() const { // For this to be legally called, we must have the read-able interface. So we are already locked for (at least) read. CriticalBlock block(crit); if (!query) { IPropertyTree *s = p->getPropTree("Query"); if (s) query.setown(new CLocalWUQuery(s)); } return query.getLink(); } IWUQuery* CLocalWorkUnit::updateQuery() { // For this to be legally called, we must have the write-able interface. So we are already locked for write. CriticalBlock block(crit); if (!query) { IPropertyTree *s = p->queryPropTree("Query"); if (!s) s = p->addPropTree("Query", createPTreeFromXMLString("")); s->Link(); query.setown(new CLocalWUQuery(s)); } return query.getLink(); } void CLocalWorkUnit::loadPlugins() const { CriticalBlock block(crit); if (!pluginsCached) { assertex(plugins.length() == 0); Owned r = p->getElements("Plugins/Plugin"); for (r->first(); r->isValid(); r->next()) { IPropertyTree *rp = &r->query(); rp->Link(); plugins.append(*new CLocalWUPlugin(rp)); } pluginsCached = true; } } IConstWUPluginIterator& CLocalWorkUnit::getPlugins() const { CriticalBlock block(crit); loadPlugins(); return *new CArrayIteratorOf (plugins, 0, (IConstWorkUnit *) this); } void CLocalWorkUnit::loadLibraries() const { CriticalBlock block(crit); if (!librariesCached) { assertex(libraries.length() == 0); Owned r = p->getElements("Libraries/Library"); ForEach(*r) { IPropertyTree *rp = &r->query(); rp->Link(); libraries.append(*new CLocalWULibrary(rp)); } librariesCached = true; } } IConstWULibraryIterator& CLocalWorkUnit::getLibraries() const { CriticalBlock block(crit); loadLibraries(); return *new CArrayIteratorOf (libraries, 0, (IConstWorkUnit *) this); } IConstWULibrary * CLocalWorkUnit::getLibraryByName(const char * search) const { CriticalBlock block(crit); loadLibraries(); ForEachItemIn(idx, libraries) { SCMStringBuffer name; IConstWULibrary &cur = libraries.item(idx); cur.getName(name); if (stricmp(name.str(), search)==0) return &OLINK(cur); } return NULL; } unsigned CLocalWorkUnit::getTimerDuration(const char *name, const char *_unused) const { CriticalBlock block(crit); StringBuffer pname; pname.appendf("Timings/Timing[@name=\"%s\"]/@duration", name); return p->getPropInt(pname.str(), 0); } unsigned CLocalWorkUnit::getTimerCount(const char *name, const char *_unused) const { CriticalBlock block(crit); StringBuffer pname; pname.appendf("Timings/Timing[@name=\"%s\"]/@count", name); return p->getPropInt(pname.str(), 0); } IStringIterator& CLocalWorkUnit::getTimers() const { CriticalBlock block(crit); return *new CStringPTreeAttrIterator(p->getElements("Timings/Timing"), "@name"); } StringBuffer &formatGraphTimerLabel(StringBuffer &str, const char *graphName, unsigned subGraphNum, unsigned __int64 subId) { str.append("Graph ").append(graphName); if (subGraphNum) str.append(" - ").append(subGraphNum).append(" (").append(subId).append(")"); else if (subId) str.append(" - id(").append(subId).append(")"); return str; } bool parseGraphTimerLabel(const char *label, StringBuffer &graphName, unsigned &subGraphNum, unsigned __int64 &subId) { // expects format: "Graph [ - ()]" unsigned len = (size32_t)strlen(label); if (len < 6 || (0 != memcmp(label, "Graph ", 6))) return false; subGraphNum = 0; subId = 0; const char *finger = label+6; const char *finger2 = strchr(finger, '-'); if (NULL == finger2) // just graphName graphName.append(finger); else { graphName.append((size32_t)((finger2-1)-finger), finger); finger = finger2+2; // skip '-' and space finger2 = strchr(finger, ' '); if (finger2) { subGraphNum = atoi_l(finger, (size32_t)(finger2-finger)); finger = finger2+2; // skip space and '(' finger2 = strchr(finger, ')'); if (finger2) subId = atoi64_l(finger, (size32_t)(finger2-finger)); } else if (((len-(finger-label))>3) && 0 == memcmp(finger, "id(", 3)) // subgraph id only, new format. { finger += 3; finger2 = strchr(finger, ')'); if (finger2) subId = atoi64_l(finger, (size32_t)(finger2-finger)); } } return true; } void CLocalWorkUnit::setTimerInfo(const char *name, const char *subname, unsigned ms, unsigned count, unsigned max) { CriticalBlock block(crit); IPropertyTree *timings = p->queryPropTree("Timings"); if (!timings) timings = p->addPropTree("Timings", createPTree("Timings")); StringBuffer fullname; if (subname) fullname.append('.').append(subname); fullname.append(name); StringBuffer xpath; xpath.append("Timing[@name=\"").append(fullname).append("\"]"); IPropertyTree *timing = timings->queryPropTree(xpath.str()); if (!timing) { timing = timings->addPropTree("Timing", createPTree("Timing")); timing->setProp("@name", fullname.str()); } timing->setPropInt("@count", count); timing->setPropInt("@duration", ms); if (!max && 1==count) max = ms; if (max) timing->setPropInt("@max", max); } void CLocalWorkUnit::setTimeStamp(const char *application, const char *instance, const char *event, bool add) { CriticalBlock block(crit); char timeStamp[64]; time_t tNow; time(&tNow); #ifdef _WIN32 struct tm *gmtNow; gmtNow = gmtime(&tNow); strftime(timeStamp, 64, "%Y-%m-%dT%H:%M:%SZ", gmtNow); #else struct tm gmtNow; gmtime_r(&tNow, &gmtNow); strftime(timeStamp, 64, "%Y-%m-%dT%H:%M:%SZ", &gmtNow); #endif //_WIN32 IPropertyTree *ts = p->queryPropTree("TimeStamps"); if (!ts) { ts = p->addPropTree("TimeStamps", createPTree("TimeStamps")); add = true; } IPropertyTree *t=NULL; if (!add) { StringBuffer path; path.appendf("TimeStamp[@application=\"%s\"]",application); t = ts->queryBranch(path.str()); } if (!t) { t = createPTree("TimeStamp"); t->setProp("@application", application); add = true; } if (instance) t->setProp("@instance", instance); t->setProp(event, timeStamp); IPropertyTree *et = t->queryPropTree(event); if(et) et->setPropInt("@ts",(int)tNow); if (add) ts->addPropTree("TimeStamp", t); } void CLocalWorkUnit::setTimeStamp(const char *application, const char *instance, const char *event) { setTimeStamp(application,instance,event,false); } void CLocalWorkUnit::addTimeStamp(const char *application, const char *instance, const char *event) { setTimeStamp(application,instance,event,true); } IStringVal &CLocalWorkUnit::getTimeStamp(const char *name, const char *application, IStringVal &str) const { CriticalBlock block(crit); str.clear(); StringBuffer pname("TimeStamps/TimeStamp"); if (application) pname.appendf("[@application=\"%s\"]", application); pname.appendf("/%s", name); Owned stamps = p->getElements(pname.str()); if (stamps && stamps->first()) str.set(stamps->query().queryProp(NULL)); return str; } IConstWUTimeStampIterator& CLocalWorkUnit::getTimeStamps() const { CriticalBlock block(crit); timestamps.load(p,"TimeStamps/*"); return *new CArrayIteratorOf (timestamps, 0, (IConstWorkUnit *) this); } bool CLocalWorkUnit::getWuDate(unsigned & year, unsigned & month, unsigned& day) { CriticalBlock block(crit); SCMStringBuffer wuidstr; const char *wuid = getWuid(wuidstr).str(); if (sscanf(wuid, "W%4u%2u%2u", &year, &month, &day)==3) { } return false; } IWUPlugin* CLocalWorkUnit::updatePluginByName(const char *qname) { CriticalBlock block(crit); IConstWUPlugin *existing = getPluginByName(qname); if (existing) return (IWUPlugin *) existing; if (!plugins.length()) p->addPropTree("Plugins", createPTree("Plugins")); IPropertyTree *pl = p->queryPropTree("Plugins"); IPropertyTree *s = pl->addPropTree("Plugin", createPTree("Plugin")); s->Link(); IWUPlugin* q = new CLocalWUPlugin(s); q->Link(); plugins.append(*q); q->setPluginName(qname); return q; } IConstWUPlugin* CLocalWorkUnit::getPluginByName(const char *qname) const { CriticalBlock block(crit); loadPlugins(); ForEachItemIn(idx, plugins) { SCMStringBuffer name; IConstWUPlugin &cur = plugins.item(idx); cur.getPluginName(name); if (stricmp(name.str(), qname)==0) { cur.Link(); return &cur; } } return NULL; } IWULibrary* CLocalWorkUnit::updateLibraryByName(const char *qname) { CriticalBlock block(crit); IConstWULibrary *existing = getLibraryByName(qname); if (existing) return (IWULibrary *) existing; if (!libraries.length()) p->addPropTree("Libraries", createPTree("Libraries")); IPropertyTree *pl = p->queryPropTree("Libraries"); IPropertyTree *s = pl->addPropTree("Library", createPTree("Library")); s->Link(); IWULibrary* q = new CLocalWULibrary(s); q->Link(); libraries.append(*q); q->setName(qname); return q; } void CLocalWorkUnit::loadExceptions() const { CriticalBlock block(crit); if (!exceptionsCached) { assertex(exceptions.length() == 0); Owned r = p->getElements("Exceptions/Exception"); for (r->first(); r->isValid(); r->next()) { IPropertyTree *rp = &r->query(); rp->Link(); exceptions.append(*new CLocalWUException(rp)); } exceptionsCached = true; } } IConstWUExceptionIterator& CLocalWorkUnit::getExceptions() const { CriticalBlock block(crit); loadExceptions(); return *new CArrayIteratorOf (exceptions, 0, (IConstWorkUnit *) this); } unsigned CLocalWorkUnit::getExceptionCount() const { CriticalBlock block(crit); loadExceptions(); return exceptions.length(); } void CLocalWorkUnit::clearExceptions() { CriticalBlock block(crit); // For this to be legally called, we must have the write-able interface. So we are already locked for write. exceptions.kill(); exceptionsCached = true; p->removeProp("Exceptions"); } IWUException* CLocalWorkUnit::createException() { CriticalBlock block(crit); // For this to be legally called, we must have the write-able interface. So we are already locked for write. loadExceptions(); if (!exceptions.length()) p->addPropTree("Exceptions", createPTree("Exceptions")); IPropertyTree *r = p->queryPropTree("Exceptions"); IPropertyTree *s = r->addPropTree("Exception", createPTree("Exception")); IWUException* q = new CLocalWUException(LINK(s)); exceptions.append(*LINK(q)); Owned now = createDateTimeNow(); SCMStringBuffer temp; now->getString(temp); q->setTimeStamp(temp.str()); return q; } IConstWUWebServicesInfo* CLocalWorkUnit::getWebServicesInfo() const { // For this to be legally called, we must have the read-able interface. So we are already locked for (at least) read. CriticalBlock block(crit); if (!webServicesInfoCached) { assertex(!webServicesInfo); IPropertyTree *s = p->getPropTree("WebServicesInfo"); if (s) webServicesInfo.setown(new CLocalWUWebServicesInfo(s)); webServicesInfoCached = true; } return webServicesInfo.getLink(); } IWUWebServicesInfo* CLocalWorkUnit::updateWebServicesInfo(bool create) { // For this to be legally called, we must have the write-able interface. So we are already locked for write. CriticalBlock block(crit); if (!webServicesInfoCached) { IPropertyTree *s = p->queryPropTree("WebServicesInfo"); if (!s) { if (create) s = p->addPropTree("WebServicesInfo", createPTreeFromXMLString("")); else return NULL; } s->Link(); webServicesInfo.setown(new CLocalWUWebServicesInfo(s)); webServicesInfoCached = true; } return webServicesInfo.getLink(); } IConstWURoxieQueryInfo* CLocalWorkUnit::getRoxieQueryInfo() const { // For this to be legally called, we must have the read-able interface. So we are already locked for (at least) read. CriticalBlock block(crit); if (!roxieQueryInfoCached) { assertex(!roxieQueryInfo); IPropertyTree *s = p->getPropTree("RoxieQueryInfo"); if (s) roxieQueryInfo.setown(new CLocalWURoxieQueryInfo(s)); roxieQueryInfoCached = true; } return roxieQueryInfo.getLink(); } IWURoxieQueryInfo* CLocalWorkUnit::updateRoxieQueryInfo(const char *wuid, const char *roxieClusterName) { // For this to be legally called, we must have the write-able interface. So we are already locked for write. CriticalBlock block(crit); if (!roxieQueryInfoCached) { IPropertyTree *s = p->queryPropTree("RoxieQueryInfo"); if (!s) s = p->addPropTree("RoxieQueryInfo", createPTreeFromXMLString("")); if (wuid && *wuid) s->addProp("@wuid", wuid); if (roxieClusterName && *roxieClusterName) s->addProp("@roxieClusterName", roxieClusterName); s->Link(); roxieQueryInfo.setown(new CLocalWURoxieQueryInfo(s)); roxieQueryInfoCached = true; } return roxieQueryInfo.getLink(); } static int compareResults(IInterface **ll, IInterface **rr) { CLocalWUResult *l = (CLocalWUResult *) *ll; CLocalWUResult *r = (CLocalWUResult *) *rr; return l->getResultSequence() - r->getResultSequence(); } void CLocalWorkUnit::loadResults() const { CriticalBlock block(crit); if (!resultsCached) { assertex(results.length() == 0); Owned r = p->getElements("Results/Result"); for (r->first(); r->isValid(); r->next()) { IPropertyTree *rp = &r->query(); rp->Link(); results.append(*new CLocalWUResult(rp)); } results.sort(compareResults); resultsCached = true; } } void CLocalWorkUnit::loadVariables() const { CriticalBlock block(crit); if (!variablesCached) { assertex(variables.length() == 0); Owned r = p->getElements("Variables/Variable"); for (r->first(); r->isValid(); r->next()) { IPropertyTree *rp = &r->query(); rp->Link(); variables.append(*new CLocalWUResult(rp)); } variablesCached = true; } } void CLocalWorkUnit::loadTemporaries() const { CriticalBlock block(crit); if (!temporariesCached) { assertex(temporaries.length() == 0); Owned r = p->getElements("Temporaries/Variable"); for (r->first(); r->isValid(); r->next()) { IPropertyTree *rp = &r->query(); rp->Link(); temporaries.append(*new CLocalWUResult(rp)); } temporariesCached = true; } } IWUResult* CLocalWorkUnit::createResult() { CriticalBlock block(crit); // For this to be legally called, we must have the write-able interface. So we are already locked for write. loadResults(); if (!results.length()) p->addPropTree("Results", createPTree("Results")); IPropertyTree *r = p->queryPropTree("Results"); IPropertyTree *s = r->addPropTree("Result", createPTreeFromXMLString("")); s->Link(); IWUResult* q = new CLocalWUResult(s); q->Link(); results.append(*q); return q; } IWUResult* CLocalWorkUnit::updateResultByName(const char *qname) { CriticalBlock block(crit); IConstWUResult *existing = getResultByName(qname); if (existing) return (IWUResult *) existing; IWUResult* q = createResult(); q->setResultName(qname); return q; } IWUResult* CLocalWorkUnit::updateResultBySequence(unsigned seq) { CriticalBlock block(crit); IConstWUResult *existing = getResultBySequence(seq); if (existing) return (IWUResult *) existing; IWUResult* q = createResult(); q->setResultSequence(seq); return q; } IConstWUResultIterator& CLocalWorkUnit::getResults() const { CriticalBlock block(crit); loadResults(); return *new CArrayIteratorOf (results, 0, (IConstWorkUnit *) this); } IConstWUResult* CLocalWorkUnit::getResultByName(const char *qname) const { CriticalBlock block(crit); loadResults(); ForEachItemIn(idx, results) { SCMStringBuffer name; IConstWUResult &cur = results.item(idx); cur.getResultName(name); if (stricmp(name.str(), qname)==0) { cur.Link(); return &cur; } } return NULL; } IConstWUResult* CLocalWorkUnit::getResultBySequence(unsigned seq) const { CriticalBlock block(crit); loadResults(); ForEachItemIn(idx, results) { IConstWUResult &cur = results.item(idx); if (cur.getResultSequence() == seq) { cur.Link(); return &cur; } } return NULL; } IConstWUResultIterator& CLocalWorkUnit::getVariables() const { CriticalBlock block(crit); loadVariables(); return *new CArrayIteratorOf (variables, 0, (IConstWorkUnit *) this); } IConstWUResult* CLocalWorkUnit::getGlobalByName(const char *qname) const { CriticalBlock block(crit); if (strcmp(p->queryName(), GLOBAL_WORKUNIT)==0) return getVariableByName(qname); Owned global = factory->openWorkUnit(GLOBAL_WORKUNIT, false); if (!global) global.setown(factory->createWorkUnit(NULL, NULL, NULL)); return global->getVariableByName(qname); } IWUResult* CLocalWorkUnit::updateGlobalByName(const char *qname) { CriticalBlock block(crit); if (strcmp(p->queryName(), GLOBAL_WORKUNIT)==0) return updateVariableByName(qname); Owned global = factory->ensureNamedWorkUnit(GLOBAL_WORKUNIT); return global->updateVariableByName(qname); } IConstWUResult* CLocalWorkUnit::getVariableByName(const char *qname) const { CriticalBlock block(crit); loadVariables(); ForEachItemIn(idx, variables) { SCMStringBuffer name; IConstWUResult &cur = variables.item(idx); cur.getResultName(name); if (stricmp(name.str(), qname)==0) { cur.Link(); return &cur; } } return NULL; } IConstWUResult* CLocalWorkUnit::getTemporaryByName(const char *qname) const { CriticalBlock block(crit); loadTemporaries(); ForEachItemIn(idx, temporaries) { SCMStringBuffer name; IConstWUResult &cur = temporaries.item(idx); cur.getResultName(name); if (stricmp(name.str(), qname)==0) { cur.Link(); return &cur; } } return NULL; } IConstWUResultIterator& CLocalWorkUnit::getTemporaries() const { CriticalBlock block(crit); loadTemporaries(); return *new CArrayIteratorOf (temporaries, 0, (IConstWorkUnit *) this); } IWUResult* CLocalWorkUnit::updateTemporaryByName(const char *qname) { CriticalBlock block(crit); IConstWUResult *existing = getTemporaryByName(qname); if (existing) return (IWUResult *) existing; if (!temporaries.length()) p->addPropTree("Temporaries", createPTree("Temporaries")); IPropertyTree *vars = p->queryPropTree("Temporaries"); IPropertyTree *s = vars->addPropTree("Variable", createPTree("Variable")); s->Link(); IWUResult* q = new CLocalWUResult(s); q->Link(); temporaries.append(*q); q->setResultName(qname); return q; } IWUResult* CLocalWorkUnit::updateVariableByName(const char *qname) { CriticalBlock block(crit); IConstWUResult *existing = getVariableByName(qname); if (existing) return (IWUResult *) existing; if (!variables.length()) p->addPropTree("Variables", createPTree("Variables")); IPropertyTree *vars = p->queryPropTree("Variables"); IPropertyTree *s = vars->addPropTree("Variable", createPTree("Variable")); s->Link(); IWUResult* q = new CLocalWUResult(s); q->Link(); variables.append(*q); q->setResultName(qname); return q; } void CLocalWorkUnit::deleteTempFiles(const char *graph, bool deleteOwned, bool deleteJobOwned) { CriticalBlock block(crit); IPropertyTree *files = p->queryPropTree("Files"); if (!files) return; Owned iter = files->getElements("File"); ICopyArrayOf toRemove; ForEach (*iter) { IPropertyTree &file = iter->query(); WUFileKind fileKind = (WUFileKind) file.getPropInt("@kind", WUFileStandard); if(file.getPropBool("@temporary")) fileKind = WUFileTemporary; // @temporary, legacy check bool needDelete; switch(fileKind) { case WUFileTemporary: if(graph==NULL) needDelete = true; else { const char *graphOwner = file.queryProp("@graph"); needDelete = ((graphOwner==NULL) || (strcmp(graph, graphOwner)==0)); } break; case WUFileJobOwned: needDelete = ((graph==NULL) && deleteJobOwned); break; case WUFileOwned: needDelete = ((graph==NULL) && deleteOwned); break; default: needDelete = false; } if(needDelete) { const char *name = file.queryProp("@name"); LOG(MCdebugProgress, unknownJob, "Removing workunit file %s from DFS", name); Owned userDesc = getUserDescriptor(); queryDistributedFileDirectory().removePhysical(name, 0, NULL, NULL, userDesc); toRemove.append(file); } } ForEachItemIn(r, toRemove) files->removeTree(&toRemove.item(r)); } static void _noteFileRead(IDistributedFile *file, IPropertyTree *filesRead) { IDistributedSuperFile *super = file->querySuperFile(); StringBuffer fname; file->getLogicalName(fname); StringBuffer path("File[@name=\""); path.append(fname).append("\"]"); IPropertyTree *fileTree = filesRead->queryPropTree(path.str()); if (fileTree) fileTree->setPropInt("@useCount", fileTree->getPropInt("@useCount")+1); else { StringBuffer cluster; file->getClusterName(0,cluster); fileTree = createPTree(); fileTree->setProp("@name", fname.str()); fileTree->setProp("@cluster", cluster.str()); fileTree->setPropInt("@useCount", 1); fileTree = filesRead->addPropTree("File", fileTree); } if (super) { Owned iter = super->getSubFileIterator(false); ForEach (*iter) { IDistributedFile &file = iter->query(); StringBuffer fname; file.getLogicalName(fname); Owned subfile = createPTree(); subfile->setProp("@name", fname.str()); fileTree->addPropTree("Subfile", subfile.getClear()); _noteFileRead(&file, filesRead); } } } void CLocalWorkUnit::noteFileRead(IDistributedFile *file) { CriticalBlock block(crit); IPropertyTree *files = p->queryPropTree("FilesRead"); if (!files) files = p->addPropTree("FilesRead", createPTree()); _noteFileRead(file, files); } static void addFile(IPropertyTree *files, const char *fileName, const char *cluster, unsigned usageCount, WUFileKind fileKind, const char *graphOwner) { StringBuffer path("File[@name=\""); path.append(fileName).append("\"]"); if (cluster) path.append("[@cluster=\"").append(cluster).append("\"]"); IPropertyTree *file = files->queryPropTree(path.str()); if (file) files->removeTree(file); file = createPTree(); file->setProp("@name", fileName); if (cluster) file->setProp("@cluster", cluster); if (graphOwner) file->setProp("@graph", graphOwner); file->setPropInt("@kind", (unsigned)fileKind); if (WUFileTemporary == fileKind) file->setPropInt("@usageCount", usageCount); files->addPropTree("File", file); } void CLocalWorkUnit::addFile(const char *fileName, StringArray *clusters, unsigned usageCount, WUFileKind fileKind, const char *graphOwner) { CriticalBlock block(crit); IPropertyTree *files = p->queryPropTree("Files"); if (!files) files = p->addPropTree("Files", createPTree()); if (!clusters) addFile(fileName, NULL, usageCount, fileKind, graphOwner); else { ForEachItemIn(c, *clusters) ::addFile(files, fileName, clusters->item(c), usageCount, fileKind, graphOwner); } } void CLocalWorkUnit::releaseFile(const char *fileName) { StringBuffer path("File[@name=\""); path.append(fileName).append("\"]"); CriticalBlock block(crit); IPropertyTree *files = p->queryPropTree("Files"); if (!files) return; Owned fiter = files->getElements(path.str()); ForEach (*fiter) { IPropertyTree *file = &fiter->query(); unsigned usageCount = file->getPropInt("@usageCount"); if (usageCount > 1) file->setPropInt("@usageCount", usageCount-1); else { StringAttr name(file->queryProp("@name")); files->removeTree(file); if (!name.isEmpty()&&(1 == usageCount)) { Owned userDesc = getUserDescriptor(); if (queryDistributedFileDirectory().removePhysical(fileName, 0, NULL, NULL, userDesc)) LOG(MCdebugProgress, unknownJob, "Removed (released) file %s from DFS", name.get()); } } } } void CLocalWorkUnit::clearGraphProgress() { CConstGraphProgress::deleteWuidProgress(p->queryName()); } void CLocalWorkUnit::resetBeforeGeneration() { CriticalBlock block(crit); //Remove all associated files Owned q = updateQuery(); q->removeAssociatedFiles(); //Remove any pre-existing workflow information workflowIterator.clear(); p->removeProp("Workflow"); } unsigned CLocalWorkUnit::queryFileUsage(const char *fileName) const { StringBuffer path("Files/File[@name=\""); path.append(fileName).append("\"]/@usageCount"); CriticalBlock block(crit); return p->getPropInt(path.str()); } IPropertyTree *CLocalWorkUnit::getDiskUsageStats() { return p->getPropTree("DiskUsageStats"); } void CLocalWorkUnit::addDiskUsageStats(__int64 _avgNodeUsage, unsigned _minNode, __int64 _minNodeUsage, unsigned _maxNode, __int64 _maxNodeUsage, __int64 _graphId) { IPropertyTree *stats = p->queryPropTree("DiskUsageStats"); offset_t maxNodeUsage; if (stats) maxNodeUsage = stats->getPropInt64("@maxNodeUsage"); else { stats = p->addPropTree("DiskUsageStats", createPTree()); maxNodeUsage = 0; } if ((offset_t)_maxNodeUsage > maxNodeUsage) { // record all details at time of max node usage. stats->setPropInt("@minNode", _minNode); stats->setPropInt("@maxNode", _maxNode); stats->setPropInt64("@minNodeUsage", _minNodeUsage); stats->setPropInt64("@maxNodeUsage", _maxNodeUsage); stats->setPropInt64("@graphId", _graphId); if (_avgNodeUsage) { unsigned _skewHi = (unsigned)((100 * (_maxNodeUsage-_avgNodeUsage))/_avgNodeUsage); unsigned _skewLo = (unsigned)((100 * (_avgNodeUsage-_minNodeUsage))/_avgNodeUsage); stats->setPropInt("@skewHi", _skewHi); stats->setPropInt("@skewLo", _skewLo); } } } IPropertyTreeIterator & CLocalWorkUnit::getFileIterator() const { CriticalBlock block(crit); return * p->getElements("Files/File"); } IPropertyTreeIterator & CLocalWorkUnit::getFilesReadIterator() const { CriticalBlock block(crit); return * p->getElements("FilesRead/File"); } //================================================================================================= IWUActivity * CLocalWorkUnit::updateActivity(__int64 id) { CriticalBlock block(crit); IConstWUActivity *existing = getActivity(id); if (existing) return (IWUActivity *) existing; if (!activities.length()) p->addPropTree("Activities", createPTree("Activities")); IPropertyTree *pl = p->queryPropTree("Activities"); IPropertyTree *s = pl->addPropTree("Activity", createPTree("Activity")); IWUActivity * q = new CLocalWUActivity(LINK(s), id); activities.append(*LINK(q)); return q; } IConstWUActivity * CLocalWorkUnit::getActivity(__int64 id) const { CriticalBlock block(crit); loadActivities(); ForEachItemIn(idx, activities) { IConstWUActivity &cur = activities.item(idx); if (cur.getId() == id) return &OLINK(cur); } return NULL; } void CLocalWorkUnit::loadActivities() const { CriticalBlock block(crit); if (!activitiesCached) { assertex(activities.length() == 0); Owned r = p->getElements("Activities/Activity"); for (r->first(); r->isValid(); r->next()) { IPropertyTree *rp = &r->query(); rp->Link(); activities.append(*new CLocalWUActivity(rp)); } activitiesCached = true; } } IConstWUActivityIterator& CLocalWorkUnit::getActivities() const { CriticalBlock block(crit); loadActivities(); return *new CArrayIteratorOf (activities, 0, (IConstWorkUnit *) this); } //================================================================================================= bool CLocalWorkUnit::switchThorQueue(const char *cluster, IQueueSwitcher *qs) { CriticalBlock block(crit); if (qs->isAuto()&&!getAllowAutoQueueSwitch()) return false; Owned newci = getTargetClusterInfo(cluster); if (!newci) return false; StringBuffer currentcluster; if (!p->getProp("@clusterName",currentcluster)) return false; Owned curci = getTargetClusterInfo(currentcluster.str()); if (!curci) return false; SCMStringBuffer curqname; curci->getThorQueue(curqname); const char *wuid = p->queryName(); void *qi = qs->getQ(curqname.str(),wuid); if (!qi) return false; setClusterName(cluster); SCMStringBuffer newqname; newci->getThorQueue(newqname); qs->putQ(newqname.str(),wuid,qi); return true; } //================================================================================================= void CLocalWorkUnit::loadGraphs() const { CriticalBlock block(crit); if (!cachedGraphs.get()) { MemoryBuffer buf; IPropertyTree *t = p->queryPropTree("PackedGraphs"); if (t&&t->getPropBin(NULL,buf)) { cachedGraphs.setown(createPTree(buf)); } else cachedGraphs.set(p->queryPropTree("Graphs")); if (cachedGraphs.get()) { Owned r = cachedGraphs->getElements("Graph"); for (r->first(); r->isValid(); r->next()) { IPropertyTree *rp = &r->query(); rp->Link(); graphs.append(*new CLocalWUGraph(rp, p->queryName())); } } } } mapEnums graphTypes[] = { { GraphTypeAny, "unknown" }, { GraphTypeProgress, "progress" }, { GraphTypeEcl, "ECL" }, { GraphTypeActivities, "activities" }, { GraphTypeSubProgress, "subgraph" }, { GraphTypeSize, NULL }, }; CLocalWUGraph::CLocalWUGraph(IPropertyTree *props, const char *_wuid) : p(props), wuid(_wuid) { } IStringVal& CLocalWUGraph::getName(IStringVal &str) const { str.set(p->queryProp("@name")); return str; } IStringVal& CLocalWUGraph::getLabel(IStringVal &str) const { Owned xgmml = getXGMMLTree(false); str.set(xgmml->queryProp("@label")); return str; } IStringVal& CLocalWUGraph::getXGMML(IStringVal &str, bool mergeProgress) const { Owned xgmml = getXGMMLTree(mergeProgress); if (xgmml) { StringBuffer x; toXML(xgmml, x); str.set(x.str()); } return str; } unsigned CLocalWorkUnit::getGraphCount() const { CriticalBlock block(crit); if (p->hasProp("Graphs")) { return p->queryPropTree("Graphs")->numChildren(); } return 0; } unsigned CLocalWorkUnit::getSourceFileCount() const { CriticalBlock block(crit); if (p->hasProp("FilesRead")) { return p->queryPropTree("FilesRead")->numChildren(); } return 0; } unsigned CLocalWorkUnit::getResultCount() const { CriticalBlock block(crit); if (p->hasProp("Results")) { return p->queryPropTree("Results")->numChildren(); } return 0; } unsigned CLocalWorkUnit::getVariableCount() const { CriticalBlock block(crit); if (p->hasProp("Variables")) { return p->queryPropTree("Variables")->numChildren(); } return 0; } unsigned CLocalWorkUnit::getTimerCount() const { CriticalBlock block(crit); if (p->hasProp("Timings")) { return p->queryPropTree("Timings")->numChildren(); } return 0; } unsigned CLocalWorkUnit::getApplicationValueCount() const { CriticalBlock block(crit); if (p->hasProp("Application")) { return p->queryPropTree("Application")->numChildren(); } return 0; } IStringVal &CLocalWorkUnit::getXmlParams(IStringVal &str) const { CriticalBlock block(crit); IPropertyTree *paramTree = p->queryPropTree("Parameters"); if (paramTree) { StringBuffer temp; toXML(paramTree, temp); str.set(temp.str()); } return str; } const IPropertyTree *CLocalWorkUnit::getXmlParams() const { CriticalBlock block(crit); return p->getPropTree("Parameters"); } void CLocalWorkUnit::setXmlParams(const char *params) { CriticalBlock block(crit); p->setPropTree("Parameters", createPTreeFromXMLString(params)); } void CLocalWorkUnit::setXmlParams(IPropertyTree *tree) { CriticalBlock block(crit); p->setPropTree("Parameters", tree); } unsigned __int64 CLocalWorkUnit::getHash() const { CriticalBlock block(crit); return p->getPropInt64("@hash"); } void CLocalWorkUnit::setHash(unsigned __int64 hash) { CriticalBlock block(crit); p->setPropInt64("@hash", hash); } IConstWUGraphIterator& CLocalWorkUnit::getGraphs(WUGraphType type) const { CriticalBlock block(crit); loadGraphs(); IConstWUGraphIterator *giter = new CArrayIteratorOf (graphs, 0, (IConstWorkUnit *) this); if (type!=GraphTypeAny) { class CConstWUGraphIterator: public CInterface, implements IConstWUGraphIterator { WUGraphType type; Owned base; bool match() { return base->query().getType()==type; } public: IMPLEMENT_IINTERFACE; CConstWUGraphIterator(IConstWUGraphIterator *_base,WUGraphType _type) : base(_base) { type = _type; } bool first() { if (!base->first()) return false; if (match()) return true; return next(); } bool next() { while (base->next()) if (match()) return true; return false; } virtual bool isValid() { return base->isValid(); } IConstWUGraph & query() { return base->query(); } }; giter = new CConstWUGraphIterator(giter,type); } return *giter; } IConstWUGraph* CLocalWorkUnit::getGraph(const char *qname) const { CriticalBlock block(crit); loadGraphs(); ForEachItemIn(idx, graphs) { SCMStringBuffer name; IConstWUGraph &cur = graphs.item(idx); cur.getName(name); if (stricmp(name.str(), qname)==0) { cur.Link(); return &cur; } } return NULL; } IWUGraph* CLocalWorkUnit::createGraph() { // For this to be legally called, we must have the write-able interface. So we are already locked for write. CriticalBlock block(crit); ensureGraphsUnpacked(); loadGraphs(); if (!graphs.length()) p->addPropTree("Graphs", createPTree("Graphs")); IPropertyTree *r = p->queryPropTree("Graphs"); IPropertyTree *s = r->addPropTree("Graph", createPTreeFromXMLString("")); s->Link(); IWUGraph* q = new CLocalWUGraph(s, p->queryName()); q->Link(); graphs.append(*q); return q; } IWUGraph * CLocalWorkUnit::updateGraph(const char * name) { CriticalBlock block(crit); ensureGraphsUnpacked(); IConstWUGraph *existing = getGraph(name); if (existing) return (IWUGraph *) existing; IWUGraph * q = createGraph(); q->setName(name); return q; } IConstWUGraphProgress *CLocalWorkUnit::getGraphProgress(const char *name) const { CriticalBlock block(crit); return new CConstGraphProgress(p->queryName(), name); } IStringVal& CLocalWUGraph::getDOT(IStringVal &str) const { UNIMPLEMENTED; } void CLocalWUGraph::setName(const char *str) { p->setProp("@name", str); progress.clear(); progress.setown(new CConstGraphProgress(wuid, str)); } void CLocalWUGraph::setXGMML(const char *str) { setXGMMLTree(createPTreeFromXMLString(str)); } void CLocalWUGraph::setXGMMLTree(IPropertyTree *graph) { assertex(strcmp(graph->queryName(), "graph")==0); IPropertyTree *xgmml = createPTree("xgmml"); xgmml->setPropTree("graph", graph); p->setPropTree("xgmml", xgmml); } void CLocalWUGraph::mergeProgress(IPropertyTree &rootNode, IPropertyTree &progressTree, const unsigned &progressV) const { IPropertyTree *graphNode = rootNode.queryPropTree("att/graph"); if (!graphNode) return; unsigned nodeId = rootNode.getPropInt("@id"); StringBuffer progressNodePath("node[@id=\""); progressNodePath.append(nodeId).append("\"]"); IPropertyTree *progressNode = progressTree.queryPropTree(progressNodePath.str()); if (progressNode) { Owned edges = progressNode->getElements("edge"); ForEach (*edges) { IPropertyTree &edge = edges->query(); StringBuffer edgePath("edge[@id=\""); edgePath.append(edge.queryProp("@id")).append("\"]"); IPropertyTree *graphEdge = graphNode->queryPropTree(edgePath.str()); if (graphEdge) { if (progressV < 1) mergePTree(graphEdge, &edge); else { // must translate to XGMML format Owned aIter = edge.getAttributes(); ForEach (*aIter) { const char *aName = aIter->queryName()+1; if (0 != stricmp("id", aName)) // "id" reserved. { IPropertyTree *att = graphEdge->addPropTree("att", createPTree()); att->setProp("@name", aName); att->setProp("@value", aIter->queryValue()); } } // This is really only here, so that our progress format can use non-attribute values, which have different efficiency qualifies (e.g. can be external by dali) Owned iter = edge.getElements("*"); ForEach (*iter) { IPropertyTree &t = iter->query(); IPropertyTree *att = graphEdge->addPropTree("att", createPTree()); att->setProp("@name", t.queryName()); att->setProp("@value", t.queryProp(NULL)); } } } } Owned nodes = progressNode->getElements("node"); ForEach (*nodes) { IPropertyTree &node = nodes->query(); StringBuffer nodePath("node[@id=\""); nodePath.append(node.queryProp("@id")).append("\"]"); IPropertyTree *_node = graphNode->queryPropTree(nodePath.str()); if (_node) { if (progressV < 1) mergePTree(_node, &node); else { // must translate to XGMML format Owned aIter = node.getAttributes(); ForEach (*aIter) { const char *aName = aIter->queryName()+1; if (0 != stricmp("id", aName)) // "id" reserved. { IPropertyTree *att = _node->addPropTree("att", createPTree()); att->setProp("@name", aName); att->setProp("@value", aIter->queryValue()); } } } } } } Owned iter = graphNode->getElements("node"); ForEach (*iter) mergeProgress(iter->query(), progressTree, progressV); } IPropertyTree * CLocalWUGraph::getXGMMLTree(bool doMergeProgress) const { if (!doMergeProgress) return p->getPropTree("xgmml/graph"); else { IPropertyTree *src = p->queryPropTree("xgmml/graph"); if (!src) return NULL; Owned copy = createPTreeFromIPT(src); Owned _progress; if (progress) _progress.set(progress); else _progress.setown(new CConstGraphProgress(wuid, p->queryProp("@name"))); unsigned progressV = _progress->queryFormatVersion(); IPropertyTree *progressTree = _progress->queryProgressTree(); Owned nodeIterator = copy->getElements("node"); ForEach (*nodeIterator) mergeProgress(nodeIterator->query(), *progressTree, progressV); return LINK(copy); } } bool CLocalWUGraph::isValid() const { return p->hasProp("xgmml/graph/node"); } WUGraphType CLocalWUGraph::getType() const { return (WUGraphType) getEnum(p, "@type", graphTypes); } IStringVal & CLocalWUGraph::getTypeName(IStringVal &str) const { str.set(p->queryProp("@type")); if (!str.length()) str.set("unknown"); return str; } void CLocalWUGraph::setType(WUGraphType _type) { setEnum(p, "@type", _type, graphTypes); } //================================================================================================= mapEnums queryFileTypes[] = { { FileTypeCpp, "cpp" }, { FileTypeDll, "dll" }, { FileTypeResText, "res" }, { FileTypeHintXml, "hint" }, { FileTypeSize, NULL }, }; CLocalWUAssociated::CLocalWUAssociated(IPropertyTree *props) : p(props) { } WUFileType CLocalWUAssociated::getType() const { return (WUFileType)getEnum(p, "@type", queryFileTypes); } IStringVal & CLocalWUAssociated::getDescription(IStringVal & str) const { str.set(p->queryProp("@desc")); return str; } IStringVal & CLocalWUAssociated::getIp(IStringVal & str) const { str.set(p->queryProp("@ip")); return str; } IStringVal & CLocalWUAssociated::getName(IStringVal & str) const { str.set(p->queryProp("@filename")); return str; } IStringVal & CLocalWUAssociated::getNameTail(IStringVal & str) const { str.set(pathTail(p->queryProp("@filename"))); return str; } unsigned CLocalWUAssociated::getCrc() const { return p->getPropInt("@crc", 0); } //================================================================================================= CLocalWUQuery::CLocalWUQuery(IPropertyTree *props) : p(props) { associatedCached = false; } mapEnums queryTypes[] = { { QueryTypeUnknown, "unknown" }, { QueryTypeEcl, "ECL" }, { QueryTypeSql, "SQL" }, { QueryTypeXml, "XML" }, { QueryTypeAttribute, "Attribute" }, { QueryTypeSize, NULL }, }; WUQueryType CLocalWUQuery::getQueryType() const { return (WUQueryType) getEnum(p, "@type", queryTypes); } void CLocalWUQuery::setQueryType(WUQueryType qt) { setEnum(p, "@type", qt, queryTypes); } IStringVal& CLocalWUQuery::getQueryText(IStringVal &str) const { str.set(p->queryProp("Text")); return str; } IStringVal& CLocalWUQuery::getQueryShortText(IStringVal &str) const { const char * text = p->queryProp("Text"); if (isArchiveQuery(text)) { Owned xml = createPTreeFromXMLString(text, ipt_caseInsensitive); const char * path = xml->queryProp("Query/@attributePath"); if (path) { IPropertyTree * resolved = resolveDefinitionInArchive(xml, path); if (resolved) str.set(resolved->queryProp(NULL)); } else str.set(xml->queryProp("Query")); } else str.set(text); return str; } IStringVal& CLocalWUQuery::getQueryName(IStringVal &str) const { str.set(p->queryProp("@name")); return str; } IStringVal& CLocalWUQuery::getQueryDllName(IStringVal &str) const { Owned entry = getAssociatedFile(FileTypeDll, 0); if (entry) entry->getNameTail(str); return str; } IStringVal& CLocalWUQuery::getQueryCppName(IStringVal &str) const { Owned entry = getAssociatedFile(FileTypeCpp, 0); if (entry) entry->getName(str); return str; } IStringVal& CLocalWUQuery::getQueryResTxtName(IStringVal &str) const { Owned entry = getAssociatedFile(FileTypeResText, 0); if (entry) entry->getName(str); return str; } unsigned CLocalWUQuery::getQueryDllCrc() const { Owned entry = getAssociatedFile(FileTypeDll, 0); if (entry) return entry->getCrc(); return 0; } void CLocalWUQuery::setQueryText(const char *text) { p->setProp("Text", text); } void CLocalWUQuery::setQueryName(const char *qname) { p->setProp("@name", qname); } void CLocalWUQuery::addAssociatedFile(WUFileType type, const char * name, const char * ip, const char * desc, unsigned crc) { CriticalBlock block(crit); loadAssociated(); if (!associated.length()) p->addPropTree("Associated", createPTree("Associated")); IPropertyTree *pl = p->queryPropTree("Associated"); IPropertyTree *s = pl->addPropTree("File", createPTree("File")); setEnum(s, "@type", type, queryFileTypes); s->setProp("@filename", name); s->setProp("@ip", ip); s->setProp("@desc", desc); if (crc) s->setPropInt("@crc", crc); IConstWUAssociatedFile * q = new CLocalWUAssociated(LINK(s)); associated.append(*q); } void CLocalWUQuery::removeAssociatedFiles() { associatedCached = false; associated.kill(); p->removeProp("Associated"); } IConstWUAssociatedFile * CLocalWUQuery::getAssociatedFile(WUFileType type, unsigned index) const { CriticalBlock block(crit); loadAssociated(); ForEachItemIn(idx, associated) { CLocalWUAssociated &cur = static_cast(associated.item(idx)); if (cur.getType() == type) { if (index-- == 0) return &OLINK(cur); } } return NULL; } void CLocalWUQuery::addSpecialCaseAssociated(WUFileType type, const char * propname, unsigned crc) const { const char * name = p->queryProp(propname); if (name) { IPropertyTree *s = createPTree("File"); setEnum(s, "@type", type, queryFileTypes); s->setProp("@filename", name); if (crc) s->setPropInt("@crc", crc); associated.append(*new CLocalWUAssociated(s)); } } void CLocalWUQuery::loadAssociated() const { CriticalBlock block(crit); if (!associatedCached) { assertex(associated.length() == 0); addSpecialCaseAssociated(FileTypeDll, "DllName", p->getPropInt("DllCrc", 0)); addSpecialCaseAssociated(FileTypeCpp, "CppName", 0); addSpecialCaseAssociated(FileTypeResText, "ResTxtName", 0); Owned r = p->getElements("Associated/File"); for (r->first(); r->isValid(); r->next()) { IPropertyTree *rp = &r->query(); rp->Link(); associated.append(*new CLocalWUAssociated(rp)); } associatedCached = true; } } IConstWUAssociatedFileIterator& CLocalWUQuery::getAssociatedFiles() const { CriticalBlock block(crit); loadAssociated(); return *new CArrayIteratorOf (associated, 0, (IConstWUQuery *) this); } //======================================================================================== CLocalWUWebServicesInfo::CLocalWUWebServicesInfo(IPropertyTree *props) : p(props) { } IStringVal& CLocalWUWebServicesInfo::getModuleName(IStringVal &str) const { str.set(p->queryProp("@module")); return str; } IStringVal& CLocalWUWebServicesInfo::getAttributeName(IStringVal &str) const { str.set(p->queryProp("@attribute")); return str; } IStringVal& CLocalWUWebServicesInfo::getDefaultName(IStringVal &str) const { str.set(p->queryProp("@defaultName")); return str; } unsigned CLocalWUWebServicesInfo::getWebServicesCRC() const { return (unsigned) p->getPropInt("@crc"); } IStringVal& CLocalWUWebServicesInfo::getInfo(const char *name, IStringVal &str) const { if (!name) { StringBuffer ws_info; ws_info.appendf("<%s ", p->queryName()); Owned attrs = p->getAttributes(); for(attrs->first(); attrs->isValid(); attrs->next()) { const char *name = attrs->queryName()+1; const char *value = attrs->queryValue(); ws_info.appendf("%s='%s' ", name, value); } ws_info.append("> \n"); Owned info = p->getElements("*"); ForEach(*info) { IPropertyTree &item = info->query(); const char *name = item.queryName(); if (name) { MemoryBuffer mb; bool isbin = p->isBinary(name); if (isbin) { p->getPropBin(name,mb); if (mb.length()) { unsigned len = 0; mb.read(len); StringBuffer encodedString; StringBuffer val(len, (const char *) mb.readDirect(len)); encodeXML(val, encodedString); ws_info.appendf("<%s>%s", name, encodedString.str(), name); } } else { StringBuffer tmp; toXML(&item, tmp); ws_info.append(tmp.str()); } } } ws_info.appendf("", p->queryName()); str.setLen(ws_info.str(), ws_info.length()); } else { MemoryBuffer mb; p->getPropBin(name,mb); if (mb.length()) { unsigned len; mb.read(len); str.setLen((const char *) mb.readDirect(len), len); } } return str; } void CLocalWUWebServicesInfo::setModuleName(const char *mname) { p->setProp("@module", mname); } void CLocalWUWebServicesInfo::setAttributeName(const char *aname) { p->setProp("@attribute", aname); } void CLocalWUWebServicesInfo::setDefaultName(const char *dname) { p->setProp("@defaultName", dname); } void CLocalWUWebServicesInfo::setWebServicesCRC(unsigned crc) { p->setPropInt("@crc", crc); } void CLocalWUWebServicesInfo::setInfo(const char *name, const char *info) { MemoryBuffer m; unsigned len = (size32_t)strlen(info); serializeLPString(len, info, m); p->setPropBin(name, m.length(), m.toByteArray()); } //======================================================================================== CLocalWURoxieQueryInfo::CLocalWURoxieQueryInfo(IPropertyTree *props) : p(props) { } IStringVal& CLocalWURoxieQueryInfo::getQueryInfo(IStringVal &str) const { IPropertyTree *queryTree = p->queryPropTree("query"); if (queryTree) { StringBuffer temp; toXML(queryTree, temp); str.set(temp.str()); } return str; } IStringVal& CLocalWURoxieQueryInfo::getDefaultPackageInfo(IStringVal &str) const { MemoryBuffer mb; p->getPropBin("RoxiePackages",mb); if (mb.length()) { unsigned len; mb.read(len); str.setLen((const char *) mb.readDirect(len), len); } return str; } IStringVal& CLocalWURoxieQueryInfo::getRoxieClusterName(IStringVal &str) const { const char *val = p->queryProp("@roxieClusterName"); if (val) str.set(val); return str; } IStringVal& CLocalWURoxieQueryInfo::getWuid(IStringVal &str) const { const char *val = p->queryProp("@wuid"); if (val) str.set(val); return str; } void CLocalWURoxieQueryInfo::setQueryInfo(const char *info) { IPropertyTree *queryTree = p->queryPropTree("query"); if (queryTree) p->removeTree(queryTree); IPropertyTree * tempTree = p->addPropTree("query", createPTreeFromXMLString(info)); if (!p->hasProp("@roxieClusterName")) { const char *roxieClusterName = tempTree->queryProp("@roxieName"); if (roxieClusterName && *roxieClusterName) p->addProp("@roxieClusterName", roxieClusterName); } if (!p->hasProp("@wuid")) { const char *wuid = tempTree->queryProp("Query/@wuid"); if (wuid && *wuid) p->addProp("@wuid", wuid); } } void CLocalWURoxieQueryInfo::setDefaultPackageInfo(const char *info, int len) { MemoryBuffer m; serializeLPString(len, info, m); p->setPropBin("RoxiePackages", m.length(), m.toByteArray()); } void CLocalWURoxieQueryInfo::setRoxieClusterName(const char *info) { p->setProp("@roxieClusterName", info); } void CLocalWURoxieQueryInfo::setWuid(const char *info) { p->setProp("@wuid", info); } //======================================================================================== CLocalWUResult::CLocalWUResult(IPropertyTree *props) : p(props) { } mapEnums resultStatuses[] = { { ResultStatusUndefined, "undefined" }, { ResultStatusCalculated, "calculated" }, { ResultStatusSupplied, "supplied" }, { ResultStatusFailed, "failed" }, { ResultStatusPartial, "partial" }, { ResultStatusSize, NULL } }; WUResultStatus CLocalWUResult::getResultStatus() const { return (WUResultStatus ) getEnum(p, "@status", resultStatuses); } IStringVal& CLocalWUResult::getResultName(IStringVal &str) const { str.set(p->queryProp("@name")); return str; } int CLocalWUResult::getResultSequence() const { return p->getPropInt("@sequence", -1); } bool CLocalWUResult::isResultScalar() const { return p->getPropInt("@isScalar", 1) != 0; } bool findSize(int size, IntArray &sizes) { ForEachItemIn(idx, sizes) { if (sizes.item(idx)==size) return true; } return false; } void CLocalWUResult::getSchema(TypeInfoArray &types, StringAttrArray &names, IStringVal * eclText) const { MemoryBuffer schema; p->getPropBin("SchemaRaw", schema); if (schema.length()) { for (;;) { StringAttr name; schema.read(name); if (*schema.readDirect(0)==type_void) break; names.append(*new StringAttrItem(name)); types.append(*deserializeType(schema)); // MORE - nested records! } schema.skip(1); if (schema.length() != schema.getPos()) { unsigned eclLen; schema.read(eclLen); const char * schemaData = (const char *)schema.readDirect(eclLen); if (eclText) { eclText->setLen(schemaData, eclLen); if ((eclLen == 0) && names.ordinality()) { const char * firstName = names.item(0).text; StringBuffer temp; temp.append("RECORD "); types.item(0).getECLType(temp); temp.append(" value{NAMED('").append(firstName).append("')}").append("; END;"); eclText->set(temp.str()); } } } } } void readRow(StringBuffer &out, MemoryBuffer &in, TypeInfoArray &types, StringAttrArray &names) { ForEachItemIn(idx, types) { StringAttrItem &name = names.item(idx); ITypeInfo &type = types.item(idx); unsigned size = type.getSize(); switch(type.getTypeCode()) { case type_data: if (size==UNKNOWN_LENGTH) { if (in.remaining() < sizeof(int)) throw MakeStringException(WUERR_CorruptResult, "corrupt workunit information"); in.read(size); } outputXmlData(size, in.readDirect(size), name.text, out); break; case type_string: if (size==UNKNOWN_LENGTH) { if (in.remaining() < sizeof(int)) throw MakeStringException(WUERR_CorruptResult, "corrupt workunit information"); in.read(size); } outputXmlString(size, (const char *) in.readDirect(size), name.text, out); break; case type_varstring: { if (size == UNKNOWN_LENGTH) size = (size32_t)strlen((const char *) in.readDirect(0))+1; const char * text = (const char *) in.readDirect(size); outputXmlString((size32_t)strlen(text), text, name.text, out); break; } case type_unicode: { unsigned len = type.getStringLen(); if (size==UNKNOWN_LENGTH) in.read(len); outputXmlUnicode(len, (UChar const *) in.readDirect(len*2), name.text, out); } break; case type_utf8: { unsigned len = type.getStringLen(); if (size==UNKNOWN_LENGTH) { in.read(len); size = rtlUtf8Size(len, in.readDirect(0)); } outputXmlUtf8(len, (const char *) in.readDirect(size), name.text, out); } break; case type_qstring: { unsigned len = type.getStringLen(); if (size==UNKNOWN_LENGTH) in.read(len); unsigned outlen; char *outstr; rtlQStrToStrX(outlen, outstr, len, (const char *) in.readDirect(rtlQStrSize(len))); outputXmlString(outlen, outstr, name.text, out); free(outstr); break; } case type_int: case type_swapint: if (type.isSigned()) { const unsigned char *raw = (const unsigned char *) in.readDirect(size); unsigned __int64 cval8 = 0; //MORE: I think this is wrong - swapped doesn't mean little/big/ if (type.isSwappedEndian()) { unsigned idx = 0; if (raw[idx] & 0x80) cval8 = (__int64)-1; while (size--) cval8 = (cval8 << 8) | raw[idx++]; } else { if (raw[size-1] & 0x80) cval8 = (__int64)-1; while (size--) cval8 = (cval8 << 8) | raw[size]; } outputXmlInt((__int64) cval8, name.text, out); } else { const unsigned char *raw = (const unsigned char *) in.readDirect(size); unsigned __int64 cval8 = 0; if (type.isSwappedEndian()) { unsigned idx = 0; while (size--) cval8 = (cval8 << 8) | raw[idx++]; } else { while (size--) cval8 = (cval8 << 8) | raw[size]; } outputXmlUInt(cval8, name.text, out); } break; case type_boolean: bool cvalb; in.read(cvalb); outputXmlBool(cvalb, name.text, out); break; case type_decimal: if (type.isSigned()) outputXmlDecimal(in.readDirect(size), size, type.getPrecision(), name.text, out); else outputXmlUDecimal(in.readDirect(size), size, type.getPrecision(), name.text, out); break; case type_real: double cvald; switch(size) { case 4: float cvalf; in.read(cvalf); cvald = cvalf; break; case 8: in.read(cvald); break; } outputXmlReal(cvald, name.text, out); break; default: assertex(!"unexpected type in raw record"); break; } } } IStringVal& CLocalWUResult::getResultXml(IStringVal &str) const { TypeInfoArray types; StringAttrArray names; getSchema(types, names); StringBuffer xml; MemoryBuffer raw; p->getPropBin("Value", raw); const char * name = p->queryProp("@name"); if (name) xml.appendf("", name); else xml.append(""); unsigned __int64 numrows = getResultRowCount(); while (numrows--) { xml.append(""); readRow(xml, raw, types, names); xml.append(""); } xml.append(""); str.set(xml.str()); return str; } unsigned CLocalWUResult::getResultFetchSize() const { return p->getPropInt("fetchSize", 100); } __int64 CLocalWUResult::getResultTotalRowCount() const { return p->getPropInt64("totalRowCount", -1); } __int64 CLocalWUResult::getResultRowCount() const { return p->getPropInt64("rowCount", 0); } void CLocalWUResult::getResultDataset(IStringVal & ecl, IStringVal & defs) const { ecl.set(p->queryProp("datasetEcl")); defs.set(p->queryProp("datasetEclDefs")); } IStringVal& CLocalWUResult::getResultLogicalName(IStringVal & val) const { val.set(p->queryProp("logicalName")); return val; } IStringVal& CLocalWUResult::getResultKeyField(IStringVal & ecl) const { ecl.set(p->queryProp("keyField")); return ecl; } unsigned CLocalWUResult::getResultRequestedRows() const { return p->getPropInt("requestedRows", 1); } IStringVal& CLocalWUResult::getResultEclSchema(IStringVal & str) const { TypeInfoArray types; StringAttrArray names; getSchema(types, names, &str); return str; } IStringVal& CLocalWUResult::getResultRecordSizeEntry(IStringVal & str) const { str.set(p->queryProp("@recordSizeEntry")); return str; } IStringVal& CLocalWUResult::getResultTransformerEntry(IStringVal & str) const { str.set(p->queryProp("@transformerEntry")); return str; } __int64 CLocalWUResult::getResultRowLimit() const { return p->getPropInt64("@rowLimit"); } IStringVal& CLocalWUResult::getResultFilename(IStringVal & str) const { str.set(p->queryProp("@tempFilename")); return str; } void CLocalWUResult::setResultStatus(WUResultStatus status) { setEnum(p, "@status", status, resultStatuses); if (status==ResultStatusUndefined) p->removeProp("Value"); } void CLocalWUResult::setResultName(const char *s) { p->setProp("@name", s); } void CLocalWUResult::setResultSequence(unsigned seq) { p->setPropInt("@sequence", seq); } void CLocalWUResult::setResultSchemaRaw(unsigned size, const void *schema) { p->setPropBin("SchemaRaw", size, schema); } void CLocalWUResult::setResultScalar(bool isScalar) { p->setPropInt("@isScalar", (int) isScalar); if (isScalar) setResultTotalRowCount(1); } void CLocalWUResult::setResultRaw(unsigned len, const void *data, WUResultFormat format) { p->setPropBin("Value", len, data); setResultStatus(ResultStatusSupplied); setResultFormat(format); } void CLocalWUResult::setResultFormat(WUResultFormat format) { switch (format) { case ResultFormatXml: p->setProp("@format","xml"); break; case ResultFormatXmlSet: p->setProp("@format","xmlSet"); break; case ResultFormatCsv: p->setProp("@format","csv"); break; default: p->removeProp("@format"); break; } } void CLocalWUResult::setResultXML(const char *val) { p->setProp("xmlValue", val); } void CLocalWUResult::addResultRaw(unsigned len, const void *data, WUResultFormat format) { p->appendPropBin("Value", len, data); setResultStatus(ResultStatusPartial); const char *existingFormat = p->queryProp("@format"); const char *formatStr = NULL; switch (format) { case ResultFormatXml: formatStr = "xml"; break; case ResultFormatXmlSet: formatStr = "xmlSet"; break; case ResultFormatCsv: formatStr = "csv"; break; default: p->removeProp("@format"); break; } if (format) { if (existingFormat) { if (0 != stricmp(formatStr, existingFormat)) throw MakeStringException(WUERR_ResultFormatMismatch, "addResult format %s, does not match existing format %s", formatStr, existingFormat); } else p->setProp("@format", formatStr); } } void CLocalWUResult::setResultFetchSize(unsigned rows) { p->setPropInt("fetchSize", rows); } void CLocalWUResult::setResultTotalRowCount(__int64 rows) { p->setPropInt64("totalRowCount", rows); } void CLocalWUResult::setResultRowCount(__int64 rows) { p->setPropInt64("rowCount", rows); } void CLocalWUResult::setResultDataset(const char *ecl, const char *defs) { p->setProp("datasetEcl", ecl); p->setProp("datasetEclDefs", defs); } void CLocalWUResult::setResultLogicalName(const char *logicalName) { p->setProp("logicalName", logicalName); } void CLocalWUResult::setResultKeyField(const char *ecl) { p->setProp("keyField", ecl); } void CLocalWUResult::setResultRequestedRows(unsigned rows) { p->setPropInt("requestedRows", rows); } void CLocalWUResult::setResultRecordSizeEntry(const char * entry) { p->setProp("@recordSizeEntry", entry); } void CLocalWUResult::setResultTransformerEntry(const char * entry) { p->setProp("@transformerEntry", entry); } void CLocalWUResult::setResultRowLimit(__int64 value) { p->setPropInt64("@rowLimit", value); } void CLocalWUResult::setResultFilename(const char * name) { p->setProp("@tempFilename", name); } // MORE - it's an undetected error if we call getResult... of a type that does not match schema __int64 CLocalWUResult::getResultInt() const { __int64 result = 0; MemoryBuffer s; p->getPropBin("Value", s); if (s.length()) s.read(result); else result = p->getPropInt64("xmlValue"); return result; } bool CLocalWUResult::getResultBool() const { bool result = false; MemoryBuffer s; p->getPropBin("Value", s); if (s.length()) s.read(result); else result = p->getPropBool("xmlValue"); return result; } double CLocalWUResult::getResultReal() const { double result = 0; MemoryBuffer s; p->getPropBin("Value", s); if (s.length()) s.read(result); else { const char *xmlVal = p->queryProp("xmlValue"); if (xmlVal) result = atof(xmlVal); } return result; } void CLocalWUResult::getResultDecimal(void * val, unsigned len, unsigned precision, bool isSigned) const { MemoryBuffer s; p->getPropBin("Value", s); if (s.length()) { assertex(s.length() == len); s.read(len, val); } else { const char *xmlVal = p->queryProp("xmlValue"); if (xmlVal) { TempDecimal d; d.setString(strlen(xmlVal), xmlVal); if (isSigned) d.getDecimal(len, precision, val); else d.getUDecimal(len, precision, val); } else memset(val, 0, len); } } IStringVal& CLocalWUResult::getResultString(IStringVal & str) const { MemoryBuffer s; p->getPropBin("Value", s); if (s.length()) { unsigned len; s.read(len); str.setLen((const char *) s.readDirect(len), len); } else { p->getPropBin("xmlValue", s); if (p->isBinary("xmlValue")) str.setLen(s.toByteArray(), s.length()); else { char *ascii = rtlUtf8ToVStr(rtlUtf8Length(s.length(), s.toByteArray()), s.toByteArray()); str.set(ascii); rtlFree(ascii); } } return str; } WUResultFormat CLocalWUResult::getResultFormat() const { const char * format = p->queryProp("@format"); if (!format) return ResultFormatRaw; else if (strcmp(format, "xml") == 0) return ResultFormatXml; else if (strcmp(format, "xmlSet") == 0) return ResultFormatXmlSet; else if (strcmp(format, "csv") == 0) return ResultFormatCsv; else throw MakeStringException(WUERR_InvalidResultFormat, "Unrecognised result format %s", format); } IDataVal& CLocalWUResult::getResultRaw(IDataVal & data, IXmlToRawTransformer * xmlTransformer, ICsvToRawTransformer * csvTransformer) const { MemoryBuffer s; p->getPropBin("Value", s); unsigned len = s.length(); if (len) { WUResultFormat format = getResultFormat(); if (format == ResultFormatXml || format == ResultFormatXmlSet) { if (!xmlTransformer) throw MakeStringException(WUERR_MissingFormatTranslator, "No transformer supplied to translate XML format result"); xmlTransformer->transform(data, len, s.readDirect(len), format == ResultFormatXml); } else if (format == ResultFormatCsv) { if (!csvTransformer) throw MakeStringException(WUERR_MissingFormatTranslator, "No transformer supplied to translate Csv format result"); csvTransformer->transform(data, len, s.readDirect(len), true); } else data.setLen(s.readDirect(len), len); } else data.clear(); return data; } unsigned CLocalWUResult::getResultHash() const { MemoryBuffer s; p->getPropBin("Value", s); unsigned len = s.length(); const byte * data = (const byte *)s.toByteArray(); return ~hashc(data, len, ~0); } IDataVal& CLocalWUResult::getResultUnicode(IDataVal & data) const { MemoryBuffer s; p->getPropBin("Value", s); if (s.length()) { unsigned len; s.read(len); data.setLen(s.readDirect(len*2), len*2); } else { StringBuffer utf8; if (p->getProp("xmlValue", utf8)) { unsigned outlen; UChar *out; rtlUtf8ToUnicodeX(outlen, out, utf8.length(), utf8.str()); data.setLen(out, outlen*2); rtlFree(out); } else data.clear(); } return data; } __int64 CLocalWUResult::getResultRawSize(IXmlToRawTransformer * xmlTransformer, ICsvToRawTransformer * csvTransformer) const { WUResultFormat format = getResultFormat(); if (format == ResultFormatRaw) { //MORE: This should not load the whole property... MemoryBuffer s; p->getPropBin("Value", s); return s.length(); } else { MemoryBuffer temp; MemoryBuffer2IDataVal adaptor(temp); getResultRaw(adaptor, xmlTransformer, csvTransformer); return temp.length(); } } IDataVal& CLocalWUResult::getResultRaw(IDataVal & data, __int64 from, __int64 length, IXmlToRawTransformer * xmlTransformer, ICsvToRawTransformer * csvTransformer) const { WUResultFormat format = getResultFormat(); if (format != ResultFormatRaw) { MemoryBuffer temp; MemoryBuffer2IDataVal adaptor(temp); getResultRaw(adaptor, xmlTransformer, csvTransformer); unsigned len = temp.length(); if (from > len) from = len; if (from + length > len) length = len - from; data.setLen(temp.readDirect(len) + from, (size32_t)length); return data; } else { //MORE: This should not load the whole property, and should be different from the code above... MemoryBuffer s; p->getPropBin("Value", s); unsigned len = s.length(); if (from > len) from = len; if (from + length > len) length = len - from; data.setLen(s.readDirect(len) + from, (size32_t)length); return data; } } bool CLocalWUResult::getResultIsAll() const { return p->getPropBool("@isAll", false); } // MORE - it's an undetected error if we call setResult... of a type that does not match schema void CLocalWUResult::setResultInt(__int64 val) { // Note: we always serialize scalar integer results as int8, and schema must reflect this MemoryBuffer m; serializeInt8(val, m); p->setPropBin("Value", m.length(), m.toByteArray()); setResultRowCount(1); setResultTotalRowCount(1); } void CLocalWUResult::setResultUInt(unsigned __int64 val) { setResultInt((__int64) val); } void CLocalWUResult::setResultReal(double val) { // Note: we always serialize scalar real results as real8, and schema must reflect this MemoryBuffer m; serializeReal8(val, m); p->setPropBin("Value", m.length(), m.toByteArray()); setResultRowCount(1); setResultTotalRowCount(1); } void CLocalWUResult::setResultBool(bool val) { MemoryBuffer m; serializeBool(val, m); p->setPropBin("Value", m.length(), m.toByteArray()); setResultRowCount(1); setResultTotalRowCount(1); } void CLocalWUResult::setResultString(const char *val, unsigned len) { // Note: we always serialize scalar strings with length prefix, and schema must reflect this MemoryBuffer m; serializeLPString(len, val, m); p->setPropBin("Value", m.length(), m.toByteArray()); setResultRowCount(1); setResultTotalRowCount(1); } void CLocalWUResult::setResultUnicode(const void *val, unsigned len) { // Note: we always serialize scalar strings with length prefix, and schema must reflect this MemoryBuffer m; m.append(len).append(len*2, val); p->setPropBin("Value", m.length(), m.toByteArray()); setResultRowCount(1); setResultTotalRowCount(1); } void CLocalWUResult::setResultData(const void *val, unsigned len) { // Note: we always serialize scalar data with length prefix, and schema must reflect this MemoryBuffer m; serializeLPString(len, (const char *)val, m); p->setPropBin("Value", m.length(), m.toByteArray()); setResultRowCount(1); setResultTotalRowCount(1); } void CLocalWUResult::setResultDecimal(const void *val, unsigned len) { // Note: serialized as data but with length known from schema MemoryBuffer m; serializeFixedData(len, val, m); p->setPropBin("Value", m.length(), m.toByteArray()); setResultRowCount(1); setResultTotalRowCount(1); } void CLocalWUResult::setResultIsAll(bool value) { p->setPropBool("@isAll", value); } //========================================================================================== CLocalWUPlugin::CLocalWUPlugin(IPropertyTree *props) : p(props) { } IStringVal& CLocalWUPlugin::getPluginName(IStringVal &str) const { str.set(p->queryProp("@dllname")); return str; } IStringVal& CLocalWUPlugin::getPluginVersion(IStringVal &str) const { str.set(p->queryProp("@version")); return str; } bool CLocalWUPlugin::getPluginThor() const { return p->getPropInt("@thor") != 0; } bool CLocalWUPlugin::getPluginHole() const { return p->getPropInt("@hole") != 0; } void CLocalWUPlugin::setPluginName(const char *str) { p->setProp("@dllname", str); } void CLocalWUPlugin::setPluginVersion(const char *str) { p->setProp("@version", str); } void CLocalWUPlugin::setPluginThor(bool on) { p->setPropInt("@thor", (int) on); } void CLocalWUPlugin::setPluginHole(bool on) { p->setPropInt("@hole", (int) on); } //========================================================================================== class WULibraryActivityIterator : public CInterface, implements IConstWULibraryActivityIterator { public: WULibraryActivityIterator(IPropertyTree * tree) { iter.setown(tree->getElements("activity")); } IMPLEMENT_IINTERFACE; bool first() { return iter->first(); } bool isValid() { return iter->isValid(); } bool next() { return iter->next(); } unsigned query() const { return iter->query().getPropInt("@id"); } private: Owned iter; }; CLocalWULibrary::CLocalWULibrary(IPropertyTree *props) : p(props) { } IStringVal& CLocalWULibrary::getName(IStringVal &str) const { str.set(p->queryProp("@name")); return str; } IConstWULibraryActivityIterator * CLocalWULibrary::getActivities() const { return new WULibraryActivityIterator(p); } void CLocalWULibrary::setName(const char *str) { p->setProp("@name", str); } void CLocalWULibrary::addActivity(unsigned id) { StringBuffer s; s.append("activity[@id=\"").append(id).append("\"]"); if (!p->hasProp(s.str())) p->addPropTree("activity", createPTree())->setPropInt("@id", id); } //========================================================================================== CLocalWUActivity::CLocalWUActivity(IPropertyTree *props, __int64 id) : p(props) { if (id) p->setPropInt64("@id", id); } __int64 CLocalWUActivity::getId() const { return p->getPropInt64("@id"); } unsigned CLocalWUActivity::getKind() const { return (unsigned)p->getPropInt("@kind"); } IStringVal & CLocalWUActivity::getHelper(IStringVal & str) const { str.set(p->queryProp("@helper")); return str; } void CLocalWUActivity::setKind(unsigned kind) { p->setPropInt64("@kind", kind); } void CLocalWUActivity::setHelper(const char * str) { p->setProp("@helper", str); } //========================================================================================== CLocalWUException::CLocalWUException(IPropertyTree *props) : p(props) { } IStringVal& CLocalWUException::getExceptionSource(IStringVal &str) const { str.set(p->queryProp("@source")); return str; } IStringVal& CLocalWUException::getExceptionMessage(IStringVal &str) const { str.set(p->queryProp(NULL)); return str; } unsigned CLocalWUException::getExceptionCode() const { return p->getPropInt("@code", 0); } WUExceptionSeverity CLocalWUException::getSeverity() const { return (WUExceptionSeverity)p->getPropInt("@severity", ExceptionSeverityError); } IStringVal & CLocalWUException::getTimeStamp(IStringVal & dt) const { dt.set(p->queryProp("@time")); return dt; } IStringVal & CLocalWUException::getExceptionFileName(IStringVal & str) const { str.set(p->queryProp("@filename")); return str; } unsigned CLocalWUException::getExceptionLineNo() const { return p->getPropInt("@row", 0); } unsigned CLocalWUException::getExceptionColumn() const { return p->getPropInt("@col", 0); } void CLocalWUException::setExceptionSource(const char *str) { p->setProp("@source", str); } void CLocalWUException::setExceptionMessage(const char *str) { p->setProp(NULL, str); } void CLocalWUException::setExceptionCode(unsigned code) { p->setPropInt("@code", code); } void CLocalWUException::setSeverity(WUExceptionSeverity level) { p->setPropInt("@severity", level); } void CLocalWUException::setTimeStamp(const char *str) { p->setProp("@time", str); } void CLocalWUException::setExceptionFileName(const char *str) { p->setProp("@filename", str); } void CLocalWUException::setExceptionLineNo(unsigned r) { p->setPropInt("@row", r); } void CLocalWUException::setExceptionColumn(unsigned c) { p->setPropInt("@col", c); } CLocalWUTimeStamp::CLocalWUTimeStamp(IPropertyTree *props) : p(props) { } IStringVal & CLocalWUTimeStamp::getApplication(IStringVal & str) const { str.set(p->queryProp("@application")); return str; } IStringVal & CLocalWUTimeStamp::getEvent(IStringVal & str) const { IPropertyTree* evt=p->queryPropTree("*[1]"); if(evt) str.set(evt->queryName()); return str; } IStringVal & CLocalWUTimeStamp::getDate(IStringVal & str) const { str.set(p->queryProp("*[1]")); return str; } CLocalWUAppValue::CLocalWUAppValue(IPropertyTree *props,unsigned child): p(props) { prop.append("*[").append(child).append("]"); } IStringVal & CLocalWUAppValue::getApplication(IStringVal & str) const { str.set(p->queryName()); return str; } IStringVal & CLocalWUAppValue::getName(IStringVal & str) const { IPropertyTree* val=p->queryPropTree(prop.str()); if(val) str.set(val->queryName()); return str; } IStringVal & CLocalWUAppValue::getValue(IStringVal & str) const { str.set(p->queryProp(prop.str())); return str; } extern WORKUNIT_API ILocalWorkUnit * createLocalWorkUnit() { Owned cw = new CLocalWorkUnit("W_LOCAL", NULL, (ISecManager*)NULL, NULL); ILocalWorkUnit* ret = QUERYINTERFACE(&cw->lockRemote(false), ILocalWorkUnit); return ret; } StringBuffer &exportWorkUnitToXML(const IConstWorkUnit *wu, StringBuffer &str) { const CLocalWorkUnit *w = QUERYINTERFACE(wu, const CLocalWorkUnit); if (!w) { const CLockedWorkUnit *wl = QUERYINTERFACE(wu, const CLockedWorkUnit); if (wl) w = wl->c; } if (w) toXML(w->p, str, 0, XML_Format|XML_SortTags); else str.append("Unrecognized workunit format"); return str; } extern WORKUNIT_API IStringVal& exportWorkUnitToXML(const IConstWorkUnit *wu, IStringVal &str) { StringBuffer x; str.set(exportWorkUnitToXML(wu,x).str()); return str; } extern WORKUNIT_API void exportWorkUnitToXMLFile(const IConstWorkUnit *wu, const char * filename, unsigned extraXmlFlags) { const CLocalWorkUnit *w = QUERYINTERFACE(wu, const CLocalWorkUnit); if (!w) { const CLockedWorkUnit *wl = QUERYINTERFACE(wu, const CLockedWorkUnit); if (wl) w = wl->c; } if (w) saveXML(filename, w->p, 0, XML_Format|XML_SortTags|extraXmlFlags); } extern WORKUNIT_API void submitWorkUnit(const char *wuid, const char *username, const char *password) { MemoryBuffer buffer; Owned conn = createNamedQueueConnection(0); // MORE - security token? Owned factory = getWorkUnitFactory(); Owned workunit = factory->updateWorkUnit(wuid); assertex(workunit); SCMStringBuffer token; createToken(wuid, username, password, token); workunit->setSecurityToken(token.str()); SCMStringBuffer clusterName; workunit->getClusterName(clusterName); if (!clusterName.length()) throw MakeStringException(WUERR_InvalidCluster, "No target cluster specified"); workunit->commit(); workunit.clear(); Owned clusterInfo = getTargetClusterInfo(clusterName.str()); if (!clusterInfo) throw MakeStringException(WUERR_InvalidCluster, "Unknown cluster %s", clusterName.str()); SCMStringBuffer serverQueue; clusterInfo->getServerQueue(serverQueue); assertex(serverQueue.length()); Owned queue = createJobQueue(serverQueue.str()); if (!queue.get()) throw MakeStringException(WUERR_InvalidQueue, "Could not create workunit queue"); IJobQueueItem *item = createJobQueueItem(wuid); queue->enqueue(item); } extern WORKUNIT_API void abortWorkUnit(const char *wuid) { StringBuffer xpath("/WorkUnitAborts/"); xpath.append(wuid); Owned acon = querySDS().connect(xpath.str(), myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE, SDS_LOCK_TIMEOUT); acon->queryRoot()->setPropInt(NULL, 1); } extern WORKUNIT_API void secSubmitWorkUnit(const char *wuid, ISecManager &secmgr, ISecUser &secuser) { if (checkWuSecAccess(wuid, secmgr, &secuser, SecAccess_Write, "Submit", true, true)) submitWorkUnit(wuid, secuser.getName(), secuser.credentials().getPassword()); } extern WORKUNIT_API void secAbortWorkUnit(const char *wuid, ISecManager &secmgr, ISecUser &secuser) { if (checkWuSecAccess(wuid, secmgr, &secuser, SecAccess_Write, "Submit", true, true)) abortWorkUnit(wuid); } bool CLocalWorkUnit::hasWorkflow() const { return p->hasProp("Workflow"); } unsigned CLocalWorkUnit::queryEventScheduledCount() const { CriticalBlock block(crit); return p->getPropInt("Workflow/@eventScheduledCount", 0); } void CLocalWorkUnit::incEventScheduledCount() { CriticalBlock block(crit); p->setPropInt("Workflow/@eventScheduledCount", p->getPropInt("Workflow/@eventScheduledCount", 0)+1); } IPropertyTree * CLocalWorkUnit::queryWorkflowTree() const { CriticalBlock block(crit); return p->queryPropTree("Workflow"); } IConstWorkflowItemIterator* CLocalWorkUnit::getWorkflowItems() const { // For this to be legally called, we must have the read-able interface. So we are already locked for (at least) read. CriticalBlock block(crit); if(!workflowIteratorCached) { assertex(!workflowIterator); Owned s = p->getPropTree("Workflow"); if(s) workflowIterator.setown(createWorkflowItemIterator(s)); workflowIteratorCached = true; } return workflowIterator.getLink(); } IWorkflowItemArray * CLocalWorkUnit::getWorkflowClone() const { unsigned count = 0; Owned iter = getWorkflowItems(); for(iter->first(); iter->isValid(); iter->next()) count++; Owned array = createWorkflowItemArray(count); for(iter->first(); iter->isValid(); iter->next()) array->addClone(iter->query()); return array.getLink(); } IWorkflowItem * CLocalWorkUnit::addWorkflowItem(unsigned wfid, WFType type, WFMode mode, unsigned success, unsigned failure, unsigned recovery, unsigned retriesAllowed, unsigned contingencyFor) { // For this to be legally called, we must have the write-able interface. So we are already locked for write. CriticalBlock block(crit); workflowIterator.clear(); workflowIteratorCached = false; IPropertyTree * s = p->queryPropTree("Workflow"); if(!s) s = p->addPropTree("Workflow", createPTree("Workflow")); return createWorkflowItem(s, wfid, type, mode, success, failure, recovery, retriesAllowed, contingencyFor); } IWorkflowItemIterator * CLocalWorkUnit::updateWorkflowItems() { // For this to be legally called, we must have the write-able interface. So we are already locked for write. CriticalBlock block(crit); if(!workflowIterator) { IPropertyTree * s = p->queryPropTree("Workflow"); if(!s) s = p->addPropTree("Workflow", createPTree("Workflow")); workflowIterator.setown(createWorkflowItemIterator(s)); workflowIteratorCached = true; } return workflowIterator.getLink(); } void CLocalWorkUnit::syncRuntimeWorkflow(IWorkflowItemArray * array) { Owned iter = updateWorkflowItems(); Owned item; for(iter->first(); iter->isValid(); iter->next()) { item.setown(iter->get()); item->syncRuntimeData(array->queryWfid(item->queryWfid())); } workflowIterator.clear(); workflowIteratorCached = false; } void CLocalWorkUnit::resetWorkflow() { if (hasWorkflow()) { Owned iter = updateWorkflowItems(); Owned wf; for(iter->first(); iter->isValid(); iter->next()) { wf.setown(iter->get()); wf->reset(); } workflowIterator.clear(); workflowIteratorCached = false; } } void CLocalWorkUnit::schedule() { CriticalBlock block(crit); if(queryEventScheduledCount() == 0) return; switch(getState()) { case WUStateCompleted: setState(WUStateWait); break; case WUStateFailed: case WUStateArchived: case WUStateAborting: case WUStateAborted: case WUStateScheduled: throw MakeStringException(WUERR_CannotSchedule, "Cannot schedule workunit in this state"); } StringBuffer rootPath; SCMStringBuffer clusterName; getClusterName(clusterName); rootPath.append("/Schedule/").append(clusterName.str()); Owned conn = querySDS().connect(rootPath.str(), myProcessSession(), RTM_LOCK_WRITE | RTM_CREATE_QUERY, SDS_LOCK_TIMEOUT); Owned root = conn->getRoot(); if(!root->hasChildren()) { StringBuffer addPath; addPath.append("/Schedulers/").append(clusterName.str()); Owned addConn = querySDS().connect(addPath.str(), myProcessSession(), RTM_LOCK_WRITE | RTM_CREATE_QUERY, SDS_LOCK_TIMEOUT); } char const * wuid = p->queryName(); StringBuffer xpath("*/*/"); ncnameEscape(wuid, xpath); bool more; do more = root->removeProp(xpath.str()); while(more); Owned iter = getWorkflowItems(); Owned event; Owned branch1, branch2; for(iter->first(); iter->isValid(); iter->next()) { event.setown(iter->query()->getScheduleEvent()); if(!event) continue; ncnameEscape(event->queryName(), xpath.clear()); ensurePTree(root, xpath.str()); branch1.setown(root->getPropTree(xpath.str())); ncnameEscape(event->queryText(), xpath.clear()); ensurePTree(branch1, xpath.str()); branch2.setown(branch1->getPropTree(xpath.str())); ncnameEscape(wuid, xpath.clear()); ensurePTree(branch2, xpath.str()); } } void CLocalWorkUnit::deschedule() { if(queryEventScheduledCount() == 0) return; if(getState() == WUStateWait) setState(WUStateCompleted); char const * wuid = p->queryName(); StringBuffer xpath; xpath.append("*/*/*/"); ncnameEscape(wuid, xpath); Owned conn = querySDS().connect("/Schedule", myProcessSession(), RTM_LOCK_WRITE, SDS_LOCK_TIMEOUT); if(!conn) return; Owned root = conn->getRoot(); bool more; do more = root->removeProp(xpath.str()); while(more); } mapEnums localFileUploadTypes[] = { { UploadTypeFileSpray, "FileSpray" }, { UploadTypeWUResult, "WUResult" }, { UploadTypeWUResultCsv, "WUResultCsv" }, { UploadTypeWUResultXml, "WUResultXml" }, { UploadTypeSize, NULL } }; class CLocalFileUpload : public CInterface, implements IConstLocalFileUpload { public: CLocalFileUpload(IPropertyTree * _tree) : tree(_tree) {} CLocalFileUpload(unsigned id, LocalFileUploadType type, char const * source, char const * destination, char const * eventTag) { tree.setown(createPTree()); tree->setPropInt("@id", id); setEnum(tree, "@type", type, localFileUploadTypes); tree->setProp("@source", source); tree->setProp("@destination", destination); if (eventTag) tree->setProp("@eventTag", eventTag); } IMPLEMENT_IINTERFACE; IPropertyTree * getTree() { return tree.getLink(); } virtual unsigned queryID() const { return tree->getPropInt("@id"); } virtual LocalFileUploadType queryType() const { return (LocalFileUploadType)getEnum(tree, "@type", localFileUploadTypes); } virtual IStringVal & getSource(IStringVal & ret) const { ret.set(tree->queryProp("@source")); return ret; } virtual IStringVal & getDestination(IStringVal & ret) const { ret.set(tree->queryProp("@destination")); return ret; } virtual IStringVal & getEventTag(IStringVal & ret) const { if(tree->hasProp("@eventTag")) ret.set(tree->queryProp("@eventTag")); else ret.clear(); return ret; } private: Owned tree; }; class CLocalFileUploadIterator : public CInterface, implements IConstLocalFileUploadIterator { public: CLocalFileUploadIterator(IPropertyTree * _tree) : tree(_tree), iter(tree->getElements("LocalFileUpload")) {} IMPLEMENT_IINTERFACE; bool first() { return iter->first(); } bool isValid() { return iter->isValid(); } bool next() { return iter->next(); } IConstLocalFileUpload * get() { return new CLocalFileUpload(&iter->get()); } private: Owned tree; Owned iter; }; IConstLocalFileUploadIterator * CLocalWorkUnit::getLocalFileUploads() const { // For this to be legally called, we must have the read-able interface. So we are already locked for (at least) read. CriticalBlock block(crit); Owned s = p->getPropTree("LocalFileUploads"); if(s) return new CLocalFileUploadIterator(s.getClear()); else return NULL; } bool CLocalWorkUnit::requiresLocalFileUpload() const { SCMStringBuffer dest; Owned result; Owned iter(getLocalFileUploads()); if(!iter) return false; for(iter->first(); iter->isValid(); iter->next()) { Owned upload(iter->get()); switch(upload->queryType()) { case UploadTypeWUResult: case UploadTypeWUResultCsv: case UploadTypeWUResultXml: upload->getDestination(dest); result.setown(getResultByName(dest.str())); if(!result) return true; break; default: throw MakeStringException(WUERR_InvalidUploadFormat, "Unsupported local file upload type %s", getEnumText(upload->queryType(), localFileUploadTypes)); } } return false; } unsigned CLocalWorkUnit::addLocalFileUpload(LocalFileUploadType type, char const * source, char const * destination, char const * eventTag) { // For this to be legally called, we must have the write-able interface. So we are already locked for write. CriticalBlock block(crit); IPropertyTree * s = p->queryPropTree("LocalFileUploads"); if(!s) s = p->addPropTree("LocalFileUploads", createPTree()); unsigned id = s->numChildren(); Owned upload = new CLocalFileUpload(id, type, source, destination, eventTag); s->addPropTree("LocalFileUpload", upload->getTree()); return id; } #if 0 void testConstWorkflow(IConstWorkflowItem * cwf, bool * okay, bool * dep) { DBGLOG("Test workflow const iface %u", cwf->queryWfid()); unsigned deps = 0; Owned diter; switch(cwf->queryWfid()) { case 1: assertex(!cwf->isScheduled()); assertex(cwf->queryType() == WFTypeNormal); assertex(cwf->queryState() == WFStateNull); diter.setown(cwf->getDependencies()); for(diter->first(); diter->isValid(); diter->next()) deps++; assertex(deps==0); okay[0] = true; break; case 2: assertex(!cwf->isScheduled()); assertex(cwf->queryType() == WFTypeRecovery); assertex(cwf->queryState() == WFStateSkip); okay[1] = true; break; case 3: assertex(cwf->queryContingencyFor() == 4); okay[2] = true; break; case 4: assertex(cwf->isScheduled()); assertex(cwf->queryType() == WFTypeNormal); assertex(cwf->queryState() == WFStateReqd); assertex(cwf->querySuccess() == 0); assertex(cwf->queryFailure() == 3); assertex(cwf->queryRecovery() == 2); assertex(cwf->queryRetriesAllowed() == 10); assertex(cwf->queryRetriesRemaining() == 10); diter.setown(cwf->getDependencies()); for(diter->first(); diter->isValid(); diter->next()) { dep[diter->query()-1] = true; deps++; } assertex(deps==2); assertex(dep[0]); assertex(dep[1]); okay[3] = true; break; case 5: assertex(cwf->isScheduled()); assertex(!cwf->isScheduledNow()); assertex(cwf->querySchedulePriority() == 75); assertex(cwf->queryScheduleCount() == 5); assertex(cwf->queryScheduleCountRemaining() == 5); okay[4] = true; break; case 6: assertex(cwf->isScheduled()); assertex(!cwf->isScheduledNow()); assertex(cwf->querySchedulePriority() == 25); assertex(!cwf->hasScheduleCount()); okay[5] = true; break; default: assertex(!"unknown wfid in test"); } } void testRuntimeWorkflow(IRuntimeWorkflowItem * rwf, bool * okay) { DBGLOG("Test workflow runtime iface %u", rwf->queryWfid()); switch(rwf->queryWfid()) { case 1: case 2: case 3: okay[rwf->queryWfid()-1] = true; break; case 4: { unsigned tries = 0; while(rwf->testAndDecRetries()) tries++; assertex(tries == 10); assertex(rwf->queryRetriesRemaining() == 0); rwf->setState(WFStateFail); assertex(rwf->queryState() == WFStateFail); rwf->reset(); assertex(rwf->queryRetriesRemaining() == 10); assertex(rwf->queryState() == WFStateReqd); } okay[3] = true; break; case 5: { assertex(rwf->queryScheduleCountRemaining() == 5); unsigned count = 0; do count++; while(rwf->decAndTestScheduleCountRemaining()); assertex(count == 5); assertex(rwf->queryScheduleCountRemaining() == 0); rwf->reset(); assertex(rwf->queryScheduleCountRemaining() == 5); } okay[4] = true; break; case 6: { assertex(!rwf->hasScheduleCount()); unsigned count; for(count=0; count<20; count++) assertex(rwf->decAndTestScheduleCountRemaining()); } okay[5] = true; break; default: assertex(!"unknown wfid in test"); } } void testWorkflow() { DBGLOG("workunit.cpp : testWorkflow"); CLocalWorkUnit wu("W-WF-TEST", 0, 0, 0); Owned wf; wf.setown(wu.addWorkflowItem(1, WFTypeNormal, 0, 0, 0, 0, 0)); wf.setown(wu.addWorkflowItem(2, WFTypeRecovery, 0, 0, 0, 0, 0)); wf.setown(wu.addWorkflowItem(3, WFTypeFailure, 0, 0, 0, 0, 4)); wf.setown(wu.addWorkflowItem(4, WFTypeNormal, 0, 3, 2, 10, 0)); wf->setScheduledNow(); wf->addDependency(1); wf.setown(wu.addWorkflowItem(5, WFTypeNormal, 0, 0, 0, 0, 0)); wf->setScheduledOn("test", "foo*"); wf->setSchedulePriority(75); wf->setScheduleCount(5); wf.setown(wu.addWorkflowItem(6, WFTypeNormal, 0, 0, 0, 0, 0)); wf->setScheduledOn("test", "bar*"); wf->setSchedulePriority(25); unsigned const n = 6; bool okay[n]; bool dep[n]; unsigned i; for(i=0; i citer(wu.getWorkflowItems()); for(citer->first(); citer->isValid(); citer->next()) testConstWorkflow(citer->query(), okay, dep); for(i=0; i miter(wu.updateWorkflowItems()); for(miter->first(); miter->isValid(); miter->next()) { Owned rwf(miter->get()); testRuntimeWorkflow(rwf, okay); } for(i=0; i array(wu.getWorkflowClone()); unsigned wfid; for(wfid = 1; array->isValid(wfid); wfid++) testConstWorkflow(&array->queryWfid(wfid), okay, dep); for(i=0; iisValid(wfid); wfid++) testRuntimeWorkflow(&array->queryWfid(wfid), okay); for(i=0; i conn = querySDS().connect(wuRoot.str(), myProcessSession(), 0, SDS_LOCK_TIMEOUT); if (conn) { unsigned start = msTick(); loop { ret = (WUState) getEnum(conn->queryRoot(), "@state", states); switch (ret) { case WUStateCompiled: case WUStateUploadingFiles: if (!compiled) break; // fall into case WUStateCompleted: case WUStateFailed: case WUStateAborted: waiter.unsubscribe(); return ret; case WUStateWait: if(returnOnWaitState) { waiter.unsubscribe(); return ret; } break; case WUStateCompiling: case WUStateRunning: case WUStateDebugPaused: case WUStateDebugRunning: case WUStateBlocked: case WUStateAborting: if (queryDaliServerVersion().compare("2.1")>=0) { SessionId agent = conn->queryRoot()->getPropInt64("@agentSession", -1); if((agent>0) && querySessionManager().sessionStopped(agent, 0)) { waiter.unsubscribe(); conn->reload(); ret = (WUState) getEnum(conn->queryRoot(), "@state", states); bool isEcl = false; switch (ret) { case WUStateCompiling: isEcl = true; // drop into case WUStateRunning: case WUStateBlocked: ret = WUStateFailed; break; case WUStateAborting: ret = WUStateAborted; break; default: return ret; } WARNLOG("_waitForWorkUnit terminated: %"I64F"d state = %d",(__int64)agent,(int)ret); Owned factory = getWorkUnitFactory(); Owned wu = factory->updateWorkUnit(wuid); wu->setState(ret); Owned e = wu->createException(); e->setExceptionCode(isEcl ? 1001 : 1000); e->setExceptionMessage(isEcl ? "EclServer terminated unexpectedly" : "Workunit terminated unexpectedly"); return ret; } } break; } unsigned waited = msTick() - start; if (timeout==-1) { waiter.wait(20000); // recheck state every 20 seconds even if no timeout, in case eclagent has crashed. if (waiter.aborted) { ret = WUStateUnknown; // MORE - throw an exception? break; } } else if (waited > timeout || !waiter.wait(timeout-waited)) { ret = WUStateUnknown; // MORE - throw an exception? break; } conn->reload(); } } waiter.unsubscribe(); return ret; } extern WUState waitForWorkUnitToComplete(const char * wuid, int timeout, bool returnOnWaitState) { return _waitForWorkUnit(wuid, (unsigned)timeout, false, returnOnWaitState); } extern WORKUNIT_API WUState secWaitForWorkUnitToComplete(const char * wuid, ISecManager &secmgr, ISecUser &secuser, int timeout, bool returnOnWaitState) { if (checkWuSecAccess(wuid, secmgr, &secuser, SecAccess_Read, "Wait for Complete", false, true)) return waitForWorkUnitToComplete(wuid, timeout, returnOnWaitState); return WUStateUnknown; } extern bool waitForWorkUnitToCompile(const char * wuid, int timeout) { switch(_waitForWorkUnit(wuid, (unsigned)timeout, true, true)) { case WUStateCompiled: case WUStateCompleted: case WUStateWait: case WUStateUploadingFiles: return true; default: return false; } } extern WORKUNIT_API bool secWaitForWorkUnitToCompile(const char * wuid, ISecManager &secmgr, ISecUser &secuser, int timeout) { if (checkWuSecAccess(wuid, secmgr, &secuser, SecAccess_Read, "Wait for Compile", false, true)) return waitForWorkUnitToCompile(wuid, timeout); return false; } extern WORKUNIT_API bool secDebugWorkunit(const char * wuid, ISecManager &secmgr, ISecUser &secuser, const char *command, StringBuffer &response) { if (strnicmp(command, " wu = factory->secOpenWorkUnit(wuid, false, &secmgr, &secuser); SCMStringBuffer ip; unsigned port; port = wu->getDebugAgentListenerPort(); wu->getDebugAgentListenerIP(ip); SocketEndpoint debugEP(ip.str(), port); Owned socket = ISocket::connect_timeout(debugEP, 1000); unsigned len = (size32_t)strlen(command); unsigned revlen = len; _WINREV(revlen); socket->write(&revlen, sizeof(revlen)); socket->write(command, len); for (;;) { socket->read(&len, sizeof(len)); _WINREV(len); if (len == 0) break; if (len & 0x80000000) { throwUnexpected(); } char * mem = (char*) response.reserve(len); socket->read(mem, len); } return true; } return false; } IWUResult * updateWorkUnitResult(IWorkUnit * w, const char *name, unsigned sequence) { switch ((int)sequence) { case ResultSequenceStored: return w->updateVariableByName(name); case ResultSequencePersist: return w->updateGlobalByName(name); case ResultSequenceInternal: case ResultSequenceOnce: return w->updateTemporaryByName(name); default: return w->updateResultBySequence(sequence); } } IConstWUResult * getWorkUnitResult(IConstWorkUnit * w, const char *name, unsigned sequence) { switch ((int)sequence) { case ResultSequenceStored: return w->getVariableByName(name); case ResultSequencePersist: return w->getGlobalByName(name); case ResultSequenceInternal: case ResultSequenceOnce: return w->getTemporaryByName(name); default: if (name && name[0]) return w->getResultByName(name);//name takes precedence over sequence else return w->getResultBySequence(sequence); } } extern WORKUNIT_API bool getWorkUnitCreateTime(const char *wuid,CDateTime &time) { if (wuid) { char prefchar; unsigned year,month,day,hour,min,sec; if (sscanf(wuid, "%c%4u%2u%2u-%2u%2u%2u", &prefchar, &year, &month, &day, &hour, &min, &sec)==7) { time.set(year, month, day, hour, min, sec, 0, true); // time.setDate(year, month, day); // time.setTime(hour, min, sec, 0, true); // for some reason time is local return true; } } return false; } extern WORKUNIT_API IStringVal& createToken(const char *wuid, const char *user, const char *password, IStringVal &str) { StringBuffer wu, token("X"); wu.append(wuid).append(';').append(user).append(';').append(password); encrypt(token,wu.str()); str.set(token.str()); return str; } // This will be replaced by something more secure! extern WORKUNIT_API void extractToken(const char *token, const char *wuid, IStringVal &user, IStringVal &password) { if (token && *token) { StringBuffer wu; decrypt(wu, token+1); const char *finger = strchr(wu.str(),';'); if (finger && strnicmp(wuid, wu.str(), finger-wu.str())==0) { const char *finger1 = strchr(++finger,';'); if(finger1) { user.setLen(finger, (size32_t)(finger1-finger)); password.setLen(++finger1, (size32_t)(wu.str() + wu.length() - finger1)); return; } } throw MakeStringException(WUERR_InvalidSecurityToken, "Invalid call to extractToken"); } } extern WORKUNIT_API WUState getWorkUnitState(const char* state) { return (WUState) getEnum(state, states); } const LogMsgCategory MCschedconn = MCprogress(1000); // Category used to inform about schedule synchronization class CWorkflowScheduleConnection : public CInterface, implements IWorkflowScheduleConnection { public: CWorkflowScheduleConnection(char const * wuid) { basexpath.append("/WorkflowSchedule/").append(wuid); flagxpath.append(basexpath.str()).append("/Active"); } IMPLEMENT_IINTERFACE; virtual void lock() { LOG(MCschedconn, "Locking base schedule connection"); baseconn.setown(querySDS().connect(basexpath.str(), myProcessSession(), RTM_CREATE_QUERY | RTM_LOCK_WRITE, INFINITE)); if(!baseconn) throw MakeStringException(WUERR_ScheduleLockFailed, "Could not get base workflow schedule lock"); } virtual void unlock() { LOG(MCschedconn, "Unlocking base schedule connection"); baseconn.clear(); } virtual void setActive() { LOG(MCschedconn, "Setting active flag in schedule connection"); flagconn.setown(querySDS().connect(flagxpath.str(), myProcessSession(), RTM_CREATE | RTM_LOCK_WRITE | RTM_DELETE_ON_DISCONNECT, INFINITE)); if(!flagconn) throw MakeStringException(WUERR_ScheduleLockFailed, "Could not get active workflow schedule lock"); } virtual void resetActive() { LOG(MCschedconn, "Resetting active flag in schedule connection"); flagconn.clear(); } virtual bool queryActive() { return baseconn->queryRoot()->hasProp("Active"); } virtual bool pull(IWorkflowItemArray * workflow) { assertex(baseconn); Owned root = baseconn->getRoot(); Owned eventQueue = root->getPropTree("EventQueue"); if(!eventQueue) return false; if(!eventQueue->hasProp("Item")) return false; { Owned eventItems = eventQueue->getElements("Item"); Owned eventItem; Owned wfItems = workflow->getSequenceIterator(); Owned wfItem; for(eventItems->first(); eventItems->isValid(); eventItems->next()) { eventItem.setown(&eventItems->get()); const char * eventName = eventItem->queryProp("@name"); const char * eventText = eventItem->queryProp("@text"); for(wfItems->first(); wfItems->isValid(); wfItems->next()) { wfItem.setown(wfItems->get()); if(wfItem->queryState() != WFStateWait) continue; Owned targetEvent = wfItem->getScheduleEvent(); if(!targetEvent || !targetEvent->matches(eventName, eventText)) continue; wfItem->setEvent(eventName, eventText); wfItem->setState(WFStateReqd); resetDepState(workflow, *wfItem); } } } bool more; do more = eventQueue->removeProp("Item"); while(more); return true; } virtual void push(char const * name, char const * text) { assertex(baseconn); Owned root = baseconn->getRoot(); ensurePTree(root, "EventQueue"); Owned eventQueue = root->getPropTree("EventQueue"); Owned eventItem = createPTree(); eventItem->setProp("@name", name); eventItem->setProp("@text", text); eventQueue->addPropTree("Item", eventItem.getLink()); } private: void resetDepState(IWorkflowItemArray * workflow, IRuntimeWorkflowItem & item) const { Owned iter(item.getDependencies()); for(iter->first(); iter->isValid(); iter->next()) { IRuntimeWorkflowItem & dep = workflow->queryWfid(iter->query()); switch(dep.queryState()) { case WFStateDone: case WFStateFail: dep.setState(WFStateNull); resetDepState(workflow, dep); break; } } } private: StringBuffer basexpath; StringBuffer flagxpath; Owned baseconn; Owned flagconn; }; extern WORKUNIT_API IWorkflowScheduleConnection * getWorkflowScheduleConnection(char const * wuid) { return new CWorkflowScheduleConnection(wuid); } extern WORKUNIT_API IExtendedWUInterface * queryExtendedWU(IWorkUnit * wu) { return QUERYINTERFACE(wu, IExtendedWUInterface); } extern WORKUNIT_API void addExceptionToWorkunit(IWorkUnit * wu, WUExceptionSeverity severity, const char * source, unsigned code, const char * text, const char * filename, unsigned lineno, unsigned column) { Owned we = wu->createException(); we->setSeverity(severity); we->setExceptionMessage(text); if (source) we->setExceptionSource(source); if (code) we->setExceptionCode(code); if (filename) we->setExceptionFileName(filename); if (lineno) { we->setExceptionLineNo(lineno); if (column) we->setExceptionColumn(lineno); } } #define UTF8_BOM "\357\273\277" extern WORKUNIT_API bool isArchiveQuery(const char * text) { if (!text) return false; //skip utf8 BOM, probably excessive if (memcmp(text, UTF8_BOM, 3) == 0) text += 3; loop { if (isspace(*text)) text++; else if (text[0] == '<' && text[1] == '!' && text[2] == '-' && text[3] == '-') { text += 4; loop { if (!*text) break; if (text[0] == '-' && text[1] == '-' && text[2] == '>') { text += 3; break; } text++; } } else break; } const char * archivePrefix = "getPropTree(xpath); } static IPropertyTree * resolveQueryByWuid(IPropertyTree * queryRegistry, const char * wuid) { StringBuffer xpath; xpath.append("Query[@wuid=\"").append(wuid).append("\"]"); return queryRegistry->getPropTree(xpath); } static void clearAliases(IPropertyTree * queryRegistry, const char * id) { StringBuffer lcId(id); lcId.toLowerCase(); StringBuffer xpath; xpath.append("Alias[@id=\"").append(lcId).append("\"]"); Owned iter = queryRegistry->getElements(xpath); ForEach(*iter) { queryRegistry->removeProp(xpath.str()); } } IPropertyTree * addNamedQuery(IPropertyTree * queryRegistry, const char * name, const char * wuid, const char * dll) { StringBuffer xpath; xpath.append("Query[@wuid=\"").append(wuid).append("\"]"); IPropertyTree *q = queryRegistry->queryPropTree(xpath.str()); if (q) return q; StringBuffer lcName(name); lcName.toLowerCase(); xpath.clear(); xpath.append("Query[@name=\"").append(lcName.str()).append("\"]"); Owned iter = queryRegistry->getElements(xpath); unsigned seq = 1; ForEach(*iter) { unsigned thisSeq = iter->query().getPropInt("@seq"); if (thisSeq >= seq) seq = thisSeq + 1; } StringBuffer id; id.append(lcName).append(".").append(seq); IPropertyTree * newEntry = createPTree("Query", ipt_caseInsensitive); newEntry->setProp("@name", lcName); newEntry->setProp("@wuid", wuid); newEntry->setProp("@dll", dll); newEntry->setProp("@id", id); newEntry->setPropInt("@seq", seq); return queryRegistry->addPropTree("Query", newEntry); } void removeNamedQuery(IPropertyTree * queryRegistry, const char * id) { StringBuffer lcId(id); lcId.toLowerCase(); clearAliases(queryRegistry, lcId); StringBuffer xpath; xpath.append("Query[@id=\"").append(lcId).append("\"]"); queryRegistry->removeProp(xpath); } void removeDllFromNamedQueries(IPropertyTree * queryRegistry, const char * dll) { Owned match = resolveQueryByDll(queryRegistry, dll); if (!match) return; clearAliases(queryRegistry, match->queryProp("@id")); queryRegistry->removeTree(match); } void removeWuidFromNamedQueries(IPropertyTree * queryRegistry, const char * wuid) { Owned match = resolveQueryByWuid(queryRegistry, wuid); if (!match) return; clearAliases(queryRegistry, match->queryProp("@id")); queryRegistry->removeTree(match); } void removeAliasesFromNamedQuery(IPropertyTree * queryRegistry, const char * id) { clearAliases(queryRegistry, id); } void setQueryAlias(IPropertyTree * queryRegistry, const char * name, const char * value) { StringBuffer lcName(name); lcName.toLowerCase(); StringBuffer xpath; xpath.append("Alias[@name=\"").append(lcName).append("\"]"); IPropertyTree * match = queryRegistry->queryPropTree(xpath); if (!match) { IPropertyTree * newEntry = createPTree("Alias"); newEntry->setProp("@name", lcName); match = queryRegistry->addPropTree("Alias", newEntry); } match->setProp("@id", value); } IPropertyTree * resolveQueryAlias(IPropertyTree * queryRegistry, const char * alias) { StringBuffer xpath; unsigned cnt = 0; const char * search = alias; loop { xpath.clear().append("Alias[@name=\"").append(search).append("\"]/@id"); const char * queryId = queryRegistry->queryProp(xpath); if (!queryId) break; //Check for too many alias indirections. if (cnt++ > 10) return NULL; search = queryId; } xpath.clear().append("Query[@id=\"").append(search).append("\"]"); return queryRegistry->getPropTree(xpath); } void setQuerySuspendedState(IPropertyTree * queryRegistry, const char *id, bool suspend) { StringBuffer lcId(id); lcId.toLowerCase(); StringBuffer xpath; xpath.append("Query[@id=\"").append(lcId).append("\"]"); IPropertyTree *tree = queryRegistry->queryPropTree(xpath); if (tree) { if (suspend) tree->addPropBool("@suspended", true); else tree->removeProp("@suspended"); } else throw MakeStringException((suspend)? QUERRREG_SUSPEND : QUERRREG_UNSUSPEND, "Modifying query suspended state failed. Could not find query %s", id); } void setQueryCommentForNamedQuery(IPropertyTree * queryRegistry, const char *id, const char *queryComment) { if (queryComment) { StringBuffer lcId(id); lcId.toLowerCase(); StringBuffer xpath; xpath.append("Query[@id=\"").append(lcId).append("\"]"); IPropertyTree *tree = queryRegistry->queryPropTree(xpath); if (tree) tree->setProp("@queryComment", queryComment); else throw MakeStringException(QUERRREG_COMMENT, "Could not find query %s", id); } } extern WORKUNIT_API IPropertyTree * getQueryRegistryRoot() { Owned globalLock = querySDS().connect("/QuerySets/", myProcessSession(), RTM_LOCK_READ, SDS_LOCK_TIMEOUT); if (!globalLock) return NULL; //Only lock the branch for the target we're interested in. StringBuffer xpath; xpath.append("/QuerySets"); Owned conn = querySDS().connect(xpath.str(), myProcessSession(), RTM_LOCK_READ, SDS_LOCK_TIMEOUT); if (conn) return conn->getRoot(); else return NULL; } extern WORKUNIT_API IPropertyTree * getQueryRegistry(const char * wsEclId, bool readonly) { Owned globalLock = querySDS().connect("/QuerySets/", myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE_QUERY, SDS_LOCK_TIMEOUT); //Only lock the branch for the target we're interested in. StringBuffer xpath; xpath.append("/QuerySets/QuerySet[@id=\"").append(wsEclId).append("\"]"); Owned conn = querySDS().connect(xpath.str(), myProcessSession(), readonly ? RTM_LOCK_READ : RTM_LOCK_WRITE, SDS_LOCK_TIMEOUT); if (!conn) { if (readonly) return NULL; Owned querySet = createPTree(); querySet->setProp("@id", wsEclId); globalLock->queryRoot()->addPropTree("QuerySet", querySet.getClear()); globalLock->commit(); conn.setown(querySDS().connect(xpath.str(), myProcessSession(), RTM_LOCK_WRITE, SDS_LOCK_TIMEOUT)); if (!conn) throwUnexpected(); } return conn->getRoot(); } IPropertyTree * addNamedPackageSet(IPropertyTree * packageRegistry, const char * name, IPropertyTree *packageInfo, bool overWrite) { StringBuffer xpath; StringBuffer lcName(name); lcName.toLowerCase(); // see if "name" already exists xpath.append("Package[@id='").append(name).append("']"); IPropertyTree *pkgTree = packageRegistry->queryPropTree(xpath.str()); if (pkgTree) { if (overWrite) packageRegistry->removeTree(pkgTree); else throw MakeStringException(WUERR_PackageAlreadyExists, "Package name %s already exists, either delete it or specify overwrite",name); } IPropertyTree *tree = packageRegistry->addPropTree("Package", packageInfo); tree->setProp("@id", lcName); return tree; } void removeNamedPackage(IPropertyTree * packageRegistry, const char * id) { StringBuffer lcId(id); lcId.toLowerCase(); StringBuffer xpath; xpath.append("Package[@id=\"").append(lcId).append("\"]"); packageRegistry->removeProp(xpath); } extern WORKUNIT_API IPropertyTree * getPackageSetRegistry(const char * wsEclId, bool readonly) { Owned globalLock = querySDS().connect("/PackageSets/", myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE_QUERY, SDS_LOCK_TIMEOUT); //Only lock the branch for the target we're interested in. StringBuffer xpath; xpath.append("/PackageSets/PackageSet[@id=\"").append(wsEclId).append("\"]"); Owned conn = querySDS().connect(xpath.str(), myProcessSession(), readonly ? RTM_LOCK_READ : RTM_LOCK_WRITE, SDS_LOCK_TIMEOUT); if (!conn) { if (readonly) return NULL; Owned querySet = createPTree(); querySet->setProp("@id", wsEclId); globalLock->queryRoot()->addPropTree("PackageSet", querySet.getClear()); globalLock->commit(); conn.setown(querySDS().connect(xpath.str(), myProcessSession(), RTM_LOCK_WRITE, SDS_LOCK_TIMEOUT)); if (!conn) throwUnexpected(); } return conn->getRoot(); } void addQueryToQuerySet(IWorkUnit *workunit, const char *querySetName, const char *queryName, IPropertyTree *packageInfo, WUQueryActivationOptions activateOption, StringBuffer &newQueryId) { StringBuffer cleanQueryName(queryName); cleanQueryName.replace(' ', '_'); SCMStringBuffer dllName; Owned q = workunit->getQuery(); q->getQueryDllName(dllName); SCMStringBuffer wuid; workunit->getWuid(wuid); Owned queryRegistry = getQueryRegistry(querySetName, false); StringBuffer currentTargetClusterType; queryRegistry->getProp("@targetclustertype", currentTargetClusterType); SCMStringBuffer targetClusterType; workunit->getDebugValue("targetclustertype", targetClusterType); if (currentTargetClusterType.length() < 1) { queryRegistry->setProp("@targetclustertype", targetClusterType.str()); } else { if (strcmp(currentTargetClusterType.str(), "roxie") == 0 && strcmp(currentTargetClusterType.str(), targetClusterType.str())!=0) { throw MakeStringException(WUERR_MismatchClusterType, "TargetClusterTypes of workunit and queryset do not match."); } } IPropertyTree *newEntry = addNamedQuery(queryRegistry, cleanQueryName, wuid.str(), dllName.str()); newQueryId.append(newEntry->queryProp("@id")); workunit->setDebugValue("queryId", newQueryId.str(), true); if (activateOption == ACTIVATE_SUSPEND_PREVIOUS|| activateOption == ACTIVATE_DELETE_PREVIOUS) { Owned aliasTree = resolveQueryAlias(queryRegistry, cleanQueryName); setQueryAlias(queryRegistry, cleanQueryName, newQueryId); if (aliasTree) { if (activateOption == ACTIVATE_SUSPEND_PREVIOUS) setQuerySuspendedState(queryRegistry, cleanQueryName, true); else removeNamedQuery(queryRegistry, aliasTree->queryProp("@id")); } } else if (activateOption == MAKE_ACTIVATE || activateOption == MAKE_ACTIVATE_LOAD_DATA_ONLY) setQueryAlias(queryRegistry, cleanQueryName, newQueryId.str()); } bool removeQuerySetAlias(const char *querySetName, const char *alias) { Owned queryRegistry = getQueryRegistry(querySetName, true); StringBuffer xpath; xpath.appendf("Alias[@name='%s']", alias); IPropertyTree *t = queryRegistry->queryPropTree(xpath); return queryRegistry->removeTree(t); } void addQuerySetAlias(const char *querySetName, const char *alias, const char *id) { Owned queryRegistry = getQueryRegistry(querySetName, false); setQueryAlias(queryRegistry, alias, id); } void setSuspendQuerySetQuery(const char *querySetName, const char *id, bool suspend) { Owned queryRegistry = getQueryRegistry(querySetName, true); setQuerySuspendedState(queryRegistry, id, suspend); } void deleteQuerySetQuery(const char *querySetName, const char *id) { Owned queryRegistry = getQueryRegistry(querySetName, true); removeNamedQuery(queryRegistry, id); } void removeQuerySetAliasesFromNamedQuery(const char *querySetName, const char * id) { Owned queryRegistry = getQueryRegistry(querySetName, true); clearAliases(queryRegistry, id); } void setQueryCommentForNamedQuery(const char *querySetName, const char *id, const char *queryComment) { Owned queryRegistry = getQueryRegistry(querySetName, true); setQueryCommentForNamedQuery(queryRegistry, id, queryComment); } const char *queryIdFromQuerySetWuid(const char *querySetName, const char *wuid, IStringVal &id) { Owned queryRegistry = getQueryRegistry(querySetName, false); StringBuffer xpath; xpath.appendf("Query[@wuid='%s']", wuid); IPropertyTree *q = queryRegistry->queryPropTree(xpath.str()); if (q) { id.set(q->queryProp("@id")); } return id.str(); } bool looksLikeAWuid(const char * wuid) { if (!wuid) return false; if (wuid[0] != 'W') return false; if (!isdigit(wuid[1]) || !isdigit(wuid[2]) || !isdigit(wuid[3]) || !isdigit(wuid[4])) return false; if (!isdigit(wuid[5]) || !isdigit(wuid[6]) || !isdigit(wuid[7]) || !isdigit(wuid[8])) return false; return (wuid[9]=='-'); } IPropertyTree * resolveDefinitionInArchive(IPropertyTree * archive, const char * path) { IPropertyTree * module = archive; const char * dot = strrchr(path, '.'); StringBuffer xpath; if (dot) { xpath.clear().append("Module[@key='").appendLower(dot-path, path).append("']"); module = archive->queryPropTree(xpath); path = dot+1; } else module = archive->queryPropTree("Module[@key='']"); if (!module) return NULL; xpath.clear().append("Attribute[@key='").appendLower(strlen(path), path).append("']"); return module->queryPropTree(xpath); } extern WORKUNIT_API void associateLocalFile(IWUQuery * query, WUFileType type, const char * name, const char * description, unsigned crc) { StringBuffer hostname; queryHostIP().getIpText(hostname); StringBuffer fullPathname; makeAbsolutePath(name, fullPathname); query->addAssociatedFile(type, fullPathname, hostname, description, crc); }