浏览代码

Merge branch 'candidate-7.6.x'

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 5 年之前
父节点
当前提交
ae23ca61b3
共有 53 个文件被更改,包括 1133 次插入561 次删除
  1. 5 2
      common/thorhelper/thorcommon.cpp
  2. 3 2
      common/workunit/workunit.cpp
  3. 1 1
      dali/daliadmin/daliadmin.cpp
  4. 11 1
      dali/server/daserver.cpp
  5. 1 1
      docs/EN_US/ECLLanguageReference/ECLR-includer.xml
  6. 1 1
      docs/EN_US/ECLLanguageReference/ECLR_mods/BltInFunc-ROUND.xml
  7. 3 3
      docs/common/Version.xml
  8. 4 4
      docs/common/Version.xml.in
  9. 1 1
      ecl/eclagent/eclagent.cpp
  10. 6 3
      ecl/eclagent/start_eclagent
  11. 1 1
      ecl/hql/hqlgram2.cpp
  12. 3 1
      ecl/hql/hqllex.l
  13. 2 1
      ecl/hqlcpp/hqlcppds.cpp
  14. 10 1
      ecl/hthor/hthor.cpp
  15. 13 4
      ecl/hthor/hthorkey.cpp
  16. 1 1
      ecl/regress/workflow3.ecl
  17. 1 0
      esp/src/eclwatch/WUQueryWidget.js
  18. 56 19
      fs/dafsserver/dafsserver.cpp
  19. 1 1
      initfiles/bash/etc/init.d/dafilesrv.in
  20. 141 11
      plugins/cryptolib/cryptolib.cpp
  21. 1 0
      plugins/workunitservices/workunitservices.cpp
  22. 1 1
      roxie/ccd/ccdcontext.cpp
  23. 1 0
      roxie/ccd/ccdmain.cpp
  24. 33 3
      roxie/ccd/ccdserver.cpp
  25. 3 3
      system/jlib/jstats.cpp
  26. 3 1
      system/jlib/jthread.hpp
  27. 179 0
      testing/esp/wudetails/wucheckstartstops.py
  28. 267 0
      testing/esp/wudetails/wucommon.py
  29. 83 326
      testing/esp/wudetails/wutest.py
  30. 2 0
      testing/regress/ecl/childindex.ecl
  31. 2 2
      testing/regress/ecl/cryptoplugin_pke.ecl
  32. 34 0
      testing/regress/ecl/issue23168.ecl
  33. 7 0
      testing/regress/ecl/key/issue23168.xml
  34. 3 0
      testing/regress/ecl/key/keyed_join5.xml
  35. 13 0
      testing/regress/ecl/keyed_join5.ecl
  36. 11 0
      thorlcr/activities/fetch/thfetchslave.cpp
  37. 19 7
      thorlcr/activities/indexread/thindexreadslave.cpp
  38. 3 3
      thorlcr/activities/join/thjoinslave.cpp
  39. 1 13
      thorlcr/activities/keyedjoin/thkeyedjoinslave-legacy.cpp
  40. 47 12
      thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp
  41. 2 0
      thorlcr/activities/loop/thloopslave.cpp
  42. 3 3
      thorlcr/activities/msort/thmsortslave.cpp
  43. 3 3
      thorlcr/activities/selfjoin/thselfjoinslave.cpp
  44. 49 2
      thorlcr/graph/thgraph.cpp
  45. 14 4
      thorlcr/graph/thgraph.hpp
  46. 19 10
      thorlcr/graph/thgraphmaster.cpp
  47. 2 2
      thorlcr/graph/thgraphmaster.ipp
  48. 2 1
      thorlcr/graph/thgraphslave.cpp
  49. 3 2
      thorlcr/graph/thgraphslave.hpp
  50. 37 5
      thorlcr/slave/slavmain.cpp
  51. 1 1
      thorlcr/slave/thslavemain.cpp
  52. 18 66
      thorlcr/thorutil/thorport.cpp
  53. 2 32
      thorlcr/thorutil/thorport.hpp

+ 5 - 2
common/thorhelper/thorcommon.cpp

@@ -2102,8 +2102,11 @@ static bool getTranslators(Owned<const IDynamicTransform> &translator, Owned<con
         if ((projectedFormat != sourceFormat) && (projectedCrc != sourceCrc))
         {
             translator.setown(createRecordTranslator(projectedFormat->queryRecordAccessor(true), sourceFormat->queryRecordAccessor(true)));
-            DBGLOG("Record layout translator created for %s", tracing);
-            translator->describe();
+            if (expectedCrc && publishedCrc && expectedCrc != publishedCrc)
+            {
+                DBGLOG("Record layout translator created for %s", tracing);
+                translator->describe();
+            }
 
             if (!translator->canTranslate())
                 throw MakeStringException(0, "Untranslatable record layout mismatch detected for file %s", tracing);

+ 3 - 2
common/workunit/workunit.cpp

@@ -398,7 +398,7 @@ public:
     }
     virtual const char * queryScope() const
     {
-        return scope;
+        return scope ? scope : "";
     }
     virtual IStringVal & getFormattedValue(IStringVal & str) const
     {
@@ -1100,7 +1100,8 @@ public:
 
     virtual const char * queryScope() const  override
     {
-        return notes.item(baseIndex).queryScope();
+        const char * scope = notes.item(baseIndex).queryScope();
+        return scope ? scope : "";
     }
 
     virtual StatisticScopeType getScopeType() const override

+ 1 - 1
dali/daliadmin/daliadmin.cpp

@@ -2709,7 +2709,7 @@ public:
 
         xml.appendf("<attr source='%s' message='%s' timestamp='%s' exceptionCode='%u' severity='%u' scope='%s' cost='%u'",
                     source.str(), message.str(), timestamp.str(),
-                    exception.getExceptionCode(), exception.getSeverity(), exception.queryScope(), exception.getPriority());
+                    exception.getExceptionCode(), exception.getSeverity(), nullText(exception.queryScope()), exception.getPriority());
         xml.append("/>");
         printf(" %s\n", xml.str());
     }

+ 11 - 1
dali/server/daserver.cpp

@@ -255,12 +255,22 @@ static bool populateWhiteListFromEnvironment(IWhiteListWriter &writer)
                 {
                     const char *masterCompName = component.queryProp("ThorMasterProcess/@computer");
                     StringBuffer ipSB;
-                    const char *ip = resolveComputer(masterCompName, component.queryProp("@netAddress"), ipSB);
+                    const char *ip = resolveComputer(masterCompName, nullptr, ipSB);
                     if (ip)
                     {
                         writer.add(ip, DCR_ThorMaster);
                         writer.add(ip, DCR_DaliAdmin);
                     }
+
+                    // NB: slaves are currently seen as foreign clients and are only used by Std.File.GetUniqueInteger (which calls Dali v. occassionally)
+                    Owned<IPropertyTreeIterator> slaveIter = component.getElements("ThorSlaveProcess");
+                    ForEach(*slaveIter)
+                    {
+                        const char *slaveCompName = component.queryProp("@computer");
+                        const char *ip = resolveComputer(slaveCompName, nullptr, ipSB.clear());
+                        if (ip)
+                            writer.add(ip, DCR_ThorSlave);
+                    }
                     break;
                 }
                 case EclAgentProcess:

+ 1 - 1
docs/EN_US/ECLLanguageReference/ECLR-includer.xml

@@ -45,7 +45,7 @@
                 xpointer="xpointer(//*[@id='DateVer'])"
                 xmlns:xi="http://www.w3.org/2001/XInclude" />
 
-    <releaseinfo>© 2019 HPCC Systems<superscript>®</superscript>. All rights
+    <releaseinfo>© 2020 HPCC Systems<superscript>®</superscript>. All rights
     reserved. Except where otherwise noted, ECL Language Reference content
     licensed under Creative Commons public license.</releaseinfo>
 

+ 1 - 1
docs/EN_US/ECLLanguageReference/ECLR_mods/BltInFunc-ROUND.xml

@@ -51,7 +51,7 @@
 
   <programlisting>SomeRealValue1 := 3.14159;
 INTEGER4 MyVal1 := ROUND(SomeRealValue1);   // MyVal1 is 3
-INTEGER4 MyVal2 := ROUND(SomeRealValue1,2); // MyVal2 is 3.14
+REAL     MyVal2 := ROUND(SomeRealValue1,2); // MyVal2 is 3.14
 
 SomeRealValue2 := 3.5;
 INTEGER4 MyVal3 := ROUND(SomeRealValue2); // MyVal is 4

+ 3 - 3
docs/common/Version.xml

@@ -5,11 +5,11 @@
   <chapterinfo>
     <date id="DateVer">DEVELOPER NON-GENERATED VERSION</date>
 
-    <releaseinfo id="FooterInfo">© 2019 HPCC
+    <releaseinfo id="FooterInfo">© 2020 HPCC
     Systems<superscript>®</superscript>. All rights reserved</releaseinfo>
 
     <copyright id="Copyright">
-      <year>2019 HPCC Systems<superscript>®</superscript>. All rights
+      <year>2020 HPCC Systems<superscript>®</superscript>. All rights
       reserved</year>
     </copyright>
   </chapterinfo>
@@ -24,7 +24,7 @@
     serve one purpose and that is to store the chapterinfo the above sections
     that are being used by several other documents.</para>
 
-    <para id="CHMVer">2019 Version ${DOC_VERSION}</para>
+    <para id="CHMVer">2020 Version ${DOC_VERSION}</para>
 
     <para>The following line is the code to be put into the document you wish
     to include the above version info in:</para>

+ 4 - 4
docs/common/Version.xml.in

@@ -3,13 +3,13 @@
 "http://www.oasis-open.org/docbook/xml/4.5/docbookx.dtd">
 <chapter>
   <chapterinfo>
-    <date id="DateVer">2019 Version ${DOC_VERSION}</date>
+    <date id="DateVer">2020 Version ${DOC_VERSION}</date>
 
-    <releaseinfo id="FooterInfo">© 2019 HPCC
+    <releaseinfo id="FooterInfo">© 2020 HPCC
     Systems<superscript>®</superscript>. All rights reserved</releaseinfo>
 
     <copyright id="Copyright">
-      <year>2019 HPCC Systems<superscript>®</superscript>. All rights
+      <year>2020 HPCC Systems<superscript>®</superscript>. All rights
       reserved</year>
     </copyright>
   </chapterinfo>
@@ -24,7 +24,7 @@
     serve one purpose and that is to store the chapterinfo the above sections
     that are being used by several other documents.</para>
 
-    <para id="CHMVer">2019 Version ${DOC_VERSION}</para>
+    <para id="CHMVer">2020 Version ${DOC_VERSION}</para>
 
     <para>The following line is the code to be put into the document you wish
     to include the above version info in:</para>

+ 1 - 1
ecl/eclagent/eclagent.cpp

@@ -1877,7 +1877,7 @@ void EclAgent::doProcess()
         {
             MTIME_SECTION(queryActiveTimer(), "Process");
             Owned<IEclProcess> process = loadProcess();
-            QueryTerminationCleanup threadCleanup;
+            QueryTerminationCleanup threadCleanup(false);
 
             if (checkVersion && (process->getActivityVersion() != eclccCodeVersion))
                 failv(0, "Inconsistent interface versions.  Workunit was created using eclcc for version %u, but the c++ compiler used version %u", eclccCodeVersion, process->getActivityVersion());

+ 6 - 3
ecl/eclagent/start_eclagent

@@ -20,11 +20,14 @@ ulimit -c unlimited
 
 # dont lower limit
 moflim=$(ulimit -n 2>/dev/null)
-if [ -n "$moflim" ] && [ "$moflim" != "unlimited" ]
+if [ -n "$moflim" ]
 then
-    if [ $moflim -lt 8192 ]
+    if [ "$moflim" != "unlimited" ]
     then
-        ulimit -n 8192
+        if [ $moflim -lt 8192 ]
+        then
+            ulimit -n 8192
+        fi
     fi
 else
     ulimit -n 8192

+ 1 - 1
ecl/hql/hqlgram2.cpp

@@ -4121,7 +4121,7 @@ IHqlExpression* HqlGram::checkServiceDef(IHqlScope* serviceScope,IIdAtom * name,
                 checkSvcAttrNoValue(attr, errpos);
             }
             else if ((name == userMatchFunctionAtom) || (name == costAtom) || (name == allocatorAtom) || (name == extendAtom) || (name == passParameterMetaAtom) ||
-                     (name == namespaceAtom) || (name==prototypeAtom) || (name == foldAtom) || (name == nofoldAtom))
+                     (name == namespaceAtom) || (name==prototypeAtom) || (name == foldAtom) || (name == nofoldAtom) || (name == deprecatedAtom))
             {
             }
             else if (name == holeAtom)

+ 3 - 1
ecl/hql/hqllex.l

@@ -101,7 +101,10 @@ int HqlLex::lookupIdentifierToken(attribute & returnToken, HqlLex * lexer, Lexer
         return UNKNOWN_ID;
     }
 
+    node_operator op = expr->getOperator();
     IHqlExpression * deprecated = queryMetaAttribute(deprecatedAtom, expr);
+    if (!deprecated && (op == no_funcdef))
+        deprecated = expr->queryChild(0)->queryAttribute(deprecatedAtom);
     if (deprecated)
     {
         IHqlExpression * alternative = deprecated->queryChild(0);
@@ -130,7 +133,6 @@ int HqlLex::lookupIdentifierToken(attribute & returnToken, HqlLex * lexer, Lexer
     }
 
     returnToken.setExpr(expr);
-    node_operator op = expr->getOperator();
     int token = UNKNOWN_ID;
     bool isFunction = expr->isFunction();
     ITypeInfo * exprType = expr->queryType();

+ 2 - 1
ecl/hqlcpp/hqlcppds.cpp

@@ -3969,7 +3969,8 @@ BoundRow * HqlCppTranslator::buildDatasetIterateProject(BuildCtx & ctx, IHqlExpr
     Owned<BoundRow> tempRow = declareTempAnonRow(ctx, ctx, expr);
     if (counter)
     {
-        ctx.associateExpr(counter, counterVar);
+        OwnedHqlExpr castCounterVar = ensureExprType(counterVar, counter->queryType());
+        ctx.associateExpr(counter, castCounterVar);
         OwnedHqlExpr inc = createValue(no_postinc, LINK(unsignedType), LINK(counterVar));
         ctx.addExpr(inc);
     }

+ 10 - 1
ecl/hthor/hthor.cpp

@@ -8134,6 +8134,7 @@ void CHThorDiskReadBaseActivity::ready()
     Owned<IOutputMetaData> publishedMeta;
     unsigned publishedCrc = 0;
     RecordTranslationMode translationMode = getLayoutTranslationMode();
+    StringBuffer traceName;
     if (dFile)
     {
         const char *kind = queryFileKind(dFile);
@@ -8144,10 +8145,18 @@ void CHThorDiskReadBaseActivity::ready()
             if (publishedMeta)
                 publishedCrc = props.getPropInt("@formatCrc");
         }
+        dFile->getLogicalName(traceName);
     }
-    translators.setown(::getTranslators("hthor-diskread", expectedCrc, expectedDiskMeta, publishedCrc, publishedMeta, projectedCrc, projectedDiskMeta, translationMode));
+    else
+        traceName.set("hthor-diskread");
+    translators.setown(::getTranslators(traceName.str(), expectedCrc, expectedDiskMeta, publishedCrc, publishedMeta, projectedCrc, projectedDiskMeta, translationMode));
     if (translators)
     {
+        if (publishedCrc && expectedCrc && publishedCrc != expectedCrc)
+        {
+            VStringBuffer msg("Record layout translation required for %s", traceName.str());
+            agent.addWuExceptionEx(msg.str(), WRN_UseLayoutTranslation, SeverityInformation, MSGAUD_user, "hthor");
+        }
         translator = &translators->queryTranslator();
         keyedTranslator = translators->queryKeyedTranslator();
         actualDiskMeta.set(&translators->queryActualFormat());

+ 13 - 4
ecl/hthor/hthorkey.cpp

@@ -742,6 +742,8 @@ const IDynamicTransform * CHThorIndexReadActivityBase::getLayoutTranslator(IDist
             actualTranslator->describe();
             if (actualTranslator->keyedTranslated())
                 throw MakeStringException(0, "Untranslatable key layout mismatch reading index %s - keyed fields do not match", f->queryLogicalName());
+            VStringBuffer msg("Record layout translation required for %s", f->queryLogicalName());
+            agent.addWuExceptionEx(msg.str(), WRN_UseLayoutTranslation, SeverityInformation, MSGAUD_user, "hthor");
 
             actualLayouts.append(actualFormat.getLink());  // ensure adequate lifespan
         }
@@ -2520,9 +2522,8 @@ protected:
                     {
                         if (getLayoutTranslationMode()==RecordTranslationMode::None)
                             throw MakeStringException(0, "Translatable file layout mismatch reading file %s but translation disabled", f->queryLogicalName());
-#ifdef _DEBUG
-                        translator->describe();
-#endif
+                        VStringBuffer msg("Record layout translation required for %s", f->queryLogicalName());
+                        agent.addWuExceptionEx(msg.str(), WRN_UseLayoutTranslation, SeverityInformation, MSGAUD_user, "hthor");
                     }
                     else
                         throw MakeStringException(0, "Untranslatable file layout mismatch reading file %s", f->queryLogicalName());
@@ -4156,11 +4157,17 @@ protected:
         if (actualFormat)
         {
             actualLayouts.append(actualFormat.getLink());  // ensure adequate lifespan
-            Owned<const IDynamicTransform> payloadTranslator =  createRecordTranslator(helper.queryProjectedIndexRecordSize()->queryRecordAccessor(true), actualFormat->queryRecordAccessor(true));
+            Owned<const IDynamicTransform> payloadTranslator = createRecordTranslator(helper.queryProjectedIndexRecordSize()->queryRecordAccessor(true), actualFormat->queryRecordAccessor(true));
+            DBGLOG("Record layout translator created for %s", f->queryLogicalName());
+            payloadTranslator->describe();
             if (!payloadTranslator->canTranslate())
                 throw MakeStringException(0, "Untranslatable key layout mismatch reading index %s", f->queryLogicalName());
             if (payloadTranslator->keyedTranslated())
                 throw MakeStringException(0, "Untranslatable key layout mismatch reading index %s - keyed fields do not match", f->queryLogicalName());
+            if (getLayoutTranslationMode()==RecordTranslationMode::None)
+                throw MakeStringException(0, "Translatable file layout mismatch reading file %s but translation disabled", f->queryLogicalName());
+            VStringBuffer msg("Record layout translation required for %s", f->queryLogicalName());
+            agent.addWuExceptionEx(msg.str(), WRN_UseLayoutTranslation, SeverityInformation, MSGAUD_user, "hthor");
             return payloadTranslator.getClear();
         }
         throw MakeStringException(0, "Untranslatable key layout mismatch reading index %s - key layout information not found", f->queryLogicalName());
@@ -4206,6 +4213,8 @@ protected:
                     {
                         if (getLayoutTranslationMode()==RecordTranslationMode::None)
                             throw MakeStringException(0, "Translatable file layout mismatch reading file %s but translation disabled", f->queryLogicalName());
+                        VStringBuffer msg("Record layout translation required for %s", f->queryLogicalName());
+                        agent.addWuExceptionEx(msg.str(), WRN_UseLayoutTranslation, SeverityInformation, MSGAUD_user, "hthor");
                     }
                     else
                         throw MakeStringException(0, "Untranslatable file layout mismatch reading file %s", f->queryLogicalName());

+ 1 - 1
ecl/regress/workflow3.ecl

@@ -16,7 +16,7 @@
 ############################################################################## */
 
 export display := SERVICE
- echo(const string src) : eclrtl,library='eclrtl',entrypoint='rtlEcho';
+ echo(const string src) : eclrtl,library='eclrtl',entrypoint='rtlEcho',deprecated('Use function echoecho instead.');
 END;
 
 person := dataset('person', { unsigned8 person_id, string1 per_sex, string2 per_st, string40 per_first_name, string40 per_last_name}, thor);

+ 1 - 0
esp/src/eclwatch/WUQueryWidget.js

@@ -273,6 +273,7 @@ define([
 
                 if (this.params.searchResults) {
                     this.filter.disable(true);
+                    this.mineControl.set("disabled", true);
                 }
 
                 this.clusterTargetSelect.init({

+ 56 - 19
fs/dafsserver/dafsserver.cpp

@@ -207,6 +207,9 @@ const char *RFCStrings[] =
     RFCText(RFCreadfilteredblob),
     RFCText(RFCStreamRead),
     RFCText(RFCStreamReadTestSocket),
+    RFCText(RFCStreamGeneral),
+    RFCText(RFCStreamReadJSON),
+    RFCText(RFCmaxnormal),
 };
 
 static const char *getRFCText(RemoteFileCommandType cmd)
@@ -765,6 +768,7 @@ interface IRemoteActivity : extends IInterface
     virtual void serializeCursor(MemoryBuffer &tgt) const = 0;
     virtual void restoreCursor(MemoryBuffer &src) = 0;
     virtual bool isGrouped() const = 0;
+    virtual void flushStatistics(CClientStats &stats) = 0;
     virtual IRemoteReadActivity *queryIsReadActivity() { return nullptr; }
     virtual IRemoteWriteActivity *queryIsWriteActivity() { return nullptr; }
 };
@@ -1042,7 +1046,7 @@ public:
     IRemoteActivity *queryActivity() const { return activity; }
     ICompressor *queryCompressor() const { return compressor; }
 
-    void process(IPropertyTree *requestTree, MemoryBuffer &restMb, MemoryBuffer &responseMb)
+    void process(IPropertyTree *requestTree, MemoryBuffer &restMb, MemoryBuffer &responseMb, CClientStats &stats)
     {
         if (requestTree->hasProp("replyLimit"))
             replyLimit = requestTree->getPropInt64("replyLimit", defaultDaFSReplyLimitKB) * 1024;
@@ -1064,6 +1068,7 @@ public:
             processRead(requestTree, responseMb);
         else if (activity->queryIsWriteActivity())
             processWrite(requestTree, restMb, responseMb);
+        activity->flushStatistics(stats);
 
         if (outFmt_Binary != format)
         {
@@ -1185,6 +1190,10 @@ public:
     {
         throwUnexpected();
     }
+    virtual void flushStatistics(CClientStats &stats) override
+    {
+        throwUnexpected();
+    }
     virtual IRemoteReadActivity *queryIsReadActivity()
     {
         return this;
@@ -1213,7 +1222,7 @@ public:
 };
 
 
-class CRemoteStreamReadBaseActivity : public CRemoteDiskBaseActivity
+class CRemoteStreamReadBaseActivity : public CRemoteDiskBaseActivity, implements IFileSerialStreamCallback
 {
     typedef CRemoteDiskBaseActivity PARENT;
 
@@ -1227,6 +1236,7 @@ protected:
     // virtual field values
     unsigned partNum = 0;
     offset_t baseFpos = 0;
+    unsigned __int64 bytesRead = 0;
 
     virtual bool refreshCursor()
     {
@@ -1271,7 +1281,7 @@ protected:
                 compressed = false;
             }
         }
-        inputStream.setown(createFileSerialStream(iFileIO, startPos));
+        inputStream.setown(createFileSerialStream(iFileIO, startPos, (offset_t)-1, (size32_t)-1, this));
 
         opened = true;
         eofSeen = false;
@@ -1283,6 +1293,11 @@ protected:
         opened = false;
         eofSeen = true;
     }
+// IFileSerialStreamCallback impl.
+    virtual void process(offset_t ofs, size32_t sz, const void *buf) override
+    {
+        bytesRead += sz;
+    }
 public:
     CRemoteStreamReadBaseActivity(IPropertyTree &config, IFileDescriptor *fileDesc) : PARENT(config, fileDesc)
     {
@@ -1292,6 +1307,12 @@ public:
         partNum = config.getPropInt("virtualFields/partNum");
         baseFpos = (offset_t)config.getPropInt64("virtualFields/baseFpos");
     }
+    virtual void flushStatistics(CClientStats &stats) override
+    {
+        // NB: will be called by same thread that is reading.
+        stats.addRead(bytesRead);
+        bytesRead = 0;
+    }
 // IVirtualFieldCallback impl.
     virtual unsigned __int64 getFilePosition(const void * row) override
     {
@@ -1961,6 +1982,10 @@ public:
     {
         input->restoreCursor(src);
     }
+    virtual void flushStatistics(CClientStats &stats) override
+    {
+        input->flushStatistics(stats);
+    }
     virtual IRemoteReadActivity *queryIsReadActivity()
     {
         return this;
@@ -2034,6 +2059,10 @@ public:
         allowPreload = config.getPropBool("allowPreload");
         fileCrc = config.getPropInt("crc");
     }
+    virtual void flushStatistics(CClientStats &stats) override
+    {
+        // TBD, IKeyCursor should probably have a getStatistic(StatisticKind kind) implementation
+    }
 };
 
 class CRemoteIndexReadActivity : public CRemoteIndexBaseActivity
@@ -2123,6 +2152,7 @@ protected:
     StringAttr fileName; // physical filename
     Linked<IOutputMetaData> meta;
     unsigned __int64 processed = 0;
+    unsigned __int64 bytesWritten = 0;
     bool opened = false;
     bool eofSeen = false;
 
@@ -2176,6 +2206,12 @@ public:
     {
         throwUnexpected(); // method should be implemented in derived classes.
     }
+    virtual void flushStatistics(CClientStats &stats) override
+    {
+        // NB: will be called by same thread that is writing.
+        stats.addWrite(bytesWritten);
+        bytesWritten = 0;
+    }
     virtual IRemoteWriteActivity *queryIsWriteActivity()
     {
         return this;
@@ -2243,6 +2279,7 @@ public:
     {
         checkOpen();
         iFileIOStream->write(sz, rowData);
+        bytesWritten += sz;
     }
     virtual void serializeCursor(MemoryBuffer &tgt) const override
     {
@@ -4262,12 +4299,12 @@ public:
             extracted.item(i).serialize(reply);
     }
 
-    void cmdStreamGeneral(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client)
+    void cmdStreamGeneral(MemoryBuffer &msg, MemoryBuffer &reply, CRemoteClientHandler &client, CClientStats &stats)
     {
         size32_t jsonSz;
         msg.read(jsonSz);
         Owned<IPropertyTree> requestTree = createPTreeFromJSONString(jsonSz, (const char *)msg.readDirect(jsonSz));
-        cmdStreamCommon(requestTree, msg, reply, client);
+        cmdStreamCommon(requestTree, msg, reply, client, stats);
     }
 
     /* Notes on protocol:
@@ -4302,7 +4339,7 @@ public:
      * "fileName" is only used for unsecured non signed connections (normally forbidden), and specifies the fully qualified path to a physical file.
      *
      */
-    void cmdStreamCommon(IPropertyTree *requestTree, MemoryBuffer &rest, MemoryBuffer &reply, CRemoteClientHandler &client)
+    void cmdStreamCommon(IPropertyTree *requestTree, MemoryBuffer &rest, MemoryBuffer &reply, CRemoteClientHandler &client, CClientStats &stats)
     {
         /* Example JSON request:
          *
@@ -4554,7 +4591,7 @@ public:
         }
 
         if (cursorHandle)
-            remoteRequest->process(requestTree, rest, reply);
+            remoteRequest->process(requestTree, rest, reply, stats);
         else
         {
             const char *outputFmtStr = requestTree->queryProp("format");
@@ -4585,23 +4622,23 @@ public:
         }
     }
 
-    void cmdStreamReadCommon(MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler &client)
+    void cmdStreamReadCommon(MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler &client, CClientStats &stats)
     {
         size32_t jsonSz = msg.remaining();
         Owned<IPropertyTree> requestTree = createPTreeFromJSONString(jsonSz, (const char *)msg.readDirect(jsonSz));
-        cmdStreamCommon(requestTree, msg, reply, client);
+        cmdStreamCommon(requestTree, msg, reply, client, stats);
     }
 
 
     // NB: JSON header to message, for some requests (e.g. write), there will be trailing raw data (e.g. row data)
 
-    void cmdStreamReadStd(MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler &client)
+    void cmdStreamReadStd(MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler &client, CClientStats &stats)
     {
         reply.append(RFEnoerror); // gets patched if there is a follow on error
-        cmdStreamReadCommon(msg, reply, client);
+        cmdStreamReadCommon(msg, reply, client, stats);
     }
 
-    void cmdStreamReadJSON(MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler &client)
+    void cmdStreamReadJSON(MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler &client, CClientStats &stats)
     {
         /* NB: exactly the same handling as cmdStreamReadStd(RFCStreamRead) for now,
          * may want to differentiate later
@@ -4609,13 +4646,13 @@ public:
          * errorcode = 0 means no error
          */
         reply.append(RFEnoerror); // gets patched if there is a follow on error
-        cmdStreamReadCommon(msg, reply, client);
+        cmdStreamReadCommon(msg, reply, client, stats);
     }
 
-    void cmdStreamReadTestSocket(MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler &client)
+    void cmdStreamReadTestSocket(MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler &client, CClientStats &stats)
     {
         reply.append('J');
-        cmdStreamReadCommon(msg, reply, client);
+        cmdStreamReadCommon(msg, reply, client, stats);
     }
 
     // legacy version
@@ -4793,26 +4830,26 @@ public:
                 {
                     checkAuthorizedStreamCommand(*client);
                     reply.append(RFEnoerror); // gets patched if there is a follow on error
-                    cmdStreamGeneral(msg, reply, *client);
+                    cmdStreamGeneral(msg, reply, *client, *stats);
                     break;
                 }
                 case RFCStreamRead:
                 {
                     checkAuthorizedStreamCommand(*client);
-                    cmdStreamReadStd(msg, reply, *client);
+                    cmdStreamReadStd(msg, reply, *client, *stats);
                     break;
                 }
                 case RFCStreamReadJSON:
                 {
                     checkAuthorizedStreamCommand(*client);
-                    cmdStreamReadJSON(msg, reply, *client);
+                    cmdStreamReadJSON(msg, reply, *client, *stats);
                     break;
                 }
                 case RFCStreamReadTestSocket:
                 {
                     testSocketFlag = true;
                     checkAuthorizedStreamCommand(*client);
-                    cmdStreamReadTestSocket(msg, reply, *client);
+                    cmdStreamReadTestSocket(msg, reply, *client, *stats);
                     break;
                 }
             default:

+ 1 - 1
initfiles/bash/etc/init.d/dafilesrv.in

@@ -182,7 +182,7 @@ for C in ${component} ; do
     if [ ${DEBUG} != "NO_DEBUG" ]; then
         echo $xcmd
     fi
-    if [ "${cmd}" == "start" ]; then
+    if [[ "${cmd}" == "start" && "${thisos}" == "Linux" ]]; then
         (
             flock 99
             eval $xcmd

+ 141 - 11
plugins/cryptolib/cryptolib.cpp

@@ -14,8 +14,6 @@
     See the License for the specific language governing permissions and
     limitations under the License.
 ############################################################################## */
-
-#include "jutil.hpp"
 #include "jexcept.hpp"
 #include "digisign.hpp"
 #include "ske.hpp"
@@ -24,6 +22,7 @@
 #include <openssl/rand.h>
 #include <string>
 #include <unordered_map>
+#include "jthread.hpp"
 
 #include "cryptolib.hpp"
 
@@ -436,14 +435,12 @@ public:
         }
         else
         {
-            IDigitalSignatureManager * pDSM = nullptr;
             if (!isEmptyString(pubKeyFS) || !isEmptyString(privKeyFS))
-                pDSM = createDigitalSignatureManagerInstanceFromFiles(pubKeyFS, privKeyFS, passphrase);
+                ret = createDigitalSignatureManagerInstanceFromFiles(pubKeyFS, privKeyFS, passphrase);
             else
-                pDSM = createDigitalSignatureManagerInstanceFromKeys(pubKeyBuff, privKeyBuff, passphrase);
+                ret = createDigitalSignatureManagerInstanceFromKeys(pubKeyBuff, privKeyBuff, passphrase);
 
-            dsmCache.insert(pair<string, IDigitalSignatureManager*>(searchKey.str(), pDSM));
-            ret = pDSM;
+            dsmCache.emplace(searchKey.str(), ret);
         }
         return LINK(ret);
     }
@@ -528,6 +525,139 @@ CRYPTOLIB_API bool CRYPTOLIB_CALL clPKIVerifySignatureBuff(const char * pkalgori
 }
 
 
+//-----------------------------------------------------------------
+//  Simple cache for loaded keys
+//
+//-----------------------------------------------------------------
+#ifdef _USE_HASHMAP
+class CKeyCache : public CInterface
+{
+private:
+    typedef std::unordered_map<string, Owned<CLoadedKey>> KeyCache;
+    KeyCache keyCache;
+
+public:
+
+    CLoadedKey * getInstance(bool isPublic, const char * keyFS, const char * keyBuff, const char * passphrase)
+    {
+        if (isEmptyString(keyFS) && isEmptyString(keyBuff))
+            throw MakeStringException(-1, "Must specify a key filename or provide a key buffer");
+        VStringBuffer searchKey("%s_%s_%s", isEmptyString(keyFS) ? "" : keyFS, isEmptyString(keyBuff) ? "" : keyBuff, isEmptyString(passphrase) ? "" : passphrase);
+        KeyCache::iterator it = keyCache.find(searchKey.str());
+        CLoadedKey * ret = nullptr;
+        if (it != keyCache.end())//exists in cache?
+        {
+            ret = (*it).second;
+        }
+        else
+        {
+            if (!isEmptyString(keyFS))
+            {
+                //Create CLoadedKey from filespec
+                if (isPublic)
+                    ret = loadPublicKeyFromFile(keyFS, passphrase);
+                else
+                    ret = loadPrivateKeyFromFile(keyFS, passphrase);
+            }
+            else
+            {
+                //Create CLoadedKey from key contents
+                if (isPublic)
+                    ret = loadPublicKeyFromMemory(keyBuff, passphrase);
+                else
+                    ret = loadPrivateKeyFromMemory(keyBuff, passphrase);
+            }
+
+            keyCache.emplace(searchKey.str(), ret);
+        }
+        return LINK(ret);
+    }
+};
+
+#else
+
+class CKeyCache : public CInterface
+{
+private:
+    bool        m_isPublic = false;
+    StringAttr  m_keyFS;
+    StringAttr  m_keyBuff;
+    StringAttr  m_passphrase;
+    Owned<CLoadedKey> m_loadedKey;
+
+    //String compare that treats null ptr and ptr to empty string as matching
+    inline bool sameString(const char * left, const char * right)
+    {
+        if (isEmptyString(left))
+            return isEmptyString(right);
+        else if (isEmptyString(right))
+            return false;
+        return strcmp(left, right) == 0;
+    }
+
+public:
+
+    CLoadedKey * getInstance(bool isPublic, const char * keyFS, const char * keyBuff, const char * passphrase)
+    {
+        if (!m_loadedKey ||
+            isPublic != m_isPublic ||
+            !sameString(passphrase, m_passphrase.str())  ||
+            !sameString(keyFS, m_keyFS.str()) ||
+            !sameString(keyBuff, m_keyBuff.str()))
+        {
+            CLoadedKey *newKey;
+
+            if (!isEmptyString(keyFS))
+            {
+                //Create CLoadedKey from filespec
+                if (isPublic)
+                    newKey = loadPublicKeyFromFile(keyFS, passphrase);
+                else
+                    newKey = loadPrivateKeyFromFile(keyFS, passphrase);
+            }
+            else if (!isEmptyString(keyBuff))
+            {
+                //Create CLoadedKey from key contents
+                if (isPublic)
+                    newKey = loadPublicKeyFromMemory(keyBuff, passphrase);
+                else
+                    newKey = loadPrivateKeyFromMemory(keyBuff, passphrase);
+            }
+            else
+                throw makeStringException(-1, "Must specify a key filename or provide a key buffer");
+
+            m_loadedKey.setown(newKey);//releases previous ptr
+            m_isPublic = isPublic;
+            m_keyFS.set(keyFS);
+            m_keyBuff.set(keyBuff);
+            m_passphrase.set(passphrase);
+        }
+        return LINK(m_loadedKey);
+    }
+};
+#endif//_USE_HASHMAP
+
+//----------------------------------------------------------------------------
+// TLS storage of Key cache
+//----------------------------------------------------------------------------
+static thread_local CKeyCache *pKC = nullptr;
+
+static bool clearupKeyCache(bool isPooled)
+{
+    delete pKC;
+    pKC = nullptr;
+    return false;
+}
+
+static CLoadedKey * getCachedKey(bool isPublic, const char * keyFS, const char * keyBuff, const char * passphrase)
+{
+    if (!pKC)
+    {
+        pKC = new CKeyCache();
+        addThreadTermFunc(clearupKeyCache);
+    }
+    return pKC->getInstance(isPublic, keyFS, keyBuff, passphrase);
+}
 //------------------------------------
 //Encryption helper
 //------------------------------------
@@ -570,7 +700,7 @@ CRYPTOLIB_API void CRYPTOLIB_CALL clPKIEncrypt(size32_t & __lenResult,void * & _
                                             size32_t lenInputdata,const void * inputdata)
 {
     verifyPKIAlgorithm(pkalgorithm);
-    Owned<CLoadedKey> publicKey = loadPublicKeyFromFile(publickeyfile, passphrase);
+    Owned<CLoadedKey> publicKey = getCachedKey(true, publickeyfile, nullptr, passphrase);
     doPKIEncrypt(__lenResult, __result, publicKey, lenInputdata, inputdata);
 }
 
@@ -582,7 +712,7 @@ CRYPTOLIB_API void CRYPTOLIB_CALL clPKIDecrypt(size32_t & __lenResult,void * & _
                                             size32_t lenEncrypteddata,const void * encrypteddata)
 {
     verifyPKIAlgorithm(pkalgorithm);
-    Owned<CLoadedKey> privateKey = loadPrivateKeyFromFile(privatekeyfile, passphrase);
+    Owned<CLoadedKey> privateKey = getCachedKey(false, privatekeyfile, nullptr, passphrase);
     doPKIDecrypt(__lenResult, __result, privateKey, lenEncrypteddata, encrypteddata);
 }
 
@@ -595,7 +725,7 @@ CRYPTOLIB_API void CRYPTOLIB_CALL clPKIEncryptBuff(size32_t & __lenResult,void *
                                                 size32_t lenInputdata,const void * inputdata)
 {
     verifyPKIAlgorithm(pkalgorithm);
-    Owned<CLoadedKey> publicKey = loadPublicKeyFromMemory(publickeybuff, passphrase);
+    Owned<CLoadedKey> publicKey = getCachedKey(true, nullptr, publickeybuff, passphrase);
     doPKIEncrypt(__lenResult, __result, publicKey, lenInputdata, inputdata);
 }
 
@@ -606,6 +736,6 @@ CRYPTOLIB_API void CRYPTOLIB_CALL clPKIDecryptBuff(size32_t & __lenResult,void *
                                                 size32_t lenEncrypteddata,const void * encrypteddata)
 {
     verifyPKIAlgorithm(pkalgorithm);
-    Owned<CLoadedKey> privateKey = loadPrivateKeyFromMemory(privatekeybuff, passphrase);
+    Owned<CLoadedKey> privateKey = getCachedKey(false, nullptr, privatekeybuff, passphrase);
     doPKIDecrypt(__lenResult, __result, privateKey, lenEncrypteddata, encrypteddata);
 }

+ 1 - 0
plugins/workunitservices/workunitservices.cpp

@@ -730,6 +730,7 @@ public:
         extra.getCreator(creator);
         StatisticScopeType scopeType = extra.getScopeType();
         const char * scope = extra.queryScope();
+        if (!scope) scope = "";
         extra.getDescription(description, true);
         StatisticMeasure measure = extra.getMeasure();
 

+ 1 - 1
roxie/ccd/ccdcontext.cpp

@@ -2959,7 +2959,7 @@ public:
     virtual void process()
     {
         MTIME_SECTION(myTimer, "Process");
-        QueryTerminationCleanup threadCleanup;
+        QueryTerminationCleanup threadCleanup(true);
         EclProcessFactory pf = (EclProcessFactory) factory->queryDll()->getEntry("createProcess");
         Owned<IEclProcess> p = pf();
         try

+ 1 - 0
roxie/ccd/ccdmain.cpp

@@ -699,6 +699,7 @@ int STARTQUERY_API start_query(int argc, const char *argv[])
             topology->setPropInt("RoxieFarmProcess/@port", port);
             topology->setProp("@daliServers", globals->queryProp("--daliServers"));
             topology->setProp("@traceLevel", globals->queryProp("--traceLevel"));
+            topology->setPropBool("@traceStartStop", globals->getPropInt("--traceStartStop", 0));
             topology->setPropInt("@allFilesDynamic", globals->getPropInt("--allFilesDynamic", 1));
             topology->setProp("@memTraceLevel", globals->queryProp("--memTraceLevel"));
             topology->setPropInt64("@totalMemoryLimit", globals->getPropInt("--totalMemoryLimitMb", 0) * (memsize_t) 0x100000);

+ 33 - 3
roxie/ccd/ccdserver.cpp

@@ -5282,6 +5282,26 @@ IRoxieServerActivityFactory *createRoxieServerNullActivityFactory(unsigned _id,
 
 //=================================================================================
 
+class CRoxieServerNullSinkActivity : public CRoxieServerInternalSinkActivity
+{
+public:
+    CRoxieServerNullSinkActivity(IRoxieSlaveContext *_ctx, const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager)
+        : CRoxieServerInternalSinkActivity(_ctx, _factory, _probeManager, 0)
+    {
+    }
+
+    virtual void onExecute() override
+    {
+    }
+};
+
+IRoxieServerActivity * createRoxieServerNullSinkActivity(IRoxieSlaveContext *_ctx, const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager)
+{
+    return new CRoxieServerNullSinkActivity(_ctx, _factory, _probeManager);
+}
+
+//=================================================================================
+
 class CRoxieServerPassThroughActivity : public CRoxieServerActivity
 {
 public:
@@ -12104,13 +12124,18 @@ public:
         case 0:
             switch (kind)
             {
-            case TAKdiskwrite: return new CRoxieServerDiskWriteActivity(_ctx, this, _probeManager);
-            case TAKcsvwrite: return new CRoxieServerCsvWriteActivity(_ctx, this, _probeManager);
+            case TAKdiskwrite:
+                return new CRoxieServerDiskWriteActivity(_ctx, this, _probeManager);
+            case TAKcsvwrite:
+                return new CRoxieServerCsvWriteActivity(_ctx, this, _probeManager);
             case TAKxmlwrite:
             case TAKjsonwrite:
                 return new CRoxieServerXmlWriteActivity(_ctx, this, _probeManager, kind);
+            case TAKspillwrite:
+                return new CRoxieServerNullSinkActivity(_ctx, this, _probeManager);
             };
             throwUnexpected();
+
         case 1:
             return new CRoxieServerSpillWriteActivity(_ctx, this, _probeManager);
         default:
@@ -12120,7 +12145,7 @@ public:
 
     virtual bool isSink() const
     {
-        return numOutputs == 0 && !isTemp; // MORE - check with Gavin if this is right if not a temp but reread in  same job...
+        return numOutputs == 0 && (kind==TAKspillwrite || !isTemp); // MORE - check with Gavin if this is right if not a temp but reread in  same job...
     }
 
 };
@@ -21686,6 +21711,11 @@ public:
         return new CRoxieServerRemoteResultActivity(_ctx, this, _probeManager, usageCount);
     }
 
+    virtual bool isSink() const override
+    {
+        return CRoxieServerInternalSinkFactory::isSink() || dependentCount == 0;  // Codegen normally optimizes these away, but if it doesn't we need to treat as a sink rather than a dependency or upstream activities are not stopped properly
+    }
+
 };
 
 IRoxieServerActivityFactory *createRoxieServerRemoteResultActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode, unsigned _usageCount, bool _isRoot)

+ 3 - 3
system/jlib/jstats.cpp

@@ -122,16 +122,16 @@ MODULE_INIT(INIT_PRIORITY_STANDARD)
 
 extern jlib_decl int compareScopeName(const char * left, const char * right)
 {
-    if (!*left)
+    if (!left || !*left)
     {
-        if (!*right)
+        if (!right || !*right)
             return 0;
         else
             return -1;
     }
     else
     {
-        if (!*right)
+        if (!right || !*right)
             return +1;
     }
 

+ 3 - 1
system/jlib/jthread.hpp

@@ -67,8 +67,10 @@ extern jlib_decl void callThreadTerminationHooks(bool isPooled);
 //An exception safe way of ensuring that the thread termination hooks are called.
 class jlib_decl QueryTerminationCleanup
 {
+    bool isPooled;
 public:
-    inline ~QueryTerminationCleanup() { callThreadTerminationHooks(true); }
+    inline QueryTerminationCleanup(bool _isPooled) : isPooled(_isPooled) { }
+    inline ~QueryTerminationCleanup() { callThreadTerminationHooks(isPooled); }
 };
 
 class jlib_decl Thread : public CInterface, public IThread

+ 179 - 0
testing/esp/wudetails/wucheckstartstops.py

@@ -0,0 +1,179 @@
+#! /usr/bin/python3
+################################################################################
+#    HPCC SYSTEMS software Copyright (C) 2019 HPCC Systems®.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License");
+#    you may not use this file except in compliance with the License.
+#    You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+################################################################################
+
+import wucommon
+import logging
+from wucommon import TestCase
+
+def execTestCase(jobname, wuid, tcase):
+    logging.debug('Executing %s',jobname)
+    wuresp = wutest.makeWuDetailsRequest(jobname, wuid, tcase)
+
+    # Check numstop and numstarts
+    if (wuresp['Scopes'] != None):
+        for scope in wuresp['Scopes']['Scope']:
+            properties = scope['Properties']
+            if (properties == None):
+                continue
+            numStarts = ''
+            numStops = ''
+            for property in properties['Property']:
+                if (property['Name'] == 'NumStarts'):
+                    numStarts = property['Formatted']
+                elif (property['Name'] == 'NumStops'):
+                    numStops = property['Formatted']
+            if (numStarts != numStops):
+                logging.error('Job %s WUID %s ScopeName %s NumStarts %s NumStops %s', jobname, wuid, scope['ScopeName'], numStarts, numStops)
+                return False
+    return True
+
+###############################################################################
+print('wucheckstartstops.py - Check NumStarts matches NumStops')
+print('-------------------------------------------------------')
+print('')
+
+requiredJobs = ( ('childds1',      ['thor']),
+                 ('sort',          ['thor']),
+                 ('key',           ['thor']),
+                 ('dict1',         ['thor']),
+                 ('indexread2-multiPart(true)',['thor']),
+                 ('sets',          ['thor']),
+                 ('HeadingExample',['thor']),
+                 ('aaawriteresult',['thor']),
+                 ('action1',       ['thor']),
+                 ('action1a',      ['thor']),
+                 ('action2',       ['thor']),
+                 ('action4',       ['thor']),
+                 ('action5',       ['thor']),
+                 ('aggds1-multiPart(true)',                      ['thor']),
+                 ('aggds1-multiPart(false)-useSequential(true)', ['thor']),
+                 ('aggds2-multiPart(true)',                      ['thor']),
+                 ('aggds2-multiPart(false)-useSequential(true)', ['thor']),
+                 ('aggds3-multiPart(true)',                      ['thor']),
+                 ('aggds3-multiPart(false)-useSequential(true)', ['thor']),
+                 ('aggds3-keyedFilters(true)',                   ['thor']),
+                 ('diskread-multiPart(false)',                   ['thor']),
+                 ('diskGroupAggregate-multiPart(false)',         ['thor']),
+                 ('diskAggregate-multiPart(false)',              ['thor']),
+                 ('dict_once',     ['thor']),
+                 ('dict_null',     ['thor']),
+                 ('dict_matrix',   ['thor']),
+                 ('dict_map',      ['thor']),
+                 ('dict_keyed-multiPart(true)',  ['thor']),
+                 ('dict_keyed-multiPart(false)', ['thor']),
+                 ('dict_int',      ['thor']),
+                 ('dict_indep',    ['thor']),
+                 ('dict_if',       ['thor']),
+                 ('dict_choose',   ['thor']),
+                 ('dict_case',     ['thor']),
+                 ('dict_dsout',    ['thor']),
+                 ('dict_dups',     ['thor']),
+                 ('dict_field',    ['thor']),
+                 ('dict_field2',   ['thor']),
+                 ('dict_func',     ['thor']),
+                 ('dict5c',        ['thor']),
+                 ('dict5b',        ['thor']),
+                 ('dict5a',        ['thor']),
+                 ('dict5',         ['thor']),
+                 ('dict3a',        ['thor']),
+                 ('dict3',         ['thor']),
+                 ('dict2',         ['thor']),
+                 ('dict17',        ['thor']),
+                 ('dict16',        ['thor']),
+                 ('dict15c-multiPart(true)',   ['thor']),
+                 ('dict15c-multiPart(false)',  ['thor']),
+                 ('dict15b-multiPart(true)',   ['thor']),
+                 ('dict15b-multiPart(false)',  ['thor']),
+                 ('dict15a-multiPart(true)',   ['thor']),
+                 ('dict15a-multiPart(false)',  ['thor']),
+                 ('dict15-multiPart(true)',    ['thor']),
+                 ('dict15-multiPart(false)',   ['thor']),
+                 ('dict12',         ['thor']),
+                 ('dict11',         ['thor']),
+                 ('dict10',         ['thor']),
+                 ('dict1',          ['thor']),
+                 ('dfsrecordof',    ['thor']),
+                 ('dfsj',           ['thor']),
+                 ('dfsirecordof',   ['thor']),
+                 ('groupread-multiPart(true)', ['thor']),
+                 ('groupread-multiPart(false)',['thor']),
+                 ('groupjoin1',     ['thor']),
+                 ('grouphashdedup2',['thor']),
+                 ('grouphashdedup', ['thor']),
+                 ('grouphashagg',   ['thor']),
+                 ('groupglobal3c',  ['thor']),
+                 ('groupglobal3b',  ['thor']),
+                 ('groupglobal3a',  ['thor']),
+                 ('groupglobal2c',  ['thor']),
+                 ('groupglobal2b',  ['thor']),
+                 ('groupglobal2a',  ['thor']),
+                 ('groupglobal1c',  ['thor']),
+                 ('groupglobal1b',  ['thor']),
+                 ('groupglobal1a',  ['thor']),
+                 ('groupchild',     ['thor']),
+                 ('group',          ['thor']),
+                 ('globals',        ['thor']),
+                 ('globalmerge',    ['thor']),
+                 ('globalid',       ['thor']),
+                 ('globalfile',     ['thor']),
+                 ('global',         ['thor']),
+                 ('genjoin3',       ['thor']),
+                 ('fullkeyed-multiPart(true)', ['thor']),
+                 ('fullkeyed-multiPart(false)',['thor']),
+                 ('full_test',      ['thor']),
+                 ('fromxml5',       ['thor']),
+                 ('fromjson4',      ['thor']),
+                 ('formatstored',   ['thor']),
+                 ('filterproject2', ['thor']),
+                 ('filtergroup',    ['thor']),
+                 ('fileservice',    ['thor']),
+                 ('diskGroupAggregate-multiPart(false)', ['thor']),
+                 ('denormalize1',   ['thor']),
+                 ('dataset_transform_inline', ['thor']),
+                 ('choosesets',     ['thor']),
+                 ('bloom2',         ['thor']),
+                 ('badindex-newIndexReadMapping(false)', ['thor']),
+                 ('all_denormalize-multiPart(true)',         ['thor']))
+
+wutest = wucommon.WuTest()
+
+logging.info('Gathering workunits')
+wu = wutest.getTestWorkunits(requiredJobs)
+
+if (wutest.getMatchedJobCount()==0):
+    logging.error('There are no matching jobs.  Has the performance regression suite been executed?')
+    logging.error('Aborting')
+    exit(1)
+
+wuDetailsReq = TestCase(wutest.scopeFilter(MaxDepth='999', ScopeTypes={'ScopeType':['edge']}),
+                        wutest.nestedFilter(),
+                        wutest.propertiesToReturn(Properties={'Property':['NumStarts','NumStops']}),
+                        wutest.scopeOptions(IncludeMatchedScopesInResults='1', IncludeScope='1', IncludeId='0', IncludeScopeType='0'),
+                        wutest.propertyOptions(IncludeName='1', IncludeRawValue='0', IncludeFormatted='1', IncludeMeasure='0', IncludeCreator='0', IncludeCreatorType='0'))
+
+logging.info('Checking NumStart and NumStop matches')
+stats = wucommon.Statistics()
+for jobname, wuid in wu.items():
+    success = execTestCase(jobname, wuid, wuDetailsReq)
+    logging.debug('Job %-33s WUID %-20s Success: %s', jobname, wuid, success)
+    stats.addCount(success)
+
+logging.info('Missing count: %d', wutest.getMissingJobCount(requiredJobs))
+logging.info('Matched jobs:  %d', len(wu))
+logging.info('Success count: %d', stats.successCount)
+logging.info('Failure count: %d', stats.failureCount)
+

+ 267 - 0
testing/esp/wudetails/wucommon.py

@@ -0,0 +1,267 @@
+#! /usr/bin/python3
+################################################################################
+#    HPCC SYSTEMS software Copyright (C) 2019 HPCC Systems®.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License");
+#    you may not use this file except in compliance with the License.
+#    You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+################################################################################
+
+import argparse
+import logging
+import datetime
+import traceback
+import requests.packages.urllib3
+import sys
+import inspect
+from requests import Session
+from zeep import Client
+from zeep.transports import Transport
+from pathlib import Path
+from collections import namedtuple
+from zeep.cache import SqliteCache
+
+TestCase = namedtuple('testCase', ['ScopeFilter',
+                                   'NestedFilter',
+                                   'PropertiesToReturn',
+                                   'ScopeOptions',
+                                   'PropertyOptions'])
+
+def safeMkdir(path):
+    try:
+        path.mkdir(parents=True)
+    except FileExistsError as e:
+        pass
+    except (FileNotFoundError, PermissionError) as e:
+        logging.error("'%s' \nExit." % (str(e)))
+        exit(-1)
+    except:
+        print("Unexpected error:" + str(sys.exc_info()[0]) + " (line: " + str(inspect.stack()[0][2]) + ")" )
+        traceback.print_stack()
+        exit(-1)
+
+class DebugTransport(Transport):
+    def post(self, address, message, headers):
+        self.xml_request = message.decode('utf-8')
+        response = super().post(address, message, headers)
+        self.response = response
+        return response
+
+class Statistics:
+    def __init__(self):
+        self.successCount = 0
+        self.failureCount = 0
+
+    def addCount(self, success):
+        if (success):
+            self.successCount += 1
+        else:
+            self.failureCount += 1
+
+class WuTest:
+    def connect(self):
+        # Consume WSDL and generate soap structures
+        try:
+            session = Session()
+            if (self.args.nosslverify):
+                session.verify = False
+                session.sslverify = False
+            self.transport = DebugTransport(cache=SqliteCache(), session=session)
+            self.wudetails = Client(self.wudetailsservice_wsdl_url, transport=self.transport)
+        except:
+            logging.critical ('Unable to connect/obtain WSDL from ' + self.wudetailsservice_wsdl_url)
+            raise
+
+    def __init__(self, maskFields=(), maskMeasureTypes=(), maskVersion=False, maskWUID=False):
+        argumentparser = argparse.ArgumentParser()
+        argumentparser.add_argument('-o', '--outdir', help='Results directory', default='results', metavar='dir')
+        argumentparser.add_argument('-d', '--debug', help='Enable debug', action='store_true', default=False)
+        argumentparser.add_argument('--logresp', help='Log wudetails responses (in results directory)', action='store_true', default=False)
+        argumentparser.add_argument('--logreq', help='Log wudetails requests (in results directory)', action='store_true', default=False)
+        argumentparser.add_argument('-n', '--lastndays', help="Use workunits from last 'n' days", type=int, default=1, metavar='days')
+        argumentparser.add_argument('--nosslverify', help="Disable SSL certificate verification", action='store_true', default=False)
+        argumentparser.add_argument('--baseurl', help="Base url for both WUQuery and WUDetails", default='http://localhost:8010', metavar='url')
+        argumentparser.add_argument('--user', help='Username for authentication', metavar='username')
+        argumentparser.add_argument('--pw', help='Password for authentication', metavar='password')
+        argumentparser.add_argument('--httpdigestauth', help='User HTTP digest authentication(Basic auth default)', action='store_true')
+        self.args = argumentparser.parse_args()
+
+        if (self.args.debug):
+            loglevel = logging.DEBUG
+        else:
+            loglevel=logging.INFO
+        logging.basicConfig(level=loglevel, format='[%(levelname)s] %(message)s')
+        requests.packages.urllib3.disable_warnings()
+        self.keydir = 'key'
+        self.wuqueryservice_wsdl_url = self.args.baseurl + '/WsWorkunits/WUQuery.json?ver_=1.71&wsdl'
+        self.wudetailsservice_wsdl_url = self.args.baseurl + '/WsWorkunits/WUDetails.json?ver_=1.71&wsdl'
+        self.outdir = self.args.outdir
+
+        if (len(maskFields)==0 and len(maskMeasureTypes)==0 and maskWUID==False and maskVersion==False):
+            self.enableMasking = False
+        else:
+            self.enableMasking = True
+
+        self.maskFields = maskFields
+        self.maskMeasureTypes = maskMeasureTypes
+        self.maskVersion = maskVersion
+        self.maskWUID = maskWUID
+
+        self.resultdir = Path(self.args.outdir)
+        safeMkdir(self.resultdir)
+
+        self.tcasekeydir = Path(self.keydir)
+        safeMkdir(self.tcasekeydir)
+        try:
+            self.connect()
+            try:
+                self.scopeFilter = self.wudetails.get_type('ns0:WUScopeFilter')
+                self.nestedFilter = self.wudetails.get_type('ns0:WUNestedFilter')
+                self.propertiesToReturn = self.wudetails.get_type('ns0:WUPropertiesToReturn')
+                self.scopeOptions = self.wudetails.get_type('ns0:WUScopeOptions')
+                self.propertyOptions = self.wudetails.get_type('ns0:WUPropertyOptions')
+                self.extraProperties = self.wudetails.get_type('ns0:WUExtraProperties')
+            except:
+                logging.critical ('WSDL different from expected')
+                raise
+        except:
+            sys.exit('Aborting!')
+
+        self.wuquery = Client(self.wuqueryservice_wsdl_url, transport=self.transport)
+
+    # Mask out fields in the response structure
+    #
+    def maskoutFields(self, wudetails_resp, wuid):
+        try:
+            if (self.maskWUID and wudetails_resp['WUID']==wuid):
+                wudetails_resp['WUID'] = '{masked WUID - matches request}'
+
+            if (self.maskVersion and wudetails_resp['MaxVersion'].isnumeric()):
+                wudetails_resp['MaxVersion'] = '{masked number}'
+
+            if (wudetails_resp['Scopes'] != None):
+                for scope in wudetails_resp['Scopes']['Scope']:
+                    properties = scope['Properties']
+                    if (properties == None):
+                        continue
+                    for property in properties['Property']:
+                        if ((property['Name'] in self.maskFields) or (property['Measure'] in self.maskMeasureTypes)):
+                            if (property['RawValue'] != None):
+                                property['RawValue'] = '{masked}'
+                            if (property['Formatted'] != None):
+                                property['Formatted'] = '{masked}'
+                        property['Creator'] = '{masked}'
+        except:
+            logging.critical('Unable to process WUDetails response: %s', wuid)
+            raise
+
+    def makeWuDetailsRequest(self,testfilename,wuid,tcase):
+        outfile = (self.resultdir / testfilename).with_suffix('.json')
+        errfile = outfile.with_suffix('.err')
+
+        if (outfile.exists()): outfile.unlink()
+        if (errfile.exists()): errfile.unlink()
+
+        try:
+            wuresp = self.wudetails.service.WUDetails(WUID=wuid,
+                                            ScopeFilter=tcase.ScopeFilter,
+                                            NestedFilter=tcase.NestedFilter,
+                                            PropertiesToReturn=tcase.PropertiesToReturn,
+                                            ScopeOptions=tcase.ScopeOptions,
+                                            PropertyOptions=tcase.PropertyOptions)
+        except:
+            logging.critical('Unable to submit WUDetails request: %s', testfilename)
+            raise
+        finally:
+            if (self.args.logreq):
+                reqfile = outfile.with_suffix('.req')
+                try:
+                    if (reqfile.exists()): reqfile.unlink()
+                    with reqfile.open(mode='w') as f:
+                        print (self.transport.xml_request, file=f)
+                except:
+                    logging.critical('Unable write logrequest to file: %s', reqfile)
+                    pass
+
+        if (self.args.logresp):
+            respfile = outfile.with_suffix('.resp')
+            if (respfile.exists()): respfile.unlink()
+            with respfile.open(mode='w') as f:
+                print (wuresp, file=f)
+
+        if (self.enableMasking):
+            self.maskoutFields(wuresp, wuid)
+        return wuresp
+
+    # Get a list of workunits that will be used for testing wudetails
+    def getTestWorkunits(self, requiredJobs):
+        # Calculate date range (LastNDays not processed correctly by wuquery)
+        enddate = datetime.datetime.now()
+        startdate = enddate - datetime.timedelta(days=self.args.lastndays)
+        self.matchedWU = {}
+
+        logging.debug ('Gathering Workunits')
+        for reqjob in requiredJobs:
+            reqJobname = reqjob[0]
+            reqClusters = reqjob[1]
+            nextPage = 0
+            while (nextPage >=0):
+                wuqueryresp = self.wuquery.service.WUQuery(Owner='regress',
+                                                    State='completed',
+                                                    PageSize=500,
+                                                    PageStartFrom=nextPage,
+                                                    LastNDays=self.args.lastndays,
+                                                    StartDate=startdate.strftime('%Y-%m-%dT00:00:00'),
+                                                    EndDate=enddate.strftime('%Y-%m-%dT23:59:59'),
+                                                    Jobname=reqJobname + '*',
+                                                    Descending='1')
+                try:
+                    nextPage = wuqueryresp['NextPage']
+                    workunits = wuqueryresp['Workunits']['ECLWorkunit']
+                except:
+                    break
+
+                try:
+                    logging.debug('jobname %s count: %d', reqJobname,len(workunits))
+                    workunits.sort(key=lambda k: k['Jobname'], reverse=True)
+
+                    # Extract jobname from jobname with date postfix
+                    for wu in workunits:
+                        s = wu['Jobname'].split('-')
+                        cluster = wu['Cluster']
+                        if (len(s) >2):
+                            sep = '-'
+                            job = sep.join(s[0:len(s)-2])
+                        else:
+                            job = wu['Jobname']
+                        key = job + '_' + cluster
+                        if ( (job == reqJobname) and (cluster in reqClusters) and (key not in self.matchedWU)):
+                            self.matchedWU[key] = wu['Wuid']
+                except:
+                    logging.error('Unexpected response from WUQuery: %s', wuqueryresp)
+                    raise
+
+        return self.matchedWU
+
+    def getMissingJobCount(self,requiredJobs):
+        missingjobcount = 0
+        for reqjob in requiredJobs:
+            jobname = reqjob[0]
+            for cluster in reqjob[1]:
+                key = jobname + '_' + cluster
+                if (key not in self.matchedWU):
+                    logging.error('Missing job: %s (%s)', jobname, cluster)
+                    missingjobcount += 1
+        return missingjobcount
+
+    def getMatchedJobCount(self):
+        return len(self.matchedWU)
+

+ 83 - 326
testing/esp/wudetails/wutest.py

@@ -1,6 +1,6 @@
 #! /usr/bin/python3
 ################################################################################
-#    HPCC SYSTEMS software Copyright (C) 2018 HPCC Systems®.
+#    HPCC SYSTEMS software Copyright (C) 2019 HPCC Systems®.
 #
 #    Licensed under the Apache License, Version 2.0 (the "License");
 #    you may not use this file except in compliance with the License.
@@ -15,51 +15,40 @@
 #    limitations under the License.
 ################################################################################
 
-import argparse
-import filecmp
+import wucommon
 import logging
-import logging.config
-import datetime
-import requests.packages.urllib3
-import inspect
-import traceback
-import sys
-from requests import Session
-from requests.auth import HTTPBasicAuth
-from requests.auth import HTTPDigestAuth
-from zeep import Client 
-from zeep.transports import Transport
-from zeep.cache import SqliteCache
-from collections import namedtuple
-from pathlib import Path
+import filecmp
+from wucommon import TestCase
 
-print('WUDetails Regression (wutest.py)')
-print('--------------------------------')
-print('')
+def execTestCase(jobname, wuid, tcase, tcasename):
+    testfilename = jobname + '_' + tcasename
+    logging.debug('Executing %s',testfilename)
 
-argumentparser = argparse.ArgumentParser()
-argumentparser.add_argument('-o', '--outdir', help='Results directory', default='results', metavar='dir')
-argumentparser.add_argument('-d', '--debug', help='Enable debug', action='store_true', default=False)
-argumentparser.add_argument('--logresp', help='Log wudetails responses (in results directory)', action='store_true', default=False)
-argumentparser.add_argument('--logreq', help='Log wudetails requests (in results directory)', action='store_true', default=False)
-argumentparser.add_argument('-n', '--lastndays', help="Use workunits from last 'n' days", type=int, default=1, metavar='days')
-argumentparser.add_argument('--nosslverify', help="Disable SSL certificate verification", action='store_true', default=False)
-argumentparser.add_argument('-u', '--baseurl', help="Base url for both WUQuery and WUDetails", default='http://localhost:8010', metavar='url')
-argumentparser.add_argument('--user', help='Username for authentication', metavar='username')
-argumentparser.add_argument('--pw', help='Password for authentication', metavar='password')
-argumentparser.add_argument('--httpdigestauth', help='User HTTP digest authentication(Basic auth default)', action='store_true')
-args = argumentparser.parse_args()
+    wuresp = wutest.makeWuDetailsRequest(testfilename, wuid, tcase)
 
-if (args.debug):
-    loglevel = logging.DEBUG
-else:
-    loglevel=logging.INFO
-logging.basicConfig(level=loglevel, format='[%(levelname)s] %(message)s')
+    outfile = (wutest.resultdir / testfilename).with_suffix('.json')
+    if (outfile.exists()): outfile.unlink()
+    with outfile.open(mode='w') as f:
+        print (tcase, file=f)
+        print (wuresp, file=f)
+
+    keyfile = (wutest.tcasekeydir / testfilename).with_suffix('.json')
+    if (not keyfile.exists()):
+        logging.error('Missing key file %s', str(keyfile))
+        return False
+
+    # Compare actual and expectetd
+    if (not filecmp.cmp(str(outfile),str(keyfile))):
+        logging.error('Regression check Failed: %s', testfilename)
+        return False
+    else:
+        logging.debug('PASSED %s', testfilename)
+        return True
 
-keydir = 'key'
-wuqueryservice_url = args.baseurl + '/WsWorkunits/WUQuery.json?ver_=1.71'
-wuqueryservice_wsdl_url = args.baseurl + '/WsWorkunits/WUQuery.json?ver_=1.71&wsdl'
-wudetailsservice_wsdl_url = args.baseurl + '/WsWorkunits/WUDetails.json?ver_=1.71&wsdl'
+###############################################################################
+print('WUDetails Regression (wutest.py)')
+print('--------------------------------')
+print('')
 
 requiredJobs = ( ('childds1',      ('roxie','thor','hthor')),
                  ('dedup_all',     ('roxie','hthor')),
@@ -69,381 +58,249 @@ requiredJobs = ( ('childds1',      ('roxie','thor','hthor')),
                  ('indexread2-multiPart(true)',('roxie', 'thor','hthor')),
                  ('sets',           ('roxie','thor','hthor')) )
 
-maskValueFields = ('Definition','DefinitionList','SizePeakMemory', 'WhenFirstRow', 'TimeElapsed', 'TimeTotalExecute', 'TimeFirstExecute', 'TimeLocalExecute',
+maskFields = ('Definition','DefinitionList','SizePeakMemory', 'WhenFirstRow', 'TimeElapsed', 'TimeTotalExecute', 'TimeFirstExecute', 'TimeLocalExecute',
                    'WhenStarted', 'TimeMinLocalExecute', 'TimeMaxLocalExecute', 'TimeAvgLocalExecute', 'SkewMinLocalExecute', 'SkewMaxLocalExecute',
                    'NodeMaxLocalExecute', 'NodeMaxDiskWrites', 'NodeMaxLocalExecute', 'NodeMaxLocalExecute', 'NodeMaxSortElapsed', 'NodeMinDiskWrites',
                    'NodeMinLocalExecute', 'NodeMinLocalExecute', 'NodeMinLocalExecute', 'NodeMinSortElapsed', 'SkewMaxDiskWrites', 'SkewMaxLocalExecute',
                    'SkewMaxLocalExecute', 'SkewMaxSortElapsed', 'SkewMinDiskWrites', 'SkewMinLocalExecute', 'SkewMinLocalExecute', 'SkewMinSortElapsed',
                    'TimeAvgSortElapsed', 'TimeMaxSortElapsed', 'TimeMinSortElapsed')
-maskMeasureTypes = ('ts','ns', 'skw', 'node')
-
-requests.packages.urllib3.disable_warnings()
-
-class DebugTransport(Transport):
-    def post(self, address, message, headers):
-        self.xml_request = message.decode('utf-8')
-        response = super().post(address, message, headers)
-        self.response = response
-        
-        return response
-
-# Get a list of workunits that will be used for testing wudetails
-#
-def GetTestWorkunits():
-    try:
-        session = Session()
-        if (args.nosslverify):
-            session.verify = False
-            session.sslverify = False
-        if (args.pw and args.user):
-            if (args.httpdigestauth):
-                session.auth = HTTPDigestAuth(args.user, args.pw)
-            else:
-                session.auth = HTTPBasicAuth(args.user, args.pw)
-
-        transport = DebugTransport(cache=SqliteCache(), session=session)
-        wuquery = Client(wuqueryservice_wsdl_url, transport=transport)
-    except:
-        logging.critical ('Unable to obtain WSDL from %s', wuqueryservice_wsdl_url)
-        raise
-   
-    # Calculate date range (LastNDays not processed correctly by wuquery)
-    enddate = datetime.datetime.now()
-    startdate = enddate - datetime.timedelta(days=args.lastndays)
-    matchedWU = {}
-
-    logging.debug ('Gathering Workunits')
-    for reqjob in requiredJobs:
-        reqJobname = reqjob[0]
-        reqClusters = reqjob[1]
-        nextPage = 0 
-        while (nextPage >=0):
-            wuqueryresp = wuquery.service.WUQuery(Owner='regress',
-                                                  State='completed',
-                                                  PageSize=500,
-                                                  PageStartFrom=nextPage,
-                                                  LastNDays=args.lastndays, 
-                                                  StartDate=startdate.strftime('%Y-%m-%dT00:00:00'),
-                                                  EndDate=enddate.strftime('%Y-%m-%dT23:59:59'),
-                                                  Jobname=reqJobname + '*',
-                                                  Descending='1')
-                
-            try:
-                nextPage = wuqueryresp['NextPage']
-                workunits = wuqueryresp['Workunits']['ECLWorkunit']
-            except:
-                return matchedWU
-
-            try:
-                logging.debug('Workunit count: %d', len(workunits))
-                workunits.sort(key=lambda k: k['Jobname'], reverse=True)
-
-                # Extract jobname from jobname with date postfix
-                for wu in workunits:
-                    s = wu['Jobname'].split('-')
-                    cluster = wu['Cluster']
-                    if (len(s) >2):
-                        sep = '-'
-                        job = sep.join(s[0:len(s)-2])
-                    else:
-                        job = wu['Jobname']
-                    key = job + '_' + cluster
-                    if ( (job == reqJobname) and (cluster in reqClusters) and (key not in matchedWU)):
-                        matchedWU[key] = wu['Wuid']
-            except:
-                logging.error('Unexpected response from WUQuery: %s', wuqueryresp) 
-                raise
-
-    return matchedWU
 
-def GetMissingJobCount(wu):
-    missingjobs = 0
-    for reqjob in requiredJobs:
-        jobname = reqjob[0]
-        for cluster in reqjob[1]:
-            key = jobname + '_' + cluster
-            if (key not in wu):
-                logging.error('Missing job: %s (%s)', jobname, cluster)
-                missingjobs += 1
-    return missingjobs
-        
-# Mask out fields in the response structure
-#
-def maskoutFields(wudetails_resp) :
-    if (wudetails_resp['MaxVersion'].isnumeric()):
-        wudetails_resp['MaxVersion'] = '{masked number}'
-
-    if (wudetails_resp['Scopes'] != None):
-        for scope in wudetails_resp['Scopes']['Scope']:
-            properties = scope['Properties']
-            if (properties == None):
-                continue
-            for property in properties['Property']:
-                if ((property['Name'] in maskValueFields) or (property['Measure'] in maskMeasureTypes)):
-                    if (property['RawValue'] != None):
-                        property['RawValue'] = '{masked}'
-                    if (property['Formatted'] != None):
-                        property['Formatted'] = '{masked}'
-            
-                property['Creator'] = '{masked}'
-
-# Main
-#
-# Consume WSDL and generate soap structures
-try:
-    session = Session()
-    if (args.nosslverify):
-        session.verify = False
-        session.sslverify = False
-    transport = DebugTransport(cache=SqliteCache(), session=session)
-    wudetails = Client(wudetailsservice_wsdl_url, transport=transport)
-except:
-    logging.critical ('Unable to obtain WSDL from ' +wudetailsservice_wsdl_url)
-    raise
-
-try:
-    scopeFilter = wudetails.get_type('ns0:WUScopeFilter')
-    nestedFilter = wudetails.get_type('ns0:WUNestedFilter')
-    propertiesToReturn = wudetails.get_type('ns0:WUPropertiesToReturn')
-    scopeOptions = wudetails.get_type('ns0:WUScopeOptions')
-    propertyOptions = wudetails.get_type('ns0:WUPropertyOptions')
-    extraProperties = wudetails.get_type('ns0:WUExtraProperties')
-except:
-    logging.critical ('WSDL different from expected')
-    raise
+maskMeasureTypes = ('ts','ns', 'skw', 'node')
 
+wutest = wucommon.WuTest(maskFields, maskMeasureTypes, True, True)
 
-# Generate Test cases
-testCase = namedtuple('testCase', ['ScopeFilter',
-                                   'NestedFilter',
-                                   'PropertiesToReturn',
-                                   'ScopeOptions',
-                                   'PropertyOptions'])
+scopeFilter = wutest.scopeFilter
+nestedFilter = wutest.nestedFilter
+propertiesToReturn = wutest.propertiesToReturn
+scopeOptions = wutest.scopeOptions
+propertyOptions = wutest.propertyOptions
+extraProperties = wutest.extraProperties
 
+# Test cases
 #scopeFilter(MaxDepth='999', Scopes=set(), Ids=set(), ScopeTypes=set()),
 #nestedFilter(Depth='999', ScopeTypes=set()),
 #propertiesToReturn(AllProperties='1', MinVersion='0', Measure='', Properties=set(), ExtraProperties=set()),
 #scopeOptions(IncludeMatchedScopesInResults='1', IncludeScope='1', IncludeId='1', IncludeScopeType='1'),
 #propertyOptions(IncludeName='1', IncludeRawValue='1', IncludeFormatted='1', IncludeMeasure='1', IncludeCreator='1', IncludeCreatorType='1')
-testCases = [ 
-             testCase(
+TestCases = [
+             TestCase(
                  scopeFilter(MaxDepth='999'),
                  nestedFilter(),
                  propertiesToReturn(AllProperties='1'),
                  scopeOptions(IncludeMatchedScopesInResults='1', IncludeScope='1', IncludeId='1', IncludeScopeType='1'),
                  propertyOptions(IncludeName='1', IncludeRawValue='1', IncludeFormatted='1', IncludeMeasure='1', IncludeCreator='1', IncludeCreatorType='1')
              ),
-             testCase(
+             TestCase(
                  scopeFilter(MaxDepth='999'),
                  nestedFilter(),
                  propertiesToReturn(AllStatistics='1'),
                  scopeOptions(IncludeMatchedScopesInResults='1', IncludeScope='1', IncludeId='1', IncludeScopeType='1'),
                  propertyOptions(IncludeName='1', IncludeRawValue='1', IncludeMeasure='1', IncludeCreator='1', IncludeCreatorType='1')
              ),
-             testCase(
+             TestCase(
                  scopeFilter(MaxDepth='999'),
                  nestedFilter(),
                  propertiesToReturn(AllAttributes='1'),
                  scopeOptions(IncludeMatchedScopesInResults='1', IncludeScope='1', IncludeId='1', IncludeScopeType='1'),
                  propertyOptions(IncludeName='1', IncludeRawValue='1', IncludeMeasure='1', IncludeCreator='1', IncludeCreatorType='1')
              ),
-             testCase(
+             TestCase(
                  scopeFilter(MaxDepth='999'),
                  nestedFilter(),
                  propertiesToReturn(AllHints='1'),
                  scopeOptions(IncludeMatchedScopesInResults='1', IncludeScope='1', IncludeId='1', IncludeScopeType='1'),
                  propertyOptions(IncludeName='1', IncludeRawValue='1', IncludeMeasure='1', IncludeCreator='1', IncludeCreatorType='1')
              ),
-             testCase(
+             TestCase(
                  scopeFilter(MaxDepth='999', Scopes={'Scope':'w1:graph1'}),
                  nestedFilter(),
                  propertiesToReturn(AllProperties='1'),
                  scopeOptions(IncludeMatchedScopesInResults='1', IncludeScope='1', IncludeId='1', IncludeScopeType='1'),
                  propertyOptions(IncludeName='1', IncludeRawValue='1', IncludeMeasure='1', IncludeCreator='1', IncludeCreatorType='1')
              ),
-             testCase(
+             TestCase(
                  scopeFilter(MaxDepth='1', ScopeTypes={'ScopeType':'graph'}),
                  nestedFilter(Depth='1'),
                  propertiesToReturn(AllProperties='1'),
                  scopeOptions(IncludeMatchedScopesInResults='1', IncludeScope='1', IncludeId='1', IncludeScopeType='1'),
                  propertyOptions(IncludeName='1', IncludeRawValue='1', IncludeMeasure='1', IncludeCreator='1', IncludeCreatorType='1')
              ),
-             testCase(
+             TestCase(
                  scopeFilter(MaxDepth='999', ScopeTypes={'ScopeType':'subgraph'}),
                  nestedFilter(),
                  propertiesToReturn(AllStatistics='1'),
                  scopeOptions(IncludeMatchedScopesInResults='1', IncludeScope='1', IncludeId='1', IncludeScopeType='1'),
                  propertyOptions(IncludeName='1', IncludeRawValue='1', IncludeMeasure='1', IncludeCreator='1', IncludeCreatorType='1')
              ),
-             testCase(
+             TestCase(
                  scopeFilter(MaxDepth='999', ScopeTypes={'ScopeType':'global'}),
                  nestedFilter(),
                  propertiesToReturn(AllStatistics='1'),
                  scopeOptions(IncludeMatchedScopesInResults='1', IncludeScope='1', IncludeId='1', IncludeScopeType='1'),
                  propertyOptions(IncludeName='1', IncludeRawValue='1', IncludeMeasure='1', IncludeCreator='1', IncludeCreatorType='1')
              ),
-             testCase(
+             TestCase(
                  scopeFilter(MaxDepth='999', ScopeTypes={'ScopeType':'activity'}),
                  nestedFilter(),
                  propertiesToReturn(AllStatistics='1'),
                  scopeOptions(IncludeMatchedScopesInResults='1', IncludeScope='1', IncludeId='1', IncludeScopeType='1'),
                  propertyOptions(IncludeName='1', IncludeRawValue='1', IncludeMeasure='1', IncludeCreator='1', IncludeCreatorType='1')
              ),
-             testCase(
+             TestCase(
                  scopeFilter(MaxDepth='999', ScopeTypes={'ScopeType':'allocator'}),
                  nestedFilter(),
                  propertiesToReturn(AllStatistics='1'),
                  scopeOptions(IncludeMatchedScopesInResults='1', IncludeScope='1', IncludeId='1', IncludeScopeType='1'),
                  propertyOptions(IncludeName='1', IncludeRawValue='1', IncludeMeasure='1', IncludeCreator='1', IncludeCreatorType='1')
              ),
-             testCase(
+             TestCase(
                  scopeFilter(MaxDepth='2'),
                  nestedFilter(),
                  propertiesToReturn(AllStatistics='1', AllAttributes='1'),
                  scopeOptions(IncludeMatchedScopesInResults='1', IncludeScope='1', IncludeId='1', IncludeScopeType='1'),
                  propertyOptions()
              ),
-             testCase(
+             TestCase(
                  scopeFilter(MaxDepth='1'),
                  nestedFilter(Depth='1'),
                  propertiesToReturn(AllStatistics='1', AllAttributes='1'),
                  scopeOptions(),
                  propertyOptions()
              ),
-             testCase(
+             TestCase(
                  scopeFilter(MaxDepth='2'),
                  nestedFilter(),
                  propertiesToReturn(Properties={'Property':['WhenStarted','WhenCreated']}),
                  scopeOptions(IncludeMatchedScopesInResults='1', IncludeScope='1', IncludeId='1', IncludeScopeType='1'),
                  propertyOptions()
              ),
-             testCase(
+             TestCase(
                  scopeFilter(MaxDepth='2'),
                  nestedFilter(),
                  propertiesToReturn(Measure='ts'),
                  scopeOptions(IncludeMatchedScopesInResults='1', IncludeScope='1', IncludeId='1', IncludeScopeType='1'),
                  propertyOptions(IncludeName='1', IncludeRawValue='1', IncludeMeasure='1', IncludeCreator='1', IncludeCreatorType='1')
              ),
-             testCase(
+             TestCase(
                  scopeFilter(MaxDepth='2'),
                  nestedFilter(),
                  propertiesToReturn(Measure='cnt'),
                  scopeOptions(IncludeMatchedScopesInResults='1', IncludeScope='1', IncludeId='1', IncludeScopeType='1'),
                  propertyOptions()
              ),
-             testCase(
+             TestCase(
                  scopeFilter(MaxDepth='999', ScopeTypes={'ScopeType':'subgraph'}),
                  nestedFilter(),
                  propertiesToReturn(AllStatistics='1', AllAttributes='1'),
                  scopeOptions(IncludeMatchedScopesInResults='1'),
                  propertyOptions(IncludeName='1', IncludeRawValue='1', IncludeMeasure='1', IncludeCreator='1', IncludeCreatorType='1')
              ),
-             testCase(
+             TestCase(
                  scopeFilter(MaxDepth='999', ScopeTypes={'ScopeType':'subgraph'}),
                  nestedFilter(),
                  propertiesToReturn(AllStatistics='1', AllAttributes='1'),
                  scopeOptions(IncludeScope='1'),
                  propertyOptions(IncludeName='1', IncludeRawValue='1', IncludeMeasure='1', IncludeCreator='1', IncludeCreatorType='1')
              ),
-             testCase(
+             TestCase(
                  scopeFilter(MaxDepth='999', ScopeTypes={'ScopeType':'subgraph'}),
                  nestedFilter(),
                  propertiesToReturn(AllStatistics='1', AllAttributes='1'),
                  scopeOptions(IncludeId='1'),
                  propertyOptions(IncludeName='1', IncludeRawValue='1', IncludeMeasure='1', IncludeCreator='1', IncludeCreatorType='1')
              ),
-             testCase(
+             TestCase(
                  scopeFilter(MaxDepth='999', ScopeTypes={'ScopeType':'subgraph'}),
                  nestedFilter(),
                  propertiesToReturn(AllStatistics='1', AllAttributes='1'),
                  scopeOptions(),
                  propertyOptions(IncludeName='1', IncludeRawValue='1', IncludeMeasure='1', IncludeCreator='1', IncludeCreatorType='1')
              ),
-             testCase(
+             TestCase(
                  scopeFilter(MaxDepth='999', ScopeTypes={'ScopeType':'subgraph'}),
                  nestedFilter(),
                  propertiesToReturn(AllStatistics='1', AllAttributes='1'),
                  scopeOptions(IncludeMatchedScopesInResults='1', IncludeScope='1', IncludeId='1', IncludeScopeType='1'),
                  propertyOptions(IncludeName='0')
              ),
-             testCase(
+             TestCase(
                  scopeFilter(MaxDepth='999', ScopeTypes={'ScopeType':'subgraph'}),
                  nestedFilter(),
                  propertiesToReturn(AllStatistics='1', AllAttributes='1'),
                  scopeOptions(IncludeMatchedScopesInResults='1', IncludeScope='1', IncludeId='1', IncludeScopeType='1'),
                  propertyOptions(IncludeRawValue='1')
              ),
-             testCase(
+             TestCase(
                  scopeFilter(MaxDepth='999', ScopeTypes={'ScopeType':'subgraph'}),
                  nestedFilter(),
                  propertiesToReturn(AllStatistics='1', AllAttributes='1'),
                  scopeOptions(IncludeMatchedScopesInResults='1', IncludeScope='1', IncludeId='1', IncludeScopeType='1'),
                  propertyOptions(IncludeMeasure='0')
              ),
-             testCase(
+             TestCase(
                  scopeFilter(MaxDepth='999', ScopeTypes={'ScopeType':'subgraph'}),
                  nestedFilter(),
                  propertiesToReturn(AllStatistics='1', AllAttributes='1'),
                  scopeOptions(IncludeMatchedScopesInResults='1', IncludeScope='1', IncludeId='1', IncludeScopeType='1'),
                  propertyOptions(IncludeCreator='0')
              ),
-             testCase(
+             TestCase(
                  scopeFilter(MaxDepth='999', ScopeTypes={'ScopeType':'subgraph'}),
                  nestedFilter(),
                  propertiesToReturn(AllStatistics='1', AllAttributes='1'),
                  scopeOptions(IncludeMatchedScopesInResults='1', IncludeScope='1', IncludeId='1', IncludeScopeType='1'),
                  propertyOptions(IncludeCreatorType='0')
              ),
-             testCase(
+             TestCase(
                  scopeFilter(MaxDepth='2', Scopes={'Scope':'w1:graph1:sg1'}),
                  nestedFilter(Depth=0),
                  propertiesToReturn(AllStatistics='1', AllAttributes='1'),
                  scopeOptions(IncludeMatchedScopesInResults='1', IncludeScope='1', IncludeId='1', IncludeScopeType='1'),
                  propertyOptions(IncludeName='1', IncludeRawValue='1', IncludeMeasure='1', IncludeCreator='1', IncludeCreatorType='1')
              ),
-             testCase(
+             TestCase(
                  scopeFilter(MaxDepth='2', Scopes={'Scope':'w1:graph1:sg1'}),
                  nestedFilter(Depth=1),
                  propertiesToReturn(Properties={'Property':['WhenStarted','WhenCreated']}, ExtraProperties={'Extra':{'scopeType':'edge','Properties':{'Property':['NumStarts','NumStops']}}}),
                  scopeOptions(IncludeMatchedScopesInResults='1', IncludeScope='1', IncludeId='1', IncludeScopeType='1'),
                  propertyOptions(IncludeName='1', IncludeRawValue='1', IncludeMeasure='1', IncludeCreator='1', IncludeCreatorType='1')
              ),
-             testCase(
+             TestCase(
                  scopeFilter(MaxDepth='999', PropertyFilters={'PropertyFilter':{'Name':'NumRowsProcessed','MinValue':'10000','MaxValue':'20000'}}),
                  nestedFilter(),
                  propertiesToReturn(AllStatistics='1', AllAttributes='1'),
                  scopeOptions(IncludeMatchedScopesInResults='1', IncludeScope='1', IncludeId='1', IncludeScopeType='1'),
                  propertyOptions(IncludeName='1', IncludeRawValue='1', IncludeMeasure='1', IncludeCreator='1', IncludeCreatorType='1')
              ),
-             testCase(
+             TestCase(
                  scopeFilter(MaxDepth='999', PropertyFilters={'PropertyFilter':{'Name':'NumIndexSeeks','MaxValue':'3'}}),
                  nestedFilter(),
                  propertiesToReturn(AllStatistics='1', AllAttributes='1'),
                  scopeOptions(IncludeMatchedScopesInResults='1', IncludeScope='1', IncludeId='1', IncludeScopeType='1'),
                  propertyOptions(IncludeName='1', IncludeRawValue='1', IncludeMeasure='1', IncludeCreator='1', IncludeCreatorType='1')
              ),
-             testCase(
+             TestCase(
                  scopeFilter(MaxDepth='999', PropertyFilters={'PropertyFilter':{'Name':'NumIndexSeeks','ExactValue':'4'}}),
                  nestedFilter(),
                  propertiesToReturn(AllStatistics='1', AllAttributes='1'),
                  scopeOptions(IncludeMatchedScopesInResults='1', IncludeScope='1', IncludeId='1', IncludeScopeType='1'),
                  propertyOptions(IncludeName='1', IncludeRawValue='1', IncludeMeasure='1', IncludeCreator='1', IncludeCreatorType='1')
              ),
-             testCase(
+             TestCase(
                  scopeFilter(MaxDepth='999', PropertyFilters={'PropertyFilter':[{'Name':'NumIndexSeeks','ExactValue':'4'},{'Name':'NumAllocations','MinValue':'5','MaxValue':'10'}]}),
                  nestedFilter(),
                  propertiesToReturn(AllStatistics='1', AllAttributes='1'),
                  scopeOptions(IncludeMatchedScopesInResults='1', IncludeScope='1', IncludeId='1', IncludeScopeType='1'),
                  propertyOptions(IncludeName='1', IncludeRawValue='1', IncludeMeasure='1', IncludeCreator='1', IncludeCreatorType='1')
              ),
-             testCase(
+             TestCase(
                  scopeFilter(ScopeTypes={'ScopeType':'workflow'}, MaxDepth='999',),
                  nestedFilter(Depth='0'),
                  propertiesToReturn(AllAttributes='1', Properties=[{'Property':'IdDependencyList'}]),
                  scopeOptions(IncludeMatchedScopesInResults='1', IncludeScope='1', IncludeId='1', IncludeScopeType='1'),
                  propertyOptions(IncludeName='1', IncludeRawValue='1', IncludeMeasure='1', IncludeCreator='1', IncludeCreatorType='1')
              ),
-             testCase(
+             TestCase(
                  scopeFilter(ScopeTypes={'ScopeType':'workflow'}, MaxDepth='999',),
                  nestedFilter(Depth='0'),
                  propertiesToReturn(Properties=[{'Property':'IdDependency'}]),
@@ -452,136 +309,36 @@ testCases = [
              ),
             ]
 
-def ExecTestCase(jobname, wuid, tcase, tcasename):
-    testfilename = jobname + '_' + tcasename
-    logging.debug('Executing %s',testfilename)
- 
-    keyfile = (tcasekeydir / testfilename).with_suffix('.json')
-    outfile = (resultdir / testfilename).with_suffix('.json')
-    errfile = outfile.with_suffix('.err')
-    reqfile = outfile.with_suffix('.req')
-    respfile = outfile.with_suffix('.resp')
-
-    if (outfile.exists()): outfile.unlink()
-    if (errfile.exists()): errfile.unlink()
-    if (reqfile.exists()): reqfile.unlink()
-    if (respfile.exists()): respfile.unlink()
-
-    try:
-        wuresp = wudetails.service.WUDetails(WUID=wuid,
-                                             ScopeFilter=tcase.ScopeFilter,
-                                             NestedFilter=tcase.NestedFilter,
-                                             PropertiesToReturn=tcase.PropertiesToReturn,
-                                             ScopeOptions=tcase.ScopeOptions,
-                                             PropertyOptions=tcase.PropertyOptions)
-    except:
-        logging.critical('Unable to submit WUDetails request')
-        raise
-
-    if (args.logreq):
-        with reqfile.open(mode='w') as f:
-            print (transport.xml_request, file=f)
-    if (args.logresp):
-        with respfile.open(mode='w') as f:
-            print (wuresp, file=f)
-    try:
-        if (wuresp['WUID']==wuid):
-            wuresp['WUID'] = '{masked WUID - matches request}'
-
-        maskoutFields(wuresp)
-    except:
-        logging.critical('Unable to process WUDetails response')
-        logging.critical('Request & response content in %s', str(errfile))
-        with errfile.open(mode='w') as f:
-            print ('====== Request ===========', file=f)
-            print (transport.xml_request, file=f)
-            print ('====== Response ==========', file=f)
-            print (wuresp, file=f)
-        return False
-
-    with outfile.open(mode='w') as f:
-        print (tcase, file=f)
-        print (wuresp, file=f)
-
-    if (not keyfile.exists()):       
-        logging.error('FAILED %s', testfilename)
-        logging.error('Missing key file %s', str(keyfile))
-        return False
-
-    # Compare actual and expectetd
-    if (not filecmp.cmp(str(outfile),str(keyfile))):
-        logging.error('FAILED %s', testfilename)
-        logging.error('WUDetails response %s', str(outfile))
-        return False
-    else:
-        logging.debug('PASSED %s', testfilename)
-        return True
-
-class Statistics:
-    def __init__(self):
-        self.successCount = 0
-        self.failureCount = 0 
-       
-    def addCount(self, success):
-        if (success):
-            self.successCount += 1
-        else:
-            self.failureCount += 1
-
-# To make this Python3.4 compatible
-def safeMkdir(path):
-    try:
-        path.mkdir(parents=True)
-    except FileExistsError as e:
-        # It is ok if alrady exists
-        pass
-    except (FileNotFoundError, PermissionError) as e:
-        logging.error("'%s' \nExit." % (str(e)))
-        exit(-1)
-    except:
-        print("Unexpected error:" + str(sys.exc_info()[0]) + " (line: " + str(inspect.stack()[0][2]) + ")" )
-        traceback.print_stack()
-        exit(-1)
-
-resultdir = Path(args.outdir)
-safeMkdir(resultdir)
-
-tcasekeydir = Path(keydir)
-safeMkdir(tcasekeydir)
-
 logging.info('Gathering workunits')
-try:
-    wu = GetTestWorkunits()
-except:
-    raise
+wu = wutest.getTestWorkunits(requiredJobs)
 
-logging.info('Matched job count: %d', len(wu))
-if (len(wu)==0):
+logging.info('Matched job count: %d', wutest.getMatchedJobCount())
+if (wutest.getMatchedJobCount()==0):
     logging.error('There are no matching jobs.  Has the regression suite been executed?')
     logging.error('Aborting')
     exit(1)
 
-missingjobs = GetMissingJobCount(wu)
+missingjobs = wutest.getMissingJobCount(requiredJobs)
 if (missingjobs > 0):
     logging.warning('There are %d missing jobs.  Full regression will not be executed', missingjobs)
 
 logging.info('Executing regression test cases')
-stats = Statistics()
+stats = wucommon.Statistics()
 for jobname, wuid in wu.items():
     logging.debug('Job %s (WUID %s)', jobname, wuid)
 
     if (jobname == 'sort_thor'):
-        for index, t in enumerate(testCases):
+        for index, t in enumerate(TestCases):
             tcasename = 'testcase' + str(index+1)
-            success = ExecTestCase(jobname, wuid, t, tcasename)
+            success = execTestCase(jobname, wuid, t, tcasename)
             stats.addCount(success)
     elif (jobname in ['sets_thor','sets_roxie', 'sets_hthor']):
-        success = ExecTestCase(jobname, wuid, testCases[30], 'testcase31')
+        success = execTestCase(jobname, wuid, TestCases[30], 'testcase31')
         stats.addCount(success)
-        success = ExecTestCase(jobname, wuid, testCases[31], 'testcase32')
+        success = execTestCase(jobname, wuid, TestCases[31], 'testcase32')
         stats.addCount(success)
     else:
-        success = ExecTestCase(jobname, wuid, testCases[0], 'testcase1')
+        success = execTestCase(jobname, wuid, TestCases[0], 'testcase1')
         stats.addCount(success)
 logging.info('Success count: %d', stats.successCount)
 logging.info('Failure count: %d', stats.failureCount)

+ 2 - 0
testing/regress/ecl/childindex.ecl

@@ -15,6 +15,8 @@
     limitations under the License.
 ############################################################################## */
 
+//nohthor
+
 //class=file
 //class=index
 //version multiPart=false

+ 2 - 2
testing/regress/ecl/cryptoplugin_pke.ecl

@@ -69,8 +69,8 @@ DATA signature := encModule.Sign((DATA)'The quick brown fox jumps over the lazy
 output( TRUE = encModule.VerifySignature(signature, (DATA)'The quick brown fox jumps over the lazy dog'));
 output(FALSE = encModule.VerifySignature(signature, (DATA)'Your Name Here'));
 
-DATA bogus := (DATA)'Not a valid signature';
-output(FALSE = encModule.VerifySignature(bogus, (DATA)'Not a valid signature'));
+DATA bogus := (DATA)'Not a valid signaturexxx';
+output(FALSE = encModule.VerifySignature(bogus, (DATA)'Not a valid signaturexxx'));
 
 
 DATA sig256Ex := encModule.Sign((DATA)'0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTTUVWXYZ`~!@#$%^&*()_-+=|}]{[":;?/>.<,');

+ 34 - 0
testing/regress/ecl/issue23168.ecl

@@ -0,0 +1,34 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2019 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+import dbglog from Std.System.log;
+
+namesRecord :=
+            RECORD
+unsigned        id;
+string          name;
+            END;
+
+names2 := DEDUP(NOCOMBINE(DATASET([{1,'Gavin'},{2,'Bill'},{3,'John'},{4,'Gerald'},{5,'Fluffy'}], namesRecord)),id);
+names1 := DEDUP(NOCOMBINE(DATASET([{1,'Oscar'},{2,'Charles'},{3,'Freddie'},{4,'Winifred'},{5,'Bouncer'}], namesRecord)), id);
+
+s := nofold(sort(names2, name));
+
+j := join(names1, s, left.id = right.id, transform(namesRecord, self.id := left.id + s[2].id; self.name := right.name), left outer, keep(1));
+
+output(names2);
+dbglog('Hello ' + (string)names2[3].name + ' and ' + (string)count(j) + 'again');

+ 7 - 0
testing/regress/ecl/key/issue23168.xml

@@ -0,0 +1,7 @@
+<Dataset name='Result 1'>
+ <Row><id>1</id><name>Gavin</name></Row>
+ <Row><id>2</id><name>Bill</name></Row>
+ <Row><id>3</id><name>John</name></Row>
+ <Row><id>4</id><name>Gerald</name></Row>
+ <Row><id>5</id><name>Fluffy</name></Row>
+</Dataset>

+ 3 - 0
testing/regress/ecl/key/keyed_join5.xml

@@ -55,3 +55,6 @@
 <Dataset name='Result 10'>
  <Row><key>0</key><f1>limit hit </f1><f2>          </f2></Row>
 </Dataset>
+<Dataset name='Result 11'>
+ <Row><Result_11>59</Result_11></Row>
+</Dataset>

+ 13 - 0
testing/regress/ecl/keyed_join5.ecl

@@ -89,6 +89,17 @@ j5 := TABLE(JOIN(GROUP(lhs, someid), i, LEFT.lhsKey=RIGHT.key, KEEP(2)), { lhsKe
 // test helper->getRowLimit, generated inside KJ by enclosing within LIMIT()
 j6 := LIMIT(JOIN(lhs, i, LEFT.lhsKey=RIGHT.key, doHKJoinTrans(LEFT, RIGHT), KEEP(3)), 2, onFail(TRANSFORM(rhsRec, SELF.f1 := 'limit hit'; SELF := [])));
 
+
+childFunc(unsigned v) := FUNCTION
+ j := JOIN(lhs, i, v>20 AND v<80 AND LEFT.someid=RIGHT.key);
+ RETURN IF(COUNT(j)>0, j[1].key, 0);
+END;
+
+parentDs := DATASET(100, TRANSFORM({unsigned4 id1; unsigned4 id2}, SELF.id1 := COUNTER; SELF.id2 := 0));
+j7 := PROJECT(parentDs, TRANSFORM(RECORDOF(parentDs), SELF.id2 := childFunc(LEFT.id1); SELF := LEFT));
+j7sumid2 := SUM(j7, id2);
+
+
 SEQUENTIAL(
  OUTPUT(rhs, , '~REGRESS::'+WORKUNIT+'::rhsDs', OVERWRITE);
  BUILD(i, OVERWRITE);
@@ -102,5 +113,7 @@ SEQUENTIAL(
 
   OUTPUT(j5);
   OUTPUT(j6);
+
+  OUTPUT(j7sumid2);
  );
 );

+ 11 - 0
thorlcr/activities/fetch/thfetchslave.cpp

@@ -292,6 +292,7 @@ protected:
 
     IPointerArrayOf<ISourceRowPrefetcher> prefetchers;
     IConstPointerArrayOf<ITranslator> translators;
+    bool initialized = false;
 
 public:
     IMPLEMENT_IINTERFACE_USING(CSlaveActivity);
@@ -310,6 +311,16 @@ public:
 
     virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
     {
+        if (initialized)
+        {
+            parts.kill();
+            offsetMapBytes.clear();
+            prefetchers.kill();
+            translators.kill();
+            eexp.clear();
+        }
+        else
+            initialized = true;
         unsigned numParts;
         data.read(numParts);
         offsetCount = 0;

+ 19 - 7
thorlcr/activities/indexread/thindexreadslave.cpp

@@ -70,6 +70,7 @@ protected:
     Owned<IKeyManager> keyMergerManager;
     Owned<IKeyIndexSet> keyIndexSet;
     IConstPointerArrayOf<ITranslator> translators;
+    bool initialized = false;
 
     rowcount_t keyedLimitCount = RCMAX;
     rowcount_t keyedLimit = RCMAX;
@@ -487,6 +488,13 @@ public:
         helper = (IHThorIndexReadBaseArg *)container->queryHelper();
         limitTransformExtra = nullptr;
         fixedDiskRecordSize = helper->queryDiskRecordSize()->querySerializedDiskMeta()->getFixedSize(); // 0 if variable and unused
+        allocator.set(queryRowAllocator());
+        deserializer.set(queryRowDeserializer());
+        serializer.set(queryRowSerializer());
+        helper->setCallback(&callback);
+        _statsArr.append(0);
+        _statsArr.append(0);
+        statsArr = _statsArr.getArray();
         reInit = 0 != (helper->getFlags() & (TIRvarfilename|TIRdynamicfilename));
     }
     rowcount_t getLocalCount(const rowcount_t keyedLimit, bool hard)
@@ -546,18 +554,22 @@ public:
         data.read(logicalFilename);
         if (!container.queryLocalOrGrouped())
             mpTag = container.queryJobChannel().deserializeMPTag(data); // channel to pass back partial counts for aggregation
+        if (initialized)
+        {
+            partDescs.kill();
+            keyIndexSet.clear();
+            translators.kill();
+            keyManagers.kill();
+            keyMergerManager.clear();
+        }
+        else
+            initialized = true;
+        
         unsigned parts;
         data.read(parts);
         if (parts)
             deserializePartFileDescriptors(data, partDescs);
         localKey = partDescs.ordinality() ? partDescs.item(0).queryOwner().queryProperties().getPropBool("@local", false) : false;
-        allocator.set(queryRowAllocator());
-        deserializer.set(queryRowDeserializer());
-        serializer.set(queryRowSerializer());
-        helper->setCallback(&callback);
-        _statsArr.append(0);
-        _statsArr.append(0);
-        statsArr = _statsArr.getArray();
         lastSeeks = lastScans = 0;
         localMerge = (localKey && partDescs.ordinality()>1) || seekGEOffset;
 

+ 3 - 3
thorlcr/activities/join/thjoinslave.cpp

@@ -176,7 +176,7 @@ public:
     ~JoinSlaveActivity()
     {
         if (portbase) 
-            freePort(portbase,NUMSLAVEPORTS);
+            queryJobChannel().freePort(portbase, NUMSLAVEPORTS);
     }
 
     virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
@@ -214,7 +214,7 @@ public:
             mpTagRPC = container.queryJobChannel().deserializeMPTag(data);
             mptag_t barrierTag = container.queryJobChannel().deserializeMPTag(data);
             barrier.setown(container.queryJobChannel().createBarrier(barrierTag));
-            portbase = allocPort(NUMSLAVEPORTS);
+            portbase = queryJobChannel().allocPort(NUMSLAVEPORTS);
             ActPrintLog("SortJoinSlaveActivity::init portbase = %d, mpTagRPC=%d",portbase,(int)mpTagRPC);
             server.setLocalHost(portbase); 
             sorter.setown(CreateThorSorter(this, server,&container.queryJob().queryIDiskUsage(),&queryJobChannel().queryJobComm(),mpTagRPC));
@@ -410,7 +410,7 @@ public:
         rightInput.clear();
         if (portbase)
         {
-            freePort(portbase, NUMSLAVEPORTS);
+            queryJobChannel().freePort(portbase, NUMSLAVEPORTS);
             portbase = 0;
         }
         CSlaveActivity::kill();

+ 1 - 13
thorlcr/activities/keyedjoin/thkeyedjoinslave-legacy.cpp

@@ -549,7 +549,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
     unsigned currentMatchIdx, currentJoinGroupSize, currentAdded, currentMatched;
     Owned<CJoinGroup> djg, doneJG;
     OwnedConstThorRow defaultRight;
-    unsigned portbase, node;
+    unsigned node;
     IArrayOf<IDelayedFile> fetchFiles;
     FPosTableEntry *localFPosToNodeMap; // maps fpos->local part #
     FPosTableEntry *globalFPosToNodeMap; // maps fpos->node for all parts of file. If file is remote, localFPosToNodeMap will have all parts
@@ -1657,7 +1657,6 @@ public:
         tlkKeySet.setown(createKeyIndexSet());
         pool = NULL;
         currentMatchIdx = currentJoinGroupSize = currentAdded = currentMatched = 0;
-        portbase = 0;
         pendingGroups = 0;
         superWidth = 0;
         additionalStats = 0;
@@ -1689,8 +1688,6 @@ public:
         }
         ::Release(fetchHandler);
         ::Release(inputHelper);
-        if (portbase)
-            freePort(portbase, NUMSLAVEPORTS*3);
         ::Release(resultDistStream);
         defaultRight.clear();
         if (pool) delete pool;
@@ -2075,15 +2072,6 @@ public:
             resultDistStream = new CKeyLocalLookup(*this, helper->queryIndexRecordSize()->queryRecordAccessor(true));
         }
     }
-    virtual void kill() override
-    {
-        if (portbase)
-        {
-            freePort(portbase, NUMSLAVEPORTS);
-            portbase = 0;
-        }
-        PARENT::kill();
-    }
     virtual void abort() override
     {
         PARENT::abort();

+ 47 - 12
thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp

@@ -158,7 +158,7 @@ static const unsigned defaultFetchLookupProcessBatchLimit = 10000;
 class CJoinGroup;
 
 
-enum AllocatorTypes { AT_Transform=1, AT_LookupWithJG, AT_LookupWithJGRef, AT_JoinFields, AT_FetchRequest, AT_FetchResponse, AT_JoinGroup, AT_JoinGroupRhsRows, AT_FetchDisk, AT_LookupResponse };
+enum AllocatorTypes { AT_Transform=1, AT_LookupWithJG, AT_JoinFields, AT_FetchRequest, AT_FetchResponse, AT_JoinGroup, AT_JoinGroupRhsRows, AT_FetchDisk, AT_LookupResponse };
 
 
 struct Row
@@ -695,9 +695,7 @@ class CJoinGroupList
     CJoinGroup *head = nullptr, *tail = nullptr;
     unsigned count = 0;
 
-public:
-    CJoinGroupList() { }
-    ~CJoinGroupList()
+    void removeAll()
     {
         while (head)
         {
@@ -706,6 +704,17 @@ public:
             head = next;
         }
     }
+public:
+    CJoinGroupList() { }
+    ~CJoinGroupList()
+    {
+        removeAll();
+    }
+    void clear()
+    {
+        removeAll();
+        head = tail = nullptr;
+    }
     inline unsigned queryCount() const { return count; }
     inline CJoinGroup *queryHead() const { return head; }
     inline CJoinGroup *queryTail() const { return tail; }
@@ -1475,7 +1484,8 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
             {
                 IPartDescriptor &part = activity.allIndexParts.item(partNo);
                 unsigned crc;
-                part.getCrc(crc);
+                if (!part.getCrc(crc))
+                    crc = 0;
                 RemoteFilename rfn;
                 part.getFilename(copy, rfn);
                 StringBuffer fname;
@@ -1486,6 +1496,13 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
                 activity.queryHelper()->serializeCreateContext(msg);
                 sizeMark.write();
 
+                size32_t parentExtractSz;
+                const byte *parentExtract = activity.queryGraph().queryParentExtract(parentExtractSz);
+                msg.append(parentExtractSz);
+                msg.append(parentExtractSz, parentExtract);
+                msg.append(activity.startCtxMb.length());
+                msg.append(activity.startCtxMb.length(), activity.startCtxMb.toByteArray());
+
                 msg.append(activity.messageCompression);
                 // NB: potentially translation per part could be different if dealing with superkeys
                 IPropertyTree &props = part.queryOwner().queryProperties();
@@ -2047,6 +2064,12 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
         }
     };
 
+    const unsigned startSpillAmountPercentage = 10;   // Initial percentage to spill, will grow if memory pressure keeps calling callback
+    const memsize_t startSpillAmount = 10 * 0x100000; // (10MB) Initial amount to try to spill
+    const memsize_t minRhsJGSpillSz = 1024;           // (1K) Threshold, if a join group is smaller than this, don't attempt to spill
+    const unsigned minimumQueueLimit = 5;             // When spilling the pending and group limits will gradually reduce until this limit
+    const unsigned unknownCopyNum = 0xff;             // in a partCopy denotes that a copy is unknown.
+
     IHThorKeyedJoinArg *helper = nullptr;
     StringAttr indexName;
     size32_t fixedRecordSize = 0;
@@ -2067,7 +2090,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
     CLimiter doneListLimiter;
 
     CPartDescriptorArray allDataParts;
-    IArrayOf<IPartDescriptor> allIndexParts;
+    CPartDescriptorArray allIndexParts;
     std::vector<unsigned> localIndexParts, localFetchPartMap;
     IArrayOf<IKeyIndex> tlkKeyIndexes;
     Owned<IEngineRowAllocator> joinFieldsAllocator;
@@ -2117,20 +2140,18 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
     std::atomic<bool> waitingForDoneGroups{false};
     Semaphore waitingForDoneGroupsSem;
     CJoinGroupList pendingJoinGroupList, doneJoinGroupList;
-    Owned<IException> abortLimitException;
     Owned<CJoinGroup> currentJoinGroup;
     unsigned currentMatchIdx = 0;
     CJoinGroup::JoinGroupRhsState rhsState;
 
-    unsigned spillAmountPercentage = 10;   // Initial percentage to spill, will grow if memory pressure keeps calling callback
-    memsize_t spillAmount = 10 * 0x100000; // (10MB) Initial amount to try to spill
-    memsize_t minRhsJGSpillSz = 1024; // (1K) Threshold, if a join group is smaller than this, don't attempt to spill
-    const unsigned minimumQueueLimit = 5; // When spilling the pending and group limits will gradually reduce until this limit
+    unsigned spillAmountPercentage = startSpillAmountPercentage;
+    memsize_t spillAmount = startSpillAmount;
     bool memoryCallbackInstalled = false;
 
     roxiemem::IRowManager *rowManager = nullptr;
     unsigned currentAdded = 0;
     unsigned currentJoinGroupSize = 0;
+    MemoryBuffer startCtxMb;
 
     Owned<IThorRowInterfaces> fetchInputMetaRowIf; // fetch request rows, header + fetch fields
     Owned<IThorRowInterfaces> fetchOutputMetaRowIf; // fetch request reply rows, header + [join fields as child row]
@@ -2138,7 +2159,6 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
 
     CriticalSection fetchFileCrit;
     std::vector<PartIO> openFetchParts;
-    const unsigned unknownCopyNum = 0xff; // in a partCopy denotes that a copy is unknown.
 
     PartIO getFetchPartIO(unsigned partNo, unsigned copy, bool compressed, bool encrypted)
     {
@@ -2855,11 +2875,18 @@ public:
             tlkKeyIndexes.kill();
             allIndexParts.kill();
             localIndexParts.clear();
+            localFetchPartMap.clear();
 
             allDataParts.kill();
             globalFPosToSlaveMap.clear();
             keyLookupHandlers.clear();
             fetchLookupHandlers.clear();
+            openFetchParts.clear();
+
+            indexPartToSlaveMap.clear();
+            dataPartToSlaveMap.clear();
+            tlkKeyManagers.kill();
+            partitionKey = false;
         }
         for (auto &a : statsArr)
             a = 0;
@@ -3005,6 +3032,7 @@ public:
         rowLimit = (rowcount_t)helper->getRowLimit();
         if (rowLimit < keepLimit)
             keepLimit = rowLimit+1; // if keepLimit is small, let it reach rowLimit+1, but any more is pointless and a waste of time/resources.
+        helper->serializeStartContext(startCtxMb.clear());
 
         inputHelper.set(input->queryFromActivity()->queryContainer().queryHelper());
         preserveOrder = 0 == (joinFlags & JFreorderable);
@@ -3014,12 +3042,19 @@ public:
         currentMatchIdx = 0;
         rhsState.clear();
         currentAdded = 0;
+        currentJoinGroupSize = 0;
         eos = false;
         endOfInput = false;
         lookupThreadLimiter.reset();
         fetchThreadLimiter.reset();
         keyLookupHandlers.init();
         fetchLookupHandlers.init();
+        pendingKeyLookupLimiter.reset();
+        doneListLimiter.reset();
+        pendingJoinGroupList.clear();
+        doneJoinGroupList.clear();
+        currentJoinGroup.clear();
+        waitingForDoneGroups = false;
 
         if ((pendingKeyLookupLimiter.queryMax() > minimumQueueLimit) || (doneListLimiter.queryMax() > minimumQueueLimit))
         {

+ 2 - 0
thorlcr/activities/loop/thloopslave.cpp

@@ -519,6 +519,8 @@ public:
                 if (condLoopCounter)
                     boundGraph->prepareCounterResult(*this, results, condLoopCounter, 0);
                 sendLoopingCount(loopCounter, 0);
+                size32_t parentExtractSz;
+                const byte *parentExtract = queryGraph().queryParentExtract(parentExtractSz);
                 boundGraph->queryGraph()->executeChild(parentExtractSz, parentExtract, results, loopResults);
             }
             int iNumResults = loopResults->count();

+ 3 - 3
thorlcr/activities/msort/thmsortslave.cpp

@@ -66,14 +66,14 @@ public:
     ~MSortSlaveActivity()
     {
         if (portbase) 
-            freePort(portbase,NUMSLAVEPORTS);
+            queryJobChannel().freePort(portbase,NUMSLAVEPORTS);
     }
     virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
     {
         mpTagRPC = container.queryJobChannel().deserializeMPTag(data);
         mptag_t barrierTag = container.queryJobChannel().deserializeMPTag(data);
         barrier.setown(container.queryJobChannel().createBarrier(barrierTag));
-        portbase = allocPort(NUMSLAVEPORTS);
+        portbase = queryJobChannel().allocPort(NUMSLAVEPORTS);
         ActPrintLog("MSortSlaveActivity::init portbase = %d, mpTagRPC = %d",portbase,(int)mpTagRPC);
         server.setLocalHost(portbase); 
         helper = (IHThorSortArg *)queryHelper();
@@ -183,7 +183,7 @@ public:
         }
         if (portbase)
         {
-            freePort(portbase, NUMSLAVEPORTS);
+            queryJobChannel().freePort(portbase, NUMSLAVEPORTS);
             portbase = 0;
         }
         PARENT::kill();

+ 3 - 3
thorlcr/activities/selfjoin/thselfjoinslave.cpp

@@ -109,7 +109,7 @@ public:
     ~SelfJoinSlaveActivity()
     {
         if(portbase) 
-            freePort(portbase,NUMSLAVEPORTS);
+            queryJobChannel().freePort(portbase, NUMSLAVEPORTS);
     }
 
 // IThorSlaveActivity
@@ -120,7 +120,7 @@ public:
             mpTagRPC = container.queryJobChannel().deserializeMPTag(data);
             mptag_t barrierTag = container.queryJobChannel().deserializeMPTag(data);
             barrier.setown(container.queryJobChannel().createBarrier(barrierTag));
-            portbase = allocPort(NUMSLAVEPORTS);
+            portbase = queryJobChannel().allocPort(NUMSLAVEPORTS);
             server.setLocalHost(portbase);
             sorter.setown(CreateThorSorter(this, server,&container.queryJob().queryIDiskUsage(),&queryJobChannel().queryJobComm(),mpTagRPC));
             server.serialize(slaveData);
@@ -146,7 +146,7 @@ public:
         sorter.clear();
         if (portbase)
         {
-            freePort(portbase, NUMSLAVEPORTS);
+            queryJobChannel().freePort(portbase, NUMSLAVEPORTS);
             portbase = 0;
         }
         PARENT::kill();

+ 49 - 2
thorlcr/graph/thgraph.cpp

@@ -27,6 +27,7 @@
 #include "thmem.hpp"
 #include "rtlformat.hpp"
 #include "thorsoapcall.hpp"
+#include "thorport.hpp"
 
 
 PointerArray createFuncs;
@@ -1385,6 +1386,7 @@ void CGraphBase::doExecute(size32_t parentExtractSz, const byte *parentExtract,
         throw MakeGraphException(this, 0, "subgraph aborted");
     }
     GraphPrintLog("Processing graph");
+    setParentCtx(parentExtractSz, parentExtract);
     Owned<IException> exception;
     try
     {
@@ -2936,6 +2938,8 @@ CJobChannel::CJobChannel(CJobBase &_job, IMPServer *_mpServer, unsigned _channel
     jobComm.setown(mpServer->createCommunicator(&job.queryJobGroup()));
     myrank = job.queryJobGroup().rank(queryMyNode());
     graphExecutor.setown(new CGraphExecutor(*this));
+    myBasePort = mpServer->queryMyNode()->endpoint().port;
+    portMap.setown(createBitSet());
 }
 
 CJobChannel::~CJobChannel()
@@ -3055,6 +3059,51 @@ IThorResult *CJobChannel::getOwnedResult(graph_id gid, activity_id ownerId, unsi
     return result.getClear();
 }
 
+unsigned short CJobChannel::allocPort(unsigned num)
+{
+    CriticalBlock block(portAllocCrit);
+    if (num==0)
+        num = 1;
+    unsigned sp=0;
+    unsigned p;
+    for (;;)
+    {
+        p = portMap->scan(sp, false);
+        unsigned q;
+        for (q=p+1; q<p+num; q++)
+        {
+            if (portMap->test(q))
+                break;
+        }
+        if (q == p+num)
+        {
+            while (q != p)
+                portMap->set(--q);
+            break;
+        }
+        sp = p+1;
+    }
+
+    return (unsigned short)(p+queryMyBasePort());
+}
+
+void CJobChannel::freePort(unsigned short p, unsigned num)
+{
+    CriticalBlock block(portAllocCrit);
+    if (!p)
+        return;
+    if (num == 0)
+        num = 1;
+    while (num--) 
+        portMap->set(p-queryMyBasePort()+num, false);
+}
+
+void CJobChannel::reservePortKind(ThorPortKind kind)
+{
+    CriticalBlock block(portAllocCrit);
+    portMap->set(getPortOffset(kind), true);
+}
+
 void CJobChannel::abort(IException *e)
 {
     aborted = true;
@@ -3096,8 +3145,6 @@ CActivityBase::CActivityBase(CGraphElementBase *_container) : container(*_contai
     mpTag = TAG_NULL;
     abortSoon = receiving = cancelledReceive = initialized = reInit = false;
     baseHelper.set(container.queryHelper());
-    parentExtractSz = 0;
-    parentExtract = NULL;
 
     defaultRoxieMemHeapFlags = (roxiemem::RoxieHeapFlags)container.getOptInt("heapflags", defaultHeapFlags);
     if (container.queryJob().queryUsePackedAllocators())

+ 14 - 4
thorlcr/graph/thgraph.hpp

@@ -47,6 +47,7 @@
 #include "workunit.hpp"
 #include "thorcommon.hpp"
 #include "thmem.hpp"
+#include "thorport.hpp"
 
 #include "thor.hpp"
 #include "eclhelper.hpp"
@@ -658,6 +659,11 @@ public:
         parentExtractMb.swapWith(newParentExtract);
         return (const byte *)parentExtractMb.toByteArray();
     }
+    const byte *queryParentExtract(size32_t &sz) const
+    {
+        sz = parentExtractSz;
+        return (const byte *)parentExtractMb.toByteArray();
+    }
     virtual ICodeContext *queryCodeContext() { return &graphCodeContext; }
     void setLoopCounter(unsigned _counter) { counter = _counter; }
     unsigned queryLoopCounter() const { return counter; }
@@ -861,7 +867,7 @@ public:
 
     inline bool queryUsePackedAllocators() const { return usePackedAllocator; }
     unsigned queryMaxLfnBlockTimeMins() const { return maxLfnBlockTimeMins; }
-    virtual void addChannel(IMPServer *mpServer) = 0;
+    virtual CJobChannel *addChannel(IMPServer *mpServer) = 0;
     CJobChannel &queryJobChannel(unsigned c) const;
     CActivityBase &queryChannelActivity(unsigned c, graph_id gid, activity_id id) const;
     unsigned queryChannelsPerSlave() const { return channelsPerSlave; }
@@ -971,6 +977,9 @@ protected:
     Owned<CThorCodeContextBase> sharedMemCodeCtx;
     unsigned channel;
     bool cleaned = false;
+    unsigned myBasePort = 0;
+    CriticalSection portAllocCrit;
+    Owned<IBitSet> portMap;
 
     void removeAssociates(CGraphBase &graph)
     {
@@ -1026,9 +1035,13 @@ public:
     ICommunicator &queryJobComm() const { return *jobComm; }
     IMPServer &queryMPServer() const { return *mpServer; }
     const rank_t &queryMyRank() const { return myrank; }
+    unsigned queryMyBasePort() const { return myBasePort; }
     mptag_t deserializeMPTag(MemoryBuffer &mb);
     IEngineRowAllocator *getRowAllocator(IOutputMetaData * meta, activity_id activityId, roxiemem::RoxieHeapFlags flags=roxiemem::RHFnone) const;
     roxiemem::IRowManager *queryRowManager() const;
+    unsigned short allocPort(unsigned num);
+    void freePort(unsigned short p, unsigned num);
+    void reservePortKind(ThorPortKind type);
 
     virtual void abort(IException *e);
     virtual IBarrier *createBarrier(mptag_t tag) { UNIMPLEMENTED; return NULL; }
@@ -1064,8 +1077,6 @@ protected:
     mptag_t mpTag; // to be used by any direct inter master<->slave communication
     bool abortSoon;
     bool timeActivities; // purely for access efficiency
-    size32_t parentExtractSz;
-    const byte *parentExtract;
     bool receiving, cancelledReceive, initialized, reInit;
     Owned<IThorGraphResults> ownedResults; // NB: probably only to be used by loop results
 
@@ -1089,7 +1100,6 @@ public:
     inline bool queryTimeActivities() const { return timeActivities; }
     inline roxiemem::RoxieHeapFlags queryHeapFlags() const { return defaultRoxieMemHeapFlags; }
 
-    void onStart(size32_t _parentExtractSz, const byte *_parentExtract) { parentExtractSz = _parentExtractSz; parentExtract = _parentExtract; }
     bool receiveMsg(ICommunicator &comm, CMessageBuffer &mb, const rank_t rank, const mptag_t mpTag, rank_t *sender=NULL, unsigned timeout=MP_WAIT_FOREVER);
     bool receiveMsg(CMessageBuffer &mb, const rank_t rank, const mptag_t mpTag, rank_t *sender=NULL, unsigned timeout=MP_WAIT_FOREVER);
     void cancelReceiveMsg(ICommunicator &comm, const rank_t rank, const mptag_t mpTag);

+ 19 - 10
thorlcr/graph/thgraphmaster.cpp

@@ -1359,7 +1359,11 @@ CJobMaster::CJobMaster(IConstWorkUnit &_workunit, const char *graphName, ILoaded
     }
     sharedAllocator.setown(::createThorAllocator(globalMemoryMB, 0, 1, memorySpillAtPercentage, *logctx, crcChecking, usePackedAllocator));
     Owned<IMPServer> mpServer = getMPServer();
-    addChannel(mpServer);
+    CJobChannel *channel = addChannel(mpServer);
+    channel->reservePortKind(TPORT_mp); 
+    channel->reservePortKind(TPORT_watchdog);
+    channel->reservePortKind(TPORT_debug);
+
     slavemptag = allocateMPTag();
     slaveMsgHandler.setown(new CSlaveMessageHandler(*this, slavemptag));
     tmpHandler.setown(createTempHandler(true));
@@ -1377,9 +1381,11 @@ void CJobMaster::endJob()
     PARENT::endJob();
 }
 
-void CJobMaster::addChannel(IMPServer *mpServer)
+CJobChannel *CJobMaster::addChannel(IMPServer *mpServer)
 {
-    jobChannels.append(*new CJobMasterChannel(*this, mpServer, jobChannels.ordinality()));
+    CJobChannel *channel = new CJobMasterChannel(*this, mpServer, jobChannels.ordinality());
+    jobChannels.append(*channel);
+    return channel;
 }
 
 
@@ -2961,19 +2967,22 @@ CTimingInfo::CTimingInfo(CJobBase &ctx) : CThorStats(ctx, StTimeLocalExecute)
 
 ProgressInfo::ProgressInfo(CJobBase &ctx) : CThorStats(ctx, StNumRowsProcessed)
 {
-    startcount = stopcount = 0;
+    startCount = stopCount = 0;
 }
 void ProgressInfo::processInfo() // reimplement as counts have special flags (i.e. stop/start)
 {
     reset();
-    startcount = stopcount = 0;
+    startCount = stopCount = 0;
     ForEachItemIn(n, counts)
     {
         unsigned __int64 thiscount = counts.item(n);
-        if (thiscount & THORDATALINK_STARTED)
-            startcount++;
         if (thiscount & THORDATALINK_STOPPED)
-            stopcount++;
+        {
+            startCount++;
+            stopCount++;
+        }
+        else if (thiscount & THORDATALINK_STARTED)
+            startCount++;
         thiscount = thiscount & THORDATALINK_COUNT_MASK;
         tallyValue(thiscount, n+1);
     }
@@ -2985,8 +2994,8 @@ void ProgressInfo::getStats(IStatisticGatherer & stats)
     CThorStats::getStats(stats, true);
     stats.addStatistic(kind, tot);
     stats.addStatistic(StNumSlaves, counts.ordinality());
-    stats.addStatistic(StNumStarts, startcount);
-    stats.addStatistic(StNumStops, stopcount);
+    stats.addStatistic(StNumStarts, startCount);
+    stats.addStatistic(StNumStops, stopCount);
 }
 
 

+ 2 - 2
thorlcr/graph/thgraphmaster.ipp

@@ -119,7 +119,7 @@ public:
 
 class graphmaster_decl ProgressInfo : public CThorStats
 {
-    unsigned startcount, stopcount;
+    unsigned startCount, stopCount;
 public:
     ProgressInfo(CJobBase &ctx);
 
@@ -215,7 +215,7 @@ public:
     CJobMaster(IConstWorkUnit &workunit, const char *_graphName, ILoadedDllEntry *querySo, bool _sendSo, const SocketEndpoint &_agentEp);
     virtual void endJob() override;
 
-    virtual void addChannel(IMPServer *mpServer);
+    virtual CJobChannel *addChannel(IMPServer *mpServer) override;
 
     void registerFile(const char *logicalName, StringArray &clusters, unsigned usageCount=0, WUFileKind fileKind=WUFileStandard, bool temp=false);
     void deregisterFile(const char *logicalName, bool kept=false);

+ 2 - 1
thorlcr/graph/thgraphslave.cpp

@@ -1703,7 +1703,7 @@ CJobSlave::CJobSlave(ISlaveWatchdog *_watchdog, IPropertyTree *_workUnitInfo, co
         actInitWaitTimeMins = queryMaxLfnBlockTimeMins()+1;
 }
 
-void CJobSlave::addChannel(IMPServer *mpServer)
+CJobChannel *CJobSlave::addChannel(IMPServer *mpServer)
 {
     unsigned nextChannelNum = jobChannels.ordinality();
     CJobSlaveChannel *channel = new CJobSlaveChannel(*this, mpServer, nextChannelNum);
@@ -1711,6 +1711,7 @@ void CJobSlave::addChannel(IMPServer *mpServer)
     unsigned slaveNum = channel->queryMyRank();
     jobChannelSlaveNumbers[nextChannelNum] = slaveNum;
     jobSlaveChannelNum[slaveNum-1] = nextChannelNum;
+    return channel;
 }
 
 void CJobSlave::startJob()

+ 3 - 2
thorlcr/graph/thgraphslave.hpp

@@ -79,7 +79,8 @@ public:
 
     inline void dataLinkStop()
     {
-        count = (count & THORDATALINK_COUNT_MASK) | THORDATALINK_STOPPED;
+        if (hasStarted())
+            count = (count & THORDATALINK_COUNT_MASK) | THORDATALINK_STOPPED;
 #ifdef _TESTING
         owner.ActPrintLog("ITDL output %d stopped, count was %" RCPF "d", outputId, getDataLinkCount());
 #endif
@@ -486,7 +487,7 @@ public:
 
     CJobSlave(ISlaveWatchdog *_watchdog, IPropertyTree *workUnitInfo, const char *graphName, ILoadedDllEntry *querySo, mptag_t _slavemptag);
 
-    virtual void addChannel(IMPServer *mpServer);
+    virtual CJobChannel *addChannel(IMPServer *mpServer) override;
     virtual void startJob() override;
     virtual void endJob() override;
     const char *queryFindString() const { return key.get(); } // for string HT

+ 37 - 5
thorlcr/slave/slavmain.cpp

@@ -454,6 +454,7 @@ class CKJService : public CSimpleInterfaceOf<IKJService>, implements IThreaded,
         Linked<CKeyLookupContext> ctx;
         Owned<IKeyManager> keyManager;
         unsigned handle = 0;
+        Owned<IHThorKeyedJoinArg> helper;
     public:
         CKMContainer(CKJService &_service, CKeyLookupContext *_ctx)
             : service(_service), ctx(_ctx)
@@ -464,6 +465,7 @@ class CKJService : public CSimpleInterfaceOf<IKJService>, implements IThreaded,
             if (translator)
                 keyManager->setLayoutTranslator(translator);
             handle = service.getUniqId();
+            helper.set(ctx->queryHelper());
         }
         ~CKMContainer()
         {
@@ -472,6 +474,14 @@ class CKJService : public CSimpleInterfaceOf<IKJService>, implements IThreaded,
         CKeyLookupContext &queryCtx() const { return *ctx; }
         IKeyManager *queryKeyManager() const { return keyManager; }
         unsigned queryHandle() const { return handle; }
+        void setContexts(MemoryBuffer &parentCtxMb, MemoryBuffer &startCtxMb, MemoryBuffer &createCtxMb)
+        {
+            // Only create a new helper, if either parent or start are present, in which case onStart evaluation may vary.
+            if (parentCtxMb.length() || startCtxMb.length())
+                helper.setown(service.createHelper(*service.currentJob, ctx->queryKey().id, createCtxMb));
+            helper->onStart((const byte *)parentCtxMb.toByteArray(), startCtxMb.length() ? &startCtxMb : nullptr);
+        }
+        inline IHThorKeyedJoinArg *queryHelper() const { return helper; }
     };
     template<class KEY, class ITEM>
     class CKeyedCacheEntry : public CInterface
@@ -505,7 +515,6 @@ class CKJService : public CSimpleInterfaceOf<IKJService>, implements IThreaded,
     {
     protected:
         Linked<CActivityContext> activityCtx;
-        IHThorKeyedJoinArg *helper;
         std::vector<const void *> rows;
         rank_t sender;
         mptag_t replyTag;
@@ -516,7 +525,6 @@ class CKJService : public CSimpleInterfaceOf<IKJService>, implements IThreaded,
         CLookupRequest(CActivityContext *_activityCtx, rank_t _sender, mptag_t _replyTag)
             : activityCtx(_activityCtx), sender(_sender), replyTag(_replyTag)
         {
-            helper = activityCtx->queryHelper();
         }
         ~CLookupRequest()
         {
@@ -654,6 +662,7 @@ class CKJService : public CSimpleInterfaceOf<IKJService>, implements IThreaded,
     };
     class CKeyLookupRequest : public CLookupRequest
     {
+        IHThorKeyedJoinArg *helper = nullptr;
         Linked<CKMContainer> kmc;
 
         rowcount_t abortLimit = 0;
@@ -719,6 +728,7 @@ class CKJService : public CSimpleInterfaceOf<IKJService>, implements IThreaded,
         CKeyLookupRequest(CKJService &_service, CKeyLookupContext *_ctx, CKMContainer *_kmc, rank_t _sender, mptag_t _replyTag)
             : CLookupRequest(_ctx->queryActivityCtx(), _sender, _replyTag), kmc(_kmc)
         {
+            helper = kmc->queryHelper();
             allocator = activityCtx->queryLookupInputAllocator();
             deserializer = activityCtx->queryLookupInputDeserializer();
             joinFieldsAllocator = activityCtx->queryJoinFieldsAllocator();
@@ -818,6 +828,7 @@ class CKJService : public CSimpleInterfaceOf<IKJService>, implements IThreaded,
     };
     class CFetchLookupRequest : public CLookupRequest
     {
+        IHThorKeyedJoinArg *helper = nullptr;
         Linked<CFetchContext> fetchContext;
         const unsigned defaultMaxFetchLookupReplySz = 0x100000;
         const IDynamicTransform *translator = nullptr;
@@ -877,6 +888,7 @@ class CKJService : public CSimpleInterfaceOf<IKJService>, implements IThreaded,
             StringBuffer tracing;
             translator = fetchContext->queryTranslator(fetchContext->queryKey().getTracing(tracing));
             prefetcher = fetchContext->queryPrefetcher();
+            helper = queryCtx().queryHelper();
         }
         virtual void process(bool &abortSoon) override
         {
@@ -977,7 +989,7 @@ class CKJService : public CSimpleInterfaceOf<IKJService>, implements IThreaded,
     CriticalSection kMCrit, lCCrit;
     Owned<IThreadPool> processorPool;
 
-    CActivityContext *createActivityContext(CJobBase &job, activity_id id, MemoryBuffer &createCtxMb)
+    IHThorKeyedJoinArg *createHelper(CJobBase &job, activity_id id, MemoryBuffer &createCtxMb)
     {
         VStringBuffer helperName("fAc%u", (unsigned)id);
         EclHelperFactory helperFactory = (EclHelperFactory) job.queryDllEntry().getEntry(helperName.str());
@@ -987,7 +999,13 @@ class CKJService : public CSimpleInterfaceOf<IKJService>, implements IThreaded,
         ICodeContext &codeCtx = job.queryJobChannel(0).querySharedMemCodeContext();
         Owned<IHThorKeyedJoinArg> helper = static_cast<IHThorKeyedJoinArg *>(helperFactory());
         helper->onCreate(&codeCtx, nullptr, &createCtxMb); // JCS->GH - will I ever need colocalParent here?
-        return new CActivityContext(*this, id, helper.getClear(), &codeCtx);
+        return helper.getClear();
+    }
+    CActivityContext *createActivityContext(CJobBase &job, activity_id id, MemoryBuffer &createCtxMb)
+    {
+        IHThorKeyedJoinArg *helper = createHelper(job, id, createCtxMb);
+        ICodeContext &codeCtx = job.queryJobChannel(0).querySharedMemCodeContext();
+        return new CActivityContext(*this, id, helper, &codeCtx);
     }
     CActivityContext *ensureActivityContext(CJobBase &job, activity_id id, MemoryBuffer &createCtxMb)
     {
@@ -1290,6 +1308,16 @@ public:
                         MemoryBuffer createCtxMb;
                         createCtxMb.setBuffer(createCtxSz, (void *)msg.readDirect(createCtxSz)); // NB: read only
 
+                        size32_t parentCtxSz;
+                        msg.read(parentCtxSz);
+                        MemoryBuffer parentCtxMb;
+                        parentCtxMb.setBuffer(parentCtxSz, (void *)msg.readDirect(parentCtxSz)); // NB: read only
+
+                        size32_t startCtxSz;
+                        msg.read(startCtxSz);
+                        MemoryBuffer startCtxMb;
+                        startCtxMb.setBuffer(startCtxSz, (void *)msg.readDirect(startCtxSz)); // NB: read only
+
                         bool created;
                         Owned<CKeyLookupContext> keyLookupContext = ensureKeyLookupContext(*currentJob, key, createCtxMb, &created); // ensure entry in keyLookupContextsHT, will be removed by last CKMContainer
                         bool messageCompression;
@@ -1313,6 +1341,7 @@ public:
                                 keyLookupContext->setTranslation(translationMode, publishedFormat, publishedFormatCrc, projectedFormat);
                         }
                         Owned<CKMContainer> kmc = createActiveKeyManager(keyLookupContext); // owns keyLookupContext
+                        kmc->setContexts(parentCtxMb, startCtxMb, createCtxMb);
                         processKeyLookupRequest(msg, kmc, sender, replyTag);
                         break;
                     }
@@ -1839,7 +1868,10 @@ public:
                         Owned<CJobSlave> job = new CJobSlave(watchdog, workUnitInfo, graphName, querySo, slaveMsgTag);
                         job->setXGMML(deps);
                         for (unsigned sc=0; sc<channelsPerSlave; sc++)
-                            job->addChannel(&mpServers.item(sc));
+                        {
+                            CJobChannel *channel = job->addChannel(&mpServers.item(sc));
+                            channel->reservePortKind(TPORT_mp);
+                        }
                         jobs.replace(*job.getLink());
                         job->startJob();
 

+ 1 - 1
thorlcr/slave/thslavemain.cpp

@@ -370,7 +370,7 @@ int main( int argc, char *argv[]  )
 
         setSlaveAffinity(globals->getPropInt("@SLAVEPROCESSNUM"));
 
-        startMPServer(getFixedPort(TPORT_mp));
+        startMPServer(DCR_ThorSlave, getFixedPort(TPORT_mp), false);
 
         if (globals->getPropBool("@MPChannelReconnect"))
             getMPServer()->setOpt(mpsopt_channelreopen, "true");

+ 18 - 66
thorlcr/thorutil/thorport.cpp

@@ -35,27 +35,11 @@
 #include "portlist.h"
 #include "thorport.hpp"
 
+// NB: these are offsets from slave/channel start port
 #define MPPORT       0
 #define WATCHDOGPORT 1
 #define DEBUGPORT 2
 
-static CriticalSection *portallocsection;
-static IBitSet *portmap;
-MODULE_INIT(INIT_PRIORITY_STANDARD)
-{
-    portallocsection = new CriticalSection;
-    portmap = createThreadSafeBitSet();
-    portmap->set(MPPORT, true);
-    portmap->set(WATCHDOGPORT, true);
-    portmap->set(DEBUGPORT, true);
-    return true;
-}
-MODULE_EXIT()
-{
-    portmap->Release();
-    delete portallocsection;
-}
-
 static unsigned short masterportbase=0;
 static unsigned short machineportbase=0;
 
@@ -74,56 +58,9 @@ unsigned short getExternalFixedPort(unsigned short masterBase, unsigned short ma
 {
     if (!masterBase) masterBase = THOR_BASE_PORT;
     if (!machineBase) machineBase = THOR_BASESLAVE_PORT;
-    switch (category) {
-    case TPORT_watchdog:
-        return machineBase+WATCHDOGPORT;
-    case TPORT_mp:
-        return machineBase+MPPORT; 
-    case TPORT_debug:
-        return machineBase+DEBUGPORT;
-    }
-    LOG(MCerror,unknownJob,"getFixedPort: Unknown Port Kind!");
-    return 0;
-}
-
-unsigned short allocPort(unsigned num)
-{
-    CriticalBlock proc(*portallocsection);
-    if (num==0)
-        num = 1;
-    unsigned sp=0;
-    unsigned p;
-    for (;;) {
-        p = portmap->scan(sp,false);
-        unsigned q;
-        for (q=p+1;q<p+num;q++) {
-            if (portmap->test(q))
-                break;
-        }
-        if (q==p+num) {
-            while (q!=p)
-                portmap->set(--q);
-            break;
-        }
-        sp=p+1;
-    }
-
-    return (unsigned short)(p+machineportbase);
-}
-
-void freePort(unsigned short p,unsigned num)
-{
-    CriticalBlock proc(*portallocsection);
-    if (!p)
-        return;
-    if (!portmap) 
-        return;
-    if (num==0)
-        num = 1;
-    while (num--) 
-        portmap->set(p-machineportbase+num,false);
+    return machineBase + getPortOffset(category);
 }
-        
+  
 void setMachinePortBase(unsigned short base)
 {
     machineportbase = base?base:THOR_BASESLAVE_PORT;
@@ -143,3 +80,18 @@ unsigned short getMachinePortBase()
 {
     return machineportbase?machineportbase:THOR_BASESLAVE_PORT;
 }
+
+unsigned getPortOffset(ThorPortKind category)
+{
+    switch (category)
+    {
+        case TPORT_watchdog:
+            return WATCHDOGPORT;
+        case TPORT_mp:
+            return MPPORT; 
+        case TPORT_debug:
+            return DEBUGPORT;
+        default:
+            throwUnexpected();
+    }
+}

+ 2 - 32
thorlcr/thorutil/thorport.hpp

@@ -35,44 +35,14 @@ enum ThorPortKind
     TPORT_debug
 };
 
+// NB: these helpers are all based on the slave or master base port and do not relate to channels
 graph_decl unsigned short getFixedPort(ThorPortKind category);
 graph_decl unsigned short getFixedPort(unsigned short base, ThorPortKind category);
 graph_decl unsigned short getExternalFixedPort(unsigned short masterbase, unsigned short machinebase, ThorPortKind category);
-graph_decl unsigned short allocPort(unsigned num=1);
-graph_decl void           freePort(unsigned short,unsigned num=1);
 graph_decl void           setMachinePortBase(unsigned short base);
 graph_decl void           setMasterPortBase(unsigned short base);
 graph_decl unsigned short         getMasterPortBase();
 graph_decl unsigned short         getMachinePortBase();
-
-typedef UnsignedShortArray PortArray;
-
-class CPortGroup
-{
-public:
-    unsigned short allocPort(unsigned n=1)
-    {
-        unsigned short p=::allocPort(n);
-        while (n--)
-            portsinuse.append(p+n);
-        return p;
-    }
-    void freePort(unsigned short p,unsigned n=1)
-    {
-        unsigned i;
-        for (i=0;i<n;i++)
-            portsinuse.zap(p+i);
-        ::freePort(p,n);
-    }
-    virtual ~CPortGroup()
-    {
-        ForEachItemIn(i,portsinuse) {
-            freePort(portsinuse.item(i));
-        }
-    }
-protected:
-    PortArray portsinuse;
-};
-
+graph_decl unsigned getPortOffset(ThorPortKind category);
 
 #endif