Selaa lähdekoodia

HPCC-12250 Allow pluggable interface to workunit creation

Cleanup ILocalWorkUnit interface and usage prior to splitting implementation.

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 10 vuotta sitten
vanhempi
commit
747d7f91cb

+ 168 - 140
common/workunit/workunit.cpp

@@ -1017,12 +1017,12 @@ template <>  struct CachedTags<CLocalWUAppValue, IConstWUAppValue>
 };
 
 
-class CLocalWorkUnit : public CInterface, implements IConstWorkUnit , implements ISDSSubscription, implements IExtendedWUInterface
+class CLocalWorkUnit : public CInterface, implements IWorkUnit , implements ISDSSubscription, implements IExtendedWUInterface
 {
     friend StringBuffer &exportWorkUnitToXML(const IConstWorkUnit *wu, StringBuffer &str, bool decodeGraphs, bool includeProgress);
     friend void exportWorkUnitToXMLFile(const IConstWorkUnit *wu, const char * filename, unsigned extraXmlFlags, bool decodeGraphs, bool includeProgress);
 
-    // NOTE - order is important - we need to construct connection before p and (especially) destruct after p
+    // NOTE - order is important - we need to construct connection before p and (especially) destroy after p
     Owned<IRemoteConnection> connection;
     Owned<IPropertyTree> p;
     bool dirty;
@@ -1067,7 +1067,7 @@ public:
     CLocalWorkUnit(IRemoteConnection *_conn, ISecManager *secmgr, ISecUser *secuser);
     CLocalWorkUnit(IRemoteConnection *_conn, IPropertyTree* root, ISecManager *secmgr, ISecUser *secuser);
     ~CLocalWorkUnit();
-    CLocalWorkUnit(const char *dummyWuid, ISecManager *secmgr, ISecUser *secuser);
+    CLocalWorkUnit(const char *dummyWuid, const char *xml, ISecManager *secmgr, ISecUser *secuser);
     IPropertyTree *getUnpackedTree(bool includeProgress) const;
 
     ISecManager *querySecMgr(){return secMgr.get();}
@@ -1248,7 +1248,7 @@ public:
     void deserialize(MemoryBuffer &src);
 
     IWorkUnit &lockRemote(bool commit);
-    void unlockRemote(bool closing);
+    void unlockRemote();
     void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen=0, const void *valueData=NULL);
     void abort();
     void cleanupAndDelete(bool deldll,bool deleteOwned, const StringArray *deleteExclusions=NULL);
@@ -1260,6 +1260,138 @@ public:
     bool getAllowAutoQueueSwitch() const;
     void setLibraryInformation(const char * name, unsigned interfaceHash, unsigned definitionHash);
 
+    virtual void setResultInt(const char * name, unsigned sequence, __int64 val)
+    {
+        Owned<IWUResult> r = updateResult(name, sequence);
+        if (r)
+        {
+            r->setResultInt(val);
+            r->setResultStatus(ResultStatusCalculated);
+        }
+    }
+    virtual void setResultUInt(const char * name, unsigned sequence, unsigned __int64 val)
+    {
+        Owned<IWUResult> r = updateResult(name, sequence);
+        if (r)
+        {
+            r->setResultUInt(val);
+            r->setResultStatus(ResultStatusCalculated);
+        }
+    }
+    virtual void setResultReal(const char *name, unsigned sequence, double val)
+    {
+        Owned<IWUResult> r = updateResult(name, sequence);
+        if (r)
+        {
+            r->setResultReal(val);
+            r->setResultStatus(ResultStatusCalculated);
+        }
+    }
+    virtual void setResultVarString(const char * stepname, unsigned sequence, const char *val)
+    {
+        setResultString(stepname, sequence, strlen(val), val);
+    }
+    virtual void setResultVarUnicode(const char * stepname, unsigned sequence, UChar const *val)
+    {
+        setResultUnicode(stepname, sequence, rtlUnicodeStrlen(val), val);
+    }
+    virtual void setResultString(const char * stepname, unsigned sequence, int len, const char *val)
+    {
+        doSetResultString(type_string, stepname, sequence, len, val);
+    }
+    virtual void setResultData(const char * stepname, unsigned sequence, int len, const void *val)
+    {
+        doSetResultString(type_data, stepname, sequence, len, (const char *)val);
+    }
+    virtual void setResultRaw(const char * name, unsigned sequence, int len, const void *val)
+    {
+        Owned<IWUResult> r = updateResult(name, sequence);
+        if (r)
+        {
+            r->setResultRaw(len, val, ResultFormatRaw);
+            r->setResultStatus(ResultStatusCalculated);
+        }
+    }
+    virtual void setResultSet(const char * name, unsigned sequence, bool isAll, size32_t len, const void *val, ISetToXmlTransformer *)
+    {
+        Owned<IWUResult> r = updateResult(name, sequence);
+        if (r)
+        {
+            r->setResultIsAll(isAll);
+            r->setResultRaw(len, val, ResultFormatRaw);
+            r->setResultStatus(ResultStatusCalculated);
+        }
+    }
+    virtual void setResultUnicode(const char * name, unsigned sequence, int len, UChar const * val)
+    {
+        Owned<IWUResult> r = updateResult(name, sequence);
+        if (r)
+        {
+            r->setResultUnicode((char const *)val, len);
+            r->setResultStatus(ResultStatusCalculated);
+        }
+    }
+    virtual void setResultBool(const char *name, unsigned sequence, bool val)
+    {
+        Owned<IWUResult> r = updateResult(name, sequence);
+        if (r)
+        {
+            r->setResultBool(val);
+            r->setResultStatus(ResultStatusCalculated);
+        }
+    }
+    virtual void setResultDecimal(const char *name, unsigned sequence, int len, int precision, bool isSigned, const void *val)
+    {
+        Owned<IWUResult> r = updateResult(name, sequence);
+        if (r)
+        {
+            r->setResultDecimal(val, len);
+            r->setResultStatus(ResultStatusCalculated);
+        }
+    }
+    virtual void setResultDataset(const char * name, unsigned sequence, size32_t len, const void *val, unsigned numRows, bool extend)
+    {
+        Owned<IWUResult> r = updateResult(name, sequence);
+        if (r)
+        {
+            __int64 totalRows = numRows;
+            if (extend)
+            {
+                totalRows += r->getResultRowCount();
+                r->addResultRaw(len, val, ResultFormatRaw);
+            }
+            else
+                r->setResultRaw(len, val, ResultFormatRaw);
+
+            r->setResultStatus(ResultStatusCalculated);
+            r->setResultRowCount(totalRows);
+            r->setResultTotalRowCount(totalRows);
+        }
+    }
+
+protected:
+    IWUResult *updateResult(const char *name, unsigned sequence)
+    {
+        Owned <IWUResult> result = updateWorkUnitResult(this, name, sequence);
+        if (result)
+        {
+            SCMStringBuffer rname;
+            if (!result->getResultName(rname).length())
+                result->setResultName(name);
+        }
+        return result.getClear();
+    }
+
+    void doSetResultString(type_t type, const char *name, unsigned sequence, int len, const char *val)
+    {
+        Owned<IWUResult> r = updateResult(name, sequence);
+        if (r)
+        {
+            r->setResultString(val, len);
+            r->setResultStatus(ResultStatusCalculated);
+        }
+    }
+
 private:
     void init();
     IWUGraph *createGraph();
@@ -1374,14 +1506,14 @@ public:
             PrintLog("Releasing locked workunit %s", x.get());
         }
         if (c)
-            c->unlockRemote(c->IsShared());
+            c->unlockRemote();
     }
 
     void setSecIfcs(ISecManager *mgr, ISecUser*usr){c->setSecIfcs(mgr, usr);}
 
     virtual IConstWorkUnit * unlock()
     {
-        c->unlockRemote(c->IsShared());
+        c->unlockRemote();
         return c.getClear();
     }
     virtual bool aborting() const
@@ -1548,13 +1680,13 @@ public:
     virtual void requestAbort()
             { c->requestAbort(); }
     virtual unsigned calculateHash(unsigned prevHash)
-            { return c->calculateHash(prevHash); }
+            { return queryExtendedWU(c)->calculateHash(prevHash); }
     virtual void copyWorkUnit(IConstWorkUnit *cached, bool all)
-            { c->copyWorkUnit(cached, all); }
+            { queryExtendedWU(c)->copyWorkUnit(cached, all); }
     virtual bool archiveWorkUnit(const char *base,bool del,bool deldll,bool deleteOwned)
-            { return c->archiveWorkUnit(base,del,deldll,deleteOwned); }
+            { return queryExtendedWU(c)->archiveWorkUnit(base,del,deldll,deleteOwned); }
     virtual void packWorkUnit(bool pack)
-            { c->packWorkUnit(pack); }
+            { queryExtendedWU(c)->packWorkUnit(pack); }
     virtual unsigned queryFileUsage(const char *filename) const
             { return c->queryFileUsage(filename); }
     virtual IJlibDateTime & getTimeScheduled(IJlibDateTime &val) const
@@ -1723,8 +1855,6 @@ public:
             { c->setHash(hash); }
 
 // ILocalWorkUnit - used for debugging etc
-    virtual void loadXML(const char *xml)
-            { c->loadXML(xml); }
     virtual void serialize(MemoryBuffer &tgt)
             { c->serialize(tgt); }
     virtual void deserialize(MemoryBuffer &src)
@@ -1744,139 +1874,32 @@ public:
             { return c->getAllowAutoQueueSwitch(); }
     virtual void setLibraryInformation(const char * name, unsigned interfaceHash, unsigned definitionHash)
             { c->setLibraryInformation(name, interfaceHash, definitionHash); }
-
     virtual void setResultInt(const char * name, unsigned sequence, __int64 val)
-    {
-        Owned<IWUResult> r = updateResult(name, sequence);
-        if (r)
-        {
-            r->setResultInt(val);   
-            r->setResultStatus(ResultStatusCalculated);
-        }
-    }
+            { c->setResultInt(name, sequence, val); }
     virtual void setResultUInt(const char * name, unsigned sequence, unsigned __int64 val)
-    {
-        Owned<IWUResult> r = updateResult(name, sequence);
-        if (r)
-        {
-            r->setResultUInt(val);  
-            r->setResultStatus(ResultStatusCalculated);
-        }
-    }
+            { c->setResultUInt(name, sequence, val); }
     virtual void setResultReal(const char *name, unsigned sequence, double val)
-    {
-        Owned<IWUResult> r = updateResult(name, sequence);
-        if (r)
-        {
-            r->setResultReal(val);  
-            r->setResultStatus(ResultStatusCalculated);
-        }
-    }
+            { c->setResultReal(name, sequence, val); }
     virtual void setResultVarString(const char * stepname, unsigned sequence, const char *val)
-    {
-        setResultString(stepname, sequence, strlen(val), val);
-    }
+            { c->setResultVarString(stepname, sequence, val); }
     virtual void setResultVarUnicode(const char * stepname, unsigned sequence, UChar const *val)
-    {
-        setResultUnicode(stepname, sequence, rtlUnicodeStrlen(val), val);
-    }
+            { c->setResultVarUnicode(stepname, sequence, val); }
     virtual void setResultString(const char * stepname, unsigned sequence, int len, const char *val)
-    {
-        doSetResultString(type_string, stepname, sequence, len, val);
-    }
+            { c->setResultString(stepname, sequence, len, val); }
     virtual void setResultData(const char * stepname, unsigned sequence, int len, const void *val)
-    {
-        doSetResultString(type_data, stepname, sequence, len, (const char *)val);
-    }
+            { c->setResultData(stepname, sequence, len, val); }
     virtual void setResultRaw(const char * name, unsigned sequence, int len, const void *val)
-    {
-        Owned<IWUResult> r = updateResult(name, sequence);
-        if (r)
-        {
-            r->setResultRaw(len, val, ResultFormatRaw); 
-            r->setResultStatus(ResultStatusCalculated);
-        }
-    }
-    virtual void setResultSet(const char * name, unsigned sequence, bool isAll, size32_t len, const void *val, ISetToXmlTransformer *)
-    {
-        Owned<IWUResult> r = updateResult(name, sequence);
-        if (r)
-        {
-            r->setResultIsAll(isAll);
-            r->setResultRaw(len, val, ResultFormatRaw); 
-            r->setResultStatus(ResultStatusCalculated);
-        }
-    }
+            { c->setResultRaw(name, sequence, len, val); }
+    virtual void setResultSet(const char * name, unsigned sequence, bool isAll, size32_t len, const void *val, ISetToXmlTransformer *xform)
+            { c->setResultSet(name, sequence, isAll, len, val, xform); }
     virtual void setResultUnicode(const char * name, unsigned sequence, int len, UChar const * val)
-    {
-        Owned<IWUResult> r = updateResult(name, sequence);
-        if (r)
-        {
-            r->setResultUnicode((char const *)val, len);
-            r->setResultStatus(ResultStatusCalculated);
-        }
-    }
+            { c->setResultUnicode(name, sequence, len, val); }
     virtual void setResultBool(const char *name, unsigned sequence, bool val)
-    {
-        Owned<IWUResult> r = updateResult(name, sequence);
-        if (r)
-        {
-            r->setResultBool(val);
-            r->setResultStatus(ResultStatusCalculated);
-        }
-    }
+            { c->setResultBool(name, sequence, val); }
     virtual void setResultDecimal(const char *name, unsigned sequence, int len, int precision, bool isSigned, const void *val)
-    {
-        Owned<IWUResult> r = updateResult(name, sequence);
-        if (r)
-        {
-            r->setResultDecimal(val, len);
-            r->setResultStatus(ResultStatusCalculated);
-        }
-    }
+            { c->setResultDecimal(name, sequence, len,  precision, isSigned, val); }
     virtual void setResultDataset(const char * name, unsigned sequence, size32_t len, const void *val, unsigned numRows, bool extend)
-    {
-        Owned<IWUResult> r = updateResult(name, sequence);
-        if (r)
-        {
-            __int64 totalRows = numRows;
-            if (extend)
-            {
-                totalRows += r->getResultRowCount();
-                r->addResultRaw(len, val, ResultFormatRaw);
-            }
-            else
-                r->setResultRaw(len, val, ResultFormatRaw);
-
-            r->setResultStatus(ResultStatusCalculated);
-            r->setResultRowCount(totalRows);
-            r->setResultTotalRowCount(totalRows);
-        }
-    }
-
-protected:
-    IWUResult *updateResult(const char *name, unsigned sequence)
-    {
-        Owned <IWUResult> result = updateWorkUnitResult(this, name, sequence);
-        if (result)
-        {
-            SCMStringBuffer rname;
-            if (!result->getResultName(rname).length())
-                result->setResultName(name);
-        }
-        return result.getClear();
-    }
-
-    void doSetResultString(type_t type, const char *name, unsigned sequence, int len, const char *val)
-    {
-        Owned<IWUResult> r = updateResult(name, sequence);
-        if (r)
-        {
-            r->setResultString(val, len);   
-            r->setResultStatus(ResultStatusCalculated);
-        }
-    }
-
+            { c->setResultDataset(name, sequence, len, val, numRows, extend); }
 };
 
 class CLocalWUAssociated : public CInterface, implements IConstWUAssociatedFile
@@ -3402,12 +3425,17 @@ void CLocalWorkUnit::init()
 }
 
 // Dummy workunit support
-CLocalWorkUnit::CLocalWorkUnit(const char *_wuid, ISecManager *secmgr, ISecUser *secuser)
+CLocalWorkUnit::CLocalWorkUnit(const char *_wuid, const char *xml, ISecManager *secmgr, ISecUser *secuser)
 {
     connectAtRoot = true;
     init();
-    p.setown(createPTree(_wuid));
-    p->setProp("@xmlns:xsi", "http://www.w3.org/1999/XMLSchema-instance");
+    if (xml)
+        p.setown(createPTreeFromXMLString(xml));
+    else
+    {
+        p.setown(createPTree(_wuid));
+        p->setProp("@xmlns:xsi", "http://www.w3.org/1999/XMLSchema-instance");
+    }
     secMgr.set(secmgr);
     secUser.set(secuser);
 }
@@ -4009,11 +4037,11 @@ void CLocalWorkUnit::unsubscribe()
     }
 }
 
-void CLocalWorkUnit::unlockRemote(bool commit)
+void CLocalWorkUnit::unlockRemote()
 {
     CriticalBlock block(crit);
     locked.unlock();
-    if (commit)  
+    if (IsShared())  // Is this right? Doesn't feel right! Commit on last unlock would seem smarter
     {
         try
         {
@@ -9026,9 +9054,9 @@ bool CLocalWULegacyTiming::matches(const IStatisticsFilter * filter) const
 
 //==========================================================================================
 
-extern WORKUNIT_API ILocalWorkUnit * createLocalWorkUnit()
+extern WORKUNIT_API ILocalWorkUnit * createLocalWorkUnit(const char *xml)
 {
-    Owned<CLocalWorkUnit> cw = new CLocalWorkUnit("W_LOCAL", (ISecManager*)NULL, NULL);
+    Owned<CLocalWorkUnit> cw = new CLocalWorkUnit("W_LOCAL", xml, (ISecManager*)NULL, NULL);
     ILocalWorkUnit* ret = QUERYINTERFACE(&cw->lockRemote(false), ILocalWorkUnit);
     return ret;
 }

+ 1 - 2
common/workunit/workunit.hpp

@@ -1199,7 +1199,6 @@ interface ILocalWorkUnit : extends IWorkUnit
 {
     virtual void serialize(MemoryBuffer & tgt) = 0;
     virtual void deserialize(MemoryBuffer & src) = 0;
-    virtual void loadXML(const char * xml) = 0;
     virtual IConstWorkUnit * unlock() = 0;
 };
 
@@ -1365,7 +1364,7 @@ extern WORKUNIT_API void addExceptionToWorkunit(IWorkUnit * wu, WUExceptionSever
 extern WORKUNIT_API IWorkUnitFactory * getWorkUnitFactory();
 extern WORKUNIT_API IWorkUnitFactory * getSecWorkUnitFactory(ISecManager &secmgr, ISecUser &secuser);
 extern WORKUNIT_API IWorkUnitFactory * getWorkUnitFactory(ISecManager *secmgr, ISecUser *secuser);
-extern WORKUNIT_API ILocalWorkUnit* createLocalWorkUnit();
+extern WORKUNIT_API ILocalWorkUnit* createLocalWorkUnit(const char *XML);
 extern WORKUNIT_API IStringVal& exportWorkUnitToXML(const IConstWorkUnit *wu, IStringVal &str, bool unpack, bool includeProgress);
 extern WORKUNIT_API StringBuffer &exportWorkUnitToXML(const IConstWorkUnit *wu, StringBuffer &str, bool unpack, bool includeProgress);
 extern WORKUNIT_API void exportWorkUnitToXMLFile(const IConstWorkUnit *wu, const char * filename, unsigned extraXmlFlags, bool unpack, bool includeProgress);

+ 1 - 2
ecl/eclagent/eclagent.cpp

@@ -3310,8 +3310,7 @@ extern int HTHOR_API eclagent_main(int argc, const char *argv[], StringBuffer *
         if (wuXML)
         {
             //Create workunit from XML
-            standAloneWorkUnit.setown(createLocalWorkUnit());
-            standAloneWorkUnit->loadXML(wuXML->str());
+            standAloneWorkUnit.setown(createLocalWorkUnit(wuXML->str()));
             wuXML->kill();  // free up text as soon as possible.
         }
 

+ 2 - 2
ecl/eclcc/eclcc.cpp

@@ -1389,7 +1389,7 @@ void EclCC::processFile(EclCompileInstance & instance)
     if (optArchive || optGenerateDepend)
         instance.archive.setown(createAttributeArchive());
 
-    instance.wu.setown(createLocalWorkUnit());
+    instance.wu.setown(createLocalWorkUnit(NULL));
     if (optSaveQueryText)
     {
         Owned<IWUQuery> q = instance.wu->updateQuery();
@@ -1608,7 +1608,7 @@ void EclCC::processReference(EclCompileInstance & instance, const char * queryAt
 {
     const char * outputFilename = instance.outputFilename;
 
-    instance.wu.setown(createLocalWorkUnit());
+    instance.wu.setown(createLocalWorkUnit(NULL));
     if (optArchive || optGenerateDepend)
         instance.archive.setown(createAttributeArchive());
 

+ 1 - 2
ecl/eclccserver/eclccserver.cpp

@@ -392,8 +392,7 @@ class EclccCompileThread : public CInterface, implements IPooledThread, implemen
                 StringBuffer wuXML;
                 if (getWorkunitXMLFromFile(realdllfilename, wuXML))
                 {
-                    Owned<ILocalWorkUnit> embeddedWU = createLocalWorkUnit();
-                    embeddedWU->loadXML(wuXML);
+                    Owned<ILocalWorkUnit> embeddedWU = createLocalWorkUnit(wuXML);
                     queryExtendedWU(workunit)->copyWorkUnit(embeddedWU, true);
                     workunit->setIsClone(false);
                     SCMStringBuffer jobname;

+ 1 - 2
esp/services/ws_workunits/ws_workunitsService.cpp

@@ -3819,8 +3819,7 @@ void deploySharedObject(IEspContext &context, StringBuffer &wuid, const char *fi
     StringBuffer dllXML;
     if (getWorkunitXMLFromFile(dllpath.str(), dllXML))
     {
-        Owned<ILocalWorkUnit> embeddedWU = createLocalWorkUnit();
-        embeddedWU->loadXML(dllXML.str());
+        Owned<ILocalWorkUnit> embeddedWU = createLocalWorkUnit(dllXML.str());
         queryExtendedWU(wu)->copyWorkUnit(embeddedWU, true);
     }
 

+ 1 - 2
roxie/ccd/ccddali.cpp

@@ -601,8 +601,7 @@ public:
             StringBuffer wuXML;
             if (getEmbeddedWorkUnitXML(source, wuXML))
             {
-                Owned<ILocalWorkUnit> localWU = createLocalWorkUnit();
-                localWU->loadXML(wuXML);
+                Owned<ILocalWorkUnit> localWU = createLocalWorkUnit(wuXML);
                 queryExtendedWU(w)->copyWorkUnit(localWU, true);
             }
             else

+ 1 - 2
roxie/ccd/ccdquery.cpp

@@ -90,8 +90,7 @@ public:
         StringBuffer wuXML;
         if (getEmbeddedWorkUnitXML(dll, wuXML))
         {
-            Owned<ILocalWorkUnit> localWU = createLocalWorkUnit();
-            localWU->loadXML(wuXML);
+            Owned<ILocalWorkUnit> localWU = createLocalWorkUnit(wuXML);
             wu.setown(localWU->unlock());
         }
         CriticalBlock b(dllCacheLock);

+ 1 - 2
tools/wuget/wuget.cpp

@@ -56,8 +56,7 @@ int main(int argc, char **argv)
                 StringBuffer xml;
                 if (getWorkunitXMLFromFile(argv[i], xml))
                 {
-                    Owned<ILocalWorkUnit> wu = createLocalWorkUnit();
-                    wu->loadXML(xml);
+                    Owned<ILocalWorkUnit> wu = createLocalWorkUnit(xml);
                     exportWorkUnitToXML(wu, xml.clear(), true, false);
                     printf("%s\n", xml.str());
                 }