Selaa lähdekoodia

Merge remote-tracking branch 'origin/candidate-3.10.x'

Conflicts:
	esp/files/ECLPlayground.css
	esp/files/dojox/html/_base.js
	esp/files/scripts/ESPResult.js
	esp/files/scripts/ResultsControl.js

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 12 vuotta sitten
vanhempi
commit
7712a3bd59
48 muutettua tiedostoa jossa 830 lisäystä ja 212 poistoa
  1. 30 1
      common/environment/dalienv.cpp
  2. 5 0
      common/environment/dalienv.hpp
  3. 1 0
      common/fileview2/fvtransform.cpp
  4. 115 5
      common/remote/rmtfile.cpp
  5. 8 0
      common/remote/rmtfile.hpp
  6. 57 29
      common/thorhelper/thorcommon.cpp
  7. 22 7
      common/thorhelper/thorcommon.hpp
  8. 37 3
      common/workunit/workunit.cpp
  9. 1 1
      dali/daliadmin/daliadmin.cpp
  10. 3 0
      ecl/eclagent/eclagent.cpp
  11. 1 0
      ecl/eclccserver/eclccserver.cpp
  12. 10 5
      ecl/hthor/hthor.cpp
  13. 1 1
      ecl/hthor/hthor.ipp
  14. 6 8
      esp/eclwatch/ws_XSLT/fs_sprayForm.xslt
  15. 0 1
      esp/eclwatch/ws_XSLT/targetclusters.xslt
  16. 13 6
      esp/eclwatch/ws_XSLT/wuidcommon.xslt
  17. 1 2
      initfiles/componentfiles/configxml/@temp/esp_service_WsSMC.xsl
  18. 7 0
      initfiles/componentfiles/configxml/thor.xsd.in
  19. 15 0
      plugins/fileservices/fileservices.cpp
  20. 16 7
      roxie/ccd/ccdserver.cpp
  21. 7 0
      system/jlib/jcomp.cpp
  22. 11 6
      system/jlib/jsocket.cpp
  23. 10 4
      system/jlib/jsocket.hpp
  24. 49 0
      testing/ecl/grouphashdedup.ecl
  25. 49 0
      testing/ecl/grouphashdedup2.ecl
  26. 30 0
      testing/ecl/key/grouphashdedup.xml
  27. 30 0
      testing/ecl/key/grouphashdedup2.xml
  28. 6 2
      thorlcr/activities/csvread/thcsvread.cpp
  29. 9 3
      thorlcr/activities/diskread/thdiskreadslave.cpp
  30. 29 12
      thorlcr/activities/enth/thenthslave.cpp
  31. 66 26
      thorlcr/activities/hashdistrib/thhashdistribslave.cpp
  32. 42 20
      thorlcr/activities/join/thjoinslave.cpp
  33. 1 1
      thorlcr/activities/lookupjoin/thlookupjoinslave.cpp
  34. 1 1
      thorlcr/activities/loop/thloop.cpp
  35. 1 1
      thorlcr/activities/loop/thloopslave.cpp
  36. 2 2
      thorlcr/activities/merge/thmergeslave.cpp
  37. 1 1
      thorlcr/activities/nsplitter/thnsplitterslave.cpp
  38. 13 28
      thorlcr/activities/spill/thspillslave.cpp
  39. 6 1
      thorlcr/activities/thdiskbaseslave.cpp
  40. 40 7
      thorlcr/graph/thgraph.cpp
  41. 10 1
      thorlcr/graph/thgraph.hpp
  42. 7 0
      thorlcr/master/thmastermain.cpp
  43. 1 1
      thorlcr/msort/tsorta.cpp
  44. 6 6
      thorlcr/msort/tsorts.cpp
  45. 5 0
      thorlcr/slave/thslavemain.cpp
  46. 41 12
      thorlcr/thorutil/thmem.cpp
  47. 1 1
      thorlcr/thorutil/thmem.hpp
  48. 7 0
      thorlcr/thorutil/thormisc.hpp

+ 30 - 1
common/environment/dalienv.cpp

@@ -438,13 +438,42 @@ bool getRemoteRunInfo(const char * keyName, const char * exeName, const char * v
     return false;
 }
 
+#define SDS_CONNECT_TIMEOUT 30000
 bool envGetConfigurationDirectory(const char *category, const char *component,const char *instance, StringBuffer &dirout)
 {
     SessionId sessid = myProcessSession();
     if (!sessid)
         return false;
-    Owned<IRemoteConnection> conn = querySDS().connect("/Environment/Software/Directories",sessid, 0, 10000);
+    Owned<IRemoteConnection> conn = querySDS().connect("/Environment/Software/Directories",sessid, 0, SDS_CONNECT_TIMEOUT);
     if (conn) 
         return getConfigurationDirectory(conn->queryRoot(),category,component,instance,dirout);
     return false;
 }
+
+IPropertyTree *envGetNASConfiguration()
+{
+    SessionId sessid = myProcessSession();
+    if (!sessid)
+        return NULL;
+    Owned<IRemoteConnection> conn = querySDS().connect("/Environment/Hardware/NAS", sessid, 0, SDS_CONNECT_TIMEOUT);
+    if (!conn)
+        return NULL;
+    return createPTreeFromIPT(conn->queryRoot());
+}
+
+IPropertyTree *envGetInstallNASHooks()
+{
+    IDaFileSrvHook *daFileSrvHook = queryDaFileSrvHook();
+    if (!daFileSrvHook) // probably always installed
+        return NULL;
+    daFileSrvHook->clearSubNetFilters();
+    Owned<IPropertyTree> nasPTree = envGetNASConfiguration();
+    if (!nasPTree)
+        return NULL;
+    return daFileSrvHook->addMySubnetFilters(nasPTree);
+}
+
+void envInstallNASHooks()
+{
+    Owned<IPropertyTree> installedFilters = envGetInstallNASHooks();
+}

+ 5 - 0
common/environment/dalienv.hpp

@@ -51,4 +51,9 @@ extern ENVIRONMENT_API bool getRemoteRunInfo(const char * keyName, const char *
 
 extern ENVIRONMENT_API bool envGetConfigurationDirectory(const char *category, const char *component,const char *instance, StringBuffer &dirout);
 
+extern ENVIRONMENT_API IPropertyTree *envGetNASConfiguration(); // return NAS config
+extern ENVIRONMENT_API void envInstallNASHooks(); // gets NAS config and sets up
+// like envInstallNASHooksalso but also reutrns which filters were installed
+extern ENVIRONMENT_API IPropertyTree *envGetInstallNASHooks();
+
 #endif

+ 1 - 0
common/fileview2/fvtransform.cpp

@@ -296,6 +296,7 @@ void ViewTransformerRegistry::addPlugins(const char * name)
     {
         IHqlScope * scope = &scopes.item(i);
         HqlExprArray symbols;
+        scope->ensureSymbolsDefined(ctx);
         scope->getSymbols(symbols);
 
         ForEachItemIn(j, symbols)

+ 115 - 5
common/remote/rmtfile.cpp

@@ -62,8 +62,56 @@ void setLocalMountRedirect(const IpAddress &ip,const char *dir,const char *mount
 
 
 
-class CDaliServixIntercept: public CInterface, implements IRemoteFileCreateHook
+class CDaliServixFilter : public CInterface
 {
+    IpSubNet ipSubNet;
+    StringAttr dir;
+    bool trace;
+public:
+    CDaliServixFilter(IPropertyTree &filter)
+    {
+        const char *subnet = filter.queryProp("@subnet");
+        const char *mask = filter.queryProp("@mask");
+        if (!ipSubNet.set(subnet, mask))
+            throw MakeStringException(0, "Invalid sub net definition: %s, %s", subnet, mask);
+        dir.set(filter.queryProp("@directory"));
+        trace = filter.getPropBool("@trace");
+    }
+    CDaliServixFilter(const char *subnet, const char *mask, const char *_dir, bool _trace) : dir(_dir), trace(_trace)
+    {
+        if (!ipSubNet.set(subnet, mask))
+            throw MakeStringException(0, "Invalid sub net definition: %s, %s", subnet, mask);
+    }
+    bool queryTrace() const { return trace; }
+    const char *queryDirectory() const { return dir; }
+    const IpSubNet &querySubNet() const { return ipSubNet; }
+    bool testIp(const IpAddress &ip) const { return ipSubNet.test(ip); }
+    bool testPath(const char *path) const
+    {
+        if (!dir) // if no dir in filter, match any
+            return true;
+        else
+            return startsWith(path, dir.get());
+    }
+};
+
+class CDaliServixIntercept: public CInterface, implements IDaFileSrvHook
+{
+    CIArrayOf<CDaliServixFilter> filters;
+
+    void addSubnetFilter(CDaliServixFilter *filter)
+    {
+        const IpSubNet &ipSubNet = filter->querySubNet();
+        StringBuffer msg("DaFileSrvHook: adding translateToLocal(subnet=");
+        ipSubNet.getNetText(msg);
+        msg.append(", mask=");
+        ipSubNet.getMaskText(msg);
+        if (filter->queryDirectory())
+            msg.append(", dir=").append(filter->queryDirectory());
+        msg.append(", trace=").append(filter->queryTrace() ? "true" : "false").append(")");
+        PROGLOG("%s", msg.str());
+        filters.append(*filter);
+    }
 public:
     IMPLEMENT_IINTERFACE;
     virtual IFile * createIFile(const RemoteFilename & filename)
@@ -71,10 +119,33 @@ public:
         SocketEndpoint ep = filename.queryEndpoint();
         bool noport = (ep.port==0);
         setDafsEndpointPort(ep);
-        if (!filename.isLocal()||(ep.port!=DAFILESRV_PORT)) {   // assume standard port is running on local machine 
+        if (!filename.isLocal()||(ep.port!=DAFILESRV_PORT)) // assume standard port is running on local machine
+        {
 #ifdef __linux__
-#ifndef USE_SAMBA   
-            return createDaliServixFile(filename);  
+#ifndef USE_SAMBA
+            if (noport && filters.ordinality())
+            {
+                ForEachItemIn(sn, filters)
+                {
+                    CDaliServixFilter &filter = filters.item(sn);
+                    if (filter.testIp(ep))
+                    {
+                        StringBuffer lPath;
+                        filename.getLocalPath(lPath);
+                        if (filter.testPath(lPath.str()))
+                        {
+                            if (filter.queryTrace())
+                            {
+                                StringBuffer fromPath;
+                                filename.getRemotePath(fromPath);
+                                PROGLOG("Redirecting path: '%s' to '%s", fromPath.str(), lPath.str());
+                            }
+                            return ::createIFile(lPath.str());
+                        }
+                    }
+                }
+            }
+            return createDaliServixFile(filename);
 #endif
 #endif
             if (!noport)            // expect all filenames that specify port to be dafilesrc or daliservix
@@ -87,7 +158,42 @@ public:
                 return createDaliServixFile(filename);  
         }
         return NULL;
-    }   
+    }
+    virtual void addSubnetFilter(const char *subnet, const char *mask, const char *dir, bool trace)
+    {
+        Owned<CDaliServixFilter> filter = new CDaliServixFilter(subnet, mask, dir, trace);
+        addSubnetFilter(filter.getClear());
+    }
+    virtual IPropertyTree *addSubnetFilters(IPropertyTree *config, const IpAddress *myIp)
+    {
+        if (!config)
+            return NULL;
+        Owned<IPropertyTree> result;
+        Owned<IPropertyTreeIterator> iter = config->getElements("Filter");
+        ForEach(*iter)
+        {
+            Owned<CDaliServixFilter> filter = new CDaliServixFilter(iter->query());
+            // Only add filters where myIP is within subnet
+            if (!myIp || filter->testIp(*myIp))
+            {
+                addSubnetFilter(filter.getClear());
+                if (!result)
+                    result.setown(createPTree());
+                result->addPropTree("Filter", LINK(&iter->query()));
+            }
+        }
+        return result.getClear();
+    }
+    virtual IPropertyTree *addMySubnetFilters(IPropertyTree *config)
+    {
+        IpAddress ip;
+        GetHostIp(ip);
+        return addSubnetFilters(config, &ip);
+    }
+    virtual void clearSubNetFilters()
+    {
+        filters.kill();
+    }
 } *DaliServixIntercept = NULL;
 
 bool testDaliServixPresent(const SocketEndpoint &_ep)
@@ -717,3 +823,7 @@ MODULE_EXIT()
     removeFileHooks();
 }
 
+IDaFileSrvHook *queryDaFileSrvHook()
+{
+    return DaliServixIntercept;
+}

+ 8 - 0
common/remote/rmtfile.hpp

@@ -37,6 +37,14 @@ enum DAFS_OS
 
 extern REMOTE_API void filenameToUrl(StringBuffer & out, const char * filename);
 
+interface IDaFileSrvHook : extends IRemoteFileCreateHook
+{
+    virtual void addSubnetFilter(const char *subnet, const char *mask, const char *dir, bool trace) = 0;
+    virtual IPropertyTree *addSubnetFilters(IPropertyTree *filters, const IpAddress *ipAddress) = 0;
+    virtual IPropertyTree *addMySubnetFilters(IPropertyTree *filters) = 0;
+    virtual void clearSubNetFilters() = 0;
+};
+extern REMOTE_API IDaFileSrvHook *queryDaFileSrvHook();
 extern REMOTE_API unsigned short getDaliServixPort();  // assumed just the one for now
 extern REMOTE_API void setCanAccessDirectly(RemoteFilename & file,bool set);
 extern REMOTE_API void setDaliServixSocketCaching(bool set);

+ 57 - 29
common/thorhelper/thorcommon.cpp

@@ -1150,7 +1150,7 @@ class CRowStreamReader : public CSimpleInterface, implements IExtRowStream
 public:
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
 
-    CRowStreamReader(IFileIO *_fileio,IMemoryMappedFile *_mmfile,offset_t _ofs, offset_t _len, IRowInterfaces *rowif,unsigned __int64 _maxrows,bool _tallycrc, bool _grouped)
+    CRowStreamReader(IFileIO *_fileio, IMemoryMappedFile *_mmfile, IRowInterfaces *rowif, offset_t _ofs, offset_t _len, unsigned __int64 _maxrows, bool _tallycrc, bool _grouped)
         : fileio(_fileio), mmfile(_mmfile), allocator(rowif->queryRowAllocator()), prefetchBuffer(NULL) 
     {
 #ifdef TRACE_CREATE
@@ -1287,34 +1287,40 @@ unsigned CRowStreamReader::rdnum;
 
 bool UseMemoryMappedRead = false;
 
-IExtRowStream *createRowStream(IFile *file,IRowInterfaces *rowif,offset_t offset,offset_t len,unsigned __int64 maxrows,bool tallycrc,bool grouped)
+IExtRowStream *createRowStreamEx(IFile *file, IRowInterfaces *rowIf, offset_t offset, offset_t len, unsigned __int64 maxrows, unsigned rwFlags, IExpander *eexp)
 {
-    IExtRowStream *ret;
-    if (UseMemoryMappedRead) {
+    bool compressed = TestRwFlag(rwFlags, rw_compress);
+    if (UseMemoryMappedRead && !compressed)
+    {
         PROGLOG("Memory Mapped read of %s",file->queryFilename());
         Owned<IMemoryMappedFile> mmfile = file->openMemoryMapped();
         if (!mmfile)
             return NULL;
-        ret = new CRowStreamReader(NULL,mmfile,offset,len,rowif,maxrows,tallycrc,grouped);
+        return new CRowStreamReader(NULL, mmfile, rowIf, offset, len, maxrows, TestRwFlag(rwFlags, rw_crc), TestRwFlag(rwFlags, rw_grouped));
     }
-    else {
-        Owned<IFileIO> fileio = file->open(IFOread);
+    else
+    {
+        Owned<IFileIO> fileio;
+        if (compressed)
+        {
+            // JCSMORE should pass in a flag for rw_compressblkcrc I think, doesn't look like it (or anywhere else)
+            // checks the block crc's at the moment.
+            fileio.setown(createCompressedFileReader(file, eexp, UseMemoryMappedRead));
+        }
+        else
+            fileio.setown(file->open(IFOread));
         if (!fileio)
             return NULL;
-        ret = new CRowStreamReader(fileio,NULL,offset,len,rowif,maxrows,tallycrc,grouped);
+        return new CRowStreamReader(fileio, NULL, rowIf, offset, len, maxrows, TestRwFlag(rwFlags, rw_crc), TestRwFlag(rwFlags, rw_grouped));
     }
-    return ret;
 }
 
-IExtRowStream *createCompressedRowStream(IFile *file,IRowInterfaces *rowif,offset_t offset,offset_t len,unsigned __int64 maxrows,bool tallycrc,bool grouped,IExpander *eexp)
+IExtRowStream *createRowStream(IFile *file, IRowInterfaces *rowIf, unsigned rwFlags, IExpander *eexp)
 {
-    Owned<IFileIO> fileio = createCompressedFileReader(file, eexp, UseMemoryMappedRead);
-    if (!fileio)
-        return NULL;
-    IExtRowStream *ret = new CRowStreamReader(fileio,NULL,offset,len,rowif,maxrows,tallycrc,grouped);
-    return ret;
+    return createRowStreamEx(file, rowIf, 0, (offset_t)-1, (unsigned __int64)-1, rwFlags, eexp);
 }
 
+
 void useMemoryMappedRead(bool on)
 {
 #if defined(_DEBUG) || defined(__64BIT__)
@@ -1526,20 +1532,44 @@ public:
 unsigned CRowStreamWriter::wrnum=0;
 #endif
 
-IExtRowWriter *createRowWriter(IFile *file,IOutputRowSerializer *serializer,IEngineRowAllocator *allocator,bool grouped, bool tallycrc, bool extend)
+IExtRowWriter *createRowWriter(IFile *iFile, IRowInterfaces *rowIf, unsigned flags, ICompressor *compressor)
 {
-    Owned<IFileIO> fileio = file->open(extend?IFOwrite:IFOcreate);
-    if (!fileio)
+    OwnedIFileIO iFileIO;
+    if (TestRwFlag(flags, rw_compress))
+    {
+        size32_t fixedSize = rowIf->queryRowMetaData()->querySerializedMeta()->getFixedSize();
+        if (fixedSize && TestRwFlag(flags, rw_grouped))
+            ++fixedSize; // row writer will include a grouping byte
+        iFileIO.setown(createCompressedFileWriter(iFile, fixedSize, TestRwFlag(flags, rw_extend), TestRwFlag(flags, rw_compressblkcrc), compressor, TestRwFlag(flags, rw_fastlz)));
+    }
+    else
+        iFileIO.setown(iFile->open((flags & rw_extend)?IFOwrite:IFOcreate));
+    if (!iFileIO)
         return NULL;
-    Owned<IFileIOStream> stream = createIOStream(fileio);
-    if (extend)
-        stream->seek(0,IFSend);
-    return createRowWriter(stream,serializer,allocator,grouped,tallycrc,true);
+    flags &= ~((unsigned)(rw_compress|rw_fastlz|rw_compressblkcrc));
+    return createRowWriter(iFileIO, rowIf, flags);
 }
 
-IExtRowWriter *createRowWriter(IFileIOStream *strm,IOutputRowSerializer *serializer,IEngineRowAllocator *allocator,bool grouped, bool tallycrc, bool autoflush)
+IExtRowWriter *createRowWriter(IFileIO *iFileIO, IRowInterfaces *rowIf, unsigned flags)
 {
-    Owned<CRowStreamWriter> writer = new CRowStreamWriter(strm, serializer, allocator, grouped, tallycrc, autoflush);
+    if (TestRwFlag(flags, rw_compress))
+        throw MakeStringException(0, "Unsupported createRowWriter flags");
+    Owned<IFileIOStream> stream;
+    if (TestRwFlag(flags, rw_buffered))
+        stream.setown(createBufferedIOStream(iFileIO));
+    else
+        stream.setown(createIOStream(iFileIO));
+    if (flags & rw_extend)
+        stream->seek(0, IFSend);
+    flags &= ~((unsigned)(rw_extend|rw_buffered));
+    return createRowWriter(stream, rowIf, flags);
+}
+
+IExtRowWriter *createRowWriter(IFileIOStream *strm, IRowInterfaces *rowIf, unsigned flags)
+{
+    if (0 != (flags & (rw_compress|rw_fastlz|rw_extend|rw_buffered|rw_compressblkcrc)))
+        throw MakeStringException(0, "Unsupported createRowWriter flags");
+    Owned<CRowStreamWriter> writer = new CRowStreamWriter(strm, rowIf->queryRowSerializer(), rowIf->queryRowAllocator(), TestRwFlag(flags, rw_grouped), TestRwFlag(flags, rw_crc), TestRwFlag(flags, rw_autoflush));
     return writer.getClear();
 }
 
@@ -1584,7 +1614,7 @@ public:
         tempname.append('.').append(tempfiles.ordinality()).append('_').append((__int64)GetCurrentThreadId()).append('_').append((unsigned)GetCurrentProcessId());
         IFile *file = createIFile(tempname.str());
         tempfiles.append(*file);
-        return createRowWriter(file,rowInterfaces->queryRowSerializer(),rowInterfaces->queryRowAllocator(),false,false,false); // flushed by close
+        return createRowWriter(file, rowInterfaces, 0); // flushed by close
     }
     void put(const void **rows,unsigned numrows)
     {
@@ -1611,7 +1641,7 @@ public:
         strms = (IRowStream **)calloc(numstrms,sizeof(IRowStream *));
         unsigned i;
         for (i=0;i<numstrms;i++) {
-            strms[i] = createSimpleRowStream(&tempfiles.item(i), rowInterfaces);
+            strms[i] = createRowStream(&tempfiles.item(i), rowInterfaces);
         }
         if (numstrms==1) 
             return LINK(strms[0]);
@@ -1632,9 +1662,7 @@ public:
             ++count;
         }
         return count;
-    }
-
-    
+    }    
 };
 
 IDiskMerger *createDiskMerger(IRowInterfaces *rowInterfaces, IRowLinkCounter *linker, const char *tempnamebase)

+ 22 - 7
common/thorhelper/thorcommon.hpp

@@ -68,6 +68,21 @@ extern THORHELPER_API void useMemoryMappedRead(bool on);
 
 extern THORHELPER_API IRowInterfaces *createRowInterfaces(IOutputMetaData *meta, unsigned actid, ICodeContext *context);
 
+
+enum RowReaderWriterFlags
+{
+    rw_grouped        = 0x1,
+    rw_crc            = 0x2,
+    rw_extend         = 0x4,
+    rw_compress       = 0x8,
+    rw_compressblkcrc = 0x10, // block compression, this sets/checks crc's at block level
+    rw_fastlz         = 0x20, // if rw_compress
+    rw_autoflush      = 0x40,
+    rw_buffered       = 0x80
+};
+#define DEFAULT_RWFLAGS (rw_buffered|rw_autoflush|rw_compressblkcrc)
+inline bool TestRwFlag(unsigned flags, RowReaderWriterFlags flag) { return 0 != (flags & flag); }
+
 interface IExtRowStream: extends IRowStream
 {
     virtual offset_t getOffset() = 0;
@@ -77,19 +92,19 @@ interface IExtRowStream: extends IRowStream
     virtual void reinit(offset_t offset,offset_t len,unsigned __int64 maxrows) = 0;
 };
 
-extern THORHELPER_API IExtRowStream *createRowStream(IFile *file,IRowInterfaces *rowif,offset_t offset,offset_t len,unsigned __int64 maxrows,bool tallycrc,bool grouped);
-inline IExtRowStream *createSimpleRowStream(IFile *file,IRowInterfaces *rowif) { return createRowStream(file, rowif,0,(offset_t)-1,(unsigned __int64)-1,false,false); }
-interface IExpander;
-extern THORHELPER_API IExtRowStream *createCompressedRowStream(IFile *file,IRowInterfaces *rowif,offset_t offset,offset_t len,unsigned __int64 maxrows,bool tallycrc,bool grouped,IExpander *eexp);
-
 interface IExtRowWriter: extends IRowWriter
 {
     virtual offset_t getPosition() = 0;
     virtual void flush(CRC32 *crcout=NULL) = 0;
 };
 
-extern THORHELPER_API IExtRowWriter *createRowWriter(IFile *file,IOutputRowSerializer *serializer,IEngineRowAllocator *allocator,bool grouped=false, bool tallycrc=false, bool extend=false); 
-extern THORHELPER_API IExtRowWriter *createRowWriter(IFileIOStream *strm,IOutputRowSerializer *serializer,IEngineRowAllocator *allocator,bool grouped=false, bool tallycrc=false,bool autoflush=true); // strm should be unbuffered
+interface IExpander;
+extern THORHELPER_API IExtRowStream *createRowStream(IFile *file, IRowInterfaces *rowif, unsigned flags=DEFAULT_RWFLAGS, IExpander *eexp=NULL);
+extern THORHELPER_API IExtRowStream *createRowStreamEx(IFile *file, IRowInterfaces *rowif, offset_t offset=0, offset_t len=(offset_t)-1, unsigned __int64 maxrows=(unsigned __int64)-1, unsigned flags=DEFAULT_RWFLAGS, IExpander *eexp=NULL);
+interface ICompressor;
+extern THORHELPER_API IExtRowWriter *createRowWriter(IFile *file, IRowInterfaces *rowIf, unsigned flags=DEFAULT_RWFLAGS, ICompressor *compressor=NULL);
+extern THORHELPER_API IExtRowWriter *createRowWriter(IFileIO *fileIO, IRowInterfaces *rowIf, unsigned flags=DEFAULT_RWFLAGS);
+extern THORHELPER_API IExtRowWriter *createRowWriter(IFileIOStream *strm, IRowInterfaces *rowIf, unsigned flags=DEFAULT_RWFLAGS); // strm should be unbuffered
 
 interface THORHELPER_API IDiskMerger : extends IInterface
 {

+ 37 - 3
common/workunit/workunit.cpp

@@ -1866,6 +1866,25 @@ public:
     }
 };      
 
+class asyncRemoveRemoteFileWorkItem: public CInterface, implements IWorkQueueItem // class only used in asyncRemoveDll
+{
+    RemoteFilename name;
+public:
+    IMPLEMENT_IINTERFACE;
+
+    asyncRemoveRemoteFileWorkItem(const char * _ip, const char * _name)
+    {
+        SocketEndpoint ep(_ip);
+        name.setPath(ep, _name);
+    }
+    void execute()
+    {
+        Owned<IFile> file = createIFile(name);
+        PROGLOG("WU removeDll %s",file->queryFilename());
+        file->remove();
+    }
+};
+
 #define WUID_VERSION 1 // recorded in each wuid created, useful for bkwd compat. checks
 
 class CWorkUnitFactory : public CInterface, implements IWorkUnitFactory, implements IDaliClientShutdown
@@ -2263,7 +2282,13 @@ public:
 
     void asyncRemoveDll(const char * name, bool removeDlls, bool removeDirectory)
     {
-        deletedllworkq->post(new asyncRemoveDllWorkItem(name,removeDlls,removeDirectory));
+        const char * tail = pathTail(name);
+        deletedllworkq->post(new asyncRemoveDllWorkItem(tail,removeDlls,removeDirectory));
+    }
+
+    void asyncRemoveFile(const char * ip, const char * name)
+    {
+        deletedllworkq->post(new asyncRemoveRemoteFileWorkItem(ip, name));
     }
 
     ISDSManager *sdsManager;
@@ -2673,12 +2698,21 @@ void CLocalWorkUnit::cleanupAndDelete(bool deldll,bool deleteOwned)
             {
                 Owned<IConstWUAssociatedFileIterator> iter = &q->getAssociatedFiles();
                 SCMStringBuffer name;
+                SCMStringBuffer ip;
                 ForEach(*iter)
                 {
                     IConstWUAssociatedFile & cur = iter->query();
                     cur.getName(name);
-                    bool removeDir = (cur.getType() == FileTypeDll);        // this is to keep the code the same as before, but I don't know why it only does it for the dll.
-                    factory->asyncRemoveDll(name.str(), true, removeDir);
+                    if (cur.getType() == FileTypeDll)
+                    {
+                        bool removeDir = true;        // this is to keep the code the same as before, but I don't know why it only does it for the dll.
+                        factory->asyncRemoveDll(name.str(), true, removeDir);
+                    }
+                    else
+                    {
+                        cur.getIp(ip);
+                        factory->asyncRemoveFile(ip.str(), name.str());
+                    }
                 }
             }
         }

+ 1 - 1
dali/daliadmin/daliadmin.cpp

@@ -107,7 +107,7 @@ void usage(const char *exe)
   printf("  daliping [ <num> ]              -- time dali server connect\n");
   printf("  getxref <destxmlfile>           -- get all XREF information\n");
   printf("  dalilocks [ <ip-pattern> ] [ files ] -- get all locked files/xpaths\n");
-  printf("  unlock <sessid>                 -- unlocks an object\n");
+  printf("  unlock <xpath or logicalfile>   --  unlocks either matching xpath(s) or matching logical file(s), can contain wildcards\n");
   printf("\n");
   printf("Common options (can be placed in dfuutil.ini)\n");
   printf("  server=<dali-server-ip>         -- server ip\n");

+ 3 - 0
ecl/eclagent/eclagent.cpp

@@ -3256,6 +3256,9 @@ extern int HTHOR_API eclagent_main(int argc, const char *argv[], StringBuffer *
             if (getConfigurationDirectory(agentTopology->queryPropTree("Directories"),"mirror","eclagent",agentTopology->queryProp("@name"),baseDir.clear()))
                 setBaseDirectory(baseDir.str(), true);
 
+            if (agentTopology->getPropBool("@useNASTranslation", true))
+                envInstallNASHooks();
+
             if (standAloneWorkUnit)
             {
                 //Stand alone program, but dali is specified => create a workunit in dali, and store the results there....

+ 1 - 0
ecl/eclccserver/eclccserver.cpp

@@ -220,6 +220,7 @@ class EclccCompileThread : public CInterface, implements IPooledThread
                     Owned<ILocalWorkUnit> embeddedWU = createLocalWorkUnit();
                     embeddedWU->loadXML(wuXML);
                     queryExtendedWU(workunit)->copyWorkUnit(embeddedWU, true);
+                    workunit->setIsClone(false);
                     SCMStringBuffer jobname;
                     if (embeddedWU->getJobName(jobname).length()) //let ECL win naming job during initial compile
                         workunit->setJobName(jobname.str());

+ 10 - 5
ecl/hthor/hthor.cpp

@@ -370,7 +370,7 @@ void CHThorDiskWriteActivity::ready()
     uncompressedBytesWritten = 0;
     numRecords = 0;
     sizeLimit = agent.queryWorkUnit()->getDebugValueInt64("hthorDiskWriteSizeLimit", defaultHThorDiskWriteSizeLimit);
-    rowSerializer.setown(input->queryOutputMeta()->createRowSerializer(agent.queryCodeContext(), activityId));
+    rowIf.setown(createRowInterfaces(input->queryOutputMeta(), activityId, agent.queryCodeContext()));
     open();
 }
 
@@ -517,8 +517,12 @@ void CHThorDiskWriteActivity::open()
     if(extend)
         diskout->seek(0, IFSend);
 
-    bool tallycrc = !agent.queryWorkUnit()->getDebugValueBool("skipFileFormatCrcCheck", false) && !(helper.getFlags() & TDRnocrccheck);
-    IExtRowWriter * writer = createRowWriter(diskout, rowSerializer, rowAllocator, grouped, tallycrc, true );
+    unsigned rwFlags = rw_autoflush;
+    if(grouped)
+        rwFlags |= rw_grouped;
+    if(!agent.queryWorkUnit()->getDebugValueBool("skipFileFormatCrcCheck", false) && !(helper.getFlags() & TDRnocrccheck))
+        rwFlags |= rw_crc;
+    IExtRowWriter * writer = createRowWriter(diskout, rowIf, rwFlags);
     outSeq.setown(writer);
 
 }
@@ -2516,9 +2520,10 @@ const void * CHThorHashDedupActivity::nextInGroup()
     {
         OwnedConstHThorRow next(input->nextInGroup());
         if(!next)
-            next.setown(input->nextInGroup());
-        if(!next)
+        {
+            table.kill();
             return NULL;
+        }
         if(table.insert(next))
             return next.getClear();
     }

+ 1 - 1
ecl/hthor/hthor.ipp

@@ -290,7 +290,7 @@ protected:
     unsigned __int64 numRecords;
     Owned<ClusterWriteHandler> clusterHandler;
     offset_t sizeLimit;
-    Owned<IOutputRowSerializer> rowSerializer;
+    Owned<IRowInterfaces> rowIf;
     StringBuffer mangledHelperFileName;
     OwnedConstHThorRow nextrow; // needed for grouped spill
 

+ 6 - 8
esp/eclwatch/ws_XSLT/fs_sprayForm.xslt

@@ -624,14 +624,12 @@
                <input type="checkbox" name="nosplit" value="1"/>
             </td>
          </tr>
-         <xsl:if test="$method='SprayFixed' or $submethod='csv'">
-            <tr>
-               <td>Compress:</td>
-               <td>
-                  <input type="checkbox" id="compress" name="compress" value="1"/>
-               </td>
-            </tr>
-         </xsl:if>
+         <tr>
+            <td>Compress:</td>
+            <td>
+               <input type="checkbox" id="compress" name="compress" value="1"/>
+            </td>
+         </tr>
          <xsl:if test="$fullHtml='1'">
             <tr>
                <td/>

+ 0 - 1
esp/eclwatch/ws_XSLT/targetclusters.xslt

@@ -51,7 +51,6 @@
         <html>
             <head>
         <meta http-equiv="Content-Type" content="text/html; charset=utf-8"/>
-        <meta http-equiv="Refresh" content="100"/>
         <link rel="stylesheet" type="text/css" href="/esp/files/default.css"/>
         <link rel="stylesheet" type="text/css" href="/esp/files/yui/build/fonts/fonts-min.css" />
         <link rel="stylesheet" type="text/css" href="/esp/files/yui/build/menu/assets/skins/sam/menu.css" />

+ 13 - 6
esp/eclwatch/ws_XSLT/wuidcommon.xslt

@@ -603,7 +603,7 @@
         </p>
       </xsl:if>
 
-      <xsl:if test="string-length(Query/Text)">
+      <xsl:if test="string-length(Query/Text) or string-length(Query/QueryMainDefinition)">
         <div>
           <div class="wugroup">
               <div class="WuGroupHdrLeft">
@@ -613,11 +613,18 @@
               </div>
           </div>
           <div id="querysection" class="wusectioncontent">
-            <div>
-              <textarea id="query" readonly="true" wrap="off" rows="10" STYLE="width:600">
-                <xsl:value-of select="Query/Text"/>
-              </textarea>
-            </div>
+              <xsl:if test="string-length(Query/Text)">
+                  <div>
+                      <textarea id="query" readonly="true" wrap="off" rows="10" STYLE="width:600">
+                          <xsl:value-of select="Query/Text"/>
+                      </textarea>
+                  </div>
+              </xsl:if>
+              <xsl:if test="string-length(Query/QueryMainDefinition)">
+                  <div>
+                      <b>QueryMainDefinition: </b><xsl:value-of select="Query/QueryMainDefinition"/>
+                  </div>
+              </xsl:if>
           </div>
         </div>
       </xsl:if>

Tiedoston diff-näkymää rajattu, sillä se on liian suuri
+ 1 - 2
initfiles/componentfiles/configxml/@temp/esp_service_WsSMC.xsl


+ 7 - 0
initfiles/componentfiles/configxml/thor.xsd.in

@@ -562,6 +562,13 @@
           </xs:appinfo>
         </xs:annotation>
       </xs:attribute>
+      <xs:attribute name="compressInternalSpills" type="xs:boolean" default="true">
+        <xs:annotation>
+          <xs:appinfo>
+            <tooltip>Compress internal writes to disk when spilling</tooltip>
+          </xs:appinfo>
+        </xs:annotation>
+      </xs:attribute>
     </xs:complexType>
     <xs:key name="thorProcessKey1">
       <xs:selector xpath="./ThorMasterProcess|./ThorSlaveProcess"/>

+ 15 - 0
plugins/fileservices/fileservices.cpp

@@ -664,6 +664,9 @@ FILESERVICES_API char * FILESERVICES_CALL fsfSprayFixed(ICodeContext *ctx, const
     req->setOverwrite(overwrite);
     req->setReplicate(replicate);
     req->setCompress(compress);
+    if (maxConnections != -1)
+        req->setMaxConnections(maxConnections);
+
     Owned<IClientSprayFixedResponse> result = server.SprayFixed(req);
 
     StringBuffer wuid(result->getWuid());
@@ -718,6 +721,9 @@ FILESERVICES_API char * FILESERVICES_CALL fsfSprayVariable(ICodeContext *ctx, co
     req->setOverwrite(overwrite);
     req->setReplicate(replicate);
     req->setCompress(compress);
+    if (maxConnections != -1)
+        req->setMaxConnections(maxConnections);
+
     Owned<IClientSprayResponse> result = server.SprayVariable(req);
 
     StringBuffer wuid(result->getWuid());
@@ -776,6 +782,9 @@ FILESERVICES_API char * FILESERVICES_CALL fsfSprayXml(ICodeContext *ctx, const c
     req->setOverwrite(overwrite);
     req->setReplicate(replicate);
     req->setCompress(compress);
+    if (maxConnections != -1)
+        req->setMaxConnections(maxConnections);
+
     Owned<IClientSprayResponse> result = server.SprayVariable(req);
 
     StringBuffer wuid(result->getWuid());
@@ -822,6 +831,9 @@ FILESERVICES_API char * FILESERVICES_CALL fsfDespray(ICodeContext *ctx, const ch
     req->setDestIP(destinationIP);
     req->setDestPath(destinationPath);
     req->setOverwrite(overwrite);
+    if (maxConnections != -1)
+        req->setMaxConnections(maxConnections);
+
     Owned<IClientDesprayResponse> result = server.Despray(req);
 
     StringBuffer wuid(result->getWuid());
@@ -882,6 +894,9 @@ FILESERVICES_API char * FILESERVICES_CALL fsfCopy(ICodeContext *ctx, const char
         req->setPush(true);
     if (transferBufferSize>0)
         req->setTransferBufferSize(transferBufferSize);
+    if (maxConnections != -1)
+        req->setMaxConnections(maxConnections);
+
     Owned<IClientCopyResponse> result = server.Copy(req);
 
     StringBuffer wuid(result->getResult());

+ 16 - 7
roxie/ccd/ccdserver.cpp

@@ -6817,12 +6817,13 @@ public:
         {
             const void * next = input->nextInGroup();
             if(!next)
-                next = input->nextInGroup();
-            if(!next)
-            {   
-                eof = true;
-                break;
+            {
+                if (table.count() == 0)
+                    eof = true;
+                table.reset();
+                return NULL;
             }
+
             if(table.insert(next))
                 return next;
             else
@@ -10744,9 +10745,17 @@ public:
         diskout.setown(createBufferedIOStream(io));
         if (extend)
             diskout->seek(0, IFSend);
-        rowSerializer.setown(input->queryOutputMeta()->createRowSerializer(ctx->queryCodeContext(), activityId)); 
         tallycrc = !factory->queryQueryFactory().getDebugValueBool("skipFileFormatCrcCheck", false) && !(helper.getFlags() & TDRnocrccheck) && !blockcompressed;
-        outSeq.setown(createRowWriter(diskout, rowSerializer, rowAllocator, grouped, tallycrc, true )); 
+        Owned<IRowInterfaces> rowIf = createRowInterfaces(input->queryOutputMeta(), activityId, ctx->queryCodeContext());
+        rowSerializer.set(rowIf->queryRowSerializer());
+        unsigned rwFlags = rw_autoflush;
+        if(grouped)
+            rwFlags |= rw_grouped;
+        if(tallycrc)
+            rwFlags |= rw_crc;
+        if(!factory->queryQueryFactory().getDebugValueBool("skipFileFormatCrcCheck", false) && !(helper.getFlags() & TDRnocrccheck))
+            rwFlags |= rw_crc;
+        outSeq.setown(createRowWriter(diskout, rowIf, rwFlags));
     }
 
     virtual void stop(bool aborting)

+ 7 - 0
system/jlib/jcomp.cpp

@@ -426,6 +426,7 @@ bool CppCompiler::compile()
         cclog = queryCcLogName();
     Owned <IFile> dstfile = createIFile(cclog);
     dstfile->remove();
+
     Owned<IFileIO> dstIO = dstfile->open(IFOwrite);
     ForEachItemIn(i2, logFiles)
     {
@@ -437,6 +438,12 @@ bool CppCompiler::compile()
         }
     }
 
+    //Don't leave lots of blank log files around if the compile was successful
+    bool logIsEmpty = (dstIO->size() == 0);
+    dstIO.clear();
+    if (ret && logIsEmpty)
+        dstfile->remove();
+
     pool->joinAll(true, 1000);
     return ret;
 }

+ 11 - 6
system/jlib/jsocket.cpp

@@ -5130,7 +5130,7 @@ bool IpSubNet::set(const char *_net,const char *_mask)
     return true;
 }
 
-bool IpSubNet::test(const IpAddress &ip)
+bool IpSubNet::test(const IpAddress &ip) const
 {
     unsigned i;
     if (ip.getNetAddress(sizeof(i),&i)==sizeof(i)) {
@@ -5148,19 +5148,24 @@ bool IpSubNet::test(const IpAddress &ip)
     return false;
 }
 
-StringBuffer IpSubNet::getNetText(StringBuffer &text)
+StringBuffer IpSubNet::getNetText(StringBuffer &text) const
 {
     char tmp[INET6_ADDRSTRLEN];
-    return text.append(_inet_ntop(isIp4(net)?AF_INET:AF_INET6, &net, tmp, sizeof(tmp)));
+    const char *res  = ::isIp4(net) ? _inet_ntop(AF_INET, &net[3], tmp, sizeof(tmp))
+                                    : _inet_ntop(AF_INET6, &net, tmp, sizeof(tmp));
+    return text.append(res);
 }
 
-StringBuffer IpSubNet::getMaskText(StringBuffer &text)
+StringBuffer IpSubNet::getMaskText(StringBuffer &text) const
 {
     char tmp[INET6_ADDRSTRLEN];
-    return text.append(_inet_ntop(isIp4(net)?AF_INET:AF_INET6, &mask, tmp, sizeof(tmp))); // isIp4(net) is correct here
+    // isIp4(net) is correct here
+    const char *res  = ::isIp4(net) ? _inet_ntop(AF_INET, &mask[3], tmp, sizeof(tmp))
+                                    : _inet_ntop(AF_INET6, &mask, tmp, sizeof(tmp));
+    return text.append(res);
 }
 
-bool IpSubNet::isNull()
+bool IpSubNet::isNull() const
 {
     for (unsigned i=0;i<4;i++)
         if (net[i]||mask[i])

+ 10 - 4
system/jlib/jsocket.hpp

@@ -195,10 +195,16 @@ public:
     IpSubNet(const char *_net,const char *_mask)    { set(_net,_mask); }
     bool set(const char *_net,const char *_mask); // _net NULL means match everything
                                                   // _mask NULL means match exact
-    bool test(const IpAddress &ip);
-    StringBuffer getNetText(StringBuffer &text);
-    StringBuffer getMaskText(StringBuffer &text);
-    bool isNull();
+    bool test(const IpAddress &ip) const;
+    StringBuffer getNetText(StringBuffer &text) const;
+    StringBuffer getMaskText(StringBuffer &text) const;
+    bool isNull() const;
+    bool operator==(IpSubNet const &other) const
+    {
+        if ((0 == memcmp(net, other.net, sizeof(net))) && (0 == memcmp(mask, other.mask, sizeof(mask))))
+            return true;
+        return false;
+    }
 };
 
 

+ 49 - 0
testing/ecl/grouphashdedup.ecl

@@ -0,0 +1,49 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+
+idRecord :=
+            RECORD
+UNSIGNED        id1;
+UNSIGNED        id2;
+UNSIGNED        id3;
+            END;
+
+ds0 := dataset([
+    {1,1,1},
+    {1,1,2},
+    {1,2,1},
+    {2,1,1},
+    {2,2,1},
+    {2,3,1},
+    {3,1,1},
+    {99,99,99}
+    ], idRecord);
+ds := sorted(ds0, id1, id2);
+
+dedup1 := DEDUP(ds, id1, ALL);
+dedup2 := DEDUP(ds, id2, ALL);
+dedup4 := DEDUP(GROUP(ds,id1), id2, ALL); // should be within the grouping
+dedup5 := DEDUP(GROUP(ds,id1,id2), id1, ALL); // should be within the grouping
+
+sequential(
+    output(SORT(TABLE(dedup1, { id1, cnt := count(GROUP) }, id1),id1));
+    output(SORT(TABLE(group(nofold(dedup2)),{id2, cnt := count(GROUP)}, id2), id2));
+    output(SORT(TABLE(group(nofold(dedup4)), { id1, id2, cnt := count(GROUP) }, id1, id2), id1, id2));
+    output(SORT(TABLE(dedup5,{id1, id2, cnt := count(GROUP)}), id1, id2));
+);
+    

+ 49 - 0
testing/ecl/grouphashdedup2.ecl

@@ -0,0 +1,49 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+
+idRecord :=
+            RECORD
+UNSIGNED        id1;
+UNSIGNED        id2;
+UNSIGNED        id3;
+            END;
+
+ds0 := dataset([
+    {1,1,1},
+    {1,1,2},
+    {1,2,1},
+    {2,1,1},
+    {2,2,1},
+    {2,3,1},
+    {3,1,1},
+    {99,99,99}
+    ], idRecord);
+ds := sorted(ds0, id1, id2);
+
+dedup1 := DEDUP(ds, id1, ALL, HASH);
+dedup2 := DEDUP(ds, id2, ALL, HASH);
+dedup4 := DEDUP(GROUP(ds,id1), id2, ALL, HASH); // should be within the grouping
+dedup5 := DEDUP(GROUP(ds,id1,id2), id1, ALL, HASH); // should be within the grouping
+
+sequential(
+    output(SORT(TABLE(dedup1, { id1, cnt := count(GROUP) }, id1),id1));
+    output(SORT(TABLE(group(nofold(dedup2)),{id2, cnt := count(GROUP)}, id2), id2));
+    output(SORT(TABLE(group(nofold(dedup4)), { id1, id2, cnt := count(GROUP) }, id1, id2), id1, id2));
+    output(SORT(TABLE(dedup5,{id1, id2, cnt := count(GROUP)}), id1, id2));
+);
+    

+ 30 - 0
testing/ecl/key/grouphashdedup.xml

@@ -0,0 +1,30 @@
+<Dataset name='Result 1'>
+ <Row><id1>1</id1><cnt>1</cnt></Row>
+ <Row><id1>2</id1><cnt>1</cnt></Row>
+ <Row><id1>3</id1><cnt>1</cnt></Row>
+ <Row><id1>99</id1><cnt>1</cnt></Row>
+</Dataset>
+<Dataset name='Result 2'>
+ <Row><id2>1</id2><cnt>1</cnt></Row>
+ <Row><id2>2</id2><cnt>1</cnt></Row>
+ <Row><id2>3</id2><cnt>1</cnt></Row>
+ <Row><id2>99</id2><cnt>1</cnt></Row>
+</Dataset>
+<Dataset name='Result 3'>
+ <Row><id1>1</id1><id2>1</id2><cnt>1</cnt></Row>
+ <Row><id1>1</id1><id2>2</id2><cnt>1</cnt></Row>
+ <Row><id1>2</id1><id2>1</id2><cnt>1</cnt></Row>
+ <Row><id1>2</id1><id2>2</id2><cnt>1</cnt></Row>
+ <Row><id1>2</id1><id2>3</id2><cnt>1</cnt></Row>
+ <Row><id1>3</id1><id2>1</id2><cnt>1</cnt></Row>
+ <Row><id1>99</id1><id2>99</id2><cnt>1</cnt></Row>
+</Dataset>
+<Dataset name='Result 4'>
+ <Row><id1>1</id1><id2>1</id2><cnt>1</cnt></Row>
+ <Row><id1>1</id1><id2>2</id2><cnt>1</cnt></Row>
+ <Row><id1>2</id1><id2>1</id2><cnt>1</cnt></Row>
+ <Row><id1>2</id1><id2>2</id2><cnt>1</cnt></Row>
+ <Row><id1>2</id1><id2>3</id2><cnt>1</cnt></Row>
+ <Row><id1>3</id1><id2>1</id2><cnt>1</cnt></Row>
+ <Row><id1>99</id1><id2>99</id2><cnt>1</cnt></Row>
+</Dataset>

+ 30 - 0
testing/ecl/key/grouphashdedup2.xml

@@ -0,0 +1,30 @@
+<Dataset name='Result 1'>
+ <Row><id1>1</id1><cnt>1</cnt></Row>
+ <Row><id1>2</id1><cnt>1</cnt></Row>
+ <Row><id1>3</id1><cnt>1</cnt></Row>
+ <Row><id1>99</id1><cnt>1</cnt></Row>
+</Dataset>
+<Dataset name='Result 2'>
+ <Row><id2>1</id2><cnt>1</cnt></Row>
+ <Row><id2>2</id2><cnt>1</cnt></Row>
+ <Row><id2>3</id2><cnt>1</cnt></Row>
+ <Row><id2>99</id2><cnt>1</cnt></Row>
+</Dataset>
+<Dataset name='Result 3'>
+ <Row><id1>1</id1><id2>1</id2><cnt>1</cnt></Row>
+ <Row><id1>1</id1><id2>2</id2><cnt>1</cnt></Row>
+ <Row><id1>2</id1><id2>1</id2><cnt>1</cnt></Row>
+ <Row><id1>2</id1><id2>2</id2><cnt>1</cnt></Row>
+ <Row><id1>2</id1><id2>3</id2><cnt>1</cnt></Row>
+ <Row><id1>3</id1><id2>1</id2><cnt>1</cnt></Row>
+ <Row><id1>99</id1><id2>99</id2><cnt>1</cnt></Row>
+</Dataset>
+<Dataset name='Result 4'>
+ <Row><id1>1</id1><id2>1</id2><cnt>1</cnt></Row>
+ <Row><id1>1</id1><id2>2</id2><cnt>1</cnt></Row>
+ <Row><id1>2</id1><id2>1</id2><cnt>1</cnt></Row>
+ <Row><id1>2</id1><id2>2</id2><cnt>1</cnt></Row>
+ <Row><id1>2</id1><id2>3</id2><cnt>1</cnt></Row>
+ <Row><id1>3</id1><id2>1</id2><cnt>1</cnt></Row>
+ <Row><id1>99</id1><id2>99</id2><cnt>1</cnt></Row>
+</Dataset>

+ 6 - 2
thorlcr/activities/csvread/thcsvread.cpp

@@ -52,8 +52,12 @@ public:
         if (headerLines)
         {
             dst.append((int)mpTag);
-            ISuperFileDescriptor *superFDesc = fileDesc->querySuperFileDescriptor();
-            unsigned subFiles = superFDesc ? superFDesc->querySubFiles() : 1;
+            unsigned subFiles = 0;
+            if (fileDesc)
+            {
+                ISuperFileDescriptor *superFDesc = fileDesc->querySuperFileDescriptor();
+                subFiles = superFDesc ? superFDesc->querySubFiles() : 1;
+            }
             dst.append(subFiles);
         }
     }

+ 9 - 3
thorlcr/activities/diskread/thdiskreadslave.cpp

@@ -200,9 +200,15 @@ void CDiskRecordPartHandler::open()
 {
     CDiskPartHandlerBase::open();
     in.clear();
+    unsigned rwFlags = DEFAULT_RWFLAGS;
+    if (checkFileCrc) // NB: if compressed, this will be turned off by base class
+        rwFlags |= rw_crc;
+    if (activity.grouped)
+        rwFlags |= rw_grouped;
     if (compressed)
     {
-        in.setown(createCompressedRowStream(iFile, activity.queryDiskRowInterfaces(), 0, (offset_t)-1, RCUNBOUND, checkFileCrc, activity.grouped, activity.eexp));
+        rwFlags |= rw_compress;
+        in.setown(createRowStream(iFile, activity.queryDiskRowInterfaces(), rwFlags, activity.eexp));
         if (!in.get())
         {
             if (!blockCompressed)
@@ -211,8 +217,8 @@ void CDiskRecordPartHandler::open()
                 throw MakeActivityException(&activity, 0, "Failed to open block compressed file '%s'", filename.get());
         }
     }
-    else 
-        in.setown(createRowStream(iFile, activity.queryDiskRowInterfaces(), 0, (offset_t)-1, RCUNBOUND, checkFileCrc, activity.grouped));
+    else
+        in.setown(createRowStream(iFile, activity.queryDiskRowInterfaces(), rwFlags));
     if (!in)
         throw MakeActivityException(&activity, 0, "Failed to open file '%s'", filename.get());
     ActPrintLog(&activity, "%s[part=%d]: %s (%s)", kindStr, which, activity.isFixedDiskWidth ? "fixed" : "variable", filename.get());

+ 29 - 12
thorlcr/activities/enth/thenthslave.cpp

@@ -196,6 +196,23 @@ class CEnthSlaveActivity : public BaseEnthActivity
         msg.append(count);
         container.queryJob().queryJobComm().send(msg, container.queryJob().queryMyRank()+1, mpTag);
     }
+    bool getPrev()
+    {
+        if (!firstNode()) // no need if 1st node
+        {
+            CMessageBuffer msg;
+            if (!receiveMsg(msg, container.queryJob().queryMyRank()-1, mpTag))
+                return false;
+            msg.read(prevRecCount);
+        }
+        setInitialCounter(prevRecCount);
+        if (haveLocalCount()) // if local total count known, send total now
+            sendCount(prevRecCount + localRecCount);
+        else
+            prevRecCountSem.signal();
+        return true;
+    }
+
 public:
     CEnthSlaveActivity(CGraphElementBase *container) : BaseEnthActivity(container)
     { 
@@ -225,18 +242,8 @@ public:
         if (first)
         {
             first = false;
-            if (!firstNode()) // no need if 1st node
-            {
-                CMessageBuffer msg;
-                if (!receiveMsg(msg, container.queryJob().queryMyRank()-1, mpTag))
-                    return NULL;
-                msg.read(prevRecCount);
-            }
-            setInitialCounter(prevRecCount);
-            if (haveLocalCount()) // if local total count known, send total now
-                sendCount(prevRecCount + localRecCount);
-            else
-                prevRecCountSem.signal();
+            if (!getPrev())
+                return NULL;
         }
         while (!abortSoon)
         {
@@ -251,6 +258,16 @@ public:
         }
         return NULL;        
     }
+    virtual void stop()
+    {
+        // Need to ensure sequence continues, in nextRow has never been called.
+        if (first)
+        {
+            first = false;
+            getPrev();
+        }
+        BaseEnthActivity::stop();
+    }
     virtual void onInputFinished(rowcount_t localRecCount)
     {
         if (!haveLocalCount())

+ 66 - 26
thorlcr/activities/hashdistrib/thhashdistribslave.cpp

@@ -742,11 +742,10 @@ public:
         fixedEstSize = meta->querySerializedMeta()->getFixedSize();
         rowManager = activity->queryJob().queryRowManager();
 
-        bool defaultAllowSpill = activity->queryJob().getWorkUnitValueBool("allowSpillHashDist", globals->getPropBool("@allowSpillHashDist", true));
-        allowSpill = activity->queryContainer().queryXGMML().getPropBool("hint[@name=\"allow_spill\"]/@value", defaultAllowSpill);
+        allowSpill = activity->getOptBool(THOROPT_HDIST_SPILL, true);
         if (allowSpill)
             ActPrintLog(activity, "Using spilling buffer (will spill if overflows)");
-        writerPoolSize = (unsigned)activity->queryJob().getWorkUnitValueInt("hashDistWritePoolSize", globals->getPropInt("@hashDistWritePoolSize", DEFAULT_WRITEPOOLSIZE));
+        writerPoolSize = activity->getOptUInt(THOROPT_HDIST_WRITE_POOL_SIZE, DEFAULT_WRITEPOOLSIZE);
         if (writerPoolSize>numnodes)
             writerPoolSize = numnodes; // no point in more
         ActPrintLog(activity, "Writer thread pool size : %d", writerPoolSize);
@@ -1793,8 +1792,10 @@ public:
         ThorDataLinkMetaInfo info;
         in->getMetaInfo(info);
         offset_t sz = info.byteTotal;
-        if (sz==(offset_t)-1) {
+        if (sz==(offset_t)-1)
+        {
             // not great but hopefully exception not rule!
+            unsigned rwFlags = DEFAULT_RWFLAGS;
             sz = 0;
             StringBuffer tempname;
             GetTempName(tempname,"hdprop",true); // use alt temp dir
@@ -1803,10 +1804,13 @@ public:
                 ActPrintLogEx(&activity->queryContainer(), thorlog_null, MCwarning, "REDISTRIBUTE size unknown, spilling to disk");
                 MemoryAttr ma;
                 activity->startInput(in);
-                Owned<IExtRowWriter> out = createRowWriter(tempfile,serializer,activity->queryRowAllocator(),false, false, false);
+                if (activity->getOptBool(THOROPT_COMPRESS_SPILLS, true))
+                    rwFlags |= rw_compress;
+                Owned<IExtRowWriter> out = createRowWriter(tempfile, activity, rwFlags);
                 if (!out)
                     throw MakeStringException(-1,"Could not created file %s",tempname.str());
-                loop {
+                loop
+                {
                     const void * row = in->ungroupedNextRow();
                     if (!row)
                         break;
@@ -1816,7 +1820,7 @@ public:
                 sz = out->getPosition();
                 activity->stopInput(in);
             }
-            ret.setown(createSimpleRowStream(tempfile,activity));
+            ret.setown(createRowStream(tempfile, activity, rwFlags));
         }
         CMessageBuffer mb;
         mb.append(sz);
@@ -2139,20 +2143,23 @@ public:
 
 class CSpill : public CSimpleInterface, implements IRowWriter
 {
+    CActivityBase &owner;
     IRowInterfaces *rowIf;
     rowcount_t count;
     Owned<CFileOwner> spillFile;
     IRowWriter *writer;
     StringAttr desc;
-    unsigned bucketN;
+    unsigned bucketN, rwFlags;
 
 public:
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
 
-    CSpill(IRowInterfaces *_rowIf, const char *_desc, unsigned _bucketN) : rowIf(_rowIf), desc(_desc), bucketN(_bucketN)
+    CSpill(CActivityBase &_owner, IRowInterfaces *_rowIf, const char *_desc, unsigned _bucketN)
+        : owner(_owner), rowIf(_rowIf), desc(_desc), bucketN(_bucketN)
     {
         count = 0;
         writer = NULL;
+        rwFlags = DEFAULT_RWFLAGS;
     }
     ~CSpill()
     {
@@ -2167,7 +2174,9 @@ public:
         GetTempName(tempname, prefix.str(), true);
         OwnedIFile iFile = createIFile(tempname.str());
         spillFile.setown(new CFileOwner(iFile.getLink()));
-        writer = createRowWriter(iFile, rowIf->queryRowSerializer(), rowIf->queryRowAllocator());
+        if (owner.getOptBool(THOROPT_COMPRESS_SPILLS, true))
+            rwFlags |= rw_compress;
+        writer = createRowWriter(iFile, rowIf, rwFlags);
     }
     IRowStream *getReader(rowcount_t *_count=NULL) // NB: also detatches ownership of 'fileOwner'
     {
@@ -2175,7 +2184,7 @@ public:
         Owned<CFileOwner> fileOwner = spillFile.getClear();
         if (!fileOwner)
             return NULL;
-        Owned<IExtRowStream> strm = createSimpleRowStream(&fileOwner->queryIFile(), rowIf);
+        Owned<IExtRowStream> strm = createRowStream(&fileOwner->queryIFile(), rowIf, rwFlags);
         Owned<CStreamFileOwner> fileStream = new CStreamFileOwner(fileOwner, strm);
         if (_count)
             *_count = count;
@@ -2188,7 +2197,7 @@ public:
             return;
         flush();
         ::Release(writer);
-        writer =NULL;
+        writer = NULL;
     }
 // IRowWriter
     virtual void putRow(const void *row)
@@ -2393,8 +2402,9 @@ class HashDedupSlaveActivityBase : public CSlaveActivity, public CThorDataLink
 {
 protected:
     IRowStream *input;      // can be changed
+    IRowStream *initialInput;
     Owned<IRowStream> currentInput;
-    bool inputstopped, eos, extractKey, local, isVariable;
+    bool inputstopped, eos, lastEog, extractKey, local, isVariable, grouped;
     const char *actTxt;
     IHThorHashDedupArg *helper;
     IHash *iHash, *iKeyHash;
@@ -2406,7 +2416,7 @@ protected:
     SpinLock stopSpin;
     PointerArrayOf<CHashTableRowTable> _hashTables;
     CHashTableRowTable **hashTables;
-    unsigned numHashTables;
+    unsigned numHashTables, initialNumBuckets;
     roxiemem::RoxieHeapFlags allocFlags;
 
     inline CHashTableRowTable &queryHashTable(unsigned n) const { return *hashTables[n]; }
@@ -2445,7 +2455,17 @@ public:
     HashDedupSlaveActivityBase(CGraphElementBase *_container, bool _local)
         : CSlaveActivity(_container), CThorDataLink(this), local(_local)
     {
-        inputstopped = false;
+        input = initialInput = NULL;
+        actTxt = NULL;
+        initialNumBuckets = 0;
+        inputstopped = eos = lastEog = extractKey = local = isVariable = grouped = false;
+        helper = NULL;
+        iHash = iKeyHash = NULL;
+        iCompare = rowKeyCompare = NULL;
+        keyRowInterfaces = NULL;
+        hashTables = NULL;
+        numHashTables = initialNumBuckets = 0;
+        roxiemem::RoxieHeapFlags allocFlags = roxiemem::RHFnone;
     }
     ~HashDedupSlaveActivityBase()
     {
@@ -2458,8 +2478,6 @@ public:
         iHash = helper->queryHash();
         appendOutputLinked(this);
         iCompare = helper->queryCompare();
-        numHashTables = 0;
-        hashTables = NULL;
         allocFlags = queryJob().queryThorAllocator()->queryFlags();
 
         // JCSMORE - it may not be worth extracting the key,
@@ -2496,21 +2514,27 @@ public:
             rowKeyCompare = iCompare;
             iKeyHash = iHash;
         }
+        grouped = container.queryGrouped();
     }
     void start()
     {
         ActivityTimer s(totalCycles, timeActivities, NULL);
         inputstopped = false;
-        eos = false;
+        eos = lastEog = false;
         startInput(inputs.item(0));
-        input = inputs.item(0);
         ThorDataLinkMetaInfo info;
         inputs.item(0)->getMetaInfo(info);
+        initialInput = input = inputs.item(0);
         unsigned div = local ? 1 : queryJob().querySlaves(); // if global, hash values already modulated by # slaves
         bucketHandler.setown(new CBucketHandler(*this, this, keyRowInterfaces, iHash, iKeyHash, rowKeyCompare, extractKey, 0, div));
-        unsigned initialNumBuckets = container.queryXGMML().getPropInt("hint[@name=\"num_buckets\"]/@value");
+        initialNumBuckets = container.queryXGMML().getPropInt("hint[@name=\"num_buckets\"]/@value");
         if (0 == initialNumBuckets)
-            initialNumBuckets = bucketHandler->getBucketEstimate(info.totalRowsMax); // will use default if no meta total
+        {
+            if (grouped)
+                initialNumBuckets = HASHDEDUP_BUCKETS_MIN;
+            else
+                initialNumBuckets = bucketHandler->getBucketEstimate(info.totalRowsMax); // will use default if no meta total
+        }
         ensureNumHashTables(initialNumBuckets);
         bucketHandler->init(initialNumBuckets);
         dataLinkStart(actTxt, container.queryId());
@@ -2543,10 +2567,11 @@ public:
             OwnedConstThorRow row;
             {
                 SpinBlock b(stopSpin);
-                row.setown(input->ungroupedNextRow());
+                row.setown(grouped?input->nextRow():input->ungroupedNextRow());
             }
             if (row)
             {
+                lastEog = false;
                 if (bucketHandler->addRow(row)) // true if new, i.e. non-duplicate (does not take ownership)
                 {
                     dataLinkIncrement();
@@ -2558,7 +2583,7 @@ public:
                 Owned<CBucketHandler> nextBucketHandler;
                 loop
                 {
-                    // If spill event occured, disk buckets + key buckets will have been created by this stage.
+                    // If spill event occurred, disk buckets + key buckets will have been created by this stage.
                     bucketHandler->flushBuckets();
 
                     // pop off parents until one has a bucket left to read
@@ -2570,7 +2595,22 @@ public:
                         {
                             currentInput.clear();
                             bucketHandler.clear();
-                            eos = true;
+                            if (grouped)
+                            {
+                                if (lastEog)
+                                    eos = true;
+                                else
+                                {
+                                    lastEog = true;
+                                    // reset for next group
+                                    input = initialInput;
+                                    bucketHandler.setown(new CBucketHandler(*this, this, keyRowInterfaces, iHash, iKeyHash, rowKeyCompare, extractKey, 0, 1));
+                                    ensureNumHashTables(initialNumBuckets); // resets
+                                    bucketHandler->init(initialNumBuckets);
+                                }
+                            }
+                            else
+                                eos = true;
                             return NULL;
                         }
                         bucketHandler.setown(&bucketHandlerStack.popGet());
@@ -2591,7 +2631,7 @@ public:
         }
     }
 
-    virtual bool isGrouped() { return false; }
+    virtual bool isGrouped() { return grouped; }
     virtual void getMetaInfo(ThorDataLinkMetaInfo &info) = 0;
 friend class CBucketHandler;
 friend class CHashTableRowTable;
@@ -2664,7 +2704,7 @@ bool CHashTableRowTable::rehash()
 
 CBucket::CBucket(HashDedupSlaveActivityBase &_owner, IRowInterfaces *_rowIf, IRowInterfaces *_keyIf, IHash *_iRowHash, IHash *_iKeyHash, ICompare *_iCompare, bool _extractKey, unsigned _bucketN, CHashTableRowTable &_htRows)
     : owner(_owner), rowIf(_rowIf), keyIf(_keyIf), iRowHash(_iRowHash), iKeyHash(_iKeyHash), iCompare(_iCompare), extractKey(_extractKey), bucketN(_bucketN), htRows(_htRows),
-      rowSpill(_rowIf, "rows", _bucketN), keySpill(_keyIf, "keys", _bucketN)
+      rowSpill(owner, _rowIf, "rows", _bucketN), keySpill(owner, _keyIf, "keys", _bucketN)
 
 {
     spilt = false;

+ 42 - 20
thorlcr/activities/join/thjoinslave.cpp

@@ -455,18 +455,21 @@ public:
         Linked<IRowInterfaces> rowif1 = queryRowInterfaces(input1);
         Linked<IRowInterfaces> rowif2 = queryRowInterfaces(input2);
         // NB two near identical branches here - should be parameterized at some stage
-        if (rightpartition) {
+        if (rightpartition)
+        {
             ActPrintLog("JOIN partition right");
             rowcount_t totalrows;
             collaterev.collate = collate;
             collaterevupper.collate = collateupper;
-            if (nosortPrimary()) {
+            if (nosortPrimary())
+            {
                 OwnedConstThorRow partitionrow  = input2->ungroupedNextRow();
                 strm2.set(new cRowStreamPlus1Adaptor(input2,partitionrow));
                 sorter->Gather(rowif1,input1,compare1,&collaterev,collateupper?&collaterevupper:NULL,keyserializer2,partitionrow,nosortSecondary(),isUnstable(),abortSoon, rowif2); // keyserializer2 *is* correct
                 partitionrow.clear();
                 stopInput1();
-                if (abortSoon) {
+                if (abortSoon)
+                {
                     barrier->cancel();
                     return false;
                 }
@@ -477,11 +480,13 @@ public:
                 strm1.setown(sorter->startMerge(totalrows));
                 return true;
             }
-            else {
+            else
+            {
                 strm2.set(input2);
                 sorter->Gather(rowif2,input2,compare2,NULL,NULL,keyserializer2,NULL,false,isUnstable(),abortSoon, NULL); 
                 stopInput2();
-                if (abortSoon) {
+                if (abortSoon)
+                {
                     barrier->cancel();
                     return false;
                 }
@@ -493,8 +498,12 @@ public:
 
                 GetTempName(tempname.clear(),"joinspill",false); // don't use alt temp dir
                 Owned<IFile> tempf = createIFile(tempname.str());
-                Owned<IRowWriter> tmpstrm = createRowWriter(tempf,rowif2->queryRowSerializer(),rowif2->queryRowAllocator());
-                if (!tmpstrm) {
+                unsigned rwFlags = DEFAULT_RWFLAGS;
+                if (getOptBool(THOROPT_COMPRESS_SPILLS, true))
+                    rwFlags |= rw_compress;
+                Owned<IRowWriter> tmpstrm = createRowWriter(tempf, rowif2, rwFlags);
+                if (!tmpstrm)
+                {
                     ActPrintLogEx(&queryContainer(), thorlog_null, MCerror, "Cannot open %s", tempname.toCharArray());
                     throw MakeErrnoException("JoinSlaveActivity::doglobaljoin");
                 }
@@ -502,8 +511,9 @@ public:
                 tmpstrm->flush();
                 tmpstrm.clear();
                 rstrm2.clear();
-                try {
-                    strm2.setown(createSimpleRowStream(tempf,rowif2));
+                try
+                {
+                    strm2.setown(createRowStream(tempf, rowif2, rwFlags));
 
                     ActPrintLog("JOIN waiting barrier.2");
                     if (!barrier->wait(false))
@@ -512,7 +522,8 @@ public:
                     sorter->stopMerge();
                     sorter->Gather(rowif1,input1,compare1,&collaterev,collateupper?&collaterevupper:NULL,keyserializer2,NULL,nosortSecondary(),isUnstable(),abortSoon,rowif2); // keyserializer2 *is* correct
                     stopInput1();
-                    if (abortSoon) {
+                    if (abortSoon)
+                    {
                         barrier->cancel();
                         return false;
                     }
@@ -534,15 +545,18 @@ public:
                 }
             }
         }
-        else  {
+        else
+        {
             rowcount_t totalrows;
-            if (nosortPrimary()) {
+            if (nosortPrimary())
+            {
                 OwnedConstThorRow partitionrow  = input1->ungroupedNextRow();
                 strm1.set(new cRowStreamPlus1Adaptor(input1,partitionrow));
                 sorter->Gather(rowif2,input2,compare2,collate,collateupper,keyserializer1,partitionrow,nosortSecondary(),isUnstable(),abortSoon, rowif1); // keyserializer1 *is* correct
                 partitionrow.clear();
                 stopInput2();
-                if (abortSoon) {
+                if (abortSoon)
+                {
                     barrier->cancel();
                     return false;
                 }
@@ -553,11 +567,13 @@ public:
                 strm2.setown(sorter->startMerge(totalrows));
                 return true;
             }
-            else {
+            else
+            {
                 strm1.set(input1);
                 sorter->Gather(rowif1,input1,compare1,NULL,NULL,keyserializer1,NULL,false,isUnstable(),abortSoon, NULL);
                 stopInput1();
-                if (abortSoon) {
+                if (abortSoon)
+                {
                     barrier->cancel();
                     return false;
                 }
@@ -572,8 +588,12 @@ public:
 
                 GetTempName(tempname.clear(),"joinspill",false); // don't use alt temp dir
                 Owned<IFile> tempf = createIFile(tempname.str());
-                Owned<IRowWriter> tmpstrm = createRowWriter(tempf,rowif1->queryRowSerializer(),rowif1->queryRowAllocator());
-                if (!tmpstrm) {
+                unsigned rwFlags = DEFAULT_RWFLAGS;
+                if (getOptBool(THOROPT_COMPRESS_SPILLS, true))
+                    rwFlags |= rw_compress;
+                Owned<IRowWriter> tmpstrm = createRowWriter(tempf, rowif1, rwFlags);
+                if (!tmpstrm)
+                {
                     ActPrintLogEx(&queryContainer(), thorlog_null, MCerror, "Cannot open %s", tempname.toCharArray());
                     throw MakeErrnoException("JoinSlaveActivity::doglobaljoin");
                 }
@@ -581,8 +601,9 @@ public:
                 tmpstrm->flush();
                 tmpstrm.clear();
                 rstrm1.clear();
-                try {
-                    strm1.setown(createSimpleRowStream(tempf,rowif1));
+                try
+                {
+                    strm1.setown(createRowStream(tempf, rowif1, rwFlags));
 
                     ActPrintLog("JOIN waiting barrier.2");
                     if (!barrier->wait(false))
@@ -591,7 +612,8 @@ public:
                     sorter->stopMerge();
                     sorter->Gather(rowif2,input2,compare2,collate,collateupper,keyserializer1,NULL,nosortSecondary(),isUnstable(),abortSoon,rowif1); // keyserializer1 *is* correct
                     stopInput2();
-                    if (abortSoon) {
+                    if (abortSoon)
+                    {
                         barrier->cancel();
                         return false;
                     }

+ 1 - 1
thorlcr/activities/lookupjoin/thlookupjoinslave.cpp

@@ -1241,7 +1241,7 @@ public:
         else // local
         {
             if (RCUNSET != rhsTotalCount)
-                rhsRows = rhsTotalCount;
+                rhsRows = (rowidx_t)rhsTotalCount;
             else // all join, or lookup if total count unkown
                 rhsRows = rhs.ordinality();
         }

+ 1 - 1
thorlcr/activities/loop/thloop.cpp

@@ -107,7 +107,7 @@ public:
         global = !loopGraph->isLocalOnly();
         if (container.queryLocalOrGrouped())
             return;
-        maxEmptyLoopIterations = (unsigned)container.queryJob().getWorkUnitValueInt("@maxEmptyLoopIterations", 1000);
+        maxEmptyLoopIterations = getOptUInt(THOROPT_LOOP_MAX_EMPTY, 1000);
     }
     void process()
     {

+ 1 - 1
thorlcr/activities/loop/thloopslave.cpp

@@ -165,7 +165,7 @@ public:
     {
         input = NULL;
         mpTag = TAG_NULL;
-        maxEmptyLoopIterations = (unsigned)container->queryJob().getWorkUnitValueInt("@maxEmptyLoopIterations", 1000);
+        maxEmptyLoopIterations = getOptUInt(THOROPT_LOOP_MAX_EMPTY, 1000);
     }
     void init(MemoryBuffer &data, MemoryBuffer &slaveData)
     {

+ 2 - 2
thorlcr/activities/merge/thmergeslave.cpp

@@ -316,7 +316,7 @@ public:
         StringBuffer tmpname;
         GetTempName(tmpname,"merge",true); // use alt temp dir
         tmpfile.setown(createIFile(tmpname.str()));
-        Owned<IRowWriter> writer =  createRowWriter(tmpfile,queryRowSerializer(),queryRowAllocator()); 
+        Owned<IRowWriter> writer =  createRowWriter(tmpfile, this);
         CThorKeyArray sample(*this, this, helper->querySerialize(), helper->queryCompare(), helper->queryCompareKey(), helper->queryCompareRowKey());
         sample.setSampling(MERGE_TRANSFER_BUFFER_SIZE);
         ActPrintLog("MERGE: start gather");
@@ -360,7 +360,7 @@ public:
         offset_t end = partitionpos[idx];
         if (pos>=end)
             return 0;
-        Owned<IExtRowStream> rs = createRowStream(tmpfile,queryRowInterfaces(this),pos,end,RCUNBOUND,false,false); // this is not good
+        Owned<IExtRowStream> rs = createRowStreamEx(tmpfile, queryRowInterfaces(this), pos, end); // this is not good
         offset_t so = rs->getOffset();
         size32_t len = 0;
         size32_t chunksize = chunkmaxsize;

+ 1 - 1
thorlcr/activities/nsplitter/thnsplitterslave.cpp

@@ -310,7 +310,7 @@ public:
         ForEachItemIn(o, container.outputs)
             appendOutput(new CDelayedInput(*this));
         IHThorSplitArg *helper = (IHThorSplitArg *)queryHelper();
-        int dV = (int)container.queryJob().getWorkUnitValueInt("splitterSpills", -1);
+        int dV = getOptInt(THOROPT_SPLITTER_SPILL, -1);
         if (-1 == dV)
         {
             spill = !helper->isBalanced();

+ 13 - 28
thorlcr/activities/spill/thspillslave.cpp

@@ -82,29 +82,12 @@ public:
         cd->createDirectory();
 
         IHThorSpillArg *helper = (IHThorSpillArg *)queryHelper();
-        Owned<IRecordSize> rSz;
-        if (!grouped)
-            rSz.set(helper->queryDiskRecordSize());
-        else
-        {
-            class GroupedRecordSize : public CSimpleInterface, implements IRecordSize
-            {
-                IRecordSize *rSz;
-            public:
-                IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
-                GroupedRecordSize(IRecordSize *_rSz) { rSz = LINK(_rSz); }
-                ~GroupedRecordSize() { ::Release(rSz); }
-                virtual size32_t getRecordSize(const void *rec) { return rSz->getRecordSize(rec) + 1; }
-                virtual size32_t getFixedSize() const { return rSz->getFixedSize()?(rSz->getFixedSize()+1):0; }
-            };
-            rSz.setown(new GroupedRecordSize(helper->queryDiskRecordSize()));
-        }
-
         void *ekey;
         size32_t ekeylen;
         helper->getEncryptKey(ekeylen,ekey);
         Owned<ICompressor> ecomp;
-        if (ekeylen!=0) {
+        if (ekeylen!=0)
+        {
             ecomp.setown(createAESCompressor256(ekeylen,ekey));
             memset(ekey,0,ekeylen);
             free(ekey);
@@ -114,23 +97,25 @@ public:
         Owned<IFileIO> iFileIO;
         bool fixedRecordSize = queryRowMetaData()->isFixedSize();
         size32_t minrecsize = queryRowMetaData()->getMinRecordSize();
-        if (compress)
-            iFileIO.setown(createCompressedFileWriter(file, fixedRecordSize?(minrecsize+(grouped?sizeof(byte):0)):0, false, true, ecomp));
-        else
-            iFileIO.setown(file->open(IFOcreate));
-        if (!iFileIO)
-            throw MakeActivityException(this, 0, "Failed to create temporary file: %s", fileName.str());
+
         if (fixedRecordSize)
             ActPrintLog("SPILL: created fixed output %s recsize=%u", (0!=ekeylen)?"[encrypted]":compress?"[compressed]":"",minrecsize);
         else
             ActPrintLog("SPILL: created variable output %s, minrecsize=%u", (0!=ekeylen)?"[encrypted]":compress?"[compressed]":"",minrecsize);
-        Owned<IFileIOStream> filestrm = createBufferedIOStream(iFileIO);
-        out.setown(createRowWriter(filestrm,queryRowSerializer(),queryRowAllocator(),grouped,!compress,false)); 
+        unsigned rwFlags = (DEFAULT_RWFLAGS & ~rw_autoflush); // flushed by close()
+        if (compress)
+            rwFlags |= rw_compress;
+        else
+            rwFlags |= rw_crc; // only if !compress
+        if (grouped)
+            rwFlags |= rw_grouped;
+        out.setown(createRowWriter(file, this, rwFlags));
     }
 
     void close()
     {
-        if (out) {
+        if (out)
+        {
             if (compress)
                 out->flush();
             else

+ 6 - 1
thorlcr/activities/thdiskbaseslave.cpp

@@ -340,7 +340,12 @@ void CDiskWriteSlaveActivityBase::open()
     else
     {
         stream.setown(createIOStream(iFileIO));
-        out.setown(createRowWriter(stream,::queryRowSerializer(input),::queryRowAllocator(input),grouped,calcFileCrc,false)); // flushed by close
+        unsigned rwFlags = 0;
+        if (grouped)
+            rwFlags |= rw_grouped;
+        if (calcFileCrc)
+            rwFlags |= rw_crc;
+        out.setown(createRowWriter(stream, ::queryRowInterfaces(input), rwFlags));
     }
     CDfsLogicalFileName dlfn;
     dlfn.set(logicalFilename);

+ 40 - 7
thorlcr/graph/thgraph.cpp

@@ -2356,11 +2356,11 @@ void CJobBase::init()
 #endif
     bool crcChecking = 0 != getWorkUnitValueInt("THOR_ROWCRC", globals->getPropBool("@THOR_ROWCRC", defaultCrcChecking));
     bool usePackedAllocator = 0 != getWorkUnitValueInt("THOR_PACKEDALLOCATOR", globals->getPropBool("@THOR_PACKEDALLOCATOR", false));
-    unsigned memorySpillAt = getWorkUnitValueInt("memorySpillAt", globals->getPropInt("@memorySpillAt", 80));
+    unsigned memorySpillAt = (unsigned)getWorkUnitValueInt("memorySpillAt", globals->getPropInt("@memorySpillAt", 80));
     thorAllocator.setown(createThorAllocator(((memsize_t)globalMemorySize)*0x100000, memorySpillAt, crcChecking, usePackedAllocator));
 
     unsigned defaultMemMB = globalMemorySize*3/4;
-    unsigned largeMemSize = getOptInt("@largeMemSize", defaultMemMB);
+    unsigned largeMemSize = getOptUInt("@largeMemSize", defaultMemMB);
     if (globalMemorySize && largeMemSize >= globalMemorySize)
         throw MakeStringException(0, "largeMemSize(%d) can not exceed globalMemorySize(%d)", largeMemSize, globalMemorySize);
     PROGLOG("Global memory size = %d MB, memory spill at = %d%%, large mem size = %d MB", globalMemorySize, memorySpillAt, largeMemSize);
@@ -2597,16 +2597,29 @@ mptag_t CJobBase::deserializeMPTag(MemoryBuffer &mb)
     return tag;
 }
 
-unsigned CJobBase::getOptInt(const char *opt, unsigned dft)
+// these getX methods for property in workunit settings, then global setting, defaulting to provided 'dft' if not present
+bool CJobBase::getOptBool(const char *opt, bool dft)
 {
-    const char *wOpt = (opt&&(*opt)=='@') ? opt+1 : opt; // strip @ for options in workunit
-    return (unsigned)getWorkUnitValueInt(wOpt, globals->getPropInt(opt, dft));
+    if (!opt || !*opt)
+        return dft; // probably error
+    VStringBuffer gOpt("@%s", opt);
+    return getWorkUnitValueBool(opt, globals->getPropBool(gOpt, dft));
+}
+
+int CJobBase::getOptInt(const char *opt, int dft)
+{
+    if (!opt || !*opt)
+        return dft; // probably error
+    VStringBuffer gOpt("@%s", opt);
+    return (int)getWorkUnitValueInt(opt, globals->getPropInt(gOpt, dft));
 }
 
 __int64 CJobBase::getOptInt64(const char *opt, __int64 dft)
 {
-    const char *wOpt = (opt&&(*opt)=='@') ? opt+1 : opt; // strip @ for options in workunit
-    return getWorkUnitValueInt(opt, globals->getPropInt64(opt, dft));
+    if (!opt || !*opt)
+        return dft; // probably error
+    VStringBuffer gOpt("@%s", opt);
+    return getWorkUnitValueInt(opt, globals->getPropInt64(gOpt, dft));
 }
 
 // IGraphCallback
@@ -2780,3 +2793,23 @@ void CActivityBase::cancelReceiveMsg(const rank_t rank, const mptag_t mpTag)
         container.queryJob().queryJobComm().cancel(rank, mpTag);
 }
 
+bool CActivityBase::getOptBool(const char *prop, bool defVal) const
+{
+    bool def = queryJob().getOptBool(prop, defVal);
+    VStringBuffer path("hint[@name=\"%s\"]/@value", prop);
+    return container.queryXGMML().getPropBool(path.str(), def);
+}
+
+int CActivityBase::getOptInt(const char *prop, int defVal) const
+{
+    int def = queryJob().getOptInt(prop, defVal);
+    VStringBuffer path("hint[@name=\"%s\"]/@value", prop);
+    return container.queryXGMML().getPropInt(path.str(), def);
+}
+
+__int64 CActivityBase::getOptInt64(const char *prop, __int64 defVal) const
+{
+    __int64 def = queryJob().getOptInt64(prop, defVal);
+    VStringBuffer path("hint[@name=\"%s\"]/@value", prop);
+    return container.queryXGMML().getPropInt64(path.str(), def);
+}

+ 10 - 1
thorlcr/graph/thgraph.hpp

@@ -898,8 +898,11 @@ public:
     mptag_t allocateMPTag();
     void freeMPTag(mptag_t tag);
     mptag_t deserializeMPTag(MemoryBuffer &mb);
-    unsigned getOptInt(const char *opt, unsigned dft=0);
+    bool getOptBool(const char *opt, bool dft=false);
+    int getOptInt(const char *opt, int dft=0);
+    unsigned getOptUInt(const char *opt, unsigned dft=0) { return (unsigned)getOptInt(opt, dft); }
     __int64 getOptInt64(const char *opt, __int64 dft=0);
+    unsigned __int64 getOptUInt64(const char *opt, unsigned __int64 dft=0) { return (unsigned __int64)getOptInt64(opt, dft); }
 
     virtual void abort(IException *e);
     virtual IBarrier *createBarrier(mptag_t tag) { UNIMPLEMENTED; return NULL; }
@@ -988,6 +991,12 @@ public:
     virtual IOutputMetaData *queryRowMetaData() { return baseHelper->queryOutputMeta(); }
     virtual unsigned queryActivityId() { return (unsigned)container.queryId(); }
     virtual ICodeContext *queryCodeContext() { return container.queryCodeContext(); }
+
+    bool getOptBool(const char *prop, bool defVal=false) const;
+    int getOptInt(const char *prop, int defVal=0) const;
+    unsigned getOptUInt(const char *prop, unsigned defVal=0) const { return (unsigned)getOptInt(prop, defVal); }
+    __int64 getOptInt64(const char *prop, __int64 defVal=0) const;
+    unsigned __int64 getOptUInt64(const char *prop, unsigned __int64 defVal=0) const { return (unsigned __int64)getOptInt64(prop, defVal); }
 };
 
 interface IFileInProgressHandler : extends IInterface

+ 7 - 0
thorlcr/master/thmastermain.cpp

@@ -606,6 +606,13 @@ int main( int argc, char *argv[]  )
             FLLOG(MCoperatorError, thorJob, "ERROR: Validate failure(s) detected, exiting Thor");
             return globals->getPropBool("@validateDAFSretCode"); // default is no recycle!
         }
+
+        if (globals->getPropBool("@useNASTranslation", true))
+        {
+            Owned<IPropertyTree> filteredNasConfig = envGetInstallNASHooks();
+            if (filteredNasConfig)
+                globals->setPropTree("NAS", filteredNasConfig.getClear()); // for use by slaves
+        }
         
         HardwareInfo hdwInfo;
         getHardwareInfo(hdwInfo);

+ 1 - 1
thorlcr/msort/tsorta.cpp

@@ -455,7 +455,7 @@ void CThorKeyArray::calcPositions(IFile *file,CThorKeyArray &sample)
         if (pos==(offset_t)-1) 
             pos = 0;
         // should do bin-chop for fixed length but initially do sequential search
-        Owned<IRowStream> s = createRowStream(file,rowif,pos,(offset_t)-1,RCUNBOUND,false,false);
+        Owned<IRowStream> s = createRowStreamEx(file, rowif, pos);
         loop
         {
             OwnedConstThorRow rowcmp = s->nextRow();

+ 6 - 6
thorlcr/msort/tsorts.cpp

@@ -152,12 +152,12 @@ class CWriteIntercept : public CSimpleInterface
         Linked<CWriteIntercept> parent;
         Owned<IRowStream> stream;
         offset_t startOffset;
-        unsigned __int64 max;
+        rowcount_t max;
     public:
         IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
-        CFileOwningStream(CWriteIntercept *_parent, offset_t _startOffset, unsigned __int64 _max) : parent(_parent), startOffset(_startOffset), max(_max)
+        CFileOwningStream(CWriteIntercept *_parent, offset_t _startOffset, rowcount_t _max) : parent(_parent), startOffset(_startOffset), max(_max)
         {
-            stream.setown(createRowStream(parent->dataFile, parent->rowIf, startOffset, (offset_t)-1, max, false, false));
+            stream.setown(createRowStreamEx(parent->dataFile, parent->rowIf, startOffset, (offset_t)-1, max));
         }
         virtual const void *nextRow() { return stream->nextRow(); }
         virtual void stop() { stream->stop(); }
@@ -187,7 +187,7 @@ public:
         StringBuffer tempname;
         GetTempName(tempname,"srtmrg",false);
         dataFile.setown(createIFile(tempname.str()));
-        Owned<IExtRowWriter> output = createRowWriter(dataFile, rowIf->queryRowSerializer(), rowIf->queryRowAllocator(), false, false, false);
+        Owned<IExtRowWriter> output = createRowWriter(dataFile, rowIf);
 
         bool overflowed = false;
         ActPrintLog(&activity, "Local Overflow Merge start");
@@ -236,7 +236,7 @@ public:
         ActPrintLog(&activity, "Local Overflow Merge done: overflow file '%s', size = %"I64F"d", dataFile->queryFilename(), dataFile->size());
         return end;
     }
-    IRowStream *getStream(offset_t startOffset, unsigned __int64 max)
+    IRowStream *getStream(offset_t startOffset, rowcount_t max)
     {
         return new CFileOwningStream(this, startOffset, max);
     }
@@ -1001,7 +1001,7 @@ public:
             return true;
         }
         Owned<IFile> file = createIFile(filename);
-        Owned<IExtRowStream> rowstream = createSimpleRowStream(file,auxrowif);
+        Owned<IExtRowStream> rowstream = createRowStream(file, auxrowif);
         OwnedConstThorRow row = rowstream->nextRow();
         if (!row) {
             rowbuf = NULL;

+ 5 - 0
thorlcr/slave/thslavemain.cpp

@@ -46,6 +46,7 @@
 #include "slave.hpp"
 #include "portlist.h"
 #include "dafdesc.hpp"
+#include "rmtfile.hpp"
 
 #include "slavmain.hpp"
 
@@ -335,6 +336,10 @@ int main( int argc, char *argv[]  )
             }
             setPasswordsFromSDS();
 #endif
+            IDaFileSrvHook *daFileSrvHook = queryDaFileSrvHook();
+            if (daFileSrvHook) // probably always installed
+                daFileSrvHook->addSubnetFilters(globals->queryPropTree("NAS"), NULL);
+
             StringBuffer thorPath;
             globals->getProp("@thorPath", thorPath);
             recursiveCreateDirectory(thorPath.str());

+ 41 - 12
thorlcr/thorutil/thmem.cpp

@@ -180,7 +180,7 @@ class CSpillableStreamBase : public CSimpleInterface, implements roxiemem::IBuff
 protected:
     CActivityBase &activity;
     IRowInterfaces *rowIf;
-    bool preserveNulls, ownsRows;
+    bool preserveNulls, ownsRows, useCompression;
     CThorSpillableRowArray rows;
     OwnedIFile spillFile;
     Owned<IRowStream> spillStream;
@@ -196,7 +196,7 @@ protected:
         GetTempName(tempname,"streamspill", true);
         spillFile.setown(createIFile(tempname.str()));
 
-        rows.save(*spillFile); // saves committed rows
+        rows.save(*spillFile, useCompression); // saves committed rows
         rows.noteSpilled(numRows);
         return true;
     }
@@ -208,6 +208,7 @@ public:
         : activity(_activity), rowIf(_rowIf), rows(_activity, _rowIf, _preserveNulls), preserveNulls(_preserveNulls)
     {
         rows.swap(inRows);
+        useCompression = false;
         activity.queryJob().queryRowManager()->addRowBuffer(this);
     }
     ~CSpillableStreamBase()
@@ -266,7 +267,10 @@ class CSharedSpillableRowSet : public CSpillableStreamBase, implements IInterfac
             else if (owner->spillFile) // i.e. has spilt
             {
                 assertex(((offset_t)-1) != outputOffset);
-                spillStream.setown(::createRowStream(owner->spillFile, owner->rowIf, outputOffset, (offset_t)-1, (unsigned __int64)-1, false, owner->preserveNulls));
+                unsigned rwFlags = DEFAULT_RWFLAGS;
+                if (owner->preserveNulls)
+                    rwFlags |= rw_grouped;
+                spillStream.setown(::createRowStreamEx(owner->spillFile, owner->rowIf, outputOffset, (offset_t)-1, (unsigned __int64)-1, rwFlags));
                 return spillStream->nextRow();
             }
             return owner->rows.get(pos++);
@@ -297,7 +301,12 @@ public:
             // already spilled?
             CThorSpillableRowArray::CThorSpillableRowArrayLock block(rows);
             if (spillFile)
-                return ::createRowStream(spillFile, rowIf, 0, (offset_t)-1, (unsigned __int64)-1, false, preserveNulls);
+            {
+                unsigned rwFlags = DEFAULT_RWFLAGS;
+                if (preserveNulls)
+                    rwFlags |= rw_grouped;
+                return ::createRowStream(spillFile, rowIf, rwFlags);
+            }
         }
         return new CStream(*this);
     }
@@ -315,6 +324,7 @@ public:
     CSpillableStream(CActivityBase &_activity, CThorSpillableRowArray &inRows, IRowInterfaces *_rowIf, bool _preserveNulls)
         : CSpillableStreamBase(_activity, inRows, _rowIf, _preserveNulls)
     {
+        useCompression = activity.getOptBool(THOROPT_COMPRESS_SPILLS, true);
         pos = numReadRows = 0;
         granularity = 500; // JCSMORE - rows
 
@@ -341,7 +351,12 @@ public:
             CThorSpillableRowArray::CThorSpillableRowArrayLock block(rows);
             if (spillFile)
             {
-                spillStream.setown(createRowStream(spillFile, rowIf, 0, (offset_t)-1, (unsigned __int64)-1, false, preserveNulls));
+                unsigned rwFlags = DEFAULT_RWFLAGS;
+                if (preserveNulls)
+                    rwFlags |= rw_grouped;
+                if (useCompression)
+                    rwFlags |= rw_compress;
+                spillStream.setown(createRowStream(spillFile, rowIf, rwFlags));
                 return spillStream->nextRow();
             }
             rowidx_t fetch = rows.numCommitted();
@@ -1015,15 +1030,24 @@ void CThorSpillableRowArray::sort(ICompare &compare, unsigned maxCores)
     }
 }
 
-rowidx_t CThorSpillableRowArray::save(IFile &iFile, rowidx_t watchRecNum, offset_t *watchFilePosResult)
+rowidx_t CThorSpillableRowArray::save(IFile &iFile, bool useCompression)
 {
     rowidx_t n = numCommitted();
     if (0 == n)
         return 0;
+    ActPrintLog(&activity, "CThorSpillableRowArray::save %"RIPF"d rows", n);
+
+    if (useCompression)
+        assertex(0 == writeCallbacks.ordinality()); // incompatible
+
+    unsigned rwFlags = DEFAULT_RWFLAGS;
+    if (useCompression)
+        rwFlags |= rw_compress;
+    if (allowNulls)
+        rwFlags |= rw_grouped;
+    Owned<IExtRowWriter> writer = createRowWriter(&iFile, rowIf, rwFlags);
+
     const void **rows = getBlock(n);
-    Owned<IExtRowWriter> writer = createRowWriter(&iFile, rowIf->queryRowSerializer(), rowIf->queryRowAllocator(), allowNulls, false, true);
-    ActPrintLog(&activity, "CThorSpillableRowArray::save %"RIPF"d rows", numRows);
-    offset_t startPos = writer->getPosition();
     for (rowidx_t i=0; i < n; i++)
     {
         const void *row = rows[i];
@@ -1041,7 +1065,7 @@ rowidx_t CThorSpillableRowArray::save(IFile &iFile, rowidx_t watchRecNum, offset
         }
     }
     writer->flush();
-    offset_t bytesWritten = writer->getPosition() - startPos;
+    offset_t bytesWritten = writer->getPosition();
     writer.clear();
     ActPrintLog(&activity, "CThorSpillableRowArray::save done, bytes = %"I64F"d", (__int64)bytesWritten);
     return n;
@@ -1138,7 +1162,7 @@ protected:
         GetTempName(tempname,"srtspill",true);
         Owned<IFile> iFile = createIFile(tempname.str());
         spillFiles.append(new CFileOwner(iFile.getLink()));
-        spillableRows.save(*iFile); // saves committed rows
+        spillableRows.save(*iFile, activity.getOptBool(THOROPT_COMPRESS_SPILLS, true)); // saves committed rows
         spillableRows.noteSpilled(numRows);
 
         ++overflowCount;
@@ -1213,7 +1237,12 @@ protected:
         ForEachItemIn(f, spillFiles)
         {
             CFileOwner *fileOwner = spillFiles.item(f);
-            Owned<IExtRowStream> strm = createRowStream(&fileOwner->queryIFile(), rowIf, 0, (offset_t) -1, (unsigned __int64)-1, false, preserveGrouping);
+            unsigned rwFlags = DEFAULT_RWFLAGS;
+            if (activity.getOptBool(THOROPT_COMPRESS_SPILLS, true))
+                rwFlags |= rw_compress;
+            if (preserveGrouping)
+                rwFlags |= rw_grouped;
+            Owned<IExtRowStream> strm = createRowStream(&fileOwner->queryIFile(), rowIf, rwFlags);
             instrms.append(* new CStreamFileOwner(fileOwner, strm));
         }
 

+ 1 - 1
thorlcr/thorutil/thmem.hpp

@@ -416,7 +416,7 @@ public:
 
     //A thread calling the following functions must own the lock, or guarantee no other thread will access
     void sort(ICompare & compare, unsigned maxcores);
-    rowidx_t save(IFile &file, rowidx_t watchRecNum=(rowidx_t)-1, offset_t *watchFilePosResult=NULL);
+    rowidx_t save(IFile &file, bool useCompression);
     const void **getBlock(rowidx_t readRows);
     inline void noteSpilled(rowidx_t spilledRows)
     {

+ 7 - 0
thorlcr/thorutil/thormisc.hpp

@@ -45,6 +45,13 @@
     #define graph_decl
 #endif
 
+/// Thor options, that can be hints, workunit options, or global settings
+#define THOROPT_COMPRESS_SPILLS       "compressInternalSpills"
+#define THOROPT_HDIST_SPILL           "hdistSpill"
+#define THOROPT_HDIST_WRITE_POOL_SIZE "hdistSendPoolSize"
+#define THOROPT_SPLITTER_SPILL        "splitterSpill"
+#define THOROPT_LOOP_MAX_EMPTY        "loopMaxEmpty"
+
 #define INITIAL_SELFJOIN_MATCH_WARNING_LEVEL 20000  // max of row matches before selfjoin emits warning
 
 #define THOR_SEM_RETRY_TIMEOUT 2