瀏覽代碼

HPCC-9920 Add and use new LZ4 compression algos for spill

Remove Density support until next update due to too many issues
Common up flz and lz4 into fcmp base class

Signed-off-by: Mark Kelly <mark.kelly@lexisnexis.com>
Mark Kelly 10 年之前
父節點
當前提交
821c455a75

+ 3 - 0
.gitmodules

@@ -34,3 +34,6 @@
 [submodule "plugins/kafka/librdkafka"]
 	path = plugins/kafka/librdkafka
 	url = https://github.com/hpcc-systems/librdkafka.git
+[submodule "system/lz4_sm/lz4"]
+    path = system/lz4_sm/lz4
+    url = https://github.com/hpcc-systems/lz4.git

+ 3 - 3
common/thorhelper/thorcommon.cpp

@@ -1549,13 +1549,13 @@ IExtRowWriter *createRowWriter(IFile *iFile, IRowInterfaces *rowIf, unsigned fla
         size32_t fixedSize = rowIf->queryRowMetaData()->querySerializedDiskMeta()->getFixedSize();
         if (fixedSize && TestRwFlag(flags, rw_grouped))
             ++fixedSize; // row writer will include a grouping byte
-        iFileIO.setown(createCompressedFileWriter(iFile, fixedSize, TestRwFlag(flags, rw_extend), TestRwFlag(flags, rw_compressblkcrc), compressor, TestRwFlag(flags, rw_fastlz)));
+        iFileIO.setown(createCompressedFileWriter(iFile, fixedSize, TestRwFlag(flags, rw_extend), TestRwFlag(flags, rw_compressblkcrc), compressor, getCompMethod(flags)));
     }
     else
         iFileIO.setown(iFile->open((flags & rw_extend)?IFOwrite:IFOcreate));
     if (!iFileIO)
         return NULL;
-    flags &= ~((unsigned)(rw_compress|rw_fastlz|rw_compressblkcrc));
+    flags &= ~COMP_MASK;
     return createRowWriter(iFileIO, rowIf, flags);
 }
 
@@ -1576,7 +1576,7 @@ IExtRowWriter *createRowWriter(IFileIO *iFileIO, IRowInterfaces *rowIf, unsigned
 
 IExtRowWriter *createRowWriter(IFileIOStream *strm, IRowInterfaces *rowIf, unsigned flags)
 {
-    if (0 != (flags & (rw_compress|rw_fastlz|rw_extend|rw_buffered|rw_compressblkcrc)))
+    if (0 != (flags & (rw_extend|rw_buffered|COMP_MASK)))
         throw MakeStringException(0, "Unsupported createRowWriter flags");
     Owned<CRowStreamWriter> writer = new CRowStreamWriter(strm, rowIf->queryRowSerializer(), rowIf->queryRowAllocator(), TestRwFlag(flags, rw_grouped), TestRwFlag(flags, rw_crc), TestRwFlag(flags, rw_autoflush));
     return writer.getClear();

+ 49 - 1
common/thorhelper/thorcommon.hpp

@@ -20,6 +20,7 @@
 
 #include "jiface.hpp"
 #include "jcrc.hpp"
+#include "jlzw.hpp"
 #include "jsort.hpp"
 #include "jdebug.hpp"
 #include "jfile.hpp"
@@ -90,11 +91,58 @@ enum RowReaderWriterFlags
     rw_compressblkcrc = 0x10, // block compression, this sets/checks crc's at block level
     rw_fastlz         = 0x20, // if rw_compress
     rw_autoflush      = 0x40,
-    rw_buffered       = 0x80
+    rw_buffered       = 0x80,
+    rw_lzw            = 0x100, // if rw_compress
+    rw_lz4            = 0x200  // if rw_compress
 };
 #define DEFAULT_RWFLAGS (rw_buffered|rw_autoflush|rw_compressblkcrc)
 inline bool TestRwFlag(unsigned flags, RowReaderWriterFlags flag) { return 0 != (flags & flag); }
 
+#define COMP_MASK (rw_compress|rw_compressblkcrc|rw_fastlz|rw_lzw|rw_lz4)
+#define COMP_TYPE_MASK (rw_fastlz|rw_lzw|rw_lz4)
+inline void setCompFlag(const StringBuffer compStr, unsigned &flags)
+{
+    flags &= ~COMP_TYPE_MASK;
+    if (compStr.length())
+    {
+        if (0 == stricmp("FLZ", compStr.str()))
+            flags |= rw_fastlz;
+        else if (0 == stricmp("LZ4", compStr.str()))
+            flags |= rw_lz4;
+        else // not specifically FLZ or LZ4 so set to LZW (or rowdif)
+            flags |= rw_lzw;
+    }
+    else // default is LZ4
+        flags |= rw_lz4;
+}
+
+inline unsigned getCompMethod(unsigned flags)
+{
+    unsigned compMethod = COMPRESS_METHOD_LZW;
+    if (TestRwFlag(flags, rw_lzw))
+        compMethod = COMPRESS_METHOD_LZW;
+    else if (TestRwFlag(flags, rw_fastlz))
+        compMethod = COMPRESS_METHOD_FASTLZ;
+    else if (TestRwFlag(flags, rw_lz4))
+        compMethod = COMPRESS_METHOD_LZ4;
+    return compMethod;
+}
+
+inline unsigned getCompMethod(const StringBuffer compStr)
+{
+    unsigned compMethod = COMPRESS_METHOD_LZW;
+    if (compStr.length())
+    {
+        if (0 == stricmp("FLZ", compStr.str()))
+            compMethod = COMPRESS_METHOD_FASTLZ;
+        else if (0 == stricmp("LZ4", compStr.str()))
+            compMethod = COMPRESS_METHOD_LZ4;
+    }
+    else // default is LZ4
+        compMethod = COMPRESS_METHOD_LZ4;
+    return compMethod;
+}
+
 interface IExtRowStream: extends IRowStream
 {
     virtual offset_t getOffset() = 0;

+ 1 - 0
system/CMakeLists.txt

@@ -17,6 +17,7 @@ HPCC_ADD_SUBDIRECTORY (hrpc)
 HPCC_ADD_SUBDIRECTORY (include "PLATFORM")
 HPCC_ADD_SUBDIRECTORY (jhtree)
 HPCC_ADD_SUBDIRECTORY (jlib)
+HPCC_ADD_SUBDIRECTORY (lz4_sm)
 HPCC_ADD_SUBDIRECTORY (lzma)
 HPCC_ADD_SUBDIRECTORY (mp)
 HPCC_ADD_SUBDIRECTORY (security)

+ 5 - 4
system/jlib/CMakeLists.txt

@@ -48,8 +48,6 @@ set (    SRCS
          jexcept.cpp 
          jfile.cpp 
          jflz.cpp 
-         jlz4.cpp
-         lz4.c
          jhash.cpp 
          jiface.cpp 
          jio.cpp 
@@ -57,6 +55,7 @@ set (    SRCS
          jkeyboard.cpp 
          jlib.cpp 
          jlog.cpp 
+         jlz4.cpp
          jlzma.cpp 
          jlzw.cpp 
          jmalloc.cpp 
@@ -101,11 +100,10 @@ set (    INCLUDES
         jencrypt.hpp
         jerror.hpp
         jexcept.hpp
+        jfcmp.hpp
         jfile.hpp
         jfile.ipp
         jflz.hpp
-        jlz4.hpp
-        lz4.h
         jhash.hpp
         jhash.ipp
         jheap.hpp
@@ -122,6 +120,7 @@ set (    INCLUDES
         jliball.hpp
         jlog.hpp
         jlog.ipp
+        jlz4.hpp
         jlzma.hpp
         jlzw.hpp
         jlzw.ipp
@@ -176,6 +175,7 @@ include_directories (
          ../../system/win32 
          ../../system/include 
          ../../system/lzma
+         ../../system/lz4_sm/lz4/lib
          ${CMAKE_CURRENT_BINARY_DIR}  # for generated jelog.h file 
          ${CMAKE_BINARY_DIR}
          ${CMAKE_BINARY_DIR}/oss
@@ -187,6 +187,7 @@ HPCC_ADD_LIBRARY( jlib SHARED ${SRCS} ${INCLUDES} )
 
 target_link_libraries ( jlib
         lzma
+        lz4
        )
 
 if ( ${USE_TBB} )

+ 477 - 0
system/jlib/jfcmp.hpp

@@ -0,0 +1,477 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2015 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+#include "platform.h"
+#include "jlzw.hpp"
+
+#define COMMITTED ((size32_t)-1)
+
+#define FCMP_BUFFER_SIZE (0x100000)
+
+class jlib_decl CFcmpCompressor : public CSimpleInterfaceOf<ICompressor>
+{
+protected:
+    size32_t blksz;
+    size32_t bufalloc;
+    MemoryBuffer inma;      // equals blksize len
+    MemoryBuffer *outBufMb; // used when dynamic output buffer (when open() used)
+    size32_t outBufStart;
+    byte *inbuf;
+    size32_t inmax;         // remaining
+    size32_t inlen;
+    size32_t inlenblk;      // set to COMMITTED when so
+    bool trailing;
+    byte *outbuf;
+    size32_t outlen;
+    size32_t wrmax;
+    size32_t dynamicOutSz;
+
+    virtual void setinmax() = 0;
+    virtual void flushcommitted() = 0;
+
+    void initCommon()
+    {
+        blksz = inma.capacity();
+        *(size32_t *)outbuf = 0;
+        outlen = sizeof(size32_t);
+        inlen = 0;
+        inlenblk = COMMITTED;
+        setinmax();
+    }
+
+public:
+    CFcmpCompressor()
+    {
+        outlen = 0;
+        outbuf = NULL;      // only set on close
+        bufalloc = 0;
+        wrmax = 0;          // set at open
+        dynamicOutSz = 0;
+        outBufMb = NULL;
+        outBufStart = 0;
+        inbuf = NULL;
+    }
+
+    virtual ~CFcmpCompressor()
+    {
+        if (bufalloc)
+            free(outbuf);
+    }
+
+    virtual void open(void *buf,size32_t max)
+    {
+        if (max<1024)
+            throw MakeStringException(-1,"CFcmpCompressor::open - block size (%d) not large enough", blksz);
+        wrmax = max;
+        if (buf)
+        {
+            if (bufalloc)
+                free(outbuf);
+            bufalloc = 0;
+            outbuf = (byte *)buf;
+        }
+        else if (max>bufalloc)
+        {
+            if (bufalloc)
+                free(outbuf);
+            outbuf = (byte *)malloc(max);
+            if (!outbuf)
+                throw MakeStringException(-1,"CFcmpCompressor::open - out of memory, requesting %d bytes", max);
+            bufalloc = max;
+        }
+        outBufMb = NULL;
+        outBufStart = 0;
+        dynamicOutSz = 0;
+        inbuf = (byte *)inma.ensureCapacity(max);
+        initCommon();
+    }
+
+    virtual void open(MemoryBuffer &mb, size32_t initialSize)
+    {
+        if (!initialSize)
+            initialSize = FCMP_BUFFER_SIZE; // 1MB
+        if (initialSize<1024)
+            throw MakeStringException(-1,"CFcmpCompressor::open - block size (%d) not large enough", initialSize);
+        wrmax = initialSize;
+        if (bufalloc)
+        {
+            free(outbuf);
+            bufalloc = 0;
+        }
+        inbuf = (byte *)inma.ensureCapacity(initialSize);
+        outBufMb = &mb;
+        outBufStart = mb.length();
+        outbuf = (byte *)outBufMb->ensureCapacity(initialSize);
+        dynamicOutSz = outBufMb->capacity();
+        initCommon();
+    }
+
+    virtual void close()
+    {
+        if (inlenblk!=COMMITTED)
+        {
+            inlen = inlenblk; // transaction failed
+            inlenblk = COMMITTED;
+        }
+        flushcommitted();
+        size32_t totlen = outlen+sizeof(size32_t)+inlen;
+        assertex(blksz>=totlen);
+        size32_t *tsize = (size32_t *)(outbuf+outlen);
+        *tsize = inlen;
+        memcpy(tsize+1,inbuf,inlen);
+        outlen = totlen;
+        *(size32_t *)outbuf += inlen;
+        inbuf = NULL;
+        if (outBufMb)
+        {
+            outBufMb->setWritePos(outBufStart+outlen);
+            outBufMb = NULL;
+        }
+    }
+
+    size32_t write(const void *buf,size32_t len)
+    {
+        // no more than wrmax per write (unless dynamically sizing)
+        size32_t lenb = wrmax;
+        byte *b = (byte *)buf;
+        size32_t written = 0;
+        while (len)
+        {
+            if (len < lenb)
+                lenb = len;
+            if (lenb+inlen>inmax)
+            {
+                if (trailing)
+                    return written;
+                flushcommitted();
+                if (lenb+inlen>inmax)
+                {
+                    if (outBufMb) // sizing input buffer, but outBufMb!=NULL is condition of whether in use or not
+                    {
+                        blksz += len > FCMP_BUFFER_SIZE ? len : FCMP_BUFFER_SIZE;
+                        verifyex(inma.ensureCapacity(blksz));
+                        blksz = inma.capacity();
+                        inbuf = (byte *)inma.bufferBase();
+                        wrmax = blksz;
+                        setinmax();
+                    }
+                    lenb = inmax-inlen;
+                    if (len < lenb)
+                        lenb = len;
+                }
+            }
+            if (lenb == 0)
+                return written;
+            memcpy(inbuf+inlen,b,lenb);
+            b += lenb;
+            inlen += lenb;
+            len -= lenb;
+            written += lenb;
+        }
+        return written;
+    }
+
+    void * bufptr()
+    {
+        assertex(!inbuf);  // i.e. closed
+        return outbuf;
+    }
+
+    size32_t buflen()
+    {
+        assertex(!inbuf);  // i.e. closed
+        return outlen;
+    }
+
+    void startblock()
+    {
+        inlenblk = inlen;
+    }
+
+    void commitblock()
+    {
+        inlenblk = COMMITTED;
+    }
+
+};
+
+
+class jlib_decl CFcmpExpander : public CSimpleInterfaceOf<IExpander>
+{
+protected:
+    byte *outbuf;
+    size32_t outlen;
+    size32_t bufalloc;
+    const size32_t *in;
+
+public:
+    CFcmpExpander()
+    {
+        outbuf = NULL;
+        outlen = 0;
+        bufalloc = 0;
+    }
+
+    virtual ~CFcmpExpander()
+    {
+        if (bufalloc)
+            free(outbuf);
+    }
+
+    virtual size32_t init(const void *blk)
+    {
+        const size32_t *expsz = (const size32_t *)blk;
+        outlen = *expsz;
+        in = (expsz+1);
+        return outlen;
+    }
+
+    virtual void expand(void *buf)
+    {
+        if (!outlen)
+            return;
+        if (buf)
+        {
+            if (bufalloc)
+                free(outbuf);
+            bufalloc = 0;
+            outbuf = (unsigned char *)buf;
+        }
+        else if (outlen>bufalloc)
+        {
+            if (bufalloc)
+                free(outbuf);
+            bufalloc = outlen;
+            outbuf = (unsigned char *)malloc(bufalloc);
+            if (!outbuf)
+                throw MakeStringException(MSGAUD_operator,0, "Out of memory in FcmpExpander::expand, requesting %d bytes", bufalloc);
+        }
+        size32_t done = 0;
+        loop
+        {
+            const size32_t szchunk = *in;
+            in++;
+            if (szchunk+done<outlen)
+            {
+                memcpy((byte *)buf+done, in, szchunk);
+                size32_t written = szchunk;
+                done += written;
+                if (!written||(done>outlen))
+                    throw MakeStringException(0, "FcmpExpander - corrupt data(1) %d %d",written,szchunk);
+            }
+            else
+            {
+                if (szchunk+done!=outlen)
+                    throw MakeStringException(0, "FcmpExpander - corrupt data(2) %d %d",szchunk,outlen);
+                memcpy((byte *)buf+done,in,szchunk);
+                break;
+            }
+            in = (const size32_t *)(((const byte *)in)+szchunk);
+        }
+    }
+
+    virtual void *bufptr() { return outbuf;}
+    virtual size32_t buflen() { return outlen;}
+};
+
+struct FcmpCompressedFileTrailer
+{
+    offset_t        zfill1;             // must be first
+    offset_t        expandedSize;
+    __int64         compressedType;
+    unsigned        zfill2;             // must be last
+};
+
+class CFcmpStream : public CSimpleInterfaceOf<IFileIOStream>
+{
+protected:
+    Linked<IFileIO> baseio;
+    offset_t expOffset;     // expanded offset
+    offset_t cmpOffset;     // compressed offset in file
+    bool reading;
+    MemoryAttr ma;
+    size32_t bufsize;
+    size32_t bufpos;        // reading only
+    offset_t expSize;
+    __int64 compType;
+
+public:
+    CFcmpStream()
+    {
+        expOffset = 0;
+        cmpOffset = 0;
+        reading = true;
+        bufpos = 0;
+        bufsize = 0;
+    }
+
+    virtual ~CFcmpStream() { flush(); }
+
+    virtual bool load()
+    {
+        bufpos = 0;
+        bufsize = 0;
+        if (expOffset==expSize)
+            return false;
+        size32_t sz[2];
+        if (baseio->read(cmpOffset,sizeof(size32_t)*2,&sz)!=sizeof(size32_t)*2)
+            return false;
+        bufsize = sz[0];
+        if (!bufsize)
+            return false;
+        cmpOffset += sizeof(size32_t)*2;
+        if (ma.length()<bufsize)
+            ma.allocate(bufsize);
+        MemoryAttr cmpma;
+        byte *cmpbuf = (byte *)cmpma.allocate(sz[1]);
+        if (baseio->read(cmpOffset,sz[1],cmpbuf)!=sz[1])
+            throw MakeStringException(-1,"CFcmpStream: file corrupt.1");
+        memcpy(ma.bufferBase(), cmpbuf, sz[1]);
+        size32_t amnt = sz[1];
+        if (amnt!=bufsize)
+            throw MakeStringException(-1,"CFcmpStream: file corrupt.2");
+        cmpOffset += sz[1];
+        return true;
+    }
+
+    virtual void save()
+    {
+        if (bufsize)
+        {
+            MemoryAttr dstma;
+            byte *dst = (byte *)dstma.allocate(sizeof(size32_t)*2+bufsize);
+            memcpy((sizeof(size32_t)*2+dst), ma.get(), bufsize);
+            size32_t sz = bufsize;
+            memcpy(dst,&bufsize,sizeof(size32_t));
+            memcpy(dst+sizeof(size32_t),&sz,sizeof(size32_t));
+            baseio->write(cmpOffset,sz+sizeof(size32_t)*2,dst);
+            cmpOffset += sz+sizeof(size32_t)*2;
+        }
+        bufsize = 0;
+    }
+
+    virtual bool attach(IFileIO *_baseio)
+    {
+        baseio.set(_baseio);
+        expOffset = 0;
+        cmpOffset = 0;
+        reading = true;
+        bufpos = 0;
+        bufsize = 0;
+
+        FcmpCompressedFileTrailer trailer;
+        offset_t filesize = baseio->size();
+        if (filesize<sizeof(trailer))
+            return false;
+        baseio->read(filesize-sizeof(trailer),sizeof(trailer),&trailer);
+        expSize = trailer.expandedSize;
+        return trailer.compressedType==compType;
+    }
+
+    virtual void create(IFileIO *_baseio)
+    {
+        baseio.set(_baseio);
+        expOffset = 0;
+        cmpOffset = 0;
+        reading = false;
+        bufpos = 0;
+        bufsize = 0;
+        ma.allocate(FCMP_BUFFER_SIZE);
+        expSize = (offset_t)-1;
+    }
+
+    virtual void seek(offset_t pos, IFSmode origin)
+    {
+        if ((origin==IFScurrent)&&(pos==0))
+            return;
+        if ((origin==IFSbegin)||(pos!=0))
+            throw MakeStringException(-1,"CFcmpStream seek not supported");
+        expOffset = 0;
+        bufpos = 0;
+        bufsize = 0;
+    }
+
+    virtual offset_t size()
+    {
+        return (expSize==(offset_t)-1)?0:expSize;
+    }
+
+    virtual offset_t tell()
+    {
+        return expOffset;
+    }
+
+    virtual size32_t read(size32_t len, void * data)
+    {
+        if (!reading)
+            throw MakeStringException(-1,"CFcmpStream read to stream being written");
+        size32_t ret=0;
+        while (len)
+        {
+            size32_t cpy = bufsize-bufpos;
+            if (!cpy)
+            {
+                if (!load())
+                    break;
+                cpy = bufsize-bufpos;
+            }
+            if (cpy>len)
+                cpy = len;
+            memcpy(data,(const byte *)ma.get()+bufpos,cpy);
+            bufpos += cpy;
+            len -= cpy;
+            ret += cpy;
+        }
+        expOffset += ret;
+        return ret;
+    }
+
+    virtual size32_t write(size32_t len, const void * data)
+    {
+        if (reading)
+            throw MakeStringException(-1,"CFcmpStream write to stream being read");
+        size32_t ret = len;
+        while (len+bufsize>FCMP_BUFFER_SIZE)
+        {
+            size32_t cpy = FCMP_BUFFER_SIZE-bufsize;
+            memcpy((byte *)ma.bufferBase()+bufsize,data,cpy);
+            data = (const byte *)data+cpy;
+            len -= cpy;
+            bufsize = FCMP_BUFFER_SIZE;
+            save();
+        }
+        memcpy((byte *)ma.bufferBase()+bufsize,data,len);
+        bufsize += len;
+        expOffset += len;
+        return ret;
+    }
+
+    virtual void flush()
+    {
+        if (!reading&&(expSize!=expOffset))
+        {
+            save();
+            FcmpCompressedFileTrailer trailer;
+            memset(&trailer,0,sizeof(trailer));
+            trailer.compressedType = compType;
+            trailer.expandedSize = expOffset;
+            baseio->write(cmpOffset,sizeof(trailer),&trailer);
+            expSize = expOffset;
+        }
+    }
+
+};

+ 14 - 368
system/jlib/jflz.cpp

@@ -47,9 +47,8 @@
 
 // adapted for jlib
 #include "platform.h"
-
+#include "jfcmp.hpp"
 #include "jflz.hpp"
-
 #include "jcrc.hpp"
 
 /*
@@ -593,46 +592,29 @@ static FASTLZ_INLINE int FASTLZ_DECOMPRESSOR(const void* input, int length, void
 
 #if defined(FASTLZ__JLIBCOMPRESSOR)
 
-
-#define COMMITTED ((size32_t)-1)
-
 /* Format:
     size32_t totalexpsize;
     { size32_t subcmpsize; bytes subcmpdata; }
     size32_t trailsize; bytes traildata;    // unexpanded
 */
 
-
-class jlib_decl CFastLZCompressor : public CInterface, public ICompressor
+class jlib_decl CFastLZCompressor : public CFcmpCompressor
 {
     HTAB_T ht;
-    size32_t blksz;
-    size32_t bufalloc;
-    MemoryBuffer inma;      // equals blksize len
-    MemoryBuffer *outBufMb; // used when dynamic output buffer (when open() used)
-    size32_t outBufStart;
-    byte *inbuf;
-    size32_t inmax;         // remaining
-    size32_t inlen;
-    size32_t inlenblk;      // set to COMMITTED when so
-    bool trailing;
-    byte *outbuf;
-    size32_t outlen;
-    size32_t wrmax;
-    size32_t dynamicOutSz;
-
-    inline void setinmax()
+
+    virtual void setinmax()
     {
         inmax = blksz-outlen-sizeof(size32_t);
         if (inmax<256)
             trailing = true;    // too small to bother compressing
-        else {
+        else
+        {
             trailing = false;
             inmax -= (fastlzSlack(inmax) + sizeof(size32_t));
         }
     }
 
-    inline void flushcommitted()
+    virtual void flushcommitted()
     {
         // only does non trailing
         if (trailing)
@@ -672,203 +654,12 @@ class jlib_decl CFastLZCompressor : public CInterface, public ICompressor
         trailing = true;
     }
 
-    void initCommon()
-    {
-        blksz = inma.capacity();
-        *(size32_t *)outbuf = 0;
-        outlen = sizeof(size32_t);
-        inlen = 0;
-        inlenblk = COMMITTED;
-        setinmax();
-    }
-public:
-    IMPLEMENT_IINTERFACE;
-
-    CFastLZCompressor()
-    {
-        outlen = 0;
-        outbuf = NULL;      // only set on close
-        bufalloc = 0;
-        wrmax = 0;          // set at open
-        dynamicOutSz = 0;
-        outBufMb = NULL;
-        outBufStart = 0;
-        inbuf = NULL;
-    }
-
-    virtual ~CFastLZCompressor()
-    {
-        if (bufalloc)
-            free(outbuf);
-    }
-
-
-    virtual void open(void *buf,size32_t max)
-    {
-        if (max<1024)
-            throw MakeStringException(-1,"CFastLZCompressor::open - block size (%d) not large enough", blksz);
-        wrmax = max;
-        if (buf)
-        {
-            if (bufalloc)
-                free(outbuf);
-            bufalloc = 0;
-            outbuf = (byte *)buf;
-        }
-        else if (max>bufalloc)
-        {
-            if (bufalloc)
-                free(outbuf);
-            bufalloc = max;
-            outbuf = (byte *)malloc(bufalloc);
-        }
-        outBufMb = NULL;
-        outBufStart = 0;
-        dynamicOutSz = 0;
-        inbuf = (byte *)inma.ensureCapacity(max);
-        initCommon();
-    }
-
-    virtual void open(MemoryBuffer &mb, size32_t initialSize)
-    {
-        if (!initialSize)
-            initialSize = 0x100000; // 1MB
-        if (initialSize<1024)
-            throw MakeStringException(-1,"CFastLZCompressor::open - block size (%d) not large enough", initialSize);
-        wrmax = initialSize;
-        if (bufalloc)
-        {
-            free(outbuf);
-            bufalloc = 0;
-        }
-        inbuf = (byte *)inma.ensureCapacity(initialSize);
-        outBufMb = &mb;
-        outBufStart = mb.length();
-        outbuf = (byte *)outBufMb->ensureCapacity(initialSize);
-        dynamicOutSz = outBufMb->capacity();
-        initCommon();
-    }
-
-    virtual void close()
-    {
-        if (inlenblk!=COMMITTED) {
-            inlen = inlenblk; // transaction failed
-            inlenblk = COMMITTED;
-        }
-        flushcommitted();
-        size32_t totlen = outlen+sizeof(size32_t)+inlen;
-        assertex(blksz>=totlen);
-        size32_t *tsize = (size32_t *)(outbuf+outlen);
-        *tsize = inlen;
-        memcpy(tsize+1,inbuf,inlen);
-        outlen = totlen;
-        *(size32_t *)outbuf += inlen;
-        inbuf = NULL;
-        if (outBufMb)
-        {
-            outBufMb->setWritePos(outBufStart+outlen);
-            outBufMb = NULL;
-        }
-    }
-
-
-    size32_t write(const void *buf,size32_t len)
-    {
-        // no more than wrmax per write (unless dynamically sizing)
-        size32_t lenb = wrmax;
-        byte *b = (byte *)buf;
-        size32_t written = 0;
-        while (len)
-        {
-            if (len < lenb)
-                lenb = len;
-            if (lenb+inlen>inmax)
-            {
-                if (trailing)
-                    return written;
-                flushcommitted();
-                if (lenb+inlen>inmax)
-                {
-                    if (outBufMb) // sizing input buffer, but outBufMb!=NULL is condition of whether in use or not
-                    {
-                        blksz += len > 0x100000 ? len : 0x100000;
-                        verifyex(inma.ensureCapacity(blksz));
-                        blksz = inma.capacity();
-                        inbuf = (byte *)inma.bufferBase();
-                        wrmax = blksz;
-                        setinmax();
-                    }
-                    lenb = inmax-inlen;
-                    if (len < lenb)
-                        lenb = len;
-                }
-            }
-            if (lenb == 0)
-                return written;
-            memcpy(inbuf+inlen,b,lenb);
-            b += lenb;
-            inlen += lenb;
-            len -= lenb;
-            written += lenb;
-        }
-        return written;
-    }
-
-    void *  bufptr() 
-    { 
-        assertex(!inbuf);  // i.e. closed
-        return outbuf;
-    }
-    size32_t    buflen() 
-    { 
-        assertex(!inbuf);  // i.e. closed
-        return outlen;
-    }
-    void    startblock()
-    {
-        inlenblk = inlen;
-    }
-    void commitblock()
-    {
-        inlenblk = COMMITTED;
-    }
-
-
 };
 
 
-class jlib_decl CFastLZExpander : public CInterface, public IExpander
+class jlib_decl CFastLZExpander : public CFcmpExpander
 {
-
-    byte *outbuf;
-    size32_t outlen;
-    size32_t bufalloc;
-    const size32_t *in;  
-
 public:
-    IMPLEMENT_IINTERFACE;
-
-    CFastLZExpander()
-    {
-        outbuf = NULL;
-        outlen = 0;
-        bufalloc = 0;
-    }
-    ~CFastLZExpander()
-    {
-        if (bufalloc)
-            free(outbuf);
-
-    }
-
-    virtual size32_t  init(const void *blk)
-    {
-        const size32_t *expsz = (const size32_t *)blk;
-        outlen = *expsz;
-        in = (expsz+1);
-        return outlen;
-    }
-
     virtual void expand(void *buf)
     {
         if (!outlen)
@@ -907,8 +698,6 @@ public:
         }
     }
 
-    virtual void *bufptr() { return outbuf;}
-    virtual size32_t   buflen() { return outlen;}
 };
 
 void fastLZCompressToBuffer(MemoryBuffer & out, size32_t len, const void * src)
@@ -918,7 +707,8 @@ void fastLZCompressToBuffer(MemoryBuffer & out, size32_t len, const void * src)
     *sz = len;
     sz++;
     *sz = (len>16)?fastlz_compress(src, (int)len, sz+1):16;
-    if (*sz>=len) {
+    if (*sz>=len)
+    {
         *sz = len;
         memcpy(sz+1,src,len);
     }
@@ -996,30 +786,10 @@ IExpander *createFastLZExpander()
     return new CFastLZExpander;
 }
 
-#define FLZ_BUFFER_SIZE (0x100000)
-
-static const __uint64 FLZCOMPRESSEDFILEFLAG = U64C(0xc3518de42f15da57);
+static const __uint64 FLZSTRMCOMPRESSEDFILEFLAG = I64C(0xc3518de42f15da57);
 
-struct FlzCompressedFileTrailer
+class CFastLZStream : public CFcmpStream
 {
-    offset_t        zfill1;             // must be first
-    offset_t        expandedSize;
-    __uint64        compressedType;
-    unsigned        zfill2;             // must be last
-};
-
-
-class CFastLZStream : public CInterface, implements IFileIOStream
-{
-    Linked<IFileIO> baseio;
-    offset_t expOffset;     // expanded offset
-    offset_t cmpOffset;     // compressed offset in file
-    bool reading;
-    MemoryAttr ma;
-    size32_t bufsize;
-    size32_t bufpos;        // reading only
-    offset_t expSize;
-
     bool load()
     {
         bufpos = 0;
@@ -1061,133 +831,12 @@ class CFastLZStream : public CInterface, implements IFileIOStream
 
 
 public:
-    IMPLEMENT_IINTERFACE;
-
-    CFastLZStream() 
-    {
-        expOffset = 0;
-        cmpOffset = 0;
-        reading = true;
-        bufpos = 0;
-        bufsize = 0;
-    }
-
-    ~CFastLZStream()
-    {
-        flush();
-    }
-
-    bool attach(IFileIO *_baseio)
-    {
-        baseio.set(_baseio);
-        expOffset = 0;
-        cmpOffset = 0;
-        reading = true;
-        bufpos = 0;
-        bufsize = 0;
-
-        FlzCompressedFileTrailer trailer;
-        offset_t filesize = baseio->size();
-        if (filesize<sizeof(trailer))
-            return false;
-        baseio->read(filesize-sizeof(trailer),sizeof(trailer),&trailer);
-        expSize = trailer.expandedSize;
-        return trailer.compressedType==FLZCOMPRESSEDFILEFLAG;
-    }
-
-    void create(IFileIO *_baseio)
-    {
-        baseio.set(_baseio);
-        expOffset = 0;
-        cmpOffset = 0;
-        reading = false;
-        bufpos = 0;
-        bufsize = 0;
-        ma.allocate(FLZ_BUFFER_SIZE);
-        expSize = (offset_t)-1;
-    }
-
-    void seek(offset_t pos, IFSmode origin)
-    {
-        if ((origin==IFScurrent)&&(pos==0))
-            return;
-        if ((origin==IFSbegin)||(pos!=0))
-            throw MakeStringException(-1,"CFastLZStream seek not supported");
-        expOffset = 0;
-        bufpos = 0;
-        bufsize = 0;
-    }
-
-    offset_t size()
-    {
-        return (expSize==(offset_t)-1)?0:expSize;
-    }
-
-    offset_t tell()
-    {
-        return expOffset;
-    }
-
-
-    size32_t read(size32_t len, void * data)
-    {
-        if (!reading)
-            throw MakeStringException(-1,"CFastLZStream read to stream being written");
-        size32_t ret=0;
-        while (len) {
-            size32_t cpy = bufsize-bufpos;
-            if (!cpy) {
-                if (!load())
-                    break;
-                cpy = bufsize-bufpos;
-            }
-            if (cpy>len)
-                cpy = len;
-            memcpy(data,(const byte *)ma.get()+bufpos,cpy);
-            bufpos += cpy;
-            len -= cpy;
-            ret += cpy;
-        }
-        expOffset += ret;
-        return ret;
-    }
-
-    size32_t write(size32_t len, const void * data)
-    {
-        if (reading)
-            throw MakeStringException(-1,"CFastLZStream write to stream being read");
-        size32_t ret = len;
-        while (len+bufsize>FLZ_BUFFER_SIZE) {
-            size32_t cpy = FLZ_BUFFER_SIZE-bufsize;
-            memcpy((byte *)ma.bufferBase()+bufsize,data,cpy);
-            data = (const byte *)data+cpy;
-            len -= cpy;
-            bufsize = FLZ_BUFFER_SIZE;
-            save();
-        }
-        memcpy((byte *)ma.bufferBase()+bufsize,data,len);
-        bufsize += len;
-        expOffset += len;
-        return ret;
-    }
+    CFastLZStream() { compType = FLZSTRMCOMPRESSEDFILEFLAG; }
 
-    void flush()
-    {
-        if (!reading&&(expSize!=expOffset)) {
-            save();
-            FlzCompressedFileTrailer trailer;
-            memset(&trailer,0,sizeof(trailer));
-            trailer.compressedType = FLZCOMPRESSEDFILEFLAG;
-            trailer.expandedSize = expOffset;
-            baseio->write(cmpOffset,sizeof(trailer),&trailer);
-            expSize = expOffset;
-        }
-    }
+    virtual ~CFastLZStream() { flush(); }
 
 };
 
-
-
 IFileIOStream *createFastLZStreamRead(IFileIO *base)
 {
     Owned<CFastLZStream> strm = new CFastLZStream();
@@ -1203,7 +852,4 @@ IFileIOStream *createFastLZStreamWrite(IFileIO *base)
     return strm.getClear();
 }
 
-
 #endif
-
-

+ 83 - 363
system/jlib/jlz4.cpp

@@ -1,6 +1,6 @@
 /*##############################################################################
 
-    HPCC SYSTEMS software Copyright (C) 2015 HPCC Systems.
+    HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
 
     Licensed under the Apache License, Version 2.0 (the "License");
     you may not use this file except in compliance with the License.
@@ -16,38 +16,36 @@
 ############################################################################## */
 
 #include "platform.h"
+#include "jfcmp.hpp"
 #include "jlz4.hpp"
+#include "lz4.h"
 
-#define COMMITTED ((size32_t)-1)
+/* Format:
+    size32_t totalexpsize;
+    { size32_t subcmpsize; bytes subcmpdata; }
+    size32_t trailsize; bytes traildata;    // unexpanded
+*/
 
-class jlib_decl CLZ4Compressor : public CInterface, public ICompressor
+class jlib_decl CLZ4Compressor : public CFcmpCompressor
 {
-    size32_t blksz;
-    size32_t bufalloc;
-    MemoryAttr inma;        // equals blksize len
-    byte *inbuf;
-    size32_t inmax;         // remaining
-    size32_t inlen;
-    size32_t inlenblk;      // set to COMMITTED when so
-    bool trailing;
-    byte *outbuf;
-    size32_t outlen;
-    size32_t wrmax;
-
-    inline void setinmax()
+    virtual void setinmax()
     {
         inmax = blksz-outlen-sizeof(size32_t);
         if (inmax<256)
             trailing = true;    // too small to bother compressing
-        else {
+        else
+        {
             trailing = false;
-            // inmax -= (fastlzSlack(inmax) + sizeof(size32_t));
             size32_t slack = LZ4_COMPRESSBOUND(inmax) - inmax;
-            inmax -= (slack + sizeof(size32_t));
+            int inmax2 = inmax - (slack + sizeof(size32_t));
+            if (inmax2<256)
+                trailing = true;
+            else
+                inmax = inmax2;
         }
     }
 
-    inline void flushcommitted()
+    virtual void flushcommitted()
     {
         // only does non trailing
         if (trailing)
@@ -56,21 +54,36 @@ class jlib_decl CLZ4Compressor : public CInterface, public ICompressor
         if (toflush == 0)
             return;
 
-        // printf("flushcommited() inlenblk=%d inlen=%d blksz=%d outlen=%d\n", inlenblk, inlen, blksz, outlen);
-
-        assertex(outlen+sizeof(size32_t)*2+LZ4_COMPRESSBOUND(toflush)<=blksz);
+        if (toflush < 256)
+        {
+            trailing = true;
+            return;
+        }
 
+        size32_t outSzRequired = outlen+sizeof(size32_t)*2+LZ4_COMPRESSBOUND(toflush);
+        if (!dynamicOutSz)
+            assertex(outSzRequired<=blksz);
+        else
+        {
+            if (outSzRequired>dynamicOutSz)
+            {
+                verifyex(outBufMb->ensureCapacity(outBufStart+outSzRequired));
+                dynamicOutSz = outBufMb->capacity();
+                outbuf = ((byte *)outBufMb->bufferBase()+outBufStart);
+            }
+        }
         size32_t *cmpsize = (size32_t *)(outbuf+outlen);
         byte *out = (byte *)(cmpsize+1);
 
-        *cmpsize = LZ4_compress((const char *)inbuf, (char *)out, toflush);
-
-        if (*cmpsize<toflush) {
+        *cmpsize = LZ4_compress_default((const char *)inbuf, (char *)out, toflush, LZ4_COMPRESSBOUND(toflush));
+        if (*cmpsize && *cmpsize<toflush)
+        {
             *(size32_t *)outbuf += toflush;
             outlen += *cmpsize+sizeof(size32_t);
             if (inlenblk==COMMITTED)
                 inlen = 0;
-            else {
+            else
+            {
                 inlen -= inlenblk;
                 memmove(inbuf,inbuf+toflush,inlen);
             }
@@ -80,178 +93,25 @@ class jlib_decl CLZ4Compressor : public CInterface, public ICompressor
         trailing = true;
     }
 
-
-public:
-    IMPLEMENT_IINTERFACE;
-
-    CLZ4Compressor()
-    {
-        outlen = 0;
-        outbuf = NULL;      // only set on close
-        bufalloc = 0;
-        wrmax = 0;          // set at open
-    }
-
-    virtual ~CLZ4Compressor()
-    {
-        if (bufalloc)
-            free(outbuf);
-    }
-
-
-    virtual void open(void *buf,size32_t max)
-    {
-        if (buf) {
-            if (bufalloc) {
-                free(outbuf);
-            }
-            bufalloc = 0;
-            outbuf = (byte *)buf;
-        }
-        else if (max>bufalloc) {
-            if (bufalloc)
-                free(outbuf);
-            bufalloc = max;
-            outbuf = (byte *)malloc(bufalloc);
-        }
-        blksz = max;
-        if (blksz!=inma.length())
-            inbuf = (byte *)inma.allocate(blksz);
-        else
-            inbuf = (byte *)inma.bufferBase();
-        if (blksz<1024)
-            throw MakeStringException(-1,"CLZ4Compressor::open - block size (%d) not large enough", blksz);
-        *(size32_t *)outbuf = 0;
-        outlen = sizeof(size32_t);
-        inlen = 0;
-        inlenblk = COMMITTED;
-        setinmax();
-        wrmax = inmax;
-        // printf("open() inlenblk=%d inlen=%d trailing=%d blksz=%d outlen=%d\n", inlenblk, inlen, trailing, blksz, outlen);
-    }
-
-    virtual void close()
-    {
-        if (inlenblk!=COMMITTED) {
-            inlen = inlenblk; // transaction failed
-            inlenblk = COMMITTED;
-        }
-        flushcommitted();
-        // printf("close() inlenblk=%d inlen=%d trailing=%d blksz=%d outlen=%d\n", inlenblk, inlen, trailing, blksz, outlen);
-        size32_t totlen = outlen+sizeof(size32_t)+inlen;
-        assertex(blksz>=totlen);
-        size32_t *tsize = (size32_t *)(outbuf+outlen);
-        *tsize = inlen;
-        memcpy(tsize+1,inbuf,inlen);
-        outlen = totlen;
-        *(size32_t *)outbuf += inlen;
-        inbuf = NULL;
-    }
-
-
-    size32_t write(const void *buf,size32_t len)
-    {
-        // no more than wrmax per write
-        size32_t lenb = wrmax;
-        byte *b = (byte *)buf;
-        size32_t written = 0;
-        while (len)
-        {
-            if (len < lenb)
-                lenb = len;
-            if (lenb+inlen>inmax) {
-                if (trailing)
-                    return written;
-                size32_t lenb2 = inmax - inlen;
-                if (lenb2 >= 0x2000) {
-                    memcpy(inbuf+inlen,b,lenb2);
-                    b += lenb2;
-                    inlen += lenb2;
-                    len -= lenb2;
-                    written += lenb2;
-                    if (len < lenb)
-                        lenb = len;
-                }
-                flushcommitted();
-                if (lenb+inlen>inmax)
-                    lenb = inmax-inlen;
-            }
-            if (lenb == 0)
-                return written;
-            memcpy(inbuf+inlen,b,lenb);
-            b += lenb;
-            inlen += lenb;
-            len -= lenb;
-            written += lenb;
-        }
-        return written;
-    }
-
-    void *  bufptr()
-    {
-        assertex(!inbuf);  // i.e. closed
-        return outbuf;
-    }
-    size32_t    buflen()
-    {
-        assertex(!inbuf);  // i.e. closed
-        return outlen;
-    }
-    void    startblock()
-    {
-        inlenblk = inlen;
-    }
-    void commitblock()
-    {
-        inlenblk = COMMITTED;
-    }
-
-
 };
 
-class jlib_decl CLZ4Expander : public CInterface, public IExpander
-{
-
-    byte *outbuf;
-    size32_t outlen;
-    size32_t bufalloc;
-    const size32_t *in;
 
+class jlib_decl CLZ4Expander : public CFcmpExpander
+{
 public:
-    IMPLEMENT_IINTERFACE;
-
-    CLZ4Expander()
-    {
-        outbuf = NULL;
-        outlen = 0;
-        bufalloc = 0;
-    }
-    ~CLZ4Expander()
-    {
-        if (bufalloc)
-            free(outbuf);
-
-    }
-
-    virtual size32_t  init(const void *blk)
-    {
-        const size32_t *expsz = (const size32_t *)blk;
-        outlen = *expsz;
-        in = (expsz+1);
-        return outlen;
-    }
-
     virtual void expand(void *buf)
     {
         if (!outlen)
             return;
-        if (buf) {
+        if (buf)
+        {
             if (bufalloc)
                 free(outbuf);
             bufalloc = 0;
             outbuf = (unsigned char *)buf;
         }
-        else if (outlen>bufalloc) {
+        else if (outlen>bufalloc)
+        {
             if (bufalloc)
                 free(outbuf);
             bufalloc = outlen;
@@ -260,18 +120,19 @@ public:
                 throw MakeStringException(MSGAUD_operator,0, "Out of memory in LZ4Expander::expand, requesting %d bytes", bufalloc);
         }
         size32_t done = 0;
-        loop {
+        loop
+        {
             const size32_t szchunk = *in;
             in++;
-            if (szchunk+done<outlen) {
-
+            if (szchunk+done<outlen)
+            {
                 size32_t written = LZ4_decompress_safe((const char *)in, (char *)((byte *)buf+done), szchunk, outlen-done);
-
                 done += written;
                 if (!written||(done>outlen))
                     throw MakeStringException(0, "LZ4Expander - corrupt data(1) %d %d",written,szchunk);
             }
-            else {
+            else
+            {
                 if (szchunk+done!=outlen)
                     throw MakeStringException(0, "LZ4Expander - corrupt data(2) %d %d",szchunk,outlen);
                 memcpy((byte *)buf+done,in,szchunk);
@@ -281,25 +142,28 @@ public:
         }
     }
 
-    virtual void *bufptr() { return outbuf;}
-    virtual size32_t   buflen() { return outlen;}
 };
 
 void LZ4CompressToBuffer(MemoryBuffer & out, size32_t len, const void * src)
 {
     size32_t outbase = out.length();
-
     size32_t *sz = (size32_t *)out.reserve(LZ4_COMPRESSBOUND(len)+sizeof(size32_t)*2);
-
     *sz = len;
     sz++;
-
-    *sz = (len>16)?LZ4_compress((const char *)src, (char *)(sz+1), len):16;
-
-    if (*sz>=len) {
+    if (len < 64)
+    {
         *sz = len;
         memcpy(sz+1,src,len);
     }
+    else
+    {
+        *sz = LZ4_compress_default((const char *)src, (char *)(sz+1), len, LZ4_COMPRESSBOUND(len));
+        if (!*sz)
+        {
+            *sz = len;
+            memcpy(sz+1,src,len);
+        }
+    }
     out.setLength(outbase+*sz+sizeof(size32_t)*2);
 }
 
@@ -309,10 +173,9 @@ void LZ4DecompressToBuffer(MemoryBuffer & out, const void * src)
     size32_t expsz = *(sz++);
     size32_t cmpsz = *(sz++);
     void *o = out.reserve(expsz);
-    if (cmpsz!=expsz) {
-
+    if (cmpsz!=expsz)
+    {
         size32_t written = LZ4_decompress_safe((const char *)sz, (char *)o, cmpsz, expsz);
-
         if (written!=expsz)
             throw MakeStringException(0, "LZ4DecompressToBuffer - corrupt data(1) %d %d",written,expsz);
     }
@@ -326,10 +189,9 @@ void LZ4DecompressToBuffer(MemoryBuffer & out, MemoryBuffer & in)
     size32_t cmpsz;
     in.read(expsz).read(cmpsz);
     void *o = out.reserve(expsz);
-    if (cmpsz!=expsz) {
-
+    if (cmpsz!=expsz)
+    {
         size32_t written = LZ4_decompress_safe((const char *)in.readDirect(cmpsz), (char *)o, cmpsz, expsz);
-
         if (written!=expsz)
             throw MakeStringException(0, "LZ4DecompressToBuffer - corrupt data(3) %d %d",written,expsz);
     }
@@ -343,10 +205,9 @@ void LZ4DecompressToAttr(MemoryAttr & out, const void * src)
     size32_t expsz = *(sz++);
     size32_t cmpsz = *(sz++);
     void *o = out.allocate(expsz);
-    if (cmpsz!=expsz) {
-
+    if (cmpsz!=expsz)
+    {
         size32_t written = LZ4_decompress_safe((const char *)sz, (char *)o, cmpsz, expsz);
-
         if (written!=expsz)
             throw MakeStringException(0, "LZ4DecompressToBuffer - corrupt data(2) %d %d",written,expsz);
     }
@@ -360,10 +221,9 @@ void LZ4DecompressToBuffer(MemoryAttr & out, MemoryBuffer & in)
     size32_t cmpsz;
     in.read(expsz).read(cmpsz);
     void *o = out.allocate(expsz);
-    if (cmpsz!=expsz) {
-
+    if (cmpsz!=expsz)
+    {
         size32_t written = LZ4_decompress_safe((const char *)in.readDirect(cmpsz), (char *)o, cmpsz, expsz);
-
         if (written!=expsz)
             throw MakeStringException(0, "LZ4DecompressToBuffer - corrupt data(4) %d %d",written,expsz);
     }
@@ -382,31 +242,10 @@ IExpander *createLZ4Expander()
     return new CLZ4Expander;
 }
 
-#define LZ4_BUFFER_SIZE (0x100000)
-
-#define LZ4STRMCOMPRESSEDFILEFLAG (I64C(0xc3526de42f15da57)) // mck - what is an ok value ?
+#define LZ4STRMCOMPRESSEDFILEFLAG (I64C(0xc129b02d53545e91))
 
-
-struct LZ4CompressedFileTrailer
+class CLZ4Stream : public CFcmpStream
 {
-    offset_t        zfill1;             // must be first
-    offset_t        expandedSize;
-    __int64         compressedType;
-    unsigned        zfill2;             // must be last
-};
-
-
-class CLZ4Stream : public CInterface, implements IFileIOStream
-{
-    Linked<IFileIO> baseio;
-    offset_t expOffset;     // expanded offset
-    offset_t cmpOffset;     // compressed offset in file
-    bool reading;
-    MemoryAttr ma;
-    size32_t bufsize;
-    size32_t bufpos;        // reading only
-    offset_t expSize;
-
     bool load()
     {
         bufpos = 0;
@@ -426,25 +265,25 @@ class CLZ4Stream : public CInterface, implements IFileIOStream
         byte *cmpbuf = (byte *)cmpma.allocate(sz[1]);
         if (baseio->read(cmpOffset,sz[1],cmpbuf)!=sz[1])
             throw MakeStringException(-1,"CLZ4Stream: file corrupt.1");
-
         size32_t amnt = LZ4_decompress_safe((const char *)cmpbuf, (char *)ma.bufferBase(), sz[1], bufsize);
-
         if (amnt!=bufsize)
             throw MakeStringException(-1,"CLZ4Stream: file corrupt.2");
-
         cmpOffset += sz[1];
         return true;
     }
 
     void save()
     {
-        if (bufsize) {
+        if (bufsize)
+        {
             MemoryAttr dstma;
-
             byte *dst = (byte *)dstma.allocate(sizeof(size32_t)*2+LZ4_COMPRESSBOUND(bufsize));
-
-            size32_t sz = LZ4_compress((const char *)ma.get(), (char *)(sizeof(size32_t)*2+dst), bufsize);
-
+            size32_t sz = LZ4_compress_default((const char *)ma.get(), (char *)(sizeof(size32_t)*2+dst), bufsize, LZ4_COMPRESSBOUND(bufsize));
+            if (!sz)
+            {
+                sz = bufsize;
+                memcpy((sizeof(size32_t)*2+dst), ma.get(), bufsize);
+            }
             memcpy(dst,&bufsize,sizeof(size32_t));
             memcpy(dst+sizeof(size32_t),&sz,sizeof(size32_t));
             baseio->write(cmpOffset,sz+sizeof(size32_t)*2,dst);
@@ -455,128 +294,9 @@ class CLZ4Stream : public CInterface, implements IFileIOStream
 
 
 public:
-    IMPLEMENT_IINTERFACE;
+    CLZ4Stream() { compType = LZ4STRMCOMPRESSEDFILEFLAG; }
 
-    CLZ4Stream()
-    {
-        expOffset = 0;
-        cmpOffset = 0;
-        reading = true;
-        bufpos = 0;
-        bufsize = 0;
-    }
-
-    ~CLZ4Stream()
-    {
-        flush();
-    }
-
-    bool attach(IFileIO *_baseio)
-    {
-        baseio.set(_baseio);
-        expOffset = 0;
-        cmpOffset = 0;
-        reading = true;
-        bufpos = 0;
-        bufsize = 0;
-
-        LZ4CompressedFileTrailer trailer;
-        offset_t filesize = baseio->size();
-        if (filesize<sizeof(trailer))
-            return false;
-        baseio->read(filesize-sizeof(trailer),sizeof(trailer),&trailer);
-        expSize = trailer.expandedSize;
-        return trailer.compressedType==LZ4STRMCOMPRESSEDFILEFLAG;
-    }
-
-    void create(IFileIO *_baseio)
-    {
-        baseio.set(_baseio);
-        expOffset = 0;
-        cmpOffset = 0;
-        reading = false;
-        bufpos = 0;
-        bufsize = 0;
-        ma.allocate(LZ4_BUFFER_SIZE);
-        expSize = (offset_t)-1;
-    }
-
-    void seek(offset_t pos, IFSmode origin)
-    {
-        if ((origin==IFScurrent)&&(pos==0))
-            return;
-        if ((origin==IFSbegin)||(pos!=0))
-            throw MakeStringException(-1,"CLZ4Stream seek not supported");
-        expOffset = 0;
-        bufpos = 0;
-        bufsize = 0;
-    }
-
-    offset_t size()
-    {
-        return (expSize==(offset_t)-1)?0:expSize;
-    }
-
-    offset_t tell()
-    {
-        return expOffset;
-    }
-
-
-    size32_t read(size32_t len, void * data)
-    {
-        if (!reading)
-            throw MakeStringException(-1,"CLZ4Stream read to stream being written");
-        size32_t ret=0;
-        while (len) {
-            size32_t cpy = bufsize-bufpos;
-            if (!cpy) {
-                if (!load())
-                    break;
-                cpy = bufsize-bufpos;
-            }
-            if (cpy>len)
-                cpy = len;
-            memcpy(data,(const byte *)ma.get()+bufpos,cpy);
-            bufpos += cpy;
-            len -= cpy;
-            ret += cpy;
-        }
-        expOffset += ret;
-        return ret;
-    }
-
-    size32_t write(size32_t len, const void * data)
-    {
-        if (reading)
-            throw MakeStringException(-1,"CLZ4Stream write to stream being read");
-        size32_t ret = len;
-        while (len+bufsize>LZ4_BUFFER_SIZE) {
-            size32_t cpy = LZ4_BUFFER_SIZE-bufsize;
-            memcpy((byte *)ma.bufferBase()+bufsize,data,cpy);
-            data = (const byte *)data+cpy;
-            len -= cpy;
-            bufsize = LZ4_BUFFER_SIZE;
-            save();
-        }
-        memcpy((byte *)ma.bufferBase()+bufsize,data,len);
-        bufsize += len;
-        expOffset += len;
-        return ret;
-    }
-
-    void flush()
-    {
-        if (!reading&&(expSize!=expOffset)) {
-            save();
-            LZ4CompressedFileTrailer trailer;
-            memset(&trailer,0,sizeof(trailer));
-            trailer.compressedType = LZ4STRMCOMPRESSEDFILEFLAG;
-            trailer.expandedSize = expOffset;
-            baseio->write(cmpOffset,sizeof(trailer),&trailer);
-            expSize = expOffset;
-        }
-    }
+    virtual ~CLZ4Stream() { flush(); }
 
 };
 

+ 1 - 2
system/jlib/jlz4.hpp

@@ -19,9 +19,8 @@
 #define JLZ4_INCL
 
 #include "jlzw.hpp"
-#include "lz4.h"
 
-#define LZ4COMPRESSEDFILEBLOCKSIZE (0x10000)
+#define LZ4COMPRESSEDFILEBLOCKSIZE (0x100000)
 
 extern jlib_decl ICompressor *createLZ4Compressor();
 extern jlib_decl IExpander   *createLZ4Expander();

+ 64 - 49
system/jlib/jlzw.cpp

@@ -1841,20 +1841,23 @@ struct CompressedFileTrailer
     offset_t        expandedSize;
     offset_t        indexPos;       // end of blocks
     size32_t        blockSize;
-    size32_t        recordSize;     // 0 is lzw or fast (flz) or lz4 compressed
+    size32_t        recordSize;     // 0 is lzw or fastlz or lz4
     __int64         compressedType;
     unsigned        crc;                // must be last
     unsigned numBlocks() { return (unsigned)((indexPos+blockSize-1)/blockSize); }
     unsigned method()
     {
-        if (recordSize)
-            return COMPRESS_METHOD_ROWDIF;
-        if (compressedType==COMPRESSEDFILEFLAG)
-            return COMPRESS_METHOD_LZW;
         if (compressedType==FASTCOMPRESSEDFILEFLAG)
             return COMPRESS_METHOD_FASTLZ;
         if (compressedType==LZ4COMPRESSEDFILEFLAG)
             return COMPRESS_METHOD_LZ4;
+        if (compressedType==COMPRESSEDFILEFLAG)
+        {
+            if (recordSize)
+                return COMPRESS_METHOD_ROWDIF;
+            else
+                return COMPRESS_METHOD_LZW;
+        }
         return 0;
     }
 
@@ -1864,7 +1867,7 @@ struct CompressedFileTrailer
         tree.setPropInt64("@expandedSize",expandedSize);
         tree.setPropInt64("@indexPos",indexPos);
         tree.setPropInt("@blockSize",blockSize);
-        tree.setPropInt("@recordSize",recordSize);      // 0 is lzw compressed
+        tree.setPropInt("@recordSize",recordSize);      // 0 is lzw or fastlz or lz4
         tree.setPropInt64("@compressedType",compressedType);
         tree.setPropInt("@method",method());
         tree.setPropInt("@crc",crc);                
@@ -1880,7 +1883,7 @@ struct WinCompressedFileTrailer
     offset_t        expandedSize;
     offset_t        indexPos;       // end of blocks
     size32_t        blockSize;
-    size32_t        recordSize;     // 0 is lzw compressed
+    size32_t        recordSize;     // 0 is lzw or fastlz or lz4
     __int64         compressedType;
     unsigned        crc;            // must be last
     unsigned        filler2;
@@ -1928,7 +1931,7 @@ class CCompressedFile : public CInterface, implements ICompressedFileIO
     bool writeException;
     Owned<ICompressor> compressor;
     Owned<IExpander> expander;
-    __int64 compType;
+    unsigned compMethod;
 
     unsigned indexNum() { return indexbuf.length()/sizeof(offset_t); }
 
@@ -2029,7 +2032,7 @@ class CCompressedFile : public CInterface, implements ICompressedFileIO
     virtual void expand(const void *compbuf,MemoryBuffer &expbuf,size32_t expsize)
     {
         size32_t rs = trailer.recordSize;
-        if (rs) { // diff compress
+        if (rs) { // diff expand
             const byte *src = (const byte *)compbuf;
             byte *dst = (byte *)expbuf.reserve(expsize);
             if (expsize) {
@@ -2046,7 +2049,7 @@ class CCompressedFile : public CInterface, implements ICompressedFileIO
                 }
             }
         }
-        else { // lzw or fastlz (flz) or lz4
+        else { // lzw or fastlz or lz4
             assertex(expander.get());
             size32_t exp = expander->init(compbuf);
             if (exp!=expsize) {
@@ -2106,14 +2109,16 @@ class CCompressedFile : public CInterface, implements ICompressedFileIO
                 src += len;
             }
         }
-        else // lzw or fastlz (flz) or lz4
+        else // lzw or fastlz or lz4
+        {
             src += compressor->write(src, len);
+        }
         return (size32_t)(src-(const byte *)expbuf);
     }
 public:
     IMPLEMENT_IINTERFACE;
 
-    CCompressedFile(IFileIO *_fileio,IMemoryMappedFile *_mmfile,CompressedFileTrailer &_trailer,ICFmode _mode, bool _setcrc,ICompressor *_compressor,IExpander *_expander, __int64 _compType)
+    CCompressedFile(IFileIO *_fileio,IMemoryMappedFile *_mmfile,CompressedFileTrailer &_trailer,ICFmode _mode, bool _setcrc,ICompressor *_compressor,IExpander *_expander, unsigned _compMethod)
         : fileio(_fileio), mmfile(_mmfile)
     {
         compressor.set(_compressor);
@@ -2124,7 +2129,7 @@ public:
         mode = _mode;
         curblockpos = 0;
         curblocknum = (unsigned)-1; // relies on wrap
-        compType = _compType;
+        compMethod = _compMethod;
         if (mode!=ICFread) {
             if (!_fileio&&_mmfile)
                 throw MakeStringException(-1,"Compressed Write not supported on memory mapped files");
@@ -2140,12 +2145,16 @@ public:
             if (trailer.recordSize==0) {
                 if (!compressor)
                 {
-                    if (compType == COMPRESS_METHOD_FASTLZ)
+                    if (compMethod == COMPRESS_METHOD_FASTLZ)
                         compressor.setown(createFastLZCompressor());
-                    else if (compType == COMPRESS_METHOD_LZ4)
+                    else if (compMethod == COMPRESS_METHOD_LZ4)
                         compressor.setown(createLZ4Compressor());
-                    else // COMPRESS_METHOD_LZW
+                    else // fallback
+                    {
+                        compMethod = COMPRESS_METHOD_LZW;
+                        trailer.compressedType = COMPRESSEDFILEFLAG;
                         compressor.setown(createLZWCompressor(true));
+                    }
                 }
                 compressor->open(compblkptr, trailer.blockSize);
             }
@@ -2170,12 +2179,15 @@ public:
             }
             if (trailer.recordSize==0) {
                 if (!expander) {
-                    if (compType == COMPRESS_METHOD_FASTLZ)
+                    if (compMethod == COMPRESS_METHOD_FASTLZ)
                         expander.setown(createFastLZExpander());
-                    else if (compType == COMPRESS_METHOD_LZ4)
+                    else if (compMethod == COMPRESS_METHOD_LZ4)
                         expander.setown(createLZ4Expander());
-                    else // COMPRESS_METHOD_LZW
+                    else // fallback
+                    {
+                        compMethod = COMPRESS_METHOD_LZW;
                         expander.setown(createLZWExpander(true));
+                    }
                 }
             }
         }
@@ -2333,12 +2345,12 @@ ICompressedFileIO *createCompressedFileReader(IFileIO *fileio,IExpander *expande
                     if (expander&&(trailer.recordSize!=0)) {
                         throw MakeStringException(-1, "Compressed file format error(%d), Encrypted?",trailer.recordSize);
                     }
-                    __int64 compType1 = COMPRESS_METHOD_LZW;
+                    unsigned compMethod1 = COMPRESS_METHOD_LZW;
                     if (trailer.compressedType == FASTCOMPRESSEDFILEFLAG)
-                        compType1 = COMPRESS_METHOD_FASTLZ;
+                        compMethod1 = COMPRESS_METHOD_FASTLZ;
                     else if (trailer.compressedType == LZ4COMPRESSEDFILEFLAG)
-                        compType1 = COMPRESS_METHOD_LZ4;
-                    CCompressedFile *cfile = new CCompressedFile(fileio,NULL,trailer,ICFread,false,NULL,expander,compType1);
+                        compMethod1 = COMPRESS_METHOD_LZ4;
+                    CCompressedFile *cfile = new CCompressedFile(fileio,NULL,trailer,ICFread,false,NULL,expander,compMethod1);
                     return cfile;
                 }
             }
@@ -2367,12 +2379,12 @@ ICompressedFileIO *createCompressedFileReader(IFile *file,IExpander *expander, b
                         if (expander&&(trailer.recordSize!=0)) {
                             throw MakeStringException(-1, "Compressed file format error(%d), Encrypted?",trailer.recordSize);
                         }
-                        __int64 compType1 = COMPRESS_METHOD_LZW;
+                        unsigned compMethod1 = COMPRESS_METHOD_LZW;
                         if (trailer.compressedType == FASTCOMPRESSEDFILEFLAG)
-                            compType1 = COMPRESS_METHOD_FASTLZ;
+                            compMethod1 = COMPRESS_METHOD_FASTLZ;
                         else if (trailer.compressedType == LZ4COMPRESSEDFILEFLAG)
-                            compType1 = COMPRESS_METHOD_LZ4;
-                        CCompressedFile *cfile = new CCompressedFile(NULL,mmfile,trailer,ICFread,false,NULL,expander,compType1);
+                            compMethod1 = COMPRESS_METHOD_LZ4;
+                        CCompressedFile *cfile = new CCompressedFile(NULL,mmfile,trailer,ICFread,false,NULL,expander,compMethod1);
                         return cfile;
                     }
                 }
@@ -2388,7 +2400,7 @@ ICompressedFileIO *createCompressedFileReader(IFile *file,IExpander *expander, b
 
 
 
-ICompressedFileIO *createCompressedFileWriter(IFileIO *fileio,size32_t recordsize,bool _setcrc,ICompressor *compressor, __int64 _compType)
+ICompressedFileIO *createCompressedFileWriter(IFileIO *fileio,size32_t recordsize,bool _setcrc,ICompressor *compressor, unsigned _compMethod)
 {
     CompressedFileTrailer trailer;
     offset_t fsize = fileio->size();
@@ -2403,15 +2415,15 @@ ICompressedFileIO *createCompressedFileWriter(IFileIO *fileio,size32_t recordsiz
                          (trailer.compressedType==FASTCOMPRESSEDFILEFLAG) ||
                          (trailer.compressedType==LZ4COMPRESSEDFILEFLAG) )
                     {
-                        // mck - check trailer.compressedType against _compType ?
-                        __int64 compType1 = 0;
+                        // check trailer.compressedType against _compMethod
+                        unsigned compMethod1 = 0;
                         if (trailer.compressedType == COMPRESSEDFILEFLAG)
-                            compType1 = COMPRESS_METHOD_LZW;
+                            compMethod1 = COMPRESS_METHOD_LZW;
                         else if (trailer.compressedType == FASTCOMPRESSEDFILEFLAG)
-                            compType1 = COMPRESS_METHOD_FASTLZ;
+                            compMethod1 = COMPRESS_METHOD_FASTLZ;
                         else if (trailer.compressedType == LZ4COMPRESSEDFILEFLAG)
-                            compType1 = COMPRESS_METHOD_LZ4;
-                        if (_compType != compType1)
+                            compMethod1 = COMPRESS_METHOD_LZ4;
+                        if (_compMethod != compMethod1)
                             throw MakeStringException(-1,"Appending to file with different compression method");
                         if ((recordsize==trailer.recordSize)||!trailer.recordSize)
                             break;
@@ -2425,37 +2437,40 @@ ICompressedFileIO *createCompressedFileWriter(IFileIO *fileio,size32_t recordsiz
     else {
         memset(&trailer,0,sizeof(trailer));
         trailer.crc = ~0U;
-        if (_compType == COMPRESS_METHOD_FASTLZ)
+        if (_compMethod == COMPRESS_METHOD_FASTLZ)
         {
             trailer.compressedType = FASTCOMPRESSEDFILEFLAG;
             trailer.blockSize = FASTCOMPRESSEDFILEBLOCKSIZE;
+            trailer.recordSize = 0;
         }
-        else if (_compType == COMPRESS_METHOD_LZ4)
+        else if (_compMethod == COMPRESS_METHOD_LZ4)
         {
             trailer.compressedType = LZ4COMPRESSEDFILEFLAG;
             trailer.blockSize = LZ4COMPRESSEDFILEBLOCKSIZE;
+            trailer.recordSize = 0;
         }
-        else // lzw
+        else // fallback
         {
             trailer.compressedType = COMPRESSEDFILEFLAG;
             trailer.blockSize = COMPRESSEDFILEBLOCKSIZE;
+            trailer.recordSize = recordsize;
         }
-        trailer.recordSize = recordsize;
     }
+    // MCK - may present compatibility issue if passing in compressor and wanting row comp
     if (compressor)
         trailer.recordSize = 0; // force not row compressed if compressor specified
-    CCompressedFile *cfile = new CCompressedFile(fileio,NULL,trailer,fsize?ICFappend:ICFcreate,_setcrc,compressor,NULL,_compType);
+    CCompressedFile *cfile = new CCompressedFile(fileio,NULL,trailer,fsize?ICFappend:ICFcreate,_setcrc,compressor,NULL,_compMethod);
     return cfile;
 }
 
-ICompressedFileIO *createCompressedFileWriter(IFile *file,size32_t recordsize,bool append,bool _setcrc,ICompressor *compressor, __int64 _compType, IFEflags extraFlags)
+ICompressedFileIO *createCompressedFileWriter(IFile *file,size32_t recordsize,bool append,bool _setcrc,ICompressor *compressor, unsigned _compMethod, IFEflags extraFlags)
 {
     if (file) {
         if (append&&!file->exists())
             append = false;
         Owned<IFileIO> fileio = file->open(append?IFOreadwrite:IFOcreate, extraFlags);
         if (fileio) 
-            return createCompressedFileWriter(fileio,recordsize,_setcrc,compressor,_compType);
+            return createCompressedFileWriter(fileio,recordsize,_setcrc,compressor,_compMethod);
     }
     return NULL;
 }
@@ -2655,9 +2670,10 @@ IPropertyTree *getBlockedFileDetails(IFile *file)
             CompressedFileTrailer trailer;
             if (fileio->read(fsize-sizeof(WinCompressedFileTrailer),sizeof(WinCompressedFileTrailer),&wintrailer)==sizeof(WinCompressedFileTrailer)) {
                 wintrailer.translate(trailer);
-                if ((trailer.compressedType==COMPRESSEDFILEFLAG) ||
-                    (trailer.compressedType==FASTCOMPRESSEDFILEFLAG) ||
-                    (trailer.compressedType==LZ4COMPRESSEDFILEFLAG)) {
+                if ( (trailer.compressedType==COMPRESSEDFILEFLAG) ||
+                     (trailer.compressedType==FASTCOMPRESSEDFILEFLAG) ||
+                     (trailer.compressedType==LZ4COMPRESSEDFILEFLAG) )
+                {
                     trailer.setDetails(*tree);
                     unsigned nb = trailer.numBlocks();
                     MemoryAttr indexbuf;
@@ -2771,14 +2787,13 @@ MODULE_INIT(INIT_PRIORITY_STANDARD)
         virtual ICompressor *getCompressor(const char *options) { return createLZWCompressor(true); }
         virtual IExpander *getExpander(const char *options) { return createLZWExpander(true); }
     };
-    addCompressorHandler(new CFLZCompressHandler());
+    ICompressHandler *flzCompressor = new CFLZCompressHandler();
+    addCompressorHandler(flzCompressor);
     addCompressorHandler(new CAESCompressHandler());
     addCompressorHandler(new CDiffCompressHandler());
     addCompressorHandler(new CLZWCompressHandler());
-    addCompressorHandler(new CDENCompressHandler());
-    ICompressHandler *lz4Compressor = new CLZ4CompressHandler();
-    addCompressorHandler(lz4Compressor);
-    defaultCompressor.set(lz4Compressor);
+    addCompressorHandler(new CLZ4CompressHandler());
+    defaultCompressor.set(flzCompressor);
     return true;
 }
 

+ 2 - 2
system/jlib/jlzw.hpp

@@ -112,8 +112,8 @@ interface ICompressedFileIO: extends IFileIO
 
 extern jlib_decl ICompressedFileIO *createCompressedFileReader(IFile *file,IExpander *expander=NULL, bool memorymapped=false, IFEflags extraFlags=IFEnone);
 extern jlib_decl ICompressedFileIO *createCompressedFileReader(IFileIO *fileio,IExpander *expander=NULL);
-extern jlib_decl ICompressedFileIO *createCompressedFileWriter(IFile *file,size32_t recordsize,bool append=false,bool setcrc=true,ICompressor *compressor=NULL, __int64 compType=COMPRESS_METHOD_LZW, IFEflags extraFlags=IFEnone);
-extern jlib_decl ICompressedFileIO *createCompressedFileWriter(IFileIO *fileio,size32_t recordsize,bool setcrc=true,ICompressor *compressor=NULL, __int64 compType=COMPRESS_METHOD_LZW);
+extern jlib_decl ICompressedFileIO *createCompressedFileWriter(IFile *file,size32_t recordsize,bool append=false,bool setcrc=true,ICompressor *compressor=NULL, unsigned compMethod=COMPRESS_METHOD_LZW, IFEflags extraFlags=IFEnone);
+extern jlib_decl ICompressedFileIO *createCompressedFileWriter(IFileIO *fileio,size32_t recordsize,bool setcrc=true,ICompressor *compressor=NULL, unsigned compMethod=COMPRESS_METHOD_LZW);
 
 #define COMPRESSEDFILECRC (~0U)
 

+ 44 - 0
system/lz4_sm/CMakeLists.txt

@@ -0,0 +1,44 @@
+################################################################################
+#    HPCC SYSTEMS software Copyright (C) 2015 HPCC Systems.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License");
+#    you may not use this file except in compliance with the License.
+#    You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+################################################################################
+
+# Component: lz4
+
+#####################################################
+# Description:
+# ------------
+#    Cmake Input File for lz4
+#####################################################
+
+
+project( lz4 )
+
+set ( SRCS
+        lz4/lib/lz4.c
+)
+
+include_directories (
+        lz4/lib
+)
+
+ADD_DEFINITIONS( -D_LIB )
+
+SET_SOURCE_FILES_PROPERTIES( ${SRCS} PROPERTIES LANGUAGE C )
+
+if (CMAKE_COMPILER_IS_GNUCC OR CMAKE_COMPILER_IS_CLANG)
+    set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fPIC -std=c99")
+endif()
+
+HPCC_ADD_LIBRARY( lz4 STATIC ${SRCS} )

+ 1 - 0
system/lz4_sm/lz4

@@ -0,0 +1 @@
+Subproject commit d86dc916771c126afb797637dda9f6421c0cb998

+ 9 - 3
thorlcr/activities/filter/thfilterslave.cpp

@@ -242,7 +242,7 @@ class CFilterGroupSlaveActivity : public CFilterSlaveActivityBase, public CThorS
     IHThorFilterGroupArg *helper;
     Owned<IThorRowLoader> groupLoader;
     Owned<IRowStream> groupStream;
-    bool compressSpills;
+    unsigned spillCompInfo;
 
 public:
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
@@ -251,7 +251,13 @@ public:
     {
         groupLoader.setown(createThorRowLoader(*this, NULL, stableSort_none, rc_allMem));
         helper = NULL;
-        compressSpills = getOptBool(THOROPT_COMPRESS_SPILLS, true);
+        spillCompInfo = 0x0;
+        if (getOptBool(THOROPT_COMPRESS_SPILLS, true))
+        {
+            StringBuffer compType;
+            getOpt(THOROPT_COMPRESS_SPILL_TYPE, compType);
+            setCompFlag(compType, spillCompInfo);
+        }
     }
     void init(MemoryBuffer &data, MemoryBuffer &slaveData)
     {
@@ -299,7 +305,7 @@ public:
                 {
                     CThorSpillableRowArray spillableRows(*this, this);
                     spillableRows.transferFrom(rows);
-                    groupStream.setown(spillableRows.createRowStream(SPILL_PRIORITY_SPILLABLE_STREAM, compressSpills));
+                    groupStream.setown(spillableRows.createRowStream(SPILL_PRIORITY_SPILLABLE_STREAM, spillCompInfo));
                 }
                 // else read next group
             }

+ 10 - 0
thorlcr/activities/hashdistrib/thhashdistribslave.cpp

@@ -2184,7 +2184,12 @@ public:
                 MemoryAttr ma;
                 activity->startInput(in);
                 if (activity->getOptBool(THOROPT_COMPRESS_SPILLS, true))
+                {
                     rwFlags |= rw_compress;
+                    StringBuffer compType;
+                    activity->getOpt(THOROPT_COMPRESS_SPILL_TYPE, compType);
+                    setCompFlag(compType, rwFlags);
+                }
                 Owned<IExtRowWriter> out = createRowWriter(tempfile, activity, rwFlags);
                 if (!out)
                     throw MakeStringException(-1,"Could not created file %s",tempname.str());
@@ -2551,7 +2556,12 @@ public:
         OwnedIFile iFile = createIFile(tempname.str());
         spillFile.setown(new CFileOwner(iFile.getLink()));
         if (owner.getOptBool(THOROPT_COMPRESS_SPILLS, true))
+        {
             rwFlags |= rw_compress;
+            StringBuffer compType;
+            owner.getOpt(THOROPT_COMPRESS_SPILL_TYPE, compType);
+            setCompFlag(compType, rwFlags);
+        }
         writer = createRowWriter(iFile, rowIf, rwFlags);
     }
     IRowStream *getReader(rowcount_t *_count=NULL) // NB: also detatches ownership of 'fileOwner'

+ 24 - 8
thorlcr/activities/lookupjoin/thlookupjoinslave.cpp

@@ -1574,6 +1574,7 @@ protected:
     using PARENT::doBroadcastStop;
     using PARENT::getGlobalRHSTotal;
     using PARENT::getOptBool;
+    using PARENT::getOpt;
     using PARENT::broadcaster;
     using PARENT::inputs;
     using PARENT::queryHelper;
@@ -1606,7 +1607,7 @@ protected:
     // Handling failover to a) hashed local lookupjoin b) hash distributed standard join
     bool smart;
     bool rhsCollated, rhsCompacted;
-    bool compressSpills;
+    unsigned spillCompInfo;
     Owned<IHashDistributor> lhsDistributor, rhsDistributor;
     ICompare *compareLeft;
     atomic_t failedOverToLocal, failedOverToStandard;
@@ -1714,11 +1715,14 @@ protected:
                     VStringBuffer spillPrefixStr("clearAllNonLocalRows(%d)", SPILL_PRIORITY_SPILLABLE_STREAM);
 
                     // 3rd param. is skipNulls = true, the row arrays may have had the non-local rows delete already.
-                    rows.save(file->queryIFile(), compressSpills, true, spillPrefixStr.str()); // saves committed rows
+                    rows.save(file->queryIFile(), spillCompInfo, true, spillPrefixStr.str()); // saves committed rows
 
                     unsigned rwFlags = DEFAULT_RWFLAGS;
-                    if (compressSpills)
+                    if (spillCompInfo)
+                    {
                         rwFlags |= rw_compress;
+                        rwFlags |= spillCompInfo;
+                    }
                     gatheredRHSNodeStreams.append(* createRowStream(&file->queryIFile(), queryRowInterfaces(rightITDL), rwFlags));
                     return true;
                 }
@@ -1781,7 +1785,7 @@ protected:
                  * fail over to standard join and it is better to 1st spill a smaller channel collection
                  * that this will feed, than this larger stream.
                  */
-                return spillableRHS.createRowStream(SPILL_PRIORITY_LOOKUPJOIN+10, compressSpills);
+                return spillableRHS.createRowStream(SPILL_PRIORITY_LOOKUPJOIN+10, spillCompInfo);
             }
         }
         else
@@ -1812,14 +1816,17 @@ protected:
                      * fail over to standard join and it is better to 1st spill a smaller channel collection
                      * that this will feed, than these larger stream.
                      */
-                    gatheredRHSNodeStreams.append(* rows.createRowStream(SPILL_PRIORITY_LOOKUPJOIN+10, compressSpills)); // NB: default SPILL_PRIORITY_SPILLABLE_STREAM is lower than SPILL_PRIORITY_LOOKUPJOIN
+                    gatheredRHSNodeStreams.append(* rows.createRowStream(SPILL_PRIORITY_LOOKUPJOIN+10, spillCompInfo)); // NB: default SPILL_PRIORITY_SPILLABLE_STREAM is lower than SPILL_PRIORITY_LOOKUPJOIN
                 }
             }
             if (overflowWriteFile)
             {
                 unsigned rwFlags = DEFAULT_RWFLAGS;
-                if (getOptBool(THOROPT_COMPRESS_SPILLS, true))
+                if (spillCompInfo)
+                {
                     rwFlags |= rw_compress;
+                    rwFlags |= spillCompInfo;
+                }
                 ActPrintLog("Reading overflow RHS broadcast rows : %" RCPF "d", overflowWriteCount);
                 Owned<IRowStream> overflowStream = createRowStream(&overflowWriteFile->queryIFile(), queryRowInterfaces(rightITDL), rwFlags);
                 gatheredRHSNodeStreams.append(* overflowStream.getClear());
@@ -2389,7 +2396,13 @@ public:
                 break;
         }
         overflowWriteCount = 0;
-        compressSpills = getOptBool(THOROPT_COMPRESS_SPILLS, true);
+        spillCompInfo = 0x0;
+        if (getOptBool(THOROPT_COMPRESS_SPILLS, true))
+        {
+            StringBuffer compType;
+            getOpt(THOROPT_COMPRESS_SPILL_TYPE, compType);
+            setCompFlag(compType, spillCompInfo);
+        }
         ActPrintLog("Smart join = %s", smart?"true":"false");
     }
     bool exceedsLimit(rowidx_t count, const void *left, const void *right, const void *&failRow)
@@ -2601,8 +2614,11 @@ public:
             if (!overflowWriteFile)
             {
                 unsigned rwFlags = DEFAULT_RWFLAGS;
-                if (getOptBool(THOROPT_COMPRESS_SPILLS, true))
+                if (spillCompInfo)
+                {
                     rwFlags |= rw_compress;
+                    rwFlags |= spillCompInfo;
+                }
                 StringBuffer tempFilename;
                 GetTempName(tempFilename, "lookup_local", true);
                 ActPrintLog("Overflowing RHS broadcast rows to spill file: %s", tempFilename.str());

+ 48 - 4
thorlcr/activities/thactivityutil.cpp

@@ -786,9 +786,29 @@ IFileIO *createMultipleWrite(CActivityBase *activity, IPartDescriptor &partDesc,
     Owned<IFileIO> fileio;
     if (compress)
     {
-        if (activity->getOptBool(THOROPT_COMP_FORCELZW, false))
-            recordSize = 0; // by default if fixed length (recordSize set), row diff compression is used. This forces LZW
-        fileio.setown(createCompressedFileWriter(file, recordSize, 0 != (twFlags & TW_Extend), true, ecomp));
+        unsigned compMethod = COMPRESS_METHOD_LZW;
+        // rowdif used if recordSize > 0, else fallback to compMethod
+        if (!ecomp)
+        {
+            if (twFlags & TW_Temporary)
+            {
+                // if temp file then can use newer compressor
+                StringBuffer compType;
+                activity->getOpt(THOROPT_COMPRESS_SPILL_TYPE, compType);
+                compMethod = getCompMethod(compType);
+            }
+            // force
+            if (activity->getOptBool(THOROPT_COMP_FORCELZW, false))
+            {
+                recordSize = 0; // by default if fixed length (recordSize set), row diff compression is used. This forces compMethod.
+                compMethod = COMPRESS_METHOD_LZW;
+            }
+            else if (activity->getOptBool(THOROPT_COMP_FORCEFLZ, false))
+                compMethod = COMPRESS_METHOD_FASTLZ;
+            else if (activity->getOptBool(THOROPT_COMP_FORCELZ4, false))
+                compMethod = COMPRESS_METHOD_LZ4;
+        }
+        fileio.setown(createCompressedFileWriter(file, recordSize, 0 != (twFlags & TW_Extend), true, ecomp, compMethod));
         if (!fileio)
         {
             compress = false;
@@ -801,7 +821,31 @@ IFileIO *createMultipleWrite(CActivityBase *activity, IPartDescriptor &partDesc,
         fileio.setown(file->open((twFlags & TW_Extend)&&file->exists()?IFOwrite:IFOcreate));
     if (!fileio)
         throw MakeActivityException(activity, TE_FileCreationFailed, "Failed to create file for write (%s) error = %d", outLocationName.str(), GetLastError());
-    ActPrintLog(activity, "Writing to file: %s, compress=%s, rdiff=%s", file->queryFilename(), compress ? "true" : "false", (compress && recordSize) ? "true" : "false");
+    StringBuffer compStr;
+    if (compress)
+    {
+        ICompressedFileIO *icompfio = QUERYINTERFACE(fileio.get(), ICompressedFileIO);
+        if (icompfio)
+        {
+            unsigned compMeth2 = icompfio->method();
+            if (COMPRESS_METHOD_FASTLZ == compMeth2)
+                compStr.append("flz");
+            else if (COMPRESS_METHOD_LZ4 == compMeth2)
+                compStr.append("lz4");
+            else if (COMPRESS_METHOD_LZW == compMeth2)
+                compStr.append("lzw");
+            else if (COMPRESS_METHOD_ROWDIF == compMeth2)
+                compStr.append("rdiff");
+            else
+                compStr.append("unknown");
+        }
+        else
+            compStr.append("unknown");
+    }
+    else
+        compStr.append("false");
+
+    ActPrintLog(activity, "Writing to file: %s, compress=%s", file->queryFilename(), compStr.str());
     return new CWriteHandler(*activity, partDesc, file, fileio, iProgress, twFlags, aborted);
 }
 

+ 1 - 0
thorlcr/activities/thactivityutil.ipp

@@ -195,6 +195,7 @@ void cancelReplicates(CActivityBase *activity, IPartDescriptor &partDesc);
 #define TW_Direct 0x02
 #define TW_External 0x04
 #define TW_RenameToPrimary 0x08
+#define TW_Temporary 0x10
 interface IPartDescriptor;
 IFileIO *createMultipleWrite(CActivityBase *activity, IPartDescriptor &partDesc, unsigned recordSize, unsigned twFlags, bool &compress, ICompressor *ecomp, ICopyFileProgress *iProgress, bool *aborted, StringBuffer *_locationName=NULL);
 

+ 3 - 1
thorlcr/activities/thdiskbaseslave.cpp

@@ -352,6 +352,8 @@ void CDiskWriteSlaveActivityBase::open()
         twFlags |= TW_RenameToPrimary;
     if (extend||(external&&!query))
         twFlags |= TW_Extend;
+    if (diskHelperBase->getFlags() & TDXtemporary)
+        twFlags |= TW_Temporary;
 
     {
         CriticalBlock block(statsCs);
@@ -360,7 +362,7 @@ void CDiskWriteSlaveActivityBase::open()
 
     if (compress)
     {
-        ActPrintLog("Performing row compression on output file: %s", fName.get());
+        ActPrintLog("Performing compression on output file: %s", fName.get());
         // NB: block compressed output has implicit crc of 0, no need to calculate in row  writer.
         calcFileCrc = false;
     }

+ 10 - 4
thorlcr/msort/tsorts.cpp

@@ -441,19 +441,25 @@ public:
         size32_t blksize = 0x100000;
 
         // JCSMORE - at the moment, the localsort set is already sorted
-        bool compressSpills = activity.getOptBool(THOROPT_COMPRESS_SPILLS, true);
+        unsigned spillCompInfo = 0x0;
+        if (activity.getOptBool(THOROPT_COMPRESS_SPILLS, true))
+        {
+            StringBuffer compType;
+            activity.getOpt(THOROPT_COMPRESS_SPILL_TYPE, compType);
+            setCompFlag(compType, spillCompInfo);
+        }
         if (1 == activity.queryJob().querySlaves())
         {
             CThorSpillableRowArray spillableRows(activity, &rowIf);
             rowCount = localRows.ordinality();
             spillableRows.transferFrom(localRows);
-            return spillableRows.createRowStream(SPILL_PRIORITY_SPILLABLE_STREAM, compressSpills);
+            return spillableRows.createRowStream(SPILL_PRIORITY_SPILLABLE_STREAM, spillCompInfo);
         }
         if (partNo)
         {
             CThorSpillableRowArray spillableRows(activity, &rowIf);
             spillableRows.transferFrom(localRows);
-            Owned<IRowStream> spillableStream = spillableRows.createRowStream(SPILL_PRIORITY_SPILLABLE_STREAM, compressSpills);
+            Owned<IRowStream> spillableStream = spillableRows.createRowStream(SPILL_PRIORITY_SPILLABLE_STREAM, spillCompInfo);
 
             CMessageBuffer mb;
             loop
@@ -554,7 +560,7 @@ public:
             rowCount = globalRows.ordinality();
             CThorSpillableRowArray spillableRows(activity, &rowIf);
             spillableRows.transferFrom(globalRows);
-            return spillableRows.createRowStream(SPILL_PRIORITY_SPILLABLE_STREAM, compressSpills);
+            return spillableRows.createRowStream(SPILL_PRIORITY_SPILLABLE_STREAM, spillCompInfo);
         }
     }
 };

+ 32 - 17
thorlcr/thorutil/thmem.cpp

@@ -163,8 +163,9 @@ class CSpillableStreamBase : public CSimpleInterface, implements roxiemem::IBuff
 protected:
     CActivityBase &activity;
     IRowInterfaces *rowIf;
-    bool preserveNulls, ownsRows, useCompression;
+    bool preserveNulls, ownsRows;
     unsigned spillPriority;
+    unsigned spillCompInfo;
     CThorSpillableRowArray rows;
     OwnedIFile spillFile;
     bool mmRegistered;
@@ -182,7 +183,7 @@ protected:
         spillFile.setown(createIFile(tempName.str()));
 
         VStringBuffer spillPrefixStr("SpillableStream(%d)", SPILL_PRIORITY_SPILLABLE_STREAM); // const for now
-        rows.save(*spillFile, useCompression, false, spillPrefixStr.str()); // saves committed rows
+        rows.save(*spillFile, spillCompInfo, false, spillPrefixStr.str()); // saves committed rows
         rows.kill(); // no longer needed, readers will pull from spillFile. NB: ok to kill array as rows is never written to or expanded
         return true;
     }
@@ -210,9 +211,9 @@ public:
     {
         assertex(inRows.isFlushed());
         rows.swap(inRows);
-        useCompression = false;
         mmRegistered = false;
         ownsRows = false;
+        spillCompInfo = 0x0;
     }
     ~CSpillableStreamBase()
     {
@@ -356,10 +357,10 @@ class CSpillableStream : public CSpillableStreamBase, implements IRowStream
 public:
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
 
-    CSpillableStream(CActivityBase &_activity, CThorSpillableRowArray &inRows, IRowInterfaces *_rowIf, bool _preserveNulls, unsigned _spillPriority, bool _compressSpills)
+    CSpillableStream(CActivityBase &_activity, CThorSpillableRowArray &inRows, IRowInterfaces *_rowIf, bool _preserveNulls, unsigned _spillPriority, unsigned _spillCompInfo)
         : CSpillableStreamBase(_activity, inRows, _rowIf, _preserveNulls, _spillPriority)
     {
-        useCompression = _compressSpills;
+        spillCompInfo = _spillCompInfo;
         pos = numReadRows = 0;
         granularity = 500; // JCSMORE - rows
 
@@ -386,10 +387,13 @@ public:
             {
                 block.clearCB = true;
                 unsigned rwFlags = DEFAULT_RWFLAGS;
+                if (spillCompInfo)
+                {
+                    rwFlags |= rw_compress;
+                    rwFlags |= spillCompInfo;
+                }
                 if (preserveNulls)
                     rwFlags |= rw_grouped;
-                if (useCompression)
-                    rwFlags |= rw_compress;
                 spillStream.setown(createRowStream(spillFile, rowIf, rwFlags));
                 return spillStream->nextRow();
             }
@@ -1277,19 +1281,22 @@ static int callbackSortRev(IInterface * const *cb2, IInterface * const *cb1)
     return 1;
 }
 
-rowidx_t CThorSpillableRowArray::save(IFile &iFile, bool useCompression, bool skipNulls, const char *tracingPrefix)
+rowidx_t CThorSpillableRowArray::save(IFile &iFile, unsigned _spillCompInfo, bool skipNulls, const char *tracingPrefix)
 {
     rowidx_t n = numCommitted();
     if (0 == n)
         return 0;
     ActPrintLog(&activity, "%s: CThorSpillableRowArray::save %" RIPF "d rows", tracingPrefix, n);
 
-    if (useCompression)
+    if (_spillCompInfo)
         assertex(0 == writeCallbacks.ordinality()); // incompatible
 
     unsigned rwFlags = DEFAULT_RWFLAGS;
-    if (useCompression)
+    if (_spillCompInfo)
+    {
         rwFlags |= rw_compress;
+        rwFlags |= _spillCompInfo;
+    }
     if (allowNulls)
         rwFlags |= rw_grouped;
 
@@ -1461,10 +1468,10 @@ void CThorSpillableRowArray::transferRowsCopy(const void **outRows, bool takeOwn
     }
 }
 
-IRowStream *CThorSpillableRowArray::createRowStream(unsigned spillPriority, bool compressSpills)
+IRowStream *CThorSpillableRowArray::createRowStream(unsigned spillPriority, unsigned spillCompInfo)
 {
     assertex(rowIf);
-    return new CSpillableStream(activity, *this, rowIf, allowNulls, spillPriority, compressSpills);
+    return new CSpillableStream(activity, *this, rowIf, allowNulls, spillPriority, spillCompInfo);
 }
 
 
@@ -1491,7 +1498,7 @@ protected:
     bool mmRegistered;
     Owned<CSharedSpillableRowSet> spillableRowSet;
     unsigned options;
-    bool compressSpills;
+    unsigned spillCompInfo;
     __uint64 spillCycles;
     __uint64 sortCycles;
 
@@ -1518,7 +1525,7 @@ protected:
         GetTempName(tempName, tempPrefix.str(), true);
         Owned<IFile> iFile = createIFile(tempName.str());
         VStringBuffer spillPrefixStr("RowCollector(%d)", spillPriority);
-        spillableRows.save(*iFile, compressSpills, false, spillPrefixStr.str()); // saves committed rows
+        spillableRows.save(*iFile, spillCompInfo, false, spillPrefixStr.str()); // saves committed rows
         spillFiles.append(new CFileOwner(iFile.getLink()));
         ++overflowCount;
         sizeSpill += iFile->size();
@@ -1598,8 +1605,11 @@ protected:
         // NB: CStreamFileOwner links CFileOwner - last usage will auto delete file
         // which may be one of these streams or CThorRowCollectorBase itself
         unsigned rwFlags = DEFAULT_RWFLAGS;
-        if (compressSpills)
+        if (spillCompInfo)
+        {
             rwFlags |= rw_compress;
+            rwFlags |= spillCompInfo;
+        }
         if (preserveGrouping)
             rwFlags |= rw_grouped;
         IArrayOf<IRowStream> instrms;
@@ -1641,7 +1651,7 @@ protected:
                     return NULL;
                 }
                 if (!shared)
-                    instrms.append(*spillableRows.createRowStream(spillPriority, compressSpills)); // NB: stream will take ownership of rows in spillableRows
+                    instrms.append(*spillableRows.createRowStream(spillPriority, spillCompInfo)); // NB: stream will take ownership of rows in spillableRows
                 else
                 {
                     spillableRowSet.setown(new CSharedSpillableRowSet(activity, spillableRows, rowIf, preserveGrouping, spillPriority));
@@ -1710,7 +1720,12 @@ public:
         maxCores = activity.queryMaxCores();
         options = 0;
         spillableRows.setup(rowIf, false, stableSort);
-        compressSpills = activity.getOptBool(THOROPT_COMPRESS_SPILLS, true);
+        if (activity.getOptBool(THOROPT_COMPRESS_SPILLS, true))
+        {
+            StringBuffer compType;
+            activity.getOpt(THOROPT_COMPRESS_SPILL_TYPE, compType);
+            setCompFlag(compType, spillCompInfo);
+        }
         spillCycles = 0;
         sortCycles = 0;
     }

+ 2 - 2
thorlcr/thorutil/thmem.hpp

@@ -458,7 +458,7 @@ public:
 
     //A thread calling the following functions must own the lock, or guarantee no other thread will access
     void sort(ICompare & compare, unsigned maxcores);
-    rowidx_t save(IFile &file, bool useCompression, bool skipNulls, const char *tracingPrefix);
+    rowidx_t save(IFile &file, unsigned _spillCompInfo, bool skipNulls, const char *tracingPrefix);
 
     inline rowidx_t numCommitted() const { return commitRows - firstRow; } //MORE::Not convinced this is very safe!
 
@@ -472,7 +472,7 @@ public:
     void transferFrom(CThorExpandingRowArray &src);
     void transferFrom(CThorSpillableRowArray &src);
 
-    IRowStream *createRowStream(unsigned spillPriority, bool compressSpills);
+    IRowStream *createRowStream(unsigned spillPriority, unsigned spillCompInfo);
 
     offset_t serializedSize()
     {

+ 4 - 1
thorlcr/thorutil/thormisc.hpp

@@ -48,6 +48,7 @@
 
 /// Thor options, that can be hints, workunit options, or global settings
 #define THOROPT_COMPRESS_SPILLS       "compressInternalSpills"  // Compress internal spills, e.g. spills created by lookahead or sort gathering  (default = true)
+#define THOROPT_COMPRESS_SPILL_TYPE   "spillCompressorType"     // Compress spill type, e.g. FLZ, LZ4 (or other to get previous)                 (default = LZ4)
 #define THOROPT_HDIST_SPILL           "hdistSpill"              // Allow distribute receiver to spill to disk, rather than blocking              (default = true)
 #define THOROPT_HDIST_WRITE_POOL_SIZE "hdistSendPoolSize"       // Distribute send thread pool size                                              (default = 16)
 #define THOROPT_HDIST_BUCKET_SIZE     "hdOutBufferSize"         // Distribute target bucket send size                                            (default = 1MB)
@@ -55,7 +56,7 @@
 #define THOROPT_HDIST_PULLBUFFER_SIZE "hdPullBufferSize"        // Distribute pull buffer size (receiver side limit, before spilling)
 #define THOROPT_HDIST_CANDIDATELIMIT  "hdCandidateLimit"        // Limits # of buckets to push to the writers when send buffer is full           (default = is 50% largest)
 #define THOROPT_HDIST_TARGETWRITELIMIT "hdTargetLimit"          // Limit # of writer threads working on a single target                          (default = unbound, but picks round-robin)
-#define THOROPT_HDIST_COMP            "hdCompressorType"        // Distribute compressor to use                                                  (default = "LZ4")
+#define THOROPT_HDIST_COMP            "hdCompressorType"        // Distribute compressor to use                                                  (default = "FLZ")
 #define THOROPT_HDIST_COMPOPTIONS     "hdCompressorOptions"     // Distribute compressor options, e.g. AES key                                   (default = "")
 #define THOROPT_SPLITTER_SPILL        "splitterSpill"           // Force splitters to spill or not, default is to adhere to helper setting       (default = -1)
 #define THOROPT_LOOP_MAX_EMPTY        "loopMaxEmpty"            // Max # of iterations that LOOP can cycle through with 0 results before errors  (default = 1000)
@@ -70,6 +71,8 @@
 #define THOROPT_LKJOIN_HASHJOINFAILOVER "lkjoin_hashjoinfailover" // Force SMART to failover to hash join (for testing only)                     (default = false)
 #define THOROPT_MAX_KERNLOG           "max_kern_level"          // Max kernel logging level, to push to workunit, -1 to disable                  (default = 3)
 #define THOROPT_COMP_FORCELZW         "forceLZW"                // Forces file compression to use LZW                                            (default = false)
+#define THOROPT_COMP_FORCEFLZ         "forceFLZ"                // Forces file compression to use FLZ                                            (default = false)
+#define THOROPT_COMP_FORCELZ4         "forceLZ4"                // Forces file compression to use LZ4                                            (default = false)
 #define THOROPT_TRACE_ENABLED         "traceEnabled"            // Output from TRACE activity enabled                                            (default = false)
 #define THOROPT_TRACE_LIMIT           "traceLimit"              // Number of rows from TRACE activity                                            (default = 10)
 #define THOROPT_READ_CRC              "crcReadEnabled"          // Enabled CRC validation on disk reads if file CRC are available                (default = true)

+ 15 - 13
tools/copyexp/copyexp.cpp

@@ -40,18 +40,18 @@ void usage(bool isHelp)
     printf("   copyexp <file>                     -- returns compress type\n");
     printf("   copyexp <file> <destination>       -- copies file to destination\n");
     printf("                                         (expanding as needed)\n");
-    printf("   copyexp -z <file> <dest>           -- compresses file (LZW)\n");
-    printf("   copyexp -r <recsz> <file> <dest>   -- compresses file (RowDif)\n");
-    printf("   copyexp -f <file> <dest>           -- compresses file (FastLZ)\n");
+    printf("   copyexp -z  <file> <dest>          -- compresses file (LZW)\n");
+    printf("   copyexp -r  <recsz> <file> <dest>  -- compresses file (RowDif)\n");
+    printf("   copyexp -f  <file> <dest>          -- compresses file (FastLZ)\n");
     printf("   copyexp -fs <file> <dest>          -- compresses file (FastLZ stream)\n");
-    printf("   copyexp -l <file> <dest>           -- compresses file (LZ4)\n");
+    printf("   copyexp -l  <file> <dest>          -- compresses file (LZ4)\n");
     printf("   copyexp -ls <file> <dest>          -- compresses file (LZ4 stream)\n");
     printf("           -s                         -- timing stats\n");
     printf("           -d                         -- do not cache files in OS\n");
     doexit(isHelp ? 0 : 2);
 }
 
-#define BUFFERSIZE (0x10000)
+#define BUFFERSIZE (0x100000)
 
 void printCompDetails(const char *fname,IFileIO *baseio,ICompressedFileIO *cmpio,IFileIOStream *strm, bool flzstrm, bool lz4strm)
 {
@@ -77,8 +77,10 @@ void printCompDetails(const char *fname,IFileIO *baseio,ICompressedFileIO *cmpio
         expsize = cmpio->size();
     }
     printf("%s: is %s compressed, size= %" I64F "d, expanded= %" I64F "d",fname,method,baseio->size(),expsize);
+    if (!strm&&cmpio)
+        printf(", block size= %d",cmpio->blockSize());
     if (!strm&&cmpio&&cmpio->recordSize())
-        printf(", record size = %d",cmpio->recordSize());
+        printf(", record size= %d",cmpio->recordSize());
     printf("\n");
 }
 
@@ -112,7 +114,7 @@ static void printStats(offset_t filesize,unsigned start,unsigned startu)
     if (!elapsedu)
         elapsedu = 1;
     if (elapsed<1000)
-        printf("%" I64F "d Bytes copied, at %.2f MB/s in %s\n",filesize,((((double)filesize)/(1024*1024))/elapsedu)*1000000,formatTimeU(elapsedu,tmp));
+        printf("%" I64F "d bytes copied, at %.2f MB/s in %s\n",filesize,((((double)filesize)/(1024*1024))/elapsedu)*1000000,formatTimeU(elapsedu,tmp));
     else
         printf("%" I64F "d bytes copied, at %.2f MB/s in %s\n",filesize,((((double)filesize)/(1024*1024))/elapsed)*1000,formatTime(elapsed,tmp));
 }
@@ -219,7 +221,7 @@ int copyExpanded(const char *from, const char *to, bool stats)
     CDateTime createTime, modifiedTime;
     if (srcfile->getTime(&createTime, &modifiedTime, NULL))
         dstfile->setTime(&createTime, &modifiedTime, NULL);
-    printf("copied %s to %s%s\n",from,to,cmpio.get()?" expanding":"");
+    printf("copied %s to %s%s\n",from,to,(cmpio.get()||strmsrc)?" expanding":"");
     return 0;
 }
 
@@ -305,12 +307,12 @@ void copyCompress(const char *from, const char *to, size32_t rowsize, bool fast,
     }
     else 
     {
-        __int64 compType = COMPRESS_METHOD_LZW;
+        unsigned compMethod = COMPRESS_METHOD_LZW;
         if (fast)
-            compType = COMPRESS_METHOD_FASTLZ;
-        else if(lz4)
-            compType = COMPRESS_METHOD_LZ4;
-        dstio.setown(createCompressedFileWriter(dstfile,rowsize,false,true,NULL,compType,extraFlags));
+            compMethod = COMPRESS_METHOD_FASTLZ;
+        else if (lz4)
+            compMethod = COMPRESS_METHOD_LZ4;
+        dstio.setown(createCompressedFileWriter(dstfile,rowsize,false,true,NULL,compMethod,extraFlags));
     }
 
     if (!dstio) {