Просмотр исходного кода

Merge branch 'candidate-6.4.22'

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 7 лет назад
Родитель
Сommit
a2abf03dfa

+ 3 - 2
dali/dfu/dfurun.cpp

@@ -1604,8 +1604,9 @@ public:
                         if (needrep)
                             feedback.repmode=cProgressReporter::REPbefore;
                         fsys.import(fdesc, dstFile, recovery, recoveryconn, filter, opttree, &feedback, &abortnotify, dfuwuid);
-                        if (!abortnotify.abortRequested()) {
-                            if (needrep)
+                        if (!abortnotify.abortRequested())
+                        {
+                            if (needrep && !recovery->getPropBool("@noFileMatch"))
                                 replicating = true;
                             else
                                 dstFile->attach(dstName.get(), userdesc);

+ 8 - 3
dali/ft/filecopy.cpp

@@ -2875,8 +2875,13 @@ void FileSprayer::spray()
     aindex_t sourceSize = sources.ordinality();
     bool failIfNoSourceFile = options->getPropBool("@failIfNoSourceFile");
 
-    if ((sourceSize == 0) && failIfNoSourceFile)
-        throwError(DFTERR_NoFilesMatchWildcard);
+    if (sourceSize == 0)
+    {
+       if (failIfNoSourceFile)
+           throwError(DFTERR_NoFilesMatchWildcard);
+       else
+           progressTree->setPropBool("@noFileMatch", true);
+    }
 
     LOG(MCdebugInfo, job, "compressedInput:%d, compressOutput:%d", compressedInput, compressOutput);
 
@@ -3250,7 +3255,7 @@ void FileSprayer::updateTargetProperties()
                 distributedSource->queryPart(0).getFilename(remoteFile, 0);
                 splitAndCollectFileInfo(newRecord, remoteFile);
             }
-            else
+            else if (sources.ordinality())
             {
                 FilePartInfo & firstSource = sources.item((aindex_t)0);
                 RemoteFilename &remoteFile = firstSource.filename;

+ 11 - 8
ecl/hql/hqlrepository.cpp

@@ -73,15 +73,18 @@ void getImplicitScopes(HqlScopeArray& implicitScopes, IEclRepository * repositor
 extern HQL_API void importRootModulesToScope(IHqlScope * scope, HqlLookupContext & ctx)
 {
     IEclRepository * eclRepository = ctx.queryRepository();
-    HqlScopeArray rootScopes;
-    getRootScopes(rootScopes, eclRepository, ctx);
-    ForEachItemIn(i, rootScopes)
+    if (eclRepository)
     {
-        IHqlScope & cur = rootScopes.item(i);
-        IIdAtom * curName = cur.queryId();
-        OwnedHqlExpr resolved = eclRepository->queryRootScope()->lookupSymbol(curName, LSFpublic, ctx);
-        if (resolved)
-            scope->defineSymbol(curName, NULL, resolved.getClear(), false, true, ob_import);
+        HqlScopeArray rootScopes;
+        getRootScopes(rootScopes, eclRepository, ctx);
+        ForEachItemIn(i, rootScopes)
+        {
+            IHqlScope & cur = rootScopes.item(i);
+            IIdAtom * curName = cur.queryId();
+            OwnedHqlExpr resolved = eclRepository->queryRootScope()->lookupSymbol(curName, LSFpublic, ctx);
+            if (resolved)
+                scope->defineSymbol(curName, NULL, resolved.getClear(), false, true, ob_import);
+        }
     }
 }
 

+ 10 - 0
ecl/hqlcpp/hqltcppc.cpp

@@ -735,6 +735,16 @@ void CMemberInfo::gatherMaxRowSize(SizeStruct & totalSize, IHqlExpression * newS
 }
 
 
+void CMemberInfo::checkConditionalAssignOk(HqlCppTranslator & translator, BuildCtx & ctx, IReferenceSelector * selector, size32_t fixedSize)
+{
+    if (isConditional())
+    {
+        OwnedHqlExpr size = getSizetConstant(fixedSize);
+        checkAssignOk(translator, ctx, selector, size, 0);
+    }
+}
+
+
 void CMemberInfo::checkAssignOk(HqlCppTranslator & translator, BuildCtx & ctx, IReferenceSelector * selector, IHqlExpression * newSize, unsigned fixedExtra)
 {
     //If no size beyond the constant value then this can't be increasing the size of the row => no need to check

+ 1 - 0
ecl/hqlcpp/hqltcppc.ipp

@@ -126,6 +126,7 @@ protected:
     void doBuildDeserialize(HqlCppTranslator & translator, BuildCtx & ctx, IReferenceSelector * selector, IHqlExpression * helper, IHqlExpression * boundSize);
     void doBuildSkipInput(HqlCppTranslator & translator, BuildCtx & ctx, IHqlExpression * helper, size32_t size);
     void checkAssignOk(HqlCppTranslator & translator, BuildCtx & ctx, IReferenceSelector * selector, IHqlExpression * newLength, unsigned fixedExtra);
+    void checkConditionalAssignOk(HqlCppTranslator & translator, BuildCtx & ctx, IReferenceSelector * selector, size32_t fixedSize);
     void defaultSetColumn(HqlCppTranslator & translator, BuildCtx & ctx, IReferenceSelector * selector, IHqlExpression * value);
     void ensureTargetAvailable(HqlCppTranslator & translator, BuildCtx & ctx, IReferenceSelector * selector, size32_t thisSize);
     IHqlExpression * createSelfPeekDeserializer(HqlCppTranslator & translator, IHqlExpression * helper);

+ 3 - 2
ecl/hqlcpp/hqltcppc2.cpp

@@ -652,8 +652,7 @@ IHqlExpression * CChildLinkedDatasetColumnInfo::buildSizeOfUnbound(HqlCppTransla
 
 void CChildLinkedDatasetColumnInfo::buildDeserialize(HqlCppTranslator & translator, BuildCtx & ctx, IReferenceSelector * selector, IHqlExpression * helper, IAtom * serializeFormat)
 {
-    if (isConditional())
-        checkAssignOk(translator, ctx, selector, queryZero(), sizeof(size32_t) + sizeof(const byte * *));
+    //There is no need to check for enough space in the row because that has been done in CIfBlockInfo::buildDeserialize
 
     OwnedHqlExpr addressSize = getColumnAddress(translator, ctx, selector, sizetType, 0);
     OwnedHqlExpr addressData = getColumnAddress(translator, ctx, selector, queryType(), sizeof(size32_t));
@@ -746,6 +745,8 @@ bool CChildLinkedDatasetColumnInfo::modifyColumn(HqlCppTranslator & translator,
 
 void CChildLinkedDatasetColumnInfo::setColumn(HqlCppTranslator & translator, BuildCtx & ctx, IReferenceSelector * selector, IHqlExpression * _value)
 {
+    checkConditionalAssignOk(translator, ctx, selector, sizeof(size32_t) + sizeof(const byte * *));
+
     OwnedHqlExpr addressSize = getColumnAddress(translator, ctx, selector, sizetType, 0);
     OwnedHqlExpr addressData = getColumnAddress(translator, ctx, selector, queryType(), sizeof(size32_t));
 

+ 1 - 5
esp/clients/wsecl/ws_ecl_client.cpp

@@ -17,11 +17,7 @@
 
 #pragma warning(disable : 4786)
 
-#ifdef _WIN32
-#define WS_ECL_CLIENT_API _declspec (dllexport)
-#else
-#define WS_ECL_CLIENT_API
-#endif
+#define WS_ECL_CLIENT_API DECL_EXPORT
 
 //Jlib
 #include "jliball.hpp"

+ 1 - 1
esp/scm/ws_ecl_client.ecm

@@ -16,7 +16,7 @@
 ############################################################################## */
 
 #ifndef WS_ECL_CLIENT_API
-#define WS_ECL_CLIENT_API __declspec(dllimport)
+#define WS_ECL_CLIENT_API DECL_IMPORT
 #endif
 
 SCMenum WsEclClientRequestState

+ 5 - 0
roxie/ccd/ccdquery.cpp

@@ -232,9 +232,12 @@ public:
 
     virtual void checkOnceDone(const IQueryFactory *factory, const IRoxieContextLogger &logctx) const
     {
+        if (calculatingOnce)   // NOTE - this must be outside the critsec or you deadlock. It is still effectively protected by the critsec
+            return;
         CriticalBlock b(onceCrit);
         if (!onceContext)
         {
+            calculatingOnce = true;
             onceContext.setown(createPTree(ipt_lowmem));
             onceResultStore.setown(createDeserializedResultStore());
             Owned <IRoxieServerContext> ctx = createOnceServerContext(factory, logctx);
@@ -254,6 +257,7 @@ public:
                 ctx->done(true);
                 onceException.setown(MakeStringException(ROXIE_INTERNAL_ERROR, "Unknown exception in ONCE code"));
             }
+            calculatingOnce = false;
         }
         if (onceException)
             throw onceException.getLink();
@@ -265,6 +269,7 @@ protected:
     mutable Owned<IPropertyTree> onceContext;
     mutable Owned<IDeserializedResultStore> onceResultStore;
     mutable Owned<IException> onceException;
+    mutable bool calculatingOnce = false;
 
 };
 

+ 25 - 1
system/security/LdapSecurity/ldapconnection.cpp

@@ -345,9 +345,33 @@ public:
         for (int numHosts=0; numHosts < getHostCount(); numHosts++)
         {
             getLdapHost(hostbuf);
+            unsigned port = strieq("ldaps",m_protocol) ? m_ldap_secure_port : m_ldapport;
+            StringBuffer sysUserDN, decPwd;
+
+            {
+                StringBuffer pwd;
+                cfg->getProp(".//@systemPassword", pwd);
+                if (pwd.isEmpty())
+                    throw MakeStringException(-1, "systemPassword is empty");
+                decrypt(decPwd, pwd.str());
+
+                StringBuffer sysUserCN;
+                cfg->getProp(".//@systemCommonName", sysUserCN);
+                if (sysUserCN.isEmpty())
+                    throw MakeStringException(-1, "systemCommonName is empty");
+
+                StringBuffer sysBasedn;
+                cfg->getProp(".//@systemBasedn", sysBasedn);
+                if (sysBasedn.isEmpty())
+                    throw MakeStringException(-1, "systemBasedn is empty");
+
+                //Guesstimate system user baseDN based on config settings. It will be used if anonymous bind fails
+                sysUserDN.append("cn=").append(sysUserCN.str()).append(",").append(sysBasedn.str());
+            }
+
             for(int retries = 0; retries <= LDAPSEC_MAX_RETRIES; retries++)
             {
-                rc = LdapUtils::getServerInfo(hostbuf.str(), m_ldapport, dcbuf, m_serverType, ldapDomain, m_timeout);
+                rc = LdapUtils::getServerInfo(hostbuf.str(), sysUserDN.str(), decPwd.str(), m_protocol, port, dcbuf, m_serverType, ldapDomain, m_timeout);
                 if(!LdapServerDown(rc) || retries >= LDAPSEC_MAX_RETRIES)
                     break;
                 sleep(LDAPSEC_RETRY_WAIT);

+ 46 - 12
system/security/LdapSecurity/ldaputils.cpp

@@ -30,7 +30,7 @@
 //------------------------------------
 // LdapUtils implementation
 //------------------------------------
-LDAP* LdapUtils::LdapInit(const char* protocol, const char* host, int port, int secure_port)
+LDAP* LdapUtils::LdapInit(const char* protocol, const char* host, int port, int secure_port, bool throwOnError)
 {
     LDAP* ld = NULL;
     if(stricmp(protocol, "ldaps") == 0)
@@ -74,7 +74,10 @@ LDAP* LdapUtils::LdapInit(const char* protocol, const char* host, int port, int
         int rc = LDAP_INIT(&ld, uri.str());
         if(rc != LDAP_SUCCESS)
         {
-            throw MakeStringException(-1, "ldap_initialize error %s", ldap_err2string(rc));
+            if (throwOnError)
+                throw MakeStringException(-1, "ldap_initialize error %s", ldap_err2string(rc));
+            DBGLOG("ldap_initialize error %s", ldap_err2string(rc));
+            return nullptr;
         }
         int reqcert = LDAP_OPT_X_TLS_NEVER;
         ldap_set_option(NULL, LDAP_OPT_X_TLS_REQUIRE_CERT, &reqcert);
@@ -83,7 +86,6 @@ LDAP* LdapUtils::LdapInit(const char* protocol, const char* host, int port, int
     else
     {
         // Initialize an LDAP session
-        DBGLOG("connecting to ldap://%s:%d", host, port);
 #ifdef _WIN32
         ld = LDAP_INIT(host, port);
         if(NULL == ld)
@@ -93,10 +95,14 @@ LDAP* LdapUtils::LdapInit(const char* protocol, const char* host, int port, int
 #else
         StringBuffer uri("ldap://");
         uri.appendf("%s:%d", host, port);
+        DBGLOG("connecting to %s", uri.str());
         int rc = LDAP_INIT(&ld, uri.str());
         if(rc != LDAP_SUCCESS)
         {
-            throw MakeStringException(-1, "ldap_initialize(%s,%d) error %s", host, port, ldap_err2string(rc));
+            if (throwOnError)
+                throw MakeStringException(-1, "ldap_initialize(%s,%d) error %s", host, port, ldap_err2string(rc));
+            DBGLOG("ldap_initialize error %s", ldap_err2string(rc));
+            return nullptr;
         }
 #endif
     }
@@ -208,20 +214,48 @@ int LdapUtils::LdapBind(LDAP* ld, int ldapTimeout, const char* domain, const cha
     return rc;
 }
 
-int LdapUtils::getServerInfo(const char* ldapserver, int ldapport, StringBuffer& domainDN, LdapServerType& stype, const char* domainname, int timeout)
+LDAP* LdapUtils::ldapInitAndSimpleBind(const char* ldapserver, const char* userDN, const char* pwd, const char* ldapprotocol, int ldapport, int timeout, int * err)
+{
+    LDAP* ld = LdapInit(ldapprotocol, ldapserver, ldapport, ldapport, false);
+    if (ld == nullptr)
+    {
+        VStringBuffer uri("%s://%s:%d", ldapprotocol, ldapserver, ldapport);
+        ERRLOG("ldap init error(%s)",uri.str());
+        *err = -1;
+        return nullptr;
+    }
+    *err = LdapSimpleBind(ld, timeout, (char*)userDN, (char*)pwd);
+    if (*err != LDAP_SUCCESS)
+    {
+        DBGLOG("LdapSimpleBind error (%d) - %s for admin user %s", *err, ldap_err2string(*err), isEmptyString(userDN) ? "NULL" : userDN);
+        if (!isEmptyString(userDN))
+            DBGLOG("Please make sure your LDAP configuration 'systemBasedn' contains the complete path, including the complete 'dc=domainComponent'");
+        return nullptr;
+    }
+    return ld;
+}
+
+int LdapUtils::getServerInfo(const char* ldapserver, const char* userDN, const char* pwd, const char* ldapprotocol, int ldapport, StringBuffer& domainDN, LdapServerType& stype, const char* domainname, int timeout)
 {
     LdapServerType deducedSType = LDAPSERVER_UNKNOWN;
-    LDAP* ld = LdapInit("ldap", ldapserver, ldapport, 636);
-    if(ld == NULL)
+
+    //First try anonymous bind using selected protocol/port
+    int err = -1;
+    LDAP* ld = ldapInitAndSimpleBind(ldapserver, nullptr, nullptr, ldapprotocol, ldapport, timeout, &err);
+
+    //if that failed, try bind with credentials
+    if (nullptr == ld)
     {
-        ERRLOG("ldap init error");
-        return false;
+        ld = ldapInitAndSimpleBind(ldapserver, userDN, pwd, ldapprotocol, ldapport, timeout, &err);
+
+        //if that failed, and was for ldaps, see if we can do anonymous bind using ldap/389
+        if (nullptr == ld  && strieq(ldapprotocol,"ldaps"))
+            ld = ldapInitAndSimpleBind(ldapserver, nullptr, nullptr, "ldap", 389, timeout, &err);
     }
 
-    int err = LdapSimpleBind(ld, timeout,NULL, NULL);
-    if(err != LDAP_SUCCESS)
+    if(nullptr == ld)
     {
-        DBGLOG("ldap anonymous bind error (%d) - %s", err, ldap_err2string(err));
+        DBGLOG("ldap bind error (%d) - %s", err, ldap_err2string(err));
 
         // for new versions of openldap, version 2.2.*
         if(err == LDAP_PROTOCOL_ERROR)

+ 3 - 2
system/security/LdapSecurity/ldaputils.hpp

@@ -37,12 +37,13 @@
 class LdapUtils
 {
 public:
-    static LDAP* LdapInit(const char* protocol, const char* host, int port, int secure_port);
+    static LDAP* LdapInit(const char* protocol, const char* host, int port, int secure_port, bool throwOnError = true);
     static int LdapSimpleBind(LDAP* ld, int ldapTimeout, char* userdn, char* password);
     // userdn is required for ldap_simple_bind_s, not really necessary for ldap_bind_s.
     static int LdapBind(LDAP* ld, int ldapTimeout, const char* domain, const char* username, const char* password, const char* userdn, LdapServerType server_type, const char* method="kerberos");
     static void bin2str(MemoryBuffer& from, StringBuffer& to);
-    static int getServerInfo(const char* ldapserver, int ldapport, StringBuffer& domainDN, LdapServerType& stype, const char* domainname, int timeout);
+    static LDAP* ldapInitAndSimpleBind(const char* ldapserver, const char* userDN, const char* pwd, const char* ldapprotocol, int ldapport, int timeout, int * err);
+    static int getServerInfo(const char* ldapserver, const char * user, const char *pwd, const char* ldapprotocol, int ldapport, StringBuffer& domainDN, LdapServerType& stype, const char* domainname, int timeout);
     static void normalizeDn(const char* dn, const char* basedn, StringBuffer& dnbuf);
     static bool containsBasedn(const char* str);
     static void cleanupDn(const char* dn, StringBuffer& dnbuf);

+ 65 - 0
testing/regress/ecl/ifblock4.ecl

@@ -0,0 +1,65 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2018 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+lOperacaoRaw := RECORD
+    STRING cnpj;
+    STRING unico;
+    STRING prefix;
+    STRING numero;
+END;
+
+lClienteRaw := RECORD
+    STRING tipo;
+    STRING identificacao;
+END;
+
+lContestacaoRaw := RECORD
+    STRING numero;
+    STRING tipo;
+    STRING hora;
+    IFBLOCK(SELF.TIPO = 'O')
+        STRING justificativa;
+    END;
+    DATASET(lClienteRaw) cliente;
+    IFBLOCK(SELF.TIPO = 'O')
+        DATASET(lOperacaoRaw) operacao;
+    END;
+END;
+
+mkOp(unsigned i) := TRANSFORM(lOperacaoRaw,
+    SELF.cnpj := 'Op' + (STRING)i;
+    SELF.unico := (STRING)i + '!';
+    SELF.prefix := (STRING)i + '?';
+    SELF.numero := (STRING)i + '@';
+);
+
+mkCl(unsigned i) := TRANSFORM(lClienteRaw,
+    SELF.tipo := 'Cl' + (STRING)i;
+    SELF.identificacao := (STRING)i + '!';
+);
+
+mkCo(unsigned i) := TRANSFORM(lContestacaoRaw,
+    SELF.numero := 'Co' + (STRING)i;
+    SELF.tipo := CHOOSE(i % 3, 'O', 'X', 'z');
+    SELF.hora := (STRING)i + '?';
+    SELF.justificativa := (STRING)i + '*';
+    SELF.cliente := DATASET(i % 4, mkCl(COUNTER));
+    SELF.operacao := DATASET(i % 5, mkOp(COUNTER));
+);
+
+d := DATASET(3*4*5, mkco(COUNTER));
+output(count(nofold(d))-3*4*5);

+ 3 - 0
testing/regress/ecl/key/ifblock4.xml

@@ -0,0 +1,3 @@
+<Dataset name='Result 1'>
+ <Row><Result_1>0</Result_1></Row>
+</Dataset>

+ 3 - 0
testing/regress/ecl/key/once2.xml

@@ -0,0 +1,3 @@
+<Dataset name='Result 1'>
+ <Row><Result_1>0</Result_1></Row>
+</Dataset>

+ 23 - 0
testing/regress/ecl/key/spray_dir_test.xml

@@ -0,0 +1,23 @@
+<Dataset name='Result 1'>
+</Dataset>
+<Dataset name='Result 2'>
+ <Row><result>Despray Pass</result></Row>
+</Dataset>
+<Dataset name='Empty_Dir_No_FailIFNoSourceFile'>
+ <Row><result>Pass</result></Row>
+</Dataset>
+<Dataset name='Empty_Dir_No_FailIFNoSourceFile_Rec_Count'>
+ <Row><Empty_Dir_No_FailIFNoSourceFile_Rec_Count>0</Empty_Dir_No_FailIFNoSourceFile_Rec_Count></Row>
+</Dataset>
+<Dataset name='Empty_Dir_FailIFNoSourceFile'>
+ <Row><result>Pass</result></Row>
+</Dataset>
+<Dataset name='Not_Empty_Dir_No_FailIFNoSourceFile'>
+ <Row><result>Pass</result></Row>
+</Dataset>
+<Dataset name='Not_Empty_Dir_No_FailIFNoSourceFile_Rec_Count'>
+ <Row><Not_Empty_Dir_No_FailIFNoSourceFile_Rec_Count>3</Not_Empty_Dir_No_FailIFNoSourceFile_Rec_Count></Row>
+</Dataset>
+<Dataset name='Not_Empty_Dir_FailIFNoSourceFile'>
+ <Row><result>Pass</result></Row>
+</Dataset>

+ 25 - 0
testing/regress/ecl/once2.ecl

@@ -0,0 +1,25 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2018 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+// Checking you can have more than one ONCE section
+
+d1 := DICTIONARY([{1=>2}], { unsigned a=>unsigned b}) : ONCE;
+d2 := DICTIONARY([{3=>4}], { unsigned a=>unsigned b}) : ONCE;
+
+unsigned v := 0 : STORED('v');
+
+d1[v].b + d2[v].b;

+ 232 - 0
testing/regress/ecl/spray_dir_test.ecl

@@ -0,0 +1,232 @@
+/*##############################################################################
+
+    Copyright (C) 2012 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+// The aim of this code is to test the wildcard spray from an empty and a not empty directory
+// and test:
+// 1. No failure if empty directory sprayed with wildcard
+// 2. 'failifnosourcefile' feature works as it suppose to be
+//
+// When an empty directory sprayed with wildcard and 'failifnosourcefile' is enabled then
+// it forces the DFU server to throw exeption.
+//
+// NOTHOR() effectively forces something to be executed globally.  At the moment if a global operation
+// fails then the query fails - rather than continuing and only failing if the result of using that
+// operation causes a failure.
+// So, to avoid to abort this code is excluded from Thor target
+//nothor
+
+//nohthor
+//class=spray
+
+
+import std.system.thorlib;
+import Std.File AS FileServices;
+import ^ as root;
+
+engine := thorlib.platform();
+prefix := '~regress::' + engine + '-';
+suffix := '-' + WORKUNIT;
+
+dropzonePath := '/var/lib/HPCCSystems/mydropzone/' : STORED('dropzonePath');
+
+unsigned VERBOSE := 0;
+
+Layout_Person := RECORD
+  STRING3  name;
+  UNSIGNED2 age;
+  BOOLEAN good;
+END;
+
+empty := DATASET([], Layout_Person);
+
+allPeople := DATASET([ {'foo', 10, 1},
+                       {'bar', 12, 0},
+                       {'baz', 32, 1}]
+            ,Layout_Person);
+
+sprayPrepFileName := prefix + 'spray_prep' + suffix;
+sprayOutFileName := prefix + 'spray_test' + suffix;
+sprayOutFileName2 := prefix + 'spray_test2' + suffix;
+dsSetup := allPeople;
+emptyDirPath := dropzonePath+'empty';
+notEmptyDirPath := dropzonePath+'notempty';
+
+// Create an 'empty' directory
+FileServices.CreateExternalDirectory('.', emptyDirPath);
+
+
+// Create a 'notempty' directory
+FileServices.CreateExternalDirectory('.', notEmptyDirPath);
+
+//  Create a small logical file
+setupFile := output(dsSetup, , DYNAMIC(sprayPrepFileName), CSV, OVERWRITE);
+
+desprayRec := RECORD
+  string result;
+  string msg;
+end;
+
+desprayOutFileName := notEmptyDirPath + '/'+ 'spray_input' + suffix;
+
+// Despray it to 'notempty' dir in default drop zone
+desprayRec despray(desprayRec l) := TRANSFORM
+  SELF.msg := FileServices.fDespray(
+                       LOGICALNAME := sprayPrepFileName
+                      ,DESTINATIONIP := '.'
+                      ,DESTINATIONPATH := desprayOutFileName
+                      ,ALLOWOVERWRITE := True
+                      );
+  SELF.result := 'Despray Pass';
+end;
+
+dst0 := NOFOLD(DATASET([{'', ''}], desprayRec));
+p0 := NOTHOR(PROJECT(NOFOLD(dst0), despray(LEFT)));
+c0 := CATCH(NOFOLD(p0), ONFAIL(TRANSFORM(desprayRec,
+                                 SELF.result := 'Despray Fail',
+                                 SELF.msg := FAILMESSAGE
+                                )));
+#if (VERBOSE = 1)
+    desprayOut := output(c0);
+#else
+    desprayOut := output(c0, {result});
+#end
+
+
+sprayRec := RECORD
+  string sourcepath;
+  string destinationLogicalName;
+  boolean failifnosourcefile;
+  string result;
+  string msg;
+end;
+
+
+// Try to spray from an empty directory with default failifnosourcefile (=0)
+
+sprayrec spray(sprayRec l) := TRANSFORM
+    SELF.msg := FileServices.fSprayVariable(
+                        SOURCEIP := '.',
+                        SOURCEPATH := l.sourcepath,
+                        //RECORDSIZE := RecordSize,
+                        DESTINATIONGROUP := 'mythor',
+                        DESTINATIONLOGICALNAME := l.destinationLogicalName,
+                        TIMEOUT := -1,
+                        ESPSERVERIPPORT := 'http://127.0.0.1:8010/FileSpray',
+                        ALLOWOVERWRITE := true,
+                        FAILIFNOSOURCEFILE := l.failifnosourcefile
+                        );
+    self.sourcepath := l.sourcepath;
+    self.destinationLogicalName := l.destinationLogicalName;
+    self.failifnosourcefile := l.failifnosourcefile;
+    self.result := l.result;
+end;
+
+
+dst1 := NOFOLD(DATASET([{emptyDirPath+'/*', sprayOutFileName, false, 'Pass', ''}], sprayRec));
+p1 := PROJECT(NOFOLD(dst1), spray(LEFT));
+c1 := CATCH(NOFOLD(p1), ONFAIL(TRANSFORM(sprayRec,
+                                SELF.sourcepath := emptyDirPath+'/*',
+                                SELF.destinationLogicalName := sprayOutFileName,
+                                SELF.failifnosourcefile := false,
+                                SELF.result := 'Fail',
+                                SELF.msg := FAILMESSAGE
+                                )));
+#if (VERBOSE = 1)
+    sprayOut1 := output(c1);
+#else
+    sprayOut1 := output(c1, {result}, NAMED('Empty_Dir_No_FailIFNoSourceFile'));
+#end
+
+dst1Res := DATASET(DYNAMIC(sprayOutFileName), Layout_Person, CSV);
+CheckSprayOut1 := output(count(dst1Res), NAMED('Empty_Dir_No_FailIFNoSourceFile_Rec_Count'));
+
+
+// Try to spray from an empty directory with failifnosourcefile=1
+// It should always fail so if it pass something broken
+
+dst2 := NOFOLD(DATASET([{emptyDirPath+'/*', sprayOutFileName, true, 'Fail', ''}], sprayRec));
+p2 := PROJECT(NOFOLD(dst2), spray(LEFT));
+c2 := CATCH(NOFOLD(p2), ONFAIL(TRANSFORM(sprayRec,
+                                 SELF.sourcepath := emptyDirPath+'/*',
+                                 SELF.destinationLogicalName := sprayOutFileName,
+                                 SELF.failifnosourcefile := true,
+                                 SELF.result := 'Pass',
+                                 SELF.msg := FAILMESSAGE
+                                )));
+#if (VERBOSE = 1)
+    sprayOut2 := output(c2);
+#else
+    sprayOut2 := output(c2, {result}, NAMED('Empty_Dir_FailIFNoSourceFile'));
+#end
+
+
+// Try to spray from a not empty directory with default failifnosourcefile (=0)
+
+dst3 := NOFOLD(DATASET([{notEmptyDirPath+'/*', sprayOutFileName2, false, 'Pass', ''}], sprayRec));
+p3 := PROJECT(NOFOLD(dst3), spray(LEFT));
+c3 := CATCH(NOFOLD(p3), ONFAIL(TRANSFORM(sprayRec,
+                                 SELF.sourcepath := notEmptyDirPath+'/*',
+                                 SELF.destinationLogicalName := sprayOutFileName2,
+                                 SELF.failifnosourcefile := false,
+                                 SELF.result := 'Fail',
+                                 SELF.msg := FAILMESSAGE
+                                )));
+#if (VERBOSE = 1)
+    sprayOut3 := output(c3);
+#else
+    sprayOut3 := output(c3, {result}, NAMED('Not_Empty_Dir_No_FailIFNoSourceFile'));
+#end
+
+dst3Res := DATASET(DYNAMIC(sprayOutFileName2), Layout_Person, CSV);
+CheckSprayOut3 := output(count(dst3Res), NAMED('Not_Empty_Dir_No_FailIFNoSourceFile_Rec_Count'));
+
+// Try to spray from a not empty directory with failifnosourcefile=1
+
+dst4 := NOFOLD(DATASET([{notEmptyDirPath+'/*', sprayOutFileName2, true, 'Pass', ''}], sprayRec));
+p4 := PROJECT(NOFOLD(dst4), spray(LEFT));
+c4 := CATCH(NOFOLD(p4), ONFAIL(TRANSFORM(sprayRec,
+                                 SELF.sourcepath := notEmptyDirPath+'/*',
+                                 SELF.destinationLogicalName := sprayOutFileName2,
+                                 SELF.failifnosourcefile := true,
+                                 SELF.result := 'Fail',
+                                 SELF.msg := FAILMESSAGE
+                                )));
+#if (VERBOSE = 1)
+    sprayOut4 := output(c4);
+#else
+    sprayOut4 := output(c4, {result}, NAMED('Not_Empty_Dir_FailIFNoSourceFile'));
+#end
+
+
+SEQUENTIAL(
+    setupFile,
+    desprayOut,
+    sprayOut1,
+    CheckSprayOut1,
+    sprayOut2,
+    sprayOut3,
+    CheckSprayOut3,
+    sprayOut4,
+
+    // Clean-up
+    FileServices.DeleteLogicalFile(sprayOutFileName),
+    FileServices.DeleteLogicalFile(sprayOutFileName2),
+    FileServices.DeleteLogicalFile(sprayPrepFileName),
+    FileServices.DeleteExternalFile('.', desprayOutFileName),
+    FileServices.DeleteExternalFile('.', notEmptyDirPath),
+    FileServices.DeleteExternalFile('.', emptyDirPath)
+);

+ 2 - 1
thorlcr/activities/diskread/thdiskreadslave.cpp

@@ -471,6 +471,8 @@ public:
                 info.unknownRowsOutput = info.canReduceNumRows = true;
                 info.byteTotal = (offset_t)-1;
             }
+            else
+                info.fastThrough = true;
         }
     };
 
@@ -563,7 +565,6 @@ public:
             initMetaInfo(cachedMetaInfo);
             cachedMetaInfo.isSource = true;
             getPartsMetaInfo(cachedMetaInfo, partDescs.ordinality(), partDescs.getArray(), partHandler);
-            cachedMetaInfo.fastThrough = true;
         }
         info = cachedMetaInfo;
         if (info.totalRowsMin==info.totalRowsMax)

+ 0 - 1
thorlcr/activities/filter/thfilterslave.cpp

@@ -42,7 +42,6 @@ public:
     virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
     {
         initMetaInfo(info);
-        info.fastThrough = true;
         info.canReduceNumRows = true;
         calcMetaInfoSize(info, queryInput(0));
     }

+ 2 - 0
thorlcr/activities/hashdistrib/thhashdistribslave.cpp

@@ -3773,6 +3773,7 @@ public:
             lhsDistributor.setown(createHashDistributor(this, queryJobChannel().queryJobComm(), mptag, false, this, "LHS"));
         Owned<IRowStream> reader = lhsDistributor->connect(queryRowInterfaces(inL), leftInputStream, ihashL, icompareL, nullptr);
         Owned<IThorRowLoader> loaderL = createThorRowLoader(*this, ::queryRowInterfaces(inL), icompareL, stableSort_earlyAlloc, rc_allDisk, SPILL_PRIORITY_HASHJOIN);
+        loaderL->setTracingPrefix("Join left");
         strmL.setown(loaderL->load(reader, abortSoon));
         loaderL.clear();
         reader.clear();
@@ -3784,6 +3785,7 @@ public:
             rhsDistributor.setown(createHashDistributor(this, queryJobChannel().queryJobComm(), mptag2, false, this, "RHS"));
         reader.setown(rhsDistributor->connect(queryRowInterfaces(inR), rightInputStream, ihashR, icompareR, nullptr));
         Owned<IThorRowLoader> loaderR = createThorRowLoader(*this, ::queryRowInterfaces(inR), icompareR, stableSort_earlyAlloc, rc_mixed, SPILL_PRIORITY_HASHJOIN);;
+        loaderL->setTracingPrefix("Join right");
         strmR.setown(loaderR->load(reader, abortSoon));
         loaderR.clear();
         reader.clear();

+ 4 - 0
thorlcr/activities/join/thjoinslave.cpp

@@ -183,7 +183,10 @@ public:
     virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
     {
         if (islocal)
+        {
             iLoaderL.setown(createThorRowLoader(*this, ::queryRowInterfaces(queryInput(0)), leftCompare, stableSort_earlyAlloc, rc_mixed, SPILL_PRIORITY_JOIN));
+            iLoaderL->setTracingPrefix("Join left");
+        }
         else
         {
             mpTagRPC = container.queryJobChannel().deserializeMPTag(data);
@@ -470,6 +473,7 @@ public:
         else
         {
             Owned<IThorRowLoader> iLoaderR = createThorRowLoader(*this, ::queryRowInterfaces(rightInput), rightCompare, stableSort_earlyAlloc, rc_mixed, SPILL_PRIORITY_JOIN);
+            iLoaderR->setTracingPrefix("Join right");
             rightStream.setown(iLoaderR->load(rightInputStream, abortSoon));
             stopRightInput();
             mergeStats(spillStats, iLoaderR);

+ 3 - 0
thorlcr/activities/lookupjoin/thlookupjoinslave.cpp

@@ -2163,6 +2163,7 @@ protected:
             // i.e. will fire OOM if runs out of memory loading local right
             channelCollector.setown(createThorRowCollector(*this, queryRowInterfaces(rightITDL), cmp, stable ? stableSort_lateAlloc : stableSort_none, rc_allMem, SPILL_PRIORITY_DISABLE));
         }
+        channelCollector->setTracingPrefix("Join right");
         Owned<IRowWriter> writer = channelCollector->getWriter();
         while (!abortSoon)
         {
@@ -2196,6 +2197,7 @@ protected:
             CChannelDistributor(CLookupJoinActivityBase &_owner, ICompare *cmp) : owner(_owner)
             {
                 channelCollector.setown(createThorRowCollector(owner, queryRowInterfaces(owner.rightITDL), cmp, stableSort_none, rc_mixed, SPILL_PRIORITY_DISABLE));
+                channelCollector->setTracingPrefix("Join right");
                 channelCollectorWriter.setown(channelCollector->getWriter());
                 channelDistributors = ((CLookupJoinActivityBase *)owner.channels[0])->channelDistributors;
                 channelDistributors[owner.queryJobChannelNumber()] = this;
@@ -2364,6 +2366,7 @@ protected:
             throw MakeActivityException(this, 0, "Degraded to standard join, LHS order cannot be preserved");
 
         Owned<IThorRowLoader> rowLoader = createThorRowLoader(*this, queryRowInterfaces(leftITDL), helper->isLeftAlreadyLocallySorted() ? NULL : compareLeft);
+        rowLoader->setTracingPrefix("Join left");
         left.setown(rowLoader->load(left, abortSoon, false));
         leftITDL = queryInput(0); // reset
         ActPrintLog("LHS loaded/sorted");

+ 17 - 7
thorlcr/activities/thactivityutil.cpp

@@ -374,36 +374,46 @@ void calcMetaInfoSize(ThorDataLinkMetaInfo &info, CThorInputArray &inputs)
         info.totalRowsMin = 0; // a good bet
 }
 
-void calcMetaInfoSize(ThorDataLinkMetaInfo &info, ThorDataLinkMetaInfo *infos, unsigned num)
+void calcMetaInfoSize(ThorDataLinkMetaInfo &info, const ThorDataLinkMetaInfo *infos, unsigned num)
 {
     if (!infos||(num<=1))
     {
         if (1 == num)
             info = infos[0];
+        else
+        {
+            info.fastThrough = true;
+            info.totalRowsMin = info.totalRowsMax = 0;
+        }
         return;
     }
     if (!info.unknownRowsOutput)
     {
         __int64 min=0;
         __int64 max=0;
-        for (unsigned i=0;i<num;i++ )
+        for (unsigned i=0; i<num; i++)
         {
-            ThorDataLinkMetaInfo &prev = infos[i];
+            const ThorDataLinkMetaInfo &currentInfo = infos[i];
             if (min>=0)
             {
-                if (prev.totalRowsMin>=0)
-                    min += prev.totalRowsMin;
+                if (currentInfo.totalRowsMin>=0)
+                    min += currentInfo.totalRowsMin;
                 else
                     min = -1;
             }
             if (max>=0)
             {
-                if (prev.totalRowsMax>=0)
-                    max += prev.totalRowsMax;
+                if (currentInfo.totalRowsMax>=0)
+                    max += currentInfo.totalRowsMax;
                 else
                     max = -1;
             }
+            if (0 == i)
+                info.fastThrough = currentInfo.fastThrough;
+            else if (info.fastThrough && !currentInfo.fastThrough) // i.e. if was true and this one is false, set return fastThrough to false
+                info.fastThrough = false;
         }
+
         if (info.totalRowsMin<=0)
         {
             if (!info.canReduceNumRows)

+ 1 - 1
thorlcr/activities/thactivityutil.ipp

@@ -71,7 +71,7 @@ IRowStream *createSequentialPartHandler(CPartHandler *partHandler, IArrayOf<IPar
 void initMetaInfo(ThorDataLinkMetaInfo &info);
 void calcMetaInfoSize(ThorDataLinkMetaInfo &info, IThorDataLink *link);
 void calcMetaInfoSize(ThorDataLinkMetaInfo &info, CThorInputArray &inputs);
-void calcMetaInfoSize(ThorDataLinkMetaInfo &info, ThorDataLinkMetaInfo *infos, unsigned num);
+void calcMetaInfoSize(ThorDataLinkMetaInfo &info, const ThorDataLinkMetaInfo *infos, unsigned num);
 
 interface ILookAheadStopNotify
 {

+ 159 - 150
thorlcr/thorutil/thmem.cpp

@@ -165,6 +165,7 @@ protected:
     unsigned spillPriority = SPILL_PRIORITY_DISABLE;
     IThorRowInterfaces *rowIf = nullptr;
     roxiemem::IRowManager *rowManager = nullptr;
+    StringAttr tracingPrefix;
     CActivityBase &activity;
 public:
     CSpillable(CActivityBase &_activity, IThorRowInterfaces *_rowIf, unsigned _spillPriority) : activity(_activity), rowIf(_rowIf), spillPriority(_spillPriority)
@@ -176,6 +177,13 @@ public:
     {
         ensureSpillingCallbackRemoved();
     }
+    void setTracingPrefix(const char *tracing)
+    {
+        if (isEmptyString(tracing))
+            return;
+        VStringBuffer str("%s: ", tracing);
+        tracingPrefix.set(str);
+    }
     inline bool spillingEnabled() const { return SPILL_PRIORITY_DISABLE != spillPriority; }
     inline void activateSpillingCallback()
     {
@@ -213,14 +221,15 @@ public:
         }
     }
 // IBufferedRowCallback
-    virtual unsigned getSpillCost() const
+    virtual unsigned getSpillCost() const override
     {
         return spillPriority;
     }
-    virtual unsigned getActivityId() const
+    virtual unsigned getActivityId() const override
     {
         return activity.queryActivityId();
     }
+    virtual bool freeBufferedRows(bool critical) override = 0; // must be implemented by derived implementations
 };
 
 //====
@@ -229,7 +238,6 @@ public:
 class CSpillableStreamBase : public CSpillable
 {
 protected:
-    bool ownsRows;
     EmptyRowSemantics emptyRowSemantics;
     unsigned spillCompInfo;
     CThorSpillableRowArray rows;
@@ -247,7 +255,7 @@ protected:
         GetTempName(tempName, tempPrefix.str(), true);
         spillFile.setown(createIFile(tempName.str()));
 
-        VStringBuffer spillPrefixStr("SpillableStream(%d)", SPILL_PRIORITY_SPILLABLE_STREAM); // const for now
+        VStringBuffer spillPrefixStr("SpillableStream(%u)", spillPriority);
         rows.save(*spillFile, spillCompInfo, false, spillPrefixStr.str()); // saves committed rows
         rows.kill(); // no longer needed, readers will pull from spillFile. NB: ok to kill array as rows is never written to or expanded
         return true;
@@ -257,7 +265,6 @@ public:
         : CSpillable(_activity, _rowIf, _spillPriority), rows(_activity), emptyRowSemantics(_emptyRowSemantics)
     {
         assertex(inRows.isFlushed());
-        ownsRows = false;
         spillCompInfo = 0x0;
         rows.setup(rowIf, emptyRowSemantics);
         rows.swap(inRows);
@@ -269,7 +276,7 @@ public:
             spillFile->remove();
     }
 // IBufferedRowCallback
-    virtual bool freeBufferedRows(bool critical)
+    virtual bool freeBufferedRows(bool critical) override
     {
         if (spillFile) // i.e. if spilt already. NB: this is thread-safe, as 'spillFile' only set by spillRows() call below and can't be called on multiple threads concurrently.
             return false;
@@ -411,13 +418,14 @@ class CSpillableStream : public CSpillableStreamBase, implements IRowStream
 public:
     IMPLEMENT_IINTERFACE_USING(CSpillableStreamBase);
 
-    CSpillableStream(CActivityBase &_activity, CThorSpillableRowArray &inRows, IThorRowInterfaces *_rowIf, EmptyRowSemantics _emptyRowSemantics, unsigned _spillPriority, unsigned _spillCompInfo)
+    CSpillableStream(CActivityBase &_activity, CThorSpillableRowArray &inRows, IThorRowInterfaces *_rowIf, EmptyRowSemantics _emptyRowSemantics, unsigned _spillPriority, unsigned _spillCompInfo, const char *tracingPrefix)
         : CSpillableStreamBase(_activity, inRows, _rowIf, _emptyRowSemantics, _spillPriority)
     {
         spillCompInfo = _spillCompInfo;
         pos = numReadRows = 0;
         granularity = 500; // JCSMORE - rows
 
+        setTracingPrefix(tracingPrefix);
         activateSpillingCallback(); // NB: it's possible the small allocate below will trigger this, 'readRows' will be free as soon as nextRow sees the spill.
 
         // a small amount of rows to read from swappable rows
@@ -1375,12 +1383,12 @@ static int callbackSortRev(IInterface * const *cb2, IInterface * const *cb1)
     return 1;
 }
 
-rowidx_t CThorSpillableRowArray::save(IFile &iFile, unsigned _spillCompInfo, bool skipNulls, const char *tracingPrefix)
+rowidx_t CThorSpillableRowArray::save(IFile &iFile, unsigned _spillCompInfo, bool skipNulls, const char *_tracingPrefix)
 {
     rowidx_t n = numCommitted();
     if (0 == n)
         return 0;
-    ActPrintLog(&activity, "%s: CThorSpillableRowArray::save (skipNulls=%s, emptyRowSemantics=%u) max rows = %"  RIPF "u", tracingPrefix, boolToStr(skipNulls), emptyRowSemantics, n);
+    ActPrintLog(&activity, "%s: CThorSpillableRowArray::save (skipNulls=%s, emptyRowSemantics=%u) max rows = %"  RIPF "u", _tracingPrefix, boolToStr(skipNulls), emptyRowSemantics, n);
 
     if (_spillCompInfo)
         assertex(0 == writeCallbacks.ordinality()); // incompatible
@@ -1454,7 +1462,7 @@ rowidx_t CThorSpillableRowArray::save(IFile &iFile, unsigned _spillCompInfo, boo
     firstRow += n;
     offset_t bytesWritten = writer->getPosition();
     writer.clear();
-    ActPrintLog(&activity, "%s: CThorSpillableRowArray::save done, rows written = %" RIPF "u, bytes = %" I64F "u", tracingPrefix, rowsWritten, (__int64)bytesWritten);
+    ActPrintLog(&activity, "%s: CThorSpillableRowArray::save done, rows written = %" RIPF "u, bytes = %" I64F "u", _tracingPrefix, rowsWritten, (__int64)bytesWritten);
     return n;
 }
 
@@ -1604,10 +1612,10 @@ void CThorSpillableRowArray::transferRowsCopy(const void **outRows, bool takeOwn
     }
 }
 
-IRowStream *CThorSpillableRowArray::createRowStream(unsigned spillPriority, unsigned spillCompInfo)
+IRowStream *CThorSpillableRowArray::createRowStream(unsigned spillPriority, unsigned spillCompInfo, const char *tracingPrefix)
 {
     assertex(rowIf);
-    return new CSpillableStream(activity, *this, rowIf, emptyRowSemantics, spillPriority, spillCompInfo);
+    return new CSpillableStream(activity, *this, rowIf, emptyRowSemantics, spillPriority, spillCompInfo, tracingPrefix);
 }
 
 
@@ -1627,7 +1635,6 @@ protected:
     ICompare *iCompare;
     StableSortFlag stableSort;
     EmptyRowSemantics emptyRowSemantics = ers_forbidden;
-    CriticalSection readerLock;
     Owned<CSharedSpillableRowSet> spillableRowSet;
     unsigned options;
     unsigned spillCompInfo = 0;
@@ -1646,17 +1653,17 @@ protected:
         StringBuffer tempPrefix, tempName;
         if (iCompare)
         {
-            ActPrintLog(&activity, "Sorting %" RIPF "d rows", spillableRows.numCommitted());
+            ActPrintLog(&activity, "%sSorting %" RIPF "d rows", tracingPrefix.str(), spillableRows.numCommitted());
             CCycleTimer timer;
             spillableRows.sort(*iCompare, maxCores); // sorts committed rows
             sortCycles += timer.elapsedCycles();
-            ActPrintLog(&activity, "Sort took: %f", ((float)timer.elapsedMs())/1000);
+            ActPrintLog(&activity, "%sSort took: %f", tracingPrefix.str(), ((float)timer.elapsedMs())/1000);
             tempPrefix.append("srt");
         }
         tempPrefix.appendf("spill_%d", activity.queryId());
         GetTempName(tempName, tempPrefix.str(), true);
         Owned<IFile> iFile = createIFile(tempName.str());
-        VStringBuffer spillPrefixStr("RowCollector(%d)", spillPriority);
+        VStringBuffer spillPrefixStr("%sRowCollector(%d)", tracingPrefix.str(), spillPriority);
         spillableRows.save(*iFile, spillCompInfo, false, spillPrefixStr.str()); // saves committed rows
         spillFiles.append(new CFileOwner(iFile.getLink()));
         ++overflowCount;
@@ -1700,7 +1707,7 @@ protected:
                 // This is a good time to shrink the row table back. shrink() force a flush.
                 StringBuffer info;
                 if (shrink(&info))
-                    activity.ActPrintLog("CThorRowCollectorBase: shrink - %s", info.str());
+                    activity.ActPrintLog("%sCThorRowCollectorBase: shrink - %s", tracingPrefix.str(), info.str());
 
                 if (!spillableRows.append(row))
                     oom = true;
@@ -1716,31 +1723,66 @@ protected:
     }
     IRowStream *getStream(CThorExpandingRowArray *allMemRows, memsize_t *memUsage, bool shared)
     {
-        CriticalBlock b(readerLock);
-        if (0 == outStreams)
         {
-            flush();
-            if (spillingEnabled())
+            CThorArrayLockBlock block(spillableRows); // ensure locked until deactivated
+            if (0 == outStreams++)
             {
-                // i.e. all disk OR (some on disk already AND allDiskOrAllMem)
-                if (((rc_allDisk == diskMemMix) || ((rc_allDiskOrAllMem == diskMemMix) && overflowCount)))
+                flush();
+                if (spillingEnabled())
                 {
-                    CThorArrayLockBlock block(spillableRows);
-                    if (spillableRows.numCommitted())
+                    // i.e. all disk OR (some on disk already AND allDiskOrAllMem)
+                    if (((rc_allDisk == diskMemMix) || ((rc_allDiskOrAllMem == diskMemMix) && overflowCount)))
                     {
-                        spillRows(false);
-                        spillableRows.kill();
+                        if (spillableRows.numCommitted())
+                        {
+                            spillRows(false);
+                            spillableRows.kill();
+                        }
                     }
                 }
+
+                /* Ensure existing callback is cleared, before:
+                 * a) instrms are built, since new spillFiles can't be added to as long as existing callback is active
+                 * b) streams created based on spillableRows, which take ownership of spillableRows and in turn add their own callbacks
+                 */
+                deactivateSpillingCallback(); // NB: spillableRows can no longer be altered asynchronously
+
+                if (spillableRows.numCommitted())
+                {
+                    totalRows += spillableRows.numCommitted();
+                    if (iCompare)
+                    {
+                        CCycleTimer timer;
+                        spillableRows.sort(*iCompare, maxCores);
+                        sortCycles += timer.elapsedCycles();
+                    }
+
+                    if ((rc_allDiskOrAllMem == diskMemMix) || // must supply allMemRows, only here if no spilling (see above)
+                        (nullptr != allMemRows && (rc_allMem == diskMemMix)) ||
+                        (nullptr != allMemRows && (rc_mixed == diskMemMix) && 0 == overflowCount) // if allMemRows given, only if no spilling
+                       )
+                    {
+                        assertex(allMemRows);
+                        if (memUsage)
+                            *memUsage = spillableRows.getMemUsage(); // a bit expensive if variable rows
+                        allMemRows->transferFrom(spillableRows);
+                        // stream cannot be used
+                        return nullptr;
+                    }
+                    if (shared)
+                    {
+                        spillableRowSet.setown(new CSharedSpillableRowSet(activity, spillableRows, rowIf, emptyRowSemantics, spillPriority));
+                        spillableRowSet->setTracingPrefix(tracingPrefix);
+                    }
+                }
+                else
+                {
+                    // If 0 rows, no overflow, don't return stream, except for rc_allDisk which will never fill allMemRows
+                    if (allMemRows && (0 == overflowCount) && (diskMemMix != rc_allDisk))
+                        return nullptr;
+                }
             }
         }
-        ++outStreams;
-
-        /* Ensure existing callback is cleared, before:
-         * a) instrms are built, since new spillFiles can't be added to as long as existing callback is active
-         * b) streams created based on spillableRows, which take ownership of spillableRows and in turn add their own callbacks
-         */
-        deactivateSpillingCallback();
 
         // NB: CStreamFileOwner links CFileOwner - last usage will auto delete file
         // which may be one of these streams or CThorRowCollectorBase itself
@@ -1759,49 +1801,14 @@ protected:
             instrms.append(* new CStreamFileOwner(fileOwner, strm));
         }
 
-        if (spillableRowSet)
-            instrms.append(*spillableRowSet->createRowStream());
-        else if (spillableRows.numCommitted())
+        if (shared)
         {
-            totalRows += spillableRows.numCommitted();
-            if (iCompare && (1 == outStreams))
-            {
-                // Option(rcflag_noAllInMemSort) - avoid sorting allMemRows
-                if ((NULL == allMemRows) || (0 == (options & rcflag_noAllInMemSort)))
-                {
-                    CCycleTimer timer;
-                    spillableRows.sort(*iCompare, maxCores);
-                    sortCycles += timer.elapsedCycles();
-                }
-            }
-
-            if ((rc_allDiskOrAllMem == diskMemMix) || // must supply allMemRows, only here if no spilling (see above)
-                (NULL!=allMemRows && (rc_allMem == diskMemMix)) ||
-                (NULL!=allMemRows && (rc_mixed == diskMemMix) && 0 == overflowCount) // if allMemRows given, only if no spilling
-               )
-            {
-                assertex(allMemRows);
-                assertex(1 == outStreams);
-                if (memUsage)
-                    *memUsage = spillableRows.getMemUsage(); // a bit expensive if variable rows
-                allMemRows->transferFrom(spillableRows);
-                // stream cannot be used
-                return NULL;
-            }
-            if (!shared)
-                instrms.append(*spillableRows.createRowStream(spillPriority, spillCompInfo)); // NB: stream will take ownership of rows in spillableRows
-            else
-            {
-                spillableRowSet.setown(new CSharedSpillableRowSet(activity, spillableRows, rowIf, emptyRowSemantics, spillPriority));
+            if (spillableRowSet)
                 instrms.append(*spillableRowSet->createRowStream());
-            }
-        }
-        else
-        {
-            // If 0 rows, no overflow, don't return stream, except for rc_allDisk which will never fill allMemRows
-            if (allMemRows && (0 == overflowCount) && (diskMemMix != rc_allDisk))
-                return NULL;
         }
+        else if (spillableRows.numCommitted())
+            instrms.append(*spillableRows.createRowStream(spillPriority, spillCompInfo, tracingPrefix)); // NB: stream will take ownership of rows in spillableRows
+
         if (0 == instrms.ordinality())
             return createNullRowStream();
         else if (1 == instrms.ordinality())
@@ -1857,7 +1864,7 @@ public:
              * memory usage.
              */
             size32_t compBlkSz = activity.getOptUInt(THOROPT_SORT_COMPBLKSZ, DEFAULT_SORT_COMPBLKSZ);
-            activity.ActPrintLog("Spilling will use compressed block size = %u", compBlkSz);
+            activity.ActPrintLog("%sSpilling will use compressed block size = %u", tracingPrefix.str(), compBlkSz);
             spillableRows.setCompBlockSize(compBlkSz);
         }
     }
@@ -1866,6 +1873,22 @@ public:
         reset();
         ensureSpillingCallbackRemoved();
     }
+// for IThorRowCollectorCommon implementation
+    rowcount_t numRows() const
+    {
+        return totalRows+spillableRows.numCommitted();
+    }
+    unsigned numOverflows() const
+    {
+        return overflowCount;
+    }
+    unsigned overflowScale() const
+    {
+        // 1 if no spill
+        if (!overflowCount)
+            return 1;
+        return overflowCount*2+3; // bit arbitrary
+    }
     void transferRowsOut(CThorExpandingRowArray &out, bool sort)
     {
         CThorArrayLockBlock block(spillableRows);
@@ -1879,41 +1902,25 @@ public:
         }
         out.transferFrom(spillableRows);
     }
-// IThorRowCollectorCommon
-    virtual rowcount_t numRows() const
-    {
-        return totalRows+spillableRows.numCommitted();
-    }
-    virtual unsigned numOverflows() const
-    {
-        return overflowCount;
-    }
-    virtual unsigned overflowScale() const
-    {
-        // 1 if no spill
-        if (!overflowCount)
-            return 1;
-        return overflowCount*2+3; // bit arbitrary
-    }
-    virtual void transferRowsIn(CThorExpandingRowArray &src)
+    void transferRowsIn(CThorExpandingRowArray &src)
     {
         reset();
         spillableRows.transferFrom(src);
         activateSpillingCallback();
     }
-    virtual void transferRowsIn(CThorSpillableRowArray &src)
+    void transferRowsIn(CThorSpillableRowArray &src)
     {
         reset();
         spillableRows.transferFrom(src);
         activateSpillingCallback();
     }
-    virtual const void *probeRow(unsigned r)
+    const void *probeRow(unsigned r)
     {
         if (r>=spillableRows.numCommitted())
             return NULL;
         return spillableRows.query(r);
     }
-    virtual void setup(ICompare *_iCompare, StableSortFlag _stableSort, RowCollectorSpillFlags _diskMemMix, unsigned _spillPriority)
+    void setup(ICompare *_iCompare, StableSortFlag _stableSort, RowCollectorSpillFlags _diskMemMix, unsigned _spillPriority)
     {
         iCompare = _iCompare;
         stableSort = _stableSort;
@@ -1928,29 +1935,15 @@ public:
         }
         spillableRows.setup(rowIf, ers_forbidden, stableSort);
     }
-    virtual void resize(rowidx_t max)
+    void resize(rowidx_t max)
     {
         spillableRows.resize(max);
     }
-    virtual void setOptions(unsigned _options)
+    void setOptions(unsigned _options)
     {
         options = _options;
     }
-    virtual bool hasSpilt() const { return overflowCount >= 1; }
-
-// IThorArrayLock
-    virtual void lock() const { spillableRows.lock(); }
-    virtual void unlock() const { spillableRows.unlock(); }
-
-// IBufferedRowCallback
-    virtual bool freeBufferedRows(bool critical)
-    {
-        if (!mmActivated || !spillingEnabled())
-            return false;
-        CThorArrayLockBlock block(spillableRows);
-        return spillRows(critical);
-    }
-    virtual unsigned __int64 getStatistic(StatisticKind kind)
+    unsigned __int64 getStatistic(StatisticKind kind)
     {
         switch (kind)
         {
@@ -1969,6 +1962,20 @@ public:
         }
         return 0;
     }
+    bool hasSpilt() const { return overflowCount >= 1; }
+
+// for IThorArrayLock implementation
+    void lock() const { spillableRows.lock(); }
+    void unlock() const { spillableRows.unlock(); }
+
+// IBufferedRowCallback
+    virtual bool freeBufferedRows(bool critical) override
+    {
+        CThorArrayLockBlock block(spillableRows);
+        if (!mmActivated || !spillingEnabled())
+            return false;
+        return spillRows(critical);
+    }
 };
 
 enum TRLGroupFlag { trl_ungroup, trl_preserveGrouping, trl_stopAtEog };
@@ -2010,31 +2017,32 @@ public:
     }
 // IThorRowCollectorCommon
     virtual rowcount_t numRows() const { return CThorRowCollectorBase::numRows(); }
-    virtual unsigned numOverflows() const { return CThorRowCollectorBase::numOverflows(); }
-    virtual unsigned overflowScale() const { return CThorRowCollectorBase::overflowScale(); }
-    virtual void transferRowsOut(CThorExpandingRowArray &dst, bool sort) { CThorRowCollectorBase::transferRowsOut(dst, sort); }
-    virtual void transferRowsIn(CThorExpandingRowArray &src) { CThorRowCollectorBase::transferRowsIn(src); }
-    virtual void transferRowsIn(CThorSpillableRowArray &src) { CThorRowCollectorBase::transferRowsIn(src); }
-    virtual const void *probeRow(unsigned r) { return CThorRowCollectorBase::probeRow(r); }
-    virtual void setup(ICompare *iCompare, StableSortFlag stableSort, RowCollectorSpillFlags diskMemMix=rc_mixed, unsigned spillPriority=50)
+    virtual unsigned numOverflows() const override { return CThorRowCollectorBase::numOverflows(); }
+    virtual unsigned overflowScale() const override { return CThorRowCollectorBase::overflowScale(); }
+    virtual void transferRowsOut(CThorExpandingRowArray &dst, bool sort) override { CThorRowCollectorBase::transferRowsOut(dst, sort); }
+    virtual void transferRowsIn(CThorExpandingRowArray &src) override { CThorRowCollectorBase::transferRowsIn(src); }
+    virtual void transferRowsIn(CThorSpillableRowArray &src) override { CThorRowCollectorBase::transferRowsIn(src); }
+    virtual const void *probeRow(unsigned r) override { return CThorRowCollectorBase::probeRow(r); }
+    virtual void setup(ICompare *iCompare, StableSortFlag stableSort, RowCollectorSpillFlags diskMemMix=rc_mixed, unsigned spillPriority=50) override
     {
         CThorRowCollectorBase::setup(iCompare, stableSort, diskMemMix, spillPriority);
     }
-    virtual void resize(rowidx_t max) { CThorRowCollectorBase::resize(max); }
-    virtual void setOptions(unsigned options)  { CThorRowCollectorBase::setOptions(options); }
-    virtual unsigned __int64 getStatistic(StatisticKind kind) { return CThorRowCollectorBase::getStatistic(kind); }
-    virtual bool hasSpilt() const { return CThorRowCollectorBase::hasSpilt(); }
+    virtual void resize(rowidx_t max) override { CThorRowCollectorBase::resize(max); }
+    virtual void setOptions(unsigned options) override { CThorRowCollectorBase::setOptions(options); }
+    virtual unsigned __int64 getStatistic(StatisticKind kind) override { return CThorRowCollectorBase::getStatistic(kind); }
+    virtual bool hasSpilt() const override { return CThorRowCollectorBase::hasSpilt(); }
+    virtual void setTracingPrefix(const char *tracing) override { CThorRowCollectorBase::setTracingPrefix(tracing); }
 
 // IThorArrayLock
-    virtual void lock() const { CThorRowCollectorBase::lock(); }
-    virtual void unlock() const { CThorRowCollectorBase::unlock(); }
+    virtual void lock() const override { CThorRowCollectorBase::lock(); }
+    virtual void unlock() const override { CThorRowCollectorBase::unlock(); }
 // IThorRowLoader
-    virtual IRowStream *load(IRowStream *in, const bool &abort, bool preserveGrouping, CThorExpandingRowArray *allMemRows, memsize_t *memUsage, bool doReset)
+    virtual IRowStream *load(IRowStream *in, const bool &abort, bool preserveGrouping, CThorExpandingRowArray *allMemRows, memsize_t *memUsage, bool doReset) override
     {
         assertex(!iCompare || !preserveGrouping); // can't sort if group preserving
         return load(in, abort, preserveGrouping?trl_preserveGrouping:trl_ungroup, allMemRows, memUsage, doReset);
     }
-    virtual IRowStream *loadGroup(IRowStream *in, const bool &abort, CThorExpandingRowArray *allMemRows, memsize_t *memUsage, bool doReset)
+    virtual IRowStream *loadGroup(IRowStream *in, const bool &abort, CThorExpandingRowArray *allMemRows, memsize_t *memUsage, bool doReset) override
     {
         return load(in, abort, trl_stopAtEog, allMemRows, memUsage, doReset);
     }
@@ -2062,31 +2070,32 @@ public:
     {
     }
 // IThorRowCollectorCommon
-    virtual void setEmptyRowSemantics(EmptyRowSemantics emptyGroupSemantics)
-    {
-        assertex(!iCompare || (ers_forbidden == emptyGroupSemantics)); // can't sort if preserving end of groups or nulls
-        CThorRowCollectorBase::setEmptyRowSemantics(emptyGroupSemantics);
-    }
-    virtual rowcount_t numRows() const { return CThorRowCollectorBase::numRows(); }
-    virtual unsigned numOverflows() const { return CThorRowCollectorBase::numOverflows(); }
-    virtual unsigned overflowScale() const { return CThorRowCollectorBase::overflowScale(); }
-    virtual void transferRowsOut(CThorExpandingRowArray &dst, bool sort) { CThorRowCollectorBase::transferRowsOut(dst, sort); }
-    virtual void transferRowsIn(CThorExpandingRowArray &src) { CThorRowCollectorBase::transferRowsIn(src); }
-    virtual void transferRowsIn(CThorSpillableRowArray &src) { CThorRowCollectorBase::transferRowsIn(src); }
-    virtual const void *probeRow(unsigned r) { return CThorRowCollectorBase::probeRow(r); }
-    virtual void setup(ICompare *iCompare, StableSortFlag stableSort, RowCollectorSpillFlags diskMemMix=rc_mixed, unsigned spillPriority=50)
+    virtual rowcount_t numRows() const override { return CThorRowCollectorBase::numRows(); }
+    virtual unsigned numOverflows() const override { return CThorRowCollectorBase::numOverflows(); }
+    virtual unsigned overflowScale() const override { return CThorRowCollectorBase::overflowScale(); }
+    virtual void transferRowsOut(CThorExpandingRowArray &dst, bool sort) override { CThorRowCollectorBase::transferRowsOut(dst, sort); }
+    virtual void transferRowsIn(CThorExpandingRowArray &src) override { CThorRowCollectorBase::transferRowsIn(src); }
+    virtual void transferRowsIn(CThorSpillableRowArray &src) override { CThorRowCollectorBase::transferRowsIn(src); }
+    virtual const void *probeRow(unsigned r) override { return CThorRowCollectorBase::probeRow(r); }
+    virtual void setup(ICompare *iCompare, StableSortFlag stableSort, RowCollectorSpillFlags diskMemMix=rc_mixed, unsigned spillPriority=50) override
     {
         CThorRowCollectorBase::setup(iCompare, stableSort, diskMemMix, spillPriority);
     }
-    virtual void resize(rowidx_t max) { CThorRowCollectorBase::resize(max); }
-    virtual void setOptions(unsigned options) { CThorRowCollectorBase::setOptions(options); }
-    virtual unsigned __int64 getStatistic(StatisticKind kind) { return CThorRowCollectorBase::getStatistic(kind); }
-    virtual bool hasSpilt() const { return CThorRowCollectorBase::hasSpilt(); }
+    virtual void resize(rowidx_t max) override { CThorRowCollectorBase::resize(max); }
+    virtual void setOptions(unsigned options) override { CThorRowCollectorBase::setOptions(options); }
+    virtual unsigned __int64 getStatistic(StatisticKind kind) override { return CThorRowCollectorBase::getStatistic(kind); }
+    virtual bool hasSpilt() const override { return CThorRowCollectorBase::hasSpilt(); }
+    virtual void setTracingPrefix(const char *tracing) override { CThorRowCollectorBase::setTracingPrefix(tracing); }
 // IThorArrayLock
     virtual void lock() const { CThorRowCollectorBase::lock(); }
     virtual void unlock() const { CThorRowCollectorBase::unlock(); }
 // IThorRowCollector
-    virtual IRowWriter *getWriter()
+    virtual void setEmptyRowSemantics(EmptyRowSemantics emptyGroupSemantics) override
+    {
+        assertex(!iCompare || (ers_forbidden == emptyGroupSemantics)); // can't sort if preserving end of groups or nulls
+        CThorRowCollectorBase::setEmptyRowSemantics(emptyGroupSemantics);
+    }
+    virtual IRowWriter *getWriter() override
     {
         class CWriter : public CSimpleInterface, implements IRowWriter
         {
@@ -2113,21 +2122,21 @@ public:
         };
         return new CWriter(this);
     }
-    virtual void reset()
+    virtual void reset() override
     {
         CThorRowCollectorBase::reset();
     }
-    virtual IRowStream *getStream(bool shared, CThorExpandingRowArray *allMemRows)
+    virtual IRowStream *getStream(bool shared, CThorExpandingRowArray *allMemRows) override
     {
         return CThorRowCollectorBase::getStream(allMemRows, NULL, shared);
     }
-    virtual bool spill(bool critical)
+    virtual bool spill(bool critical) override
     {
         CThorArrayLockBlock block(spillableRows);
         return spillRows(critical);
     }
-    virtual bool flush() { return CThorRowCollectorBase::flush(); }
-    virtual bool shrink(StringBuffer *traceInfo) { return CThorRowCollectorBase::shrink(traceInfo); }
+    virtual bool flush() override { return CThorRowCollectorBase::flush(); }
+    virtual bool shrink(StringBuffer *traceInfo) override { return CThorRowCollectorBase::shrink(traceInfo); }
 };
 
 IThorRowCollector *createThorRowCollector(CActivityBase &activity, IThorRowInterfaces *rowIf, ICompare *iCompare, StableSortFlag stableSort, RowCollectorSpillFlags diskMemMix, unsigned spillPriority, EmptyRowSemantics emptyRowSemantics)

+ 2 - 2
thorlcr/thorutil/thmem.hpp

@@ -493,7 +493,7 @@ public:
     void transferFrom(CThorExpandingRowArray &src);
     void transferFrom(CThorSpillableRowArray &src);
 
-    IRowStream *createRowStream(unsigned spillPriority, unsigned spillCompInfo);
+    IRowStream *createRowStream(unsigned spillPriority, unsigned spillCompInfo, const char *tracingPrefix=nullptr);
 
     offset_t serializedSize()
     {
@@ -528,7 +528,6 @@ private:
 
 
 enum RowCollectorSpillFlags { rc_mixed, rc_allMem, rc_allDisk, rc_allDiskOrAllMem };
-enum RowCollectorOptionFlags { rcflag_noAllInMemSort=0x01 };
 interface IThorRowCollectorCommon : extends IInterface, extends IThorArrayLock
 {
     virtual rowcount_t numRows() const = 0;
@@ -543,6 +542,7 @@ interface IThorRowCollectorCommon : extends IInterface, extends IThorArrayLock
     virtual void setOptions(unsigned options) = 0;
     virtual unsigned __int64 getStatistic(StatisticKind kind) = 0;
     virtual bool hasSpilt() const = 0; // equivalent to numOverlows() >= 1
+    virtual void setTracingPrefix(const char *tracing) = 0;
 };
 
 interface IThorRowLoader : extends IThorRowCollectorCommon

+ 1 - 1
version.cmake

@@ -1,5 +1,5 @@
 ###
-## Version Information
+##  Version Information
 ###
 set ( HPCC_PROJECT "community" )
 set ( HPCC_MAJOR 7 )