Selaa lähdekoodia

HPCC-13731 WorkunitsService does not always go via workunit.hpp

Removed the code that tried to fetch workunit subtrees from sasha - it was not
correct and thus could never have worked.

All workUnitsService code - unless explicitly querying Sasha - now uses
proper workunit.cpp interface and should work against Cassandra too.

Extended the lightweight workunit interface to include priority values since
they are part of the API implemented by this plugin.

Extended workunit API's getWorkUnitsSorted() capabilities to allow filtering
by files written.

Updated Cassandra workunit support to handle filesWritten and the new priority
values in the lightweight interface.

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 10 vuotta sitten
vanhempi
commit
1667a276b9

+ 30 - 10
common/workunit/workunit.cpp

@@ -769,6 +769,14 @@ mapEnums actions[] = {
    { WUActionSize, NULL },
 };
 
+mapEnums priorityClasses[] = {
+   { PriorityClassUnknown, "unknown" },
+   { PriorityClassLow, "low" },
+   { PriorityClassNormal, "normal" },
+   { PriorityClassHigh, "high" },
+   { PriorityClassSize, NULL },
+};
+
 const char * getWorkunitStateStr(WUState state)
 {
     dbgassertex(state < WUStateSize);
@@ -825,6 +833,8 @@ public:
         timeScheduled.set(p.queryProp("@timeScheduled"));
         state = (WUState) getEnum(&p, "@state", states);
         action = (WUAction) getEnum(&p, "Action", actions);
+        priority = (WUPriorityClass) getEnum(&p, "@priorityClass", priorityClasses);
+        priorityLevel = calcPriorityValue(&p);
         wuscope.set(p.queryProp("@scope"));
         appvalues.load(&p,"Application/*");
         totalThorTime = nanoToMilli(extractTimeCollatable(p.queryProp("@totalThorTime"), false));
@@ -839,6 +849,9 @@ public:
     virtual const char *queryStateDesc() const { return getEnumText(state, states); }
     virtual WUAction getAction() const { return action; }
     virtual const char *queryActionDesc() const { return getEnumText(action, actions); }
+    virtual WUPriorityClass getPriority() const { return priority; }
+    virtual const char *queryPriorityDesc() const { return getEnumText(priority, priorityClasses); }
+    virtual int getPriorityLevel() const { return priorityLevel; }
     virtual bool isProtected() const { return _isProtected; }
     virtual IJlibDateTime & getTimeScheduled(IJlibDateTime & val) const
     {
@@ -855,6 +868,8 @@ protected:
     unsigned totalThorTime;
     WUState state;
     WUAction action;
+    WUPriorityClass priority;
+    int priorityLevel;
     bool _isProtected;
 };
 
@@ -1201,6 +1216,8 @@ public:
             { return c->getLibraries(); }
     virtual WUPriorityClass getPriority() const
             { return c->getPriority(); }
+    virtual const char *queryPriorityDesc() const
+            { return c->queryPriorityDesc(); }
     virtual int getPriorityLevel() const
             { return c->getPriorityLevel(); }
     virtual int getPriorityValue() const
@@ -1830,6 +1847,7 @@ mapEnums workunitSortFields[] =
    { WUSFwuidhigh, "@" },
    { WUSFwildwuid, "@" },
    { WUSFappvalue, "Application" },
+   { WUSFfilewritten, "Files/File/@name" },
    { WUSFterm, NULL }
 };
 
@@ -3753,14 +3771,6 @@ const char *CLocalWorkUnit::queryWuScope() const
     return ret;
 }
 
-mapEnums priorityClasses[] = {
-   { PriorityClassUnknown, "unknown" },
-   { PriorityClassLow, "low" },
-   { PriorityClassNormal, "normal" },
-   { PriorityClassHigh, "high" },
-   { PriorityClassSize, NULL },
-};
-
 void CLocalWorkUnit::setPriority(WUPriorityClass cls) 
 {
     CriticalBlock block(crit);
@@ -3773,6 +3783,11 @@ WUPriorityClass CLocalWorkUnit::getPriority() const
     return (WUPriorityClass) getEnum(p, "@priorityClass", priorityClasses);
 }
 
+const char *CLocalWorkUnit::queryPriorityDesc() const
+{
+    return getEnumText(getPriority(), priorityClasses);
+}
+
 void CLocalWorkUnit::setState(WUState value) 
 {
     if (value==WUStateAborted || value==WUStatePaused || value==WUStateCompleted || value==WUStateFailed || value==WUStateSubmitted || value==WUStateWait)
@@ -6048,7 +6063,6 @@ void CLocalWorkUnit::noteFileRead(IDistributedFile *file)
     if (file)
     {
         CriticalBlock block(crit);
-        _loadFilesRead();
         IPropertyTree *files = p->queryPropTree("FilesRead");
         if (!files)
             files = p->addPropTree("FilesRead", createPTree());
@@ -6056,6 +6070,11 @@ void CLocalWorkUnit::noteFileRead(IDistributedFile *file)
     }
 }
 
+void CLocalWorkUnit::_loadFilesWritten() const
+{
+    // Nothing to do
+}
+
 static void addFile(IPropertyTree *files, const char *fileName, const char *cluster, unsigned usageCount, WUFileKind fileKind, const char *graphOwner)
 {
     StringBuffer path("File[@name=\"");
@@ -6083,7 +6102,7 @@ void CLocalWorkUnit::addFile(const char *fileName, StringArray *clusters, unsign
     if (!files)
         files = p->addPropTree("Files", createPTree());
     if (!clusters)
-        addFile(fileName, NULL, usageCount, fileKind, graphOwner);
+        ::addFile(files, fileName, NULL, usageCount, fileKind, graphOwner);
     else
     {
         ForEachItemIn(c, *clusters)
@@ -6180,6 +6199,7 @@ void CLocalWorkUnit::addDiskUsageStats(__int64 _avgNodeUsage, unsigned _minNode,
 IPropertyTreeIterator & CLocalWorkUnit::getFileIterator() const
 {
     CriticalBlock block(crit);
+    _loadFilesWritten();
     return * p->getElements("Files/File");
 }
 

+ 4 - 2
common/workunit/workunit.hpp

@@ -950,6 +950,9 @@ interface IConstWorkUnitInfo : extends IInterface
     virtual const char *queryStateDesc() const = 0;
     virtual WUAction getAction() const = 0;
     virtual const char *queryActionDesc() const = 0;
+    virtual WUPriorityClass getPriority() const = 0;
+    virtual const char *queryPriorityDesc() const = 0;
+    virtual int getPriorityLevel() const = 0;
     virtual bool isProtected() const = 0;
     virtual IJlibDateTime & getTimeScheduled(IJlibDateTime & val) const = 0;
 
@@ -989,8 +992,6 @@ interface IConstWorkUnit : extends IConstWorkUnitInfo
     virtual IConstWUPlugin * getPluginByName(const char * name) const = 0;
     virtual IConstWUPluginIterator & getPlugins() const = 0;
     virtual IConstWULibraryIterator & getLibraries() const = 0;
-    virtual WUPriorityClass getPriority() const = 0;
-    virtual int getPriorityLevel() const = 0;
     virtual IConstWUQuery * getQuery() const = 0;
     virtual bool getRescheduleFlag() const = 0;
     virtual IConstWUResult * getResultByName(const char * name) const = 0;
@@ -1192,6 +1193,7 @@ enum WUSortField
     WUSFecl = 13,
     // WUSFcustom = 14, obsolete
     WUSFappvalue=15,
+    WUSFfilewritten = 16,
     WUSFterm = 0,
     WUSFreverse = 256,
     WUSFnocase = 512,

+ 2 - 0
common/workunit/workunit.ipp

@@ -262,6 +262,7 @@ public:
     virtual IConstWUPluginIterator & getPlugins() const;
     virtual IConstWULibraryIterator & getLibraries() const;
     virtual WUPriorityClass getPriority() const;
+    virtual const char *queryPriorityDesc() const;
     virtual int getPriorityLevel() const;
     virtual int getPriorityValue() const;
     virtual IConstWUQuery * getQuery() const;
@@ -552,6 +553,7 @@ protected:
     virtual void _lockRemote() {};
     virtual void _unlockRemote() {};
     virtual void _loadFilesRead() const;
+    virtual void _loadFilesWritten() const;
     virtual void _loadGraphs(bool heavy) const;
     virtual void _loadResults() const;
     virtual void _loadVariables() const;

+ 11 - 0
ecl/wutest/CMakeLists.txt

@@ -39,6 +39,14 @@ include_directories (
          ./../../testing/unittests 
     )
 
+if ( USE_CPPUNIT )
+  include_directories(
+         ./../../plugins/workunitservices
+         ./../../rtl/include
+         ./../../rtl/eclrtl
+    )
+endif()
+
 ADD_DEFINITIONS( -D_CONSOLE )
 if ( FORCE_WORKUNITS_TO_CASSANDRA )
   ADD_DEFINITIONS( -DFORCE_WORKUNITS_TO_CASSANDRA )
@@ -63,3 +71,6 @@ if ( FORCE_WORKUNITS_TO_CASSANDRA )
   target_link_libraries ( wutest cassandraembed )
 endif ()
 
+if ( USE_CPPUNIT )
+  target_link_libraries ( wutest workunitservices )
+endif()

+ 196 - 0
ecl/wutest/wutest.cpp

@@ -31,6 +31,8 @@
 #include "dalienv.hpp"
 
 #ifdef _USE_CPPUNIT
+#include "workunitservices.hpp"
+#include "eclrtl.hpp"
 #include <cppunit/extensions/TestFactoryRegistry.h>
 #include <cppunit/ui/text/TestRunner.h>
 #endif
@@ -415,8 +417,10 @@ class WuTest : public CppUnit::TestFixture
         CPPUNIT_TEST(testListByAppValue);
         CPPUNIT_TEST(testListByAppValueWild);
         CPPUNIT_TEST(testListByFilesRead);
+        CPPUNIT_TEST(testListByFilesWritten);
         CPPUNIT_TEST(testSet);
         CPPUNIT_TEST(testResults);
+        CPPUNIT_TEST(testWorkUnitServices);
         CPPUNIT_TEST(testDelete);
         CPPUNIT_TEST(testCopy);
         CPPUNIT_TEST(testQuery);
@@ -470,6 +474,9 @@ protected:
                 " </FilesRead>", i % 10, i % 10);
             p->setPropTree("FilesRead", createPTreeFromXMLString(fileinfo));
             wu->noteFileRead(NULL); // Make sure we notice that it was modified
+
+            VStringBuffer myFileW("myfilewritten%02d", i % 10);
+            wu->addFile(myFileW, NULL, i % 3, WUFileStandard, NULL);
         }
         unsigned after = factory->numWorkUnits();
         DBGLOG("%u workunits created in %d ms (%d total)", testSize, msTick()-start, after);
@@ -1556,6 +1563,27 @@ protected:
         ASSERT(numIterated == (testSize+9)/10);
         numIterated++;
     }
+    void testListByFilesWritten()
+    {
+        Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
+        unsigned start = msTick();
+        unsigned numIterated = 0;
+        // Test filter by filesRead
+        WUSortField filterByFilesRead[] = { WUSFfilewritten, WUSFterm };
+        StringAttr prevValue;
+        Owned<IConstWorkUnitIterator> wus = factory->getWorkUnitsSorted((WUSortField)(WUSFwuid|WUSFreverse), filterByFilesRead, "myfilewritten00", 0, 10000, NULL, NULL);
+        ForEach(*wus)
+        {
+            IConstWorkUnitInfo &wu = wus->query();
+            if (numIterated)
+                ASSERT(strcmp(wu.queryWuid(), prevValue)<0);
+            prevValue.set(wu.queryWuid());
+            numIterated++;
+        }
+        DBGLOG("%d workunits by filewritten wild in %d ms", numIterated, msTick()-start);
+        ASSERT(numIterated == (testSize+9)/10);
+        numIterated++;
+    }
     void testGlobal()
     {
         // Is global workunit ever actually used any more? For scalar persists, perhaps
@@ -1577,6 +1605,174 @@ protected:
         ASSERT(cresult->isResultScalar());
         ASSERT(cresult->getResultInt()==53);
     }
+    void testWorkUnitServices()
+    {
+        class DummyContext: implements ICodeContext
+        {
+            virtual const char *loadResource(unsigned id) { throwUnexpected(); }
+
+            // Fetching interim results from workunit/query context
+
+            virtual bool getResultBool(const char * name, unsigned sequence) { throwUnexpected(); }
+            virtual void getResultData(unsigned & tlen, void * & tgt, const char * name, unsigned sequence) { throwUnexpected(); }
+            virtual void getResultDecimal(unsigned tlen, int precision, bool isSigned, void * tgt, const char * stepname, unsigned sequence) { throwUnexpected(); }
+            virtual void getResultDictionary(size32_t & tcount, byte * * & tgt, IEngineRowAllocator * _rowAllocator, const char * name, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer, IHThorHashLookupInfo * hasher) { throwUnexpected(); }
+            virtual void getResultRaw(unsigned & tlen, void * & tgt, const char * name, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer) { throwUnexpected(); }
+            virtual void getResultSet(bool & isAll, size32_t & tlen, void * & tgt, const char * name, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer) { throwUnexpected(); }
+            virtual __int64 getResultInt(const char * name, unsigned sequence) { throwUnexpected(); }
+            virtual double getResultReal(const char * name, unsigned sequence) { throwUnexpected(); }
+            virtual void getResultRowset(size32_t & tcount, byte * * & tgt, const char * name, unsigned sequence, IEngineRowAllocator * _rowAllocator, bool isGrouped, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer) { throwUnexpected(); }
+            virtual void getResultString(unsigned & tlen, char * & tgt, const char * name, unsigned sequence) { throwUnexpected(); }
+            virtual void getResultStringF(unsigned tlen, char * tgt, const char * name, unsigned sequence) { throwUnexpected(); }
+            virtual void getResultUnicode(unsigned & tlen, UChar * & tgt, const char * name, unsigned sequence) { throwUnexpected(); }
+            virtual char *getResultVarString(const char * name, unsigned sequence) { throwUnexpected(); }
+            virtual UChar *getResultVarUnicode(const char * name, unsigned sequence) { throwUnexpected(); }
+
+            // Writing results to workunit/query context/output
+
+            virtual void setResultBool(const char *name, unsigned sequence, bool value) { throwUnexpected(); }
+            virtual void setResultData(const char *name, unsigned sequence, int len, const void * data) { throwUnexpected(); }
+            virtual void setResultDecimal(const char * stepname, unsigned sequence, int len, int precision, bool isSigned, const void *val) { throwUnexpected(); }
+            virtual void setResultInt(const char *name, unsigned sequence, __int64 value, unsigned size) { throwUnexpected(); }
+            virtual void setResultRaw(const char *name, unsigned sequence, int len, const void * data) { throwUnexpected(); }
+            virtual void setResultReal(const char * stepname, unsigned sequence, double value) { throwUnexpected(); }
+            virtual void setResultSet(const char *name, unsigned sequence, bool isAll, size32_t len, const void * data, ISetToXmlTransformer * transformer) { throwUnexpected(); }
+            virtual void setResultString(const char *name, unsigned sequence, int len, const char * str) { throwUnexpected(); }
+            virtual void setResultUInt(const char *name, unsigned sequence, unsigned __int64 value, unsigned size) { throwUnexpected(); }
+            virtual void setResultUnicode(const char *name, unsigned sequence, int len, UChar const * str) { throwUnexpected(); }
+            virtual void setResultVarString(const char * name, unsigned sequence, const char * value) { throwUnexpected(); }
+            virtual void setResultVarUnicode(const char * name, unsigned sequence, UChar const * value) { throwUnexpected(); }
+
+            // Checking persists etc are up to date
+
+            virtual unsigned getResultHash(const char * name, unsigned sequence) { throwUnexpected(); }
+            virtual unsigned getExternalResultHash(const char * wuid, const char * name, unsigned sequence) { throwUnexpected(); }
+            virtual unsigned __int64 getDatasetHash(const char * name, unsigned __int64 crc) { throwUnexpected(); }
+
+            // Fetching various environment information, typically accessed via std.system
+
+            virtual char *getClusterName() { throwUnexpected(); } // caller frees return string.
+            virtual char *getEnv(const char *name, const char *defaultValue) const { throwUnexpected(); }
+            virtual char *getGroupName() { throwUnexpected(); } // caller frees return string.
+            virtual char *getJobName() { throwUnexpected(); } // caller frees return string.
+            virtual char *getJobOwner() { throwUnexpected(); } // caller frees return string.
+            virtual unsigned getNodeNum() { throwUnexpected(); }
+            virtual unsigned getNodes() { throwUnexpected(); }
+            virtual char *getOS() { throwUnexpected(); } // caller frees return string
+            virtual char *getPlatform() { throwUnexpected(); } // caller frees return string.
+            virtual unsigned getPriority() const { throwUnexpected(); }
+            virtual char *getWuid() { throwUnexpected(); } // caller frees return string.
+
+            // Exception handling
+
+            virtual void addWuException(const char*, unsigned int, unsigned int, const char*) { throwUnexpected(); } //n.b. this might be better named: it should only be used for adding user-generated exceptions (via the logging plug-in) --- there's a call in IAgentContext which takes a source argument too
+            virtual void addWuAssertFailure(unsigned code, const char * text, const char * filename, unsigned lineno, unsigned column, bool isAbort) { throwUnexpected(); }
+
+            // File resolution etc
+
+            virtual char * getExpandLogicalName(const char * logicalName) { throwUnexpected(); }
+            virtual unsigned __int64 getFileOffset(const char *logicalPart) { throwUnexpected(); }
+            virtual char *getFilePart(const char *logicalPart, bool create=false) { throwUnexpected(); } // caller frees return string.
+            virtual IDistributedFileTransaction *querySuperFileTransaction() { throwUnexpected(); }
+            virtual IUserDescriptor *queryUserDescriptor() { throwUnexpected(); }
+
+            // Graphs, child queries etc
+
+            virtual void executeGraph(const char * graphName, bool realThor, size32_t parentExtractSize, const void * parentExtract) { throwUnexpected(); }
+            virtual unsigned getGraphLoopCounter() const { return 0; }
+            virtual IThorChildGraph * resolveChildQuery(__int64 activityId, IHThorArg * colocal) { throwUnexpected(); }
+            virtual IEclGraphResults * resolveLocalQuery(__int64 activityId) { return NULL; }
+
+            // Logging etc
+
+            virtual unsigned logString(const char *text) const { throwUnexpected(); }
+            virtual IDebuggableContext *queryDebugContext() const { return NULL; }
+
+            // Memory management
+
+            virtual IEngineRowAllocator * getRowAllocator(IOutputMetaData * meta, unsigned activityId) const { throwUnexpected(); }
+            virtual const char * cloneVString(const char *str) const { throwUnexpected(); }
+            virtual const char * cloneVString(size32_t len, const char *str) const { throwUnexpected(); }
+
+            // Called from generated code for FROMXML/TOXML
+
+            virtual const void * fromXml(IEngineRowAllocator * _rowAllocator, size32_t len, const char * utf8, IXmlToRowTransformer * xmlTransformer, bool stripWhitespace) { throwUnexpected(); }
+            virtual void getRowXML(size32_t & lenResult, char * & result, IOutputMetaData & info, const void * row, unsigned flags) { throwUnexpected(); }
+
+            // Miscellaneous
+
+            virtual void getExternalResultRaw(unsigned & tlen, void * & tgt, const char * wuid, const char * stepname, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer) { throwUnexpected(); }    // shouldn't really be here, but it broke thor.
+            virtual char * queryIndexMetaData(char const * lfn, char const * xpath) { throwUnexpected(); }
+
+            // Called from generated code for FROMJSON
+
+            virtual const void * fromJson(IEngineRowAllocator * _rowAllocator, size32_t len, const char * utf8, IXmlToRowTransformer * xmlTransformer, bool stripWhitespace) { throwUnexpected(); }
+            virtual void getRowJSON(size32_t & lenResult, char * & result, IOutputMetaData & info, const void * row, unsigned flags) { throwUnexpected(); }
+
+            virtual const IContextLogger &queryContextLogger() const
+            {
+                return queryDummyContextLogger();
+            }
+            virtual IEngineContext *queryEngineContext() { return NULL; }
+            virtual char *getDaliServers() { throwUnexpected(); }
+            virtual IWorkUnit* updateWorkUnit() const { throwUnexpected(); }
+
+        } ctx;
+
+        size32_t lenResult;
+        void * result;
+        wsWorkunitList(&ctx, lenResult, result, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, true, false);
+        /* export WsWorkunitRecord := record "
+                                    " string24 wuid;"                         0
+                                    " string owner{maxlength(64)};"           24
+                                    " string cluster{maxlength(64)};"         92
+                                    " string roxiecluster{maxlength(64)};"    160
+                                    " string job{maxlength(256)};"            228
+                                    " string10 state;"                        488
+                                    " string7 priority;"                      498
+                                    " integer2 priorityvalue;"                505
+                                    " string20 created;"                      507
+                                    " string20 modified;"                     527
+                                    " boolean online;"                        547
+                                    " boolean protected;"                     548
+                                  " end;\n"  Total size 549
+                                  */
+        #pragma pack(push, 1)
+        struct resultStruct
+        {
+            char wuid[24];   // space filled
+            unsigned _len_owner; char owner[64];
+            unsigned _len_cluster; char cluster[64];
+            unsigned _len_roxiecluster; char roxiecluster[64];
+            unsigned _len_job; char job[256];
+            char state[10];
+            char priority[7];
+            unsigned short priorityValue;
+            char created[20];
+            char modified[20];
+            bool online;
+            bool isProtected;
+        };
+        #pragma pack(pop)
+        ASSERT(lenResult == testSize * sizeof(resultStruct));
+        rtlFree(result);
+
+        // Now filter owner via generic mechanism
+        unsigned start = msTick();
+        wsWorkunitList(&ctx, lenResult, result, NULL, NULL, "WuTestUser00", NULL, NULL, "completed", NULL, NULL, NULL, NULL, NULL, true, false);
+        ASSERT(lenResult % sizeof(resultStruct) == 0);
+        unsigned numResults = lenResult/sizeof(resultStruct);
+        resultStruct *it = (resultStruct *) result;
+        for (unsigned i = 0; i < numResults; i++)
+        {
+            ASSERT(memicmp(it[i].state, "completed ", 10)==0);
+            ASSERT(memicmp(it[i].owner, "WuTestUser00       ", 20)==0);
+        }
+        DBGLOG("%d owned workunits listed the hard way in %d ms", numResults, msTick()-start);
+        ASSERT(numResults <= (testSize+49)/50);  // Not sure what the exact answer should be!
+
+        rtlFree(result);
+    }
 };
 StringArray WuTest::wuids;
 

+ 90 - 32
plugins/cassandra/cassandrawu.cpp

@@ -845,6 +845,7 @@ static const CassandraXmlMapping workunitsMappings [] =
     {"clustername", "text", "@clusterName", stringColumnMapper},
     {"jobname", "text", "@jobName", stringColumnMapper},
     {"priorityclass", "text", "@priorityClass", stringColumnMapper},
+    {"prioritylevel", "int", "@priorityLevel", intColumnMapper},
     {"wuScope", "text", "@scope", stringColumnMapper},
     {"submitID", "text", "@submitID", stringColumnMapper},
     {"state", "text", "@state", stringColumnMapper},
@@ -857,14 +858,14 @@ static const CassandraXmlMapping workunitsMappings [] =
 
     {"agentSession", "bigint", "@agentSession", bigintColumnMapper},
     {"debug", "map<text, text>", "Debug", simpleMapColumnMapper},
-    {"attributes", "map<text, text>", "@agentSession@wuid@clusterName@jobName@priorityClass@protected@scope@submitID@state@timeScheduled@totalThorTime@", attributeMapColumnMapper},  // name is the suppression list, note trailing @
+    {"attributes", "map<text, text>", "@agentSession@wuid@clusterName@jobName@priorityClass@priorityLevel@protected@scope@submitID@state@timeScheduled@totalThorTime@", attributeMapColumnMapper},  // name is the suppression list, note trailing @
     {"plugins", "list<text>", "Plugins", pluginListColumnMapper},
     {"workflow", "map<text, text>", "Workflow", workflowMapColumnMapper},
     {"onWarnings", "map<int, text>", "OnWarnings/OnWarning", warningsMapColumnMapper},
 
     // These are catchalls for anything not processed above or in a child table
 
-    {"elements", "map<text, text>", "@Action@Application@Debug@Exceptions@FilesRead@Graphs@Results@Statistics@Plugins@Query@Variables@Temporaries@Workflow@", elementMapColumnMapper},  // name is the suppression list, note trailing @
+    {"elements", "map<text, text>", "@Action@Application@Debug@Exceptions@Files@FilesRead@Graphs@Results@Statistics@Plugins@Query@Variables@Temporaries@Workflow@", elementMapColumnMapper},  // name is the suppression list, note trailing @
     {"subtrees", "map<text, text>", "@DiskUsageStats@Parameters@Process@Tracing@", subTreeMapColumnMapper},  // name is the INCLUSION list, note trailing @
 
     { NULL, "workunits", "((partition), wuid)|CLUSTERING ORDER BY (wuid DESC)", stringColumnMapper}
@@ -877,6 +878,7 @@ static const CassandraXmlMapping workunitInfoMappings [] =  // A cut down versio
     {"clustername", "text", "@clusterName", stringColumnMapper},
     {"jobname", "text", "@jobName", stringColumnMapper},
     {"priorityclass", "text", "@priorityClass", stringColumnMapper},
+    {"prioritylevel", "int", "@priorityLevel", intColumnMapper},
     {"wuScope", "text", "@scope", stringColumnMapper},
     {"submitID", "text", "@submitID", stringColumnMapper},
     {"state", "text", "@state", stringColumnMapper},
@@ -900,6 +902,7 @@ static const CassandraXmlMapping searchMappings [] =
     {"clustername", "text", "@clusterName", stringColumnMapper},
     {"jobname", "text", "@jobName", stringColumnMapper},
     {"priorityclass", "text", "@priorityClass", stringColumnMapper},
+    {"prioritylevel", "int", "@priorityLevel", intColumnMapper},
     {"scope", "text", "@scope", stringColumnMapper},
     {"submitID", "text", "@submitID", stringColumnMapper},
     {"state", "text", "@state", stringColumnMapper},
@@ -930,11 +933,12 @@ static const CassandraXmlMapping uniqueSearchMappings [] =
 
 const char * wildSearchPaths[] = { "@submitID", "@clusterName", "@jobName", NULL};
 
-static const CassandraXmlMapping filesReadSearchMappings [] =
+static const CassandraXmlMapping filesSearchMappings [] =
 {
     {"name", "text", "@name", stringColumnMapper},
+    {"read", "boolean", "@read", boolColumnMapper},
     {"wuid", "text", NULL, suppliedStringColumnMapper},
-    { NULL, "filesReadSearchValues", "((name), wuid)|CLUSTERING ORDER BY (wuid DESC)", stringColumnMapper}
+    { NULL, "filesSearchValues", "((name, read), wuid)|CLUSTERING ORDER BY (wuid DESC)", stringColumnMapper}
 };
 
 // The version table is keyed by a partition value because (a) you need to key by something and (b) we can use it to spread the load of
@@ -964,7 +968,7 @@ static const CassandraXmlMapping versionMappings [] =
 
 // The following describe child tables - all keyed by wuid
 
-enum ChildTablesEnum { WuQueryChild, WuExceptionsChild, WuStatisticsChild, WuGraphsChild, WuResultsChild, WuVariablesChild, WuTemporariesChild, WuFilesReadChild, ChildTablesSize };
+enum ChildTablesEnum { WuQueryChild, WuExceptionsChild, WuStatisticsChild, WuGraphsChild, WuResultsChild, WuVariablesChild, WuTemporariesChild, WuFilesReadChild, WuFilesWrittenChild, ChildTablesSize };
 
 struct ChildTableInfo
 {
@@ -1128,8 +1132,7 @@ static const CassandraXmlMapping wuFilesReadMappings [] =
     {"partition", "int", NULL, hashRootNameColumnMapper},
     {"wuid", "text", NULL, rootNameColumnMapper},
     {"name", "text", "@name", stringColumnMapper},
-    {"cluster", "text", "@cluster", stringColumnMapper},
-    {"useCount", "int", "@useCount", intColumnMapper}, // NOTE - could think about using a counter column, but would mess up the commit paradigm
+    {"attributes", "map<text, text>", "@name@", attributeMapColumnMapper},  /* name is the suppression list */
     {"subfiles", "list<text>", NULL, subfileListColumnMapper},
     { NULL, "wuFilesRead", "((partition, wuid), name)", stringColumnMapper}
 };
@@ -1141,8 +1144,24 @@ static const ChildTableInfo wuFilesReadTable =
     wuFilesReadMappings
 };
 
+static const CassandraXmlMapping wuFilesWrittenMappings [] =
+{
+    {"partition", "int", NULL, hashRootNameColumnMapper},
+    {"wuid", "text", NULL, rootNameColumnMapper},
+    {"name", "text", "@name", stringColumnMapper},
+    {"attributes", "map<text, text>", "@name@", attributeMapColumnMapper},  /* name is the suppression list */
+    { NULL, "wuFilesWritten", "((partition, wuid), name)", stringColumnMapper}
+};
+
+static const ChildTableInfo wuFilesWrittenTable =
+{
+    "Files", "File",
+    WuFilesWrittenChild,
+    wuFilesWrittenMappings
+};
+
 // Order should match the enum above
-static const ChildTableInfo * const childTables [] = { &wuQueriesTable, &wuExceptionsTable, &wuStatisticsTable, &wuGraphsTable, &wuResultsTable, &wuVariablesTable, &wuTemporariesTable, &wuFilesReadTable, NULL };
+static const ChildTableInfo * const childTables [] = { &wuQueriesTable, &wuExceptionsTable, &wuStatisticsTable, &wuGraphsTable, &wuResultsTable, &wuVariablesTable, &wuTemporariesTable, &wuFilesReadTable, &wuFilesWrittenTable, NULL };
 
 // Graph progress tables are read directly, XML mappers not used
 
@@ -1305,20 +1324,30 @@ extern void simpleXMLtoCassandra(const ICassandraSession *session, CassBatch *ba
     check(cass_batch_add_statement(batch, update));
 }
 
-extern void deleteSimpleXML(const ICassandraSession *session, CassBatch *batch, const CassandraXmlMapping *mappings, IPTree *inXML, const char *userVal = NULL)
+extern void deleteFileSearch(const ICassandraSession *session, CassBatch *batch, const char *name, bool read, const char *wuid)
 {
     StringBuffer names;
     StringBuffer tableName;
-    getFieldNames(mappings, names, tableName);
-    VStringBuffer deleteQuery("DELETE from %s where name=? and wuid=?", tableName.str());
+    getFieldNames(filesSearchMappings, names, tableName);
+    VStringBuffer deleteQuery("DELETE from %s where name=? and read=? and wuid=?", tableName.str());
     CassandraStatement update(session->prepareStatement(deleteQuery));
-    unsigned bindidx = 0;
-    while (mappings->columnName)
-    {
-        if (mappings->mapper.fromXML(&update, bindidx, inXML, mappings->xpath, userVal))
-            bindidx++;
-        mappings++;
-    }
+    update.bindString(0, name);
+    update.bindBool(1, read ? cass_true : cass_false);
+    update.bindString(2, wuid);
+    check(cass_batch_add_statement(batch, update));
+}
+
+extern void addFileSearch(const ICassandraSession *session, CassBatch *batch, const char *name, bool read, const char *wuid)
+{
+    StringBuffer bindings;
+    StringBuffer names;
+    StringBuffer tableName;
+    getBoundFieldNames(filesSearchMappings, names, bindings, NULL, NULL, tableName);
+    VStringBuffer insertQuery("INSERT INTO %s (%s) values (%s)", tableName.str(), names.str()+1, bindings.str()+1);
+    CassandraStatement update(session->prepareStatement(insertQuery));
+    update.bindString(0, name);
+    update.bindBool(1, read ? cass_true : cass_false);
+    update.bindString(2, wuid);
     check(cass_batch_add_statement(batch, update));
 }
 
@@ -2151,6 +2180,7 @@ public:
                 childXMLtoCassandra(sessionCache, *batch, wuExceptionsMappings, p, "Exceptions/Exception", 0);
                 childXMLtoCassandra(sessionCache, *batch, wuStatisticsMappings, p, "Statistics/Statistic", 0);
                 childXMLtoCassandra(sessionCache, *batch, wuFilesReadMappings, p, "FilesRead/File", 0);
+                childXMLtoCassandra(sessionCache, *batch, wuFilesWrittenMappings, p, "Files/File", 0);
                 IPTree *query = p->queryPropTree("Query");
                 if (query)
                     childXMLRowtoCassandra(sessionCache, *batch, wuQueryMappings, wuid, *query, 0);
@@ -2352,6 +2382,7 @@ public:
     {
         if (file)
         {
+            childLoaded[WuFilesReadChild] = true;  // Prevent duplicates if someone tries to read back files read (unlikely)
             CPersistedWorkUnit::noteFileRead(file);
             VStringBuffer xpath("FilesRead/File[@name='%s']", file->queryLogicalName());
             noteDirty(xpath, wuFilesReadMappings);
@@ -2367,6 +2398,16 @@ public:
             }
         }
     }
+    virtual void addFile(const char *fileName, StringArray *clusters, unsigned usageCount, WUFileKind fileKind, const char *graphOwner)
+    {
+        if (fileName)
+        {
+            childLoaded[WuFilesWrittenChild] = true;  // Prevent duplicates if someone tries to read back files written from same object (unlikely)
+            CPersistedWorkUnit::addFile(fileName, clusters, usageCount, fileKind, graphOwner);
+            VStringBuffer xpath("Files/File[@name='%s']", fileName);
+            noteDirty(xpath, wuFilesWrittenMappings);
+        }
+    }
 
     virtual void clearGraphProgress() const
     {
@@ -2537,6 +2578,12 @@ public:
         CPersistedWorkUnit::_loadFilesRead();
     }
 
+    virtual void _loadFilesWritten() const
+    {
+        checkChildLoaded(wuFilesWrittenTable);    // Lazy populate the Files branch of p from Cassandra
+        CPersistedWorkUnit::_loadFilesWritten();
+    }
+
     virtual void _loadResults() const
     {
         checkChildLoaded(wuResultsTable);        // Lazy populate the Results branch of p from Cassandra
@@ -2698,10 +2745,13 @@ protected:
         Owned<IPropertyTreeIterator> filesRead = &getFilesReadIterator();
         ForEach(*filesRead)
         {
-            IPTree &file = filesRead->query();
-            deleteSimpleXML(sessionCache, *batch, filesReadSearchMappings, &file, wuid);
+            deleteFileSearch(sessionCache, *batch, filesRead->query().queryProp("@name"), true, wuid);
+        }
+        Owned<IPropertyTreeIterator> filesWritten = &getFileIterator();
+        ForEach(*filesWritten)
+        {
+            deleteFileSearch(sessionCache, *batch, filesWritten->query().queryProp("@name"), false, wuid);
         }
-        // MORE deleteFilesReadSecondaries(*p, wuid);
     }
 
     void updateSecondaries(const char *wuid)
@@ -2730,8 +2780,12 @@ protected:
         Owned<IPropertyTreeIterator> filesRead = &getFilesReadIterator();
         ForEach(*filesRead)
         {
-            IPTree &file = filesRead->query();
-            simpleXMLtoCassandra(sessionCache, *batch, filesReadSearchMappings, &file, wuid);
+            addFileSearch(sessionCache, *batch, filesRead->query().queryProp("@name"), true, wuid);
+        }
+        Owned<IPropertyTreeIterator> filesWritten = &getFileIterator();
+        ForEach(*filesWritten)
+        {
+            addFileSearch(sessionCache, *batch, filesWritten->query().queryProp("@name"), false, wuid);
         }
     }
 
@@ -2989,6 +3043,8 @@ public:
             cached->touch();
         else
             cached.setown(new CCassandraWuUQueryCacheEntry());
+        if (pageSize > INT_MAX)
+            pageSize = INT_MAX;
         const WUSortField *thisFilter = filters;
         IArrayOf<IPostFilter> goodFilters;
         IArrayOf<IPostFilter> wuidFilters;
@@ -3076,6 +3132,7 @@ public:
                         mergeFilter(wuidFilters, field, fv);
                     break;
                 case WUSFfileread:
+                case WUSFfilewritten:
                     fileFilters.append(*new PostFilter(field, fv, true));
                     break;
                 case WUSFtotalthortime:
@@ -3113,11 +3170,13 @@ public:
             {
                 // We can't postfilter by these - we COULD in some cases do a join between these and some other filtered set
                 // but we will leave that as an exercise to the reader. So if there is a fileFilter, read it first, and turn it into a merge set of the resulting wus.
+                // MORE read and written are not the same
                 assertex(fileFilters.length()==1);  // If we supported more there would be a join phase here
                 merger->addPostFilters(goodFilters, 0);
                 merger->addPostFilters(poorFilters, 0);
                 merger->addPostFilters(remoteWildFilters, 0);
-                CassandraResult wuids(fetchDataForFileRead(fileFilters.item(0).queryValue(), wuidFilters, 0));
+                const IPostFilter &fileFilter = fileFilters.item(0);
+                CassandraResult wuids(fetchDataForFiles(fileFilter.queryValue(), wuidFilters, fileFilter.queryField()==WUSFfileread));
                 CassandraIterator rows(cass_iterator_from_result(wuids));
                 StringBuffer value;
                 while (cass_iterator_next(rows))
@@ -3371,7 +3430,7 @@ public:
         ensureTable(querySession(), workunitsMappings);
         ensureTable(querySession(), searchMappings);
         ensureTable(querySession(), uniqueSearchMappings);
-        ensureTable(querySession(), filesReadSearchMappings);
+        ensureTable(querySession(), filesSearchMappings);
         for (const ChildTableInfo * const * table = childTables; *table != NULL; table++)
             ensureTable(querySession(), table[0]->mappings);
         ensureTable(querySession(), wuGraphProgressMappings);
@@ -3827,27 +3886,26 @@ private:
         return executeQuery(querySession(), select);
     }
 
-    // Fetch rows from the file search table
+    // Fetch rows from the file search table (covers files read and files written)
 
-    const CassResult *fetchDataForFileRead(const char *name, const IArrayOf<IPostFilter> &wuidFilters, unsigned limit) const
+    const CassResult *fetchDataForFiles(const char *name, const IArrayOf<IPostFilter> &wuidFilters, bool read) const
     {
         StringBuffer names;
         StringBuffer tableName;
-        getFieldNames(filesReadSearchMappings+1, names, tableName);  // mappings+3 means we don't return the key column (name)
-        VStringBuffer selectQuery("select %s from %s where name=?", names.str()+1, tableName.str());
+        getFieldNames(filesSearchMappings+2, names, tableName);  // mappings+2 means we don't return the key columns (name and readmode)
+        VStringBuffer selectQuery("select %s from %s where name=? and read=?", names.str()+1, tableName.str());
         ForEachItemIn(idx, wuidFilters)
         {
             const IPostFilter &wuidFilter = wuidFilters.item(idx);
             selectQuery.appendf(" and wuid %s ?", wuidFilter.queryField()==WUSFwuidhigh ? "<=" : ">=");
         }
-        if (limit)
-            selectQuery.appendf(" LIMIT %u", limit);
         CassandraStatement select(prepareStatement(selectQuery));
         select.bindString(0, name);
+        select.bindBool(1, read ? cass_true : cass_false);
         ForEachItemIn(idx2, wuidFilters)
         {
             const IPostFilter &wuidFilter = wuidFilters.item(idx2);
-            select.bindString(idx2+1, wuidFilter.queryValue());
+            select.bindString(idx2+2, wuidFilter.queryValue());
         }
         return executeQuery(querySession(), select);
     }

+ 158 - 271
plugins/workunitservices/workunitservices.cpp

@@ -123,7 +123,7 @@ static const char * EclDefinition =
                             " string unit;"
                         " end;\n"
 "export WorkunitServices := SERVICE\n"
-"   boolean WorkunitExists(const varstring wuid, boolean online=true, boolean archived=false) : c,entrypoint='wsWorkunitExists'; \n"
+"   boolean WorkunitExists(const varstring wuid, boolean online=true, boolean archived=false) : c,context,entrypoint='wsWorkunitExists'; \n"
 "   dataset(WsWorkunitRecord) WorkunitList("
                                         " const varstring lowwuid," 
                                         " const varstring highwuid=''," 
@@ -199,13 +199,12 @@ static void getSashaNodes(SocketEndpointArray &epa)
     }
 }
 
-
 static IWorkUnitFactory * getWorkunitFactory(ICodeContext * ctx)
 {
     IEngineContext *engineCtx = ctx->queryEngineContext();
     if (engineCtx && !engineCtx->allowDaliAccess())
     {
-        Owned<IException> e = MakeStringException(-1, "wokunitservices cannot access Dali in this context - this normally means it is being called from a thor slave");
+        Owned<IException> e = MakeStringException(-1, "workunitservices cannot access Dali in this context - this normally means it is being called from a thor slave");
         EXCLOG(e, NULL);
         throw e.getClear();
     }
@@ -216,57 +215,23 @@ static IWorkUnitFactory * getWorkunitFactory(ICodeContext * ctx)
     return getWorkUnitFactory(secmgr, secuser);
 }
 
-static IConstWorkUnit * getWorkunit(ICodeContext * ctx, const char * wuid)
-{
-    Owned<IWorkUnitFactory> wuFactory = getWorkunitFactory(ctx);
-    return wuFactory->openWorkUnit(wuid);
-}
-
-static IConstWorkUnit * getWorkunit(ICodeContext * ctx)
-{
-    StringAttr wuid;
-    wuid.setown(ctx->getWuid());
-    return getWorkunit(ctx, wuid);
-}
-
-static IWorkUnit * updateWorkunit(ICodeContext * ctx)
-{
-    // following bit of a kludge, as 
-    // 1) eclagent keeps WU locked, and 
-    // 2) rtti not available in generated .so's to convert to IAgentContext
-    IAgentContext * actx = dynamic_cast<IAgentContext *>(ctx);
-    if (actx == NULL) { // fall back to pure ICodeContext
-        // the following works for thor only 
-        char * platform = ctx->getPlatform();
-        if (strcmp(platform,"thor")==0) {  
-            CTXFREE(parentCtx, platform);
-            Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
-            StringAttr wuid;
-            wuid.setown(ctx->getWuid());
-            return factory->updateWorkUnit(wuid);
-        }
-        CTXFREE(parentCtx, platform);
-        return NULL;
-    }
-    return actx->updateWorkUnit();
-}
-
-
-
+static bool securityDisabled = false;
 
-static bool checkScopeAuthorized(IUserDescriptor *user,IPropertyTree &pt,bool &securitydisabled)
+static bool checkScopeAuthorized(IUserDescriptor *user, const char *scopename)
 {
-    if (securitydisabled)
+    if (securityDisabled)
         return true;
     unsigned auditflags = DALI_LDAP_AUDIT_REPORT|DALI_LDAP_READ_WANTED;
     int perm = 255;
-    const char *scopename = pt.queryProp("@scope");
-    if (scopename&&*scopename) {
+    if (scopename && *scopename)
+    {
         perm = querySessionManager().getPermissionsLDAP("workunit",scopename,user,auditflags);
-        if (perm<0) {
-            if (perm==-1) {
+        if (perm<0)
+        {
+            if (perm==-1)
+            {
                 perm = 255;
-                securitydisabled = true;
+                securityDisabled = true;
             }
             else 
                 perm = 0;
@@ -277,6 +242,31 @@ static bool checkScopeAuthorized(IUserDescriptor *user,IPropertyTree &pt,bool &s
     }
     return true;
 }
+
+static IConstWorkUnit * getWorkunit(ICodeContext * ctx, const char * wuid)
+{
+    StringBuffer _wuid(wuid);
+    if (!_wuid.length())
+        return NULL;
+    wuid = _wuid.toUpperCase().str();
+    Owned<IWorkUnitFactory> wuFactory = getWorkunitFactory(ctx);
+    Owned<IConstWorkUnit> wu = wuFactory->openWorkUnit(wuid);
+    if (wu)
+    {
+        if (!checkScopeAuthorized(ctx->queryUserDescriptor(), wu->queryWuScope()))
+            wu.clear();
+    }
+    return wu.getClear();
+}
+
+static IConstWorkUnit *getWorkunit(ICodeContext * ctx)
+{
+    StringAttr wuid;
+    wuid.setown(ctx->getWuid());
+    // One assumes we have read access to our own wu
+    return getWorkunit(ctx, wuid);
+}
+
 static StringBuffer &getWUIDonDate(StringBuffer &wuid,unsigned year,unsigned month,unsigned day,unsigned hour,unsigned minute)
 {
     if ((year==0)||(month==0)||(day==0)) {
@@ -316,181 +306,42 @@ static StringBuffer &getWUIDdaysAgo(StringBuffer &wuid,int daysago)
     return getWUIDonDate(wuid,y,m,d,h,mn);
 }
 
-
-
-class COnlineWorkunitIterator: public CInterface, implements IPropertyTreeIterator
+static bool addWUQueryFilter(WUSortField *filters, unsigned &count, MemoryBuffer &buff, const char *name, WUSortField value)
 {
-    Owned<IRemoteConnection> conn;
-    Owned<IPropertyTreeIterator> iter;
-    Linked<IUserDescriptor> user;
-    bool securitydisabled;
-    StringAttr namehi;
-    StringAttr namelo;
-
-
-    bool postFilterOk()
-    {
-        IPropertyTree &t = query();
-        const char *name = t.queryName();
-        if (stricmp(name,namelo.get())<0)
-            return false;
-        if (stricmp(name,namehi.get())>0)
-            return false;
-        if (!checkScopeAuthorized(user,t,securitydisabled))
-            return false;
-        return true;
-    }
-
-public:
-    IMPLEMENT_IINTERFACE;   
-    
-    COnlineWorkunitIterator (
-                    IUserDescriptor* _user,
-                    const char *_namelo,
-                    const char *_namehi,
-                    const char *user,
-                    const char *cluster,
-                    const char *jobname,
-                    const char *state,
-                    const char *priority,
-                    const char *fileread,
-                    const char *filewritten,
-                    const char *ecl
-
-    ) : user(_user), namelo(_namelo), namehi(_namehi)
-    {
-        securitydisabled = false;
-        if (namelo.isEmpty()) 
-            namelo.set("W");
-        if (namehi.isEmpty()) {
-            StringBuffer tmp;
-            namehi.set(getWUIDdaysAgo(tmp,-1).str());
-        }
-        const char *lo = namelo;
-        const char *hi = namehi;
-        StringBuffer query;
-        while (*lo&&(toupper(*lo)==toupper(*hi))) {
-            query.append((char)toupper(*lo));
-            lo++;
-            hi++;
-        }
-        if (*lo||*hi)
-            query.append("*");
-        if (user&&*user) 
-            query.appendf("[@submitID=~?\"%s\"]",user);
-        if (cluster&&*cluster) 
-            query.appendf("[@clusterName=~?\"%s\"]",cluster);
-        if (jobname&&*jobname) 
-            query.appendf("[@jobName=~?\"%s\"]",jobname);
-        if (state&&*state) 
-            query.appendf("[@state=?\"%s\"]",state);
-        if (priority&&*priority) 
-            query.appendf("[@priorityClass=?\"%s\"]",priority);
-        if (fileread&&*fileread) 
-            query.appendf("[FilesRead/File/@name=~?\"%s\"]",fileread);
-        if (filewritten&&*filewritten) 
-            query.appendf("[Files/File/@name=~?\"%s\"]",filewritten);
-        if (ecl&&*ecl)
-            query.appendf("[Query/Text=?~\"*%s*\"]",ecl);
-        conn.setown(querySDS().connect("WorkUnits", myProcessSession(), 0, SDS_LOCK_TIMEOUT));
-        if (conn.get()) {
-            iter.setown(conn->getElements(query.str()));
-            if (!iter.get()) 
-                conn.clear();
-        }
-    }
-
-
-    bool first()
-    {
-        if (!iter.get()||!iter->first())
-            return false;
-        if (postFilterOk())
-            return true;
-        return next();
-    }
-
-    bool next()
-    {
-        while (iter.get()&&iter->next()) 
-            if (postFilterOk())
-                return true;
+    if (!name || !*name)
         return false;
-    }
-
-    bool isValid() 
-    { 
-        return iter&&iter->isValid(); 
-    }
-
-    IPropertyTree & query() 
-    { 
-        assertex(iter); 
-        return iter->query(); 
-    }
-
-
-    bool serialize(MemoryBuffer &mb)
-    {
-        IPropertyTree &pt = query();
-        return serializeWUSrow(pt,mb,true);
-    }
-
-};
-
-
+    filters[count++] = value;
+    buff.append(name);
+    return true;
+}
 
-static IPropertyTree *getWorkUnitBranch(ICodeContext *ctx,const char *wuid,const char *branch)
+static bool serializeWUInfo(IConstWorkUnitInfo &info,MemoryBuffer &mb)
 {
-    if (!wuid||!*wuid)
-        return NULL;
-    StringBuffer _wuid(wuid);
-    _wuid.trimRight();
-    wuid = _wuid.toUpperCase().str();
-    StringBuffer query;
-    query.append("WorkUnits/").append(wuid);
-    Owned<IRemoteConnection> conn =  querySDS().connect(query.str(), myProcessSession(), 0, SDS_LOCK_TIMEOUT);
-    if (conn) {
-        IPropertyTree *t = conn->queryRoot();
-        if (!t)
-            return NULL;
-        bool securitydisabled = false;
-        if (!checkScopeAuthorized(ctx->queryUserDescriptor(),*t,securitydisabled))
-            return NULL;
-        IPropertyTree *ret = t->queryBranch(branch);
-        if (!ret)
-            return NULL;
-        return createPTreeFromIPT(ret);
-    }
-    // look up in sasha - this could be improved with server support
-    SocketEndpointArray sashaeps;
-    getSashaNodes(sashaeps);
-    ForEachItemIn(i,sashaeps) {
-        Owned<ISashaCommand> cmd = createSashaCommand();    
-        cmd->setAction(SCA_GET);
-        cmd->setOnline(false);                              
-        cmd->setArchived(true); 
-        cmd->addId(wuid);
-        Owned<INode> sashanode = createINode(sashaeps.item(i));
-        if (cmd->send(sashanode,SASHA_TIMEOUT)) {
-            if (cmd->numIds()) {
-                StringBuffer res;
-                if (cmd->getResult(0,res)) 
-                    return createPTreeFromXMLString(res.str());
-            }
-            if (i+1>=sashaeps.ordinality()) 
-                break;
-        }
+    fixedAppend(mb,24,info.queryWuid());
+    varAppend(mb,64,info.queryUser());
+    varAppend(mb,64,info.queryClusterName());
+    varAppend(mb,64,""); // roxiecluster is obsolete
+    varAppend(mb,256,info.queryJobName());
+    fixedAppend(mb,10,info.queryStateDesc());
+    fixedAppend(mb,7,info.queryPriorityDesc());
+    short int prioritylevel = info.getPriorityLevel();
+    mb.append(prioritylevel);
+    fixedAppend(mb,20,"");  // Created timestamp
+    fixedAppend(mb,20,"");  // Modified timestamp
+    mb.append(true);
+    mb.append(info.isProtected());
+    if (mb.length()>WORKUNIT_SERVICES_BUFFER_MAX) {
+        mb.clear().append(WUS_STATUS_OVERFLOWED);
+        return false;
     }
-    return NULL;
+    return true;
 }
 
+
 }//namespace
 
 using namespace nsWorkunitservices;
 
-
-
 WORKUNITSERVICES_API void wsWorkunitList(
     ICodeContext *ctx,
     size32_t & __lenResult,
@@ -556,10 +407,42 @@ WORKUNITSERVICES_API void wsWorkunitList(
             }
         }
     }
-    if (online) {
-        Owned<COnlineWorkunitIterator> oniter = new COnlineWorkunitIterator(ctx->queryUserDescriptor(),lowwuid,highwuid,username,cluster,jobname,state,priority,fileread,filewritten,eclcontains);
-        ForEach(*oniter) {
-            if (!oniter->serialize(mb)) 
+    if (online)
+    {
+        WUSortField filters[20];  // NOTE - increase if you add a LOT more parameters!
+        unsigned filterCount = 0;
+        MemoryBuffer filterbuf;
+
+        if (state && *state)
+        {
+            filters[filterCount++] = WUSFstate;
+            if (!strieq(state, "unknown"))
+                filterbuf.append(state);
+            else
+                filterbuf.append("");
+        }
+        if (priority && *priority)
+        {
+            filters[filterCount++] = WUSFpriority;
+            if (!strieq(state, "unknown"))
+                filterbuf.append(state);
+            else
+                filterbuf.append("");
+        }
+        addWUQueryFilter(filters, filterCount, filterbuf, cluster, WUSFcluster);
+        addWUQueryFilter(filters, filterCount, filterbuf, fileread, (WUSortField) (WUSFfileread | WUSFnocase));
+        addWUQueryFilter(filters, filterCount, filterbuf, filewritten, (WUSortField) (WUSFfilewritten | WUSFnocase));
+        addWUQueryFilter(filters, filterCount, filterbuf, username, (WUSortField) (WUSFuser | WUSFnocase));
+        addWUQueryFilter(filters, filterCount, filterbuf, jobname, (WUSortField) (WUSFjob | WUSFnocase));
+        addWUQueryFilter(filters, filterCount, filterbuf, eclcontains, (WUSortField) (WUSFecl | WUSFwild));
+        addWUQueryFilter(filters, filterCount, filterbuf, lowwuid, WUSFwuid);
+        addWUQueryFilter(filters, filterCount, filterbuf, highwuid, WUSFwuidhigh);
+        filters[filterCount] = WUSFterm;
+        Owned<IWorkUnitFactory> wuFactory = getWorkunitFactory(ctx);
+        Owned<IConstWorkUnitIterator> it = wuFactory->getWorkUnitsSorted((WUSortField) (WUSFwuid | WUSFreverse), filters, filterbuf.bufferBase(), 0, INT_MAX, NULL, NULL); // MORE - need security flags here!
+        ForEach(*it)
+        {
+            if (!serializeWUInfo(it->query(), mb))
                 throw MakeStringException(-1,"WORKUNITSERVICES: Result buffer overflowed");
         }
     }
@@ -567,21 +450,20 @@ WORKUNITSERVICES_API void wsWorkunitList(
     __result = mb.detach();
 }
 
-
-WORKUNITSERVICES_API bool wsWorkunitExists(const char *wuid, bool online, bool archived)
+WORKUNITSERVICES_API bool wsWorkunitExists(ICodeContext *ctx, const char *wuid, bool online, bool archived)
 {
     if (!wuid||!*wuid)
         return false;
     StringBuffer _wuid(wuid);
     wuid = _wuid.toUpperCase().str();
-    if (online) {
-        StringBuffer s("WorkUnits/");
-        s.append(wuid);
-        Owned<IRemoteConnection> conn = querySDS().connect(s.str(), myProcessSession(), 0, SDS_LOCK_TIMEOUT);
-            if (conn.get()) 
-                return true;
+    if (online)
+    {
+        Owned<IWorkUnitFactory> wuFactory = getWorkunitFactory(ctx);
+        Owned<IConstWorkUnit> wu = wuFactory->openWorkUnit(wuid);  // Note - we don't use getWorkUnit as we don't need read access
+        return wu != NULL;
     }
-    if (archived) {
+    if (archived)
+    {
         SocketEndpointArray sashaeps;
         getSashaNodes(sashaeps);
         ForEachItemIn(i,sashaeps) {
@@ -599,7 +481,6 @@ WORKUNITSERVICES_API bool wsWorkunitExists(const char *wuid, bool online, bool a
     return false;
 }
 
-
 WORKUNITSERVICES_API char * wsWUIDonDate(unsigned year,unsigned month,unsigned day,unsigned hour,unsigned minute)
 {
     StringBuffer ret;
@@ -612,16 +493,22 @@ WORKUNITSERVICES_API char * wsWUIDdaysAgo(unsigned daysago)
     return getWUIDdaysAgo(ret,(int)daysago).detach();
 }
 
-WORKUNITSERVICES_API void wsWorkunitTimeStamps( ICodeContext *ctx, size32_t & __lenResult, void * & __result, const char *wuid )
+WORKUNITSERVICES_API void wsWorkunitTimeStamps(ICodeContext *ctx, size32_t & __lenResult, void * & __result, const char *wuid)
 {
+    Owned<IConstWorkUnit> wu = getWorkunit(ctx, wuid);
     MemoryBuffer mb;
-    Owned<IPropertyTree> pt = getWorkUnitBranch(ctx,wuid,"TimeStamps");
-    if (pt) {
+    if (wu)
+    {
+/*
+        // Workunit timestamps have not been stored like this for a while - so this code has not been working
+        // Should look at fixing but perhaps as a separate Jira
         Owned<IPropertyTreeIterator> iter = pt->getElements("TimeStamp");
-        ForEach(*iter) {
+        ForEach(*iter)
+        {
             IPropertyTree &item = iter->query();
             Owned<IPropertyTreeIterator> iter2 = item.getElements("*");
-            ForEach(*iter2) {
+            ForEach(*iter2)
+            {
                 IPropertyTree &item2 = iter2->query();
                 fixedAppend(mb, 32, item, "@application");              // item correct here
                 fixedAppend(mb, 16, item2.queryName());                 // id
@@ -629,6 +516,7 @@ WORKUNITSERVICES_API void wsWorkunitTimeStamps( ICodeContext *ctx, size32_t & __
                 fixedAppend(mb,16, item, "@instance");                  // item correct here
             }
         }
+  */
     }
     __lenResult = mb.length();
     __result = mb.detach();
@@ -636,26 +524,27 @@ WORKUNITSERVICES_API void wsWorkunitTimeStamps( ICodeContext *ctx, size32_t & __
 
 WORKUNITSERVICES_API void wsWorkunitMessages( ICodeContext *ctx, size32_t & __lenResult, void * & __result, const char *wuid )
 {
+    Owned<IConstWorkUnit> wu = getWorkunit(ctx, wuid);
     MemoryBuffer mb;
-    unsigned tmpu;
-    int tmpi;
-    Owned<IPropertyTree> pt = getWorkUnitBranch(ctx,wuid,"Exceptions");
-    if (pt) {
-        Owned<IPropertyTreeIterator> iter = pt->getElements("Exception");
-        ForEach(*iter) {
-            IPropertyTree &item = iter->query();
-            tmpu = (unsigned)item.getPropInt("@severity");
-            mb.append(sizeof(tmpu),&tmpu);
-            tmpi = (int)item.getPropInt("@code");
-            mb.append(sizeof(tmpi),&tmpi);
-            fixedAppend(mb, 32, item, "@filename");             
-            tmpu = (unsigned)item.getPropInt("@row");
-            mb.append(sizeof(tmpu),&tmpu);
-            tmpu = (unsigned)item.getPropInt("@col");
-            mb.append(sizeof(tmpu),&tmpu);
-            fixedAppend(mb, 16, item, "@source");               
-            fixedAppend(mb, 20, item, "@time");             
-            varAppend(mb, 1024, item, NULL);                
+    if (wu)
+    {
+        SCMStringBuffer s;
+        Owned<IConstWUExceptionIterator> exceptions = &wu->getExceptions();
+        ForEach(*exceptions)
+        {
+            IConstWUException &e = exceptions->query();
+            mb.append((unsigned) e.getSeverity());
+            mb.append((int) e.getExceptionCode());
+            e.getExceptionFileName(s);
+            fixedAppend(mb, 32, s.str(), s.length());
+            mb.append((unsigned) e.getExceptionLineNo());
+            mb.append((unsigned)  e.getExceptionColumn());
+            e.getExceptionSource(s);
+            fixedAppend(mb, 16, s.str(), s.length());
+            e.getTimeStamp(s);
+            fixedAppend(mb, 20, s.str(), s.length());
+            e.getExceptionMessage(s);
+            varAppendMax(mb, 1024, s.str(), s.length());
         }
     }
     __lenResult = mb.length();
@@ -665,17 +554,17 @@ WORKUNITSERVICES_API void wsWorkunitMessages( ICodeContext *ctx, size32_t & __le
 WORKUNITSERVICES_API void wsWorkunitFilesRead( ICodeContext *ctx, size32_t & __lenResult, void * & __result, const char *wuid )
 {
     MemoryBuffer mb;
-    Owned<IPropertyTree> pt = getWorkUnitBranch(ctx,wuid,"FilesRead");
-    if (pt) {
-        Owned<IPropertyTreeIterator> iter = pt->getElements("File");
-        ForEach(*iter) {
-            IPropertyTree &item = iter->query();
+    Owned<IConstWorkUnit> wu = getWorkunit(ctx, wuid);
+    if (wu)
+    {
+        Owned<IPropertyTreeIterator> sourceFiles = &wu->getFilesReadIterator();
+        ForEach(*sourceFiles)
+        {
+            IPropertyTree &item = sourceFiles->query();
             varAppend(mb, 256, item, "@name");              
             varAppend(mb, 64, item, "@cluster");                
-            byte b = item.getPropBool("@super")?1:0;
-            mb.append(sizeof(b),&b);
-            unsigned uc = (unsigned)item.getPropInt("@useCount");
-            mb.append(sizeof(uc),&uc);
+            mb.append(item.getPropBool("@super"));
+            mb.append((unsigned) item.getPropInt("@useCount"));
         }
     }
     __lenResult = mb.length();
@@ -685,16 +574,17 @@ WORKUNITSERVICES_API void wsWorkunitFilesRead( ICodeContext *ctx, size32_t & __l
 WORKUNITSERVICES_API void wsWorkunitFilesWritten( ICodeContext *ctx, size32_t & __lenResult, void * & __result, const char *wuid )
 {
     MemoryBuffer mb;
-    Owned<IPropertyTree> pt = getWorkUnitBranch(ctx,wuid,"Files");
-    if (pt) {
-        Owned<IPropertyTreeIterator> iter = pt->getElements("File");
-        ForEach(*iter) {
-            IPropertyTree &item = iter->query();
+    Owned<IConstWorkUnit> wu = getWorkunit(ctx, wuid);
+    if (wu)
+    {
+        Owned<IPropertyTreeIterator> sourceFiles = &wu->getFileIterator();
+        ForEach(*sourceFiles)
+        {
+            IPropertyTree &item = sourceFiles->query();
             varAppend(mb, 256, item, "@name");              
             fixedAppend(mb, 10, item, "@graph");                
             varAppend(mb, 64, item, "@cluster");                
-            unsigned k = (unsigned)item.getPropInt("@kind");
-            mb.append(sizeof(k),&k);
+            mb.append( (unsigned) item.getPropInt("@kind"));
         }
     }
     __lenResult = mb.length();
@@ -723,12 +613,9 @@ WORKUNITSERVICES_API void wsWorkunitTimings( ICodeContext *ctx, size32_t & __len
             unsigned __int64 max = cur.getMax();
             cur.getDescription(desc, true);
 
-            tmp = (unsigned)count;
-            mb.append(sizeof(tmp),&tmp);
-            tmp = (unsigned)(value / 1000000);
-            mb.append(sizeof(tmp),&tmp);
-            tmp = (unsigned)max;
-            mb.append(sizeof(tmp),&tmp);
+            mb.append((unsigned) count);
+            mb.append((unsigned) (value / 1000000));
+            mb.append((unsigned) max);
             varAppend(mb, desc.str());
         }
     }

+ 1 - 1
plugins/workunitservices/workunitservices.hpp

@@ -39,7 +39,7 @@ WORKUNITSERVICES_API bool getECLPluginDefinition(ECLPluginDefinitionBlock *pb);
 WORKUNITSERVICES_API void setPluginContext(IPluginContext * _ctx);
 WORKUNITSERVICES_API char * WORKUNITSERVICES_CALL wsGetBuildInfo(void);
 
-WORKUNITSERVICES_API bool WORKUNITSERVICES_CALL wsWorkunitExists(const char *wuid, bool online, bool archived);
+WORKUNITSERVICES_API bool WORKUNITSERVICES_CALL wsWorkunitExists(ICodeContext *ctx, const char *wuid, bool online, bool archived);
 
 WORKUNITSERVICES_API void WORKUNITSERVICES_CALL wsWorkunitList(
                                                                 ICodeContext *ctx,

+ 13 - 0
plugins/workunitservices/workunitservices.ipp

@@ -65,6 +65,19 @@ inline void varAppend(MemoryBuffer &mb,unsigned w,IPropertyTree &pt,const char *
     mb.append(sz).append(sz,s.str());
 }
 
+inline void varAppendMax(MemoryBuffer &mb,unsigned w,const char *str, size32_t l=0)
+{
+    if (!str)
+        l = 0;
+    else if (l==0)
+        l = strlen(str);
+    if (l>w)
+        l = w;
+    mb.append(l).append(l, str);
+}
+
+// This is use by sasha - it's a real mess
+
 inline bool serializeWUSrow(IPropertyTree &pt,MemoryBuffer &mb, bool isonline)
 {
     mb.setEndian(__LITTLE_ENDIAN);