浏览代码

HPCC-9920 Add LZ4 compression method

Signed-off-by: Mark Kelly <mark.kelly@lexisnexis.com>
Mark Kelly 10 年之前
父节点
当前提交
731a84f511

+ 4 - 0
system/jlib/CMakeLists.txt

@@ -48,6 +48,8 @@ set (    SRCS
          jexcept.cpp 
          jfile.cpp 
          jflz.cpp 
+         jlz4.cpp
+         lz4.c
          jhash.cpp 
          jiface.cpp 
          jio.cpp 
@@ -102,6 +104,8 @@ set (    INCLUDES
         jfile.hpp
         jfile.ipp
         jflz.hpp
+        jlz4.hpp
+        lz4.h
         jhash.hpp
         jhash.ipp
         jheap.hpp

+ 1 - 4
system/jlib/jflz.cpp

@@ -628,10 +628,7 @@ class jlib_decl CFastLZCompressor : public CInterface, public ICompressor
             trailing = true;    // too small to bother compressing
         else {
             trailing = false;
-            size32_t slack = inmax/17;
-            if (slack<66)
-                slack = 66;
-            inmax -= slack+sizeof(size32_t);
+            inmax -= (fastlzSlack(inmax) + sizeof(size32_t));
         }
     }
 

+ 1 - 0
system/jlib/jflz.hpp

@@ -22,6 +22,7 @@
 
 #include "jlzw.hpp"
 
+#define FASTCOMPRESSEDFILEBLOCKSIZE (0x10000)
 
 extern jlib_decl ICompressor *createFastLZCompressor();
 extern jlib_decl IExpander *createFastLZExpander();

+ 596 - 0
system/jlib/jlz4.cpp

@@ -0,0 +1,596 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2015 HPCC Systems.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+#include "platform.h"
+#include "jlz4.hpp"
+
+#define COMMITTED ((size32_t)-1)
+
+class jlib_decl CLZ4Compressor : public CInterface, public ICompressor
+{
+    size32_t blksz;
+    size32_t bufalloc;
+    MemoryAttr inma;        // equals blksize len
+    byte *inbuf;
+    size32_t inmax;         // remaining
+    size32_t inlen;
+    size32_t inlenblk;      // set to COMMITTED when so
+    bool trailing;
+    byte *outbuf;
+    size32_t outlen;
+    size32_t wrmax;
+
+    inline void setinmax()
+    {
+        inmax = blksz-outlen-sizeof(size32_t);
+        if (inmax<256)
+            trailing = true;    // too small to bother compressing
+        else {
+            trailing = false;
+            // inmax -= (fastlzSlack(inmax) + sizeof(size32_t));
+            size32_t slack = LZ4_COMPRESSBOUND(inmax) - inmax;
+            inmax -= (slack + sizeof(size32_t));
+        }
+    }
+
+    inline void flushcommitted()
+    {
+        // only does non trailing
+        if (trailing)
+            return;
+        size32_t toflush = (inlenblk==COMMITTED)?inlen:inlenblk;
+        if (toflush == 0)
+            return;
+
+        // printf("flushcommited() inlenblk=%d inlen=%d blksz=%d outlen=%d\n", inlenblk, inlen, blksz, outlen);
+
+        assertex(outlen+sizeof(size32_t)*2+LZ4_COMPRESSBOUND(toflush)<=blksz);
+
+        size32_t *cmpsize = (size32_t *)(outbuf+outlen);
+        byte *out = (byte *)(cmpsize+1);
+
+        *cmpsize = LZ4_compress((const char *)inbuf, (char *)out, toflush);
+
+        if (*cmpsize<toflush) {
+            *(size32_t *)outbuf += toflush;
+            outlen += *cmpsize+sizeof(size32_t);
+            if (inlenblk==COMMITTED)
+                inlen = 0;
+            else {
+                inlen -= inlenblk;
+                memmove(inbuf,inbuf+toflush,inlen);
+            }
+            setinmax();
+            return;
+        }
+        trailing = true;
+    }
+
+
+public:
+    IMPLEMENT_IINTERFACE;
+
+    CLZ4Compressor()
+    {
+        outlen = 0;
+        outbuf = NULL;      // only set on close
+        bufalloc = 0;
+        wrmax = 0;          // set at open
+    }
+
+    virtual ~CLZ4Compressor()
+    {
+        if (bufalloc)
+            free(outbuf);
+    }
+
+
+    virtual void open(void *buf,size32_t max)
+    {
+        if (buf) {
+            if (bufalloc) {
+                free(outbuf);
+            }
+            bufalloc = 0;
+            outbuf = (byte *)buf;
+        }
+        else if (max>bufalloc) {
+            if (bufalloc)
+                free(outbuf);
+            bufalloc = max;
+            outbuf = (byte *)malloc(bufalloc);
+        }
+        blksz = max;
+        if (blksz!=inma.length())
+            inbuf = (byte *)inma.allocate(blksz);
+        else
+            inbuf = (byte *)inma.bufferBase();
+        if (blksz<1024)
+            throw MakeStringException(-1,"CLZ4Compressor::open - block size (%d) not large enough", blksz);
+        *(size32_t *)outbuf = 0;
+        outlen = sizeof(size32_t);
+        inlen = 0;
+        inlenblk = COMMITTED;
+        setinmax();
+        wrmax = inmax;
+        // printf("open() inlenblk=%d inlen=%d trailing=%d blksz=%d outlen=%d\n", inlenblk, inlen, trailing, blksz, outlen);
+    }
+
+    virtual void close()
+    {
+        if (inlenblk!=COMMITTED) {
+            inlen = inlenblk; // transaction failed
+            inlenblk = COMMITTED;
+        }
+        flushcommitted();
+        // printf("close() inlenblk=%d inlen=%d trailing=%d blksz=%d outlen=%d\n", inlenblk, inlen, trailing, blksz, outlen);
+        size32_t totlen = outlen+sizeof(size32_t)+inlen;
+        assertex(blksz>=totlen);
+        size32_t *tsize = (size32_t *)(outbuf+outlen);
+        *tsize = inlen;
+        memcpy(tsize+1,inbuf,inlen);
+        outlen = totlen;
+        *(size32_t *)outbuf += inlen;
+        inbuf = NULL;
+    }
+
+
+    size32_t write(const void *buf,size32_t len)
+    {
+        // no more than wrmax per write
+        size32_t lenb = wrmax;
+        byte *b = (byte *)buf;
+        size32_t written = 0;
+        while (len)
+        {
+            if (len < lenb)
+                lenb = len;
+            if (lenb+inlen>inmax) {
+                if (trailing)
+                    return written;
+                size32_t lenb2 = inmax - inlen;
+                if (lenb2 >= 0x2000) {
+                    memcpy(inbuf+inlen,b,lenb2);
+                    b += lenb2;
+                    inlen += lenb2;
+                    len -= lenb2;
+                    written += lenb2;
+                    if (len < lenb)
+                        lenb = len;
+                }
+                flushcommitted();
+                if (lenb+inlen>inmax)
+                    lenb = inmax-inlen;
+            }
+            if (lenb == 0)
+                return written;
+            memcpy(inbuf+inlen,b,lenb);
+            b += lenb;
+            inlen += lenb;
+            len -= lenb;
+            written += lenb;
+        }
+        return written;
+    }
+
+    void *  bufptr()
+    {
+        assertex(!inbuf);  // i.e. closed
+        return outbuf;
+    }
+    size32_t    buflen()
+    {
+        assertex(!inbuf);  // i.e. closed
+        return outlen;
+    }
+    void    startblock()
+    {
+        inlenblk = inlen;
+    }
+    void commitblock()
+    {
+        inlenblk = COMMITTED;
+    }
+
+
+};
+
+class jlib_decl CLZ4Expander : public CInterface, public IExpander
+{
+
+    byte *outbuf;
+    size32_t outlen;
+    size32_t bufalloc;
+    const size32_t *in;
+
+public:
+    IMPLEMENT_IINTERFACE;
+
+    CLZ4Expander()
+    {
+        outbuf = NULL;
+        outlen = 0;
+        bufalloc = 0;
+    }
+    ~CLZ4Expander()
+    {
+        if (bufalloc)
+            free(outbuf);
+
+    }
+
+    virtual size32_t  init(const void *blk)
+    {
+        const size32_t *expsz = (const size32_t *)blk;
+        outlen = *expsz;
+        in = (expsz+1);
+        return outlen;
+    }
+
+    virtual void expand(void *buf)
+    {
+        if (!outlen)
+            return;
+        if (buf) {
+            if (bufalloc)
+                free(outbuf);
+            bufalloc = 0;
+            outbuf = (unsigned char *)buf;
+        }
+        else if (outlen>bufalloc) {
+            if (bufalloc)
+                free(outbuf);
+            bufalloc = outlen;
+            outbuf = (unsigned char *)malloc(bufalloc);
+            if (!outbuf)
+                throw MakeStringException(MSGAUD_operator,0, "Out of memory in LZ4Expander::expand, requesting %d bytes", bufalloc);
+        }
+        size32_t done = 0;
+        loop {
+            const size32_t szchunk = *in;
+            in++;
+            if (szchunk+done<outlen) {
+
+                size32_t written = LZ4_decompress_safe((const char *)in, (char *)((byte *)buf+done), szchunk, outlen-done);
+
+                done += written;
+                if (!written||(done>outlen))
+                    throw MakeStringException(0, "LZ4Expander - corrupt data(1) %d %d",written,szchunk);
+            }
+            else {
+                if (szchunk+done!=outlen)
+                    throw MakeStringException(0, "LZ4Expander - corrupt data(2) %d %d",szchunk,outlen);
+                memcpy((byte *)buf+done,in,szchunk);
+                break;
+            }
+            in = (const size32_t *)(((const byte *)in)+szchunk);
+        }
+    }
+
+    virtual void *bufptr() { return outbuf;}
+    virtual size32_t   buflen() { return outlen;}
+};
+
+void LZ4CompressToBuffer(MemoryBuffer & out, size32_t len, const void * src)
+{
+    size32_t outbase = out.length();
+
+    size32_t *sz = (size32_t *)out.reserve(LZ4_COMPRESSBOUND(len)+sizeof(size32_t)*2);
+
+    *sz = len;
+    sz++;
+
+    *sz = (len>16)?LZ4_compress((const char *)src, (char *)(sz+1), len):16;
+
+    if (*sz>=len) {
+        *sz = len;
+        memcpy(sz+1,src,len);
+    }
+    out.setLength(outbase+*sz+sizeof(size32_t)*2);
+}
+
+void LZ4DecompressToBuffer(MemoryBuffer & out, const void * src)
+{
+    size32_t *sz = (size32_t *)src;
+    size32_t expsz = *(sz++);
+    size32_t cmpsz = *(sz++);
+    void *o = out.reserve(expsz);
+    if (cmpsz!=expsz) {
+
+        size32_t written = LZ4_decompress_safe((const char *)sz, (char *)o, cmpsz, expsz);
+
+        if (written!=expsz)
+            throw MakeStringException(0, "LZ4DecompressToBuffer - corrupt data(1) %d %d",written,expsz);
+    }
+    else
+        memcpy(o,sz,expsz);
+}
+
+void LZ4DecompressToBuffer(MemoryBuffer & out, MemoryBuffer & in)
+{
+    size32_t expsz;
+    size32_t cmpsz;
+    in.read(expsz).read(cmpsz);
+    void *o = out.reserve(expsz);
+    if (cmpsz!=expsz) {
+
+        size32_t written = LZ4_decompress_safe((const char *)in.readDirect(cmpsz), (char *)o, cmpsz, expsz);
+
+        if (written!=expsz)
+            throw MakeStringException(0, "LZ4DecompressToBuffer - corrupt data(3) %d %d",written,expsz);
+    }
+    else
+        memcpy(o,in.readDirect(cmpsz),expsz);
+}
+
+void LZ4DecompressToAttr(MemoryAttr & out, const void * src)
+{
+    size32_t *sz = (size32_t *)src;
+    size32_t expsz = *(sz++);
+    size32_t cmpsz = *(sz++);
+    void *o = out.allocate(expsz);
+    if (cmpsz!=expsz) {
+
+        size32_t written = LZ4_decompress_safe((const char *)sz, (char *)o, cmpsz, expsz);
+
+        if (written!=expsz)
+            throw MakeStringException(0, "LZ4DecompressToBuffer - corrupt data(2) %d %d",written,expsz);
+    }
+    else
+        memcpy(o,sz,expsz);
+}
+
+void LZ4DecompressToBuffer(MemoryAttr & out, MemoryBuffer & in)
+{
+    size32_t expsz;
+    size32_t cmpsz;
+    in.read(expsz).read(cmpsz);
+    void *o = out.allocate(expsz);
+    if (cmpsz!=expsz) {
+
+        size32_t written = LZ4_decompress_safe((const char *)in.readDirect(cmpsz), (char *)o, cmpsz, expsz);
+
+        if (written!=expsz)
+            throw MakeStringException(0, "LZ4DecompressToBuffer - corrupt data(4) %d %d",written,expsz);
+    }
+    else
+        memcpy(o,in.readDirect(cmpsz),expsz);
+}
+
+
+ICompressor *createLZ4Compressor()
+{
+    return new CLZ4Compressor;
+}
+
+IExpander *createLZ4Expander()
+{
+    return new CLZ4Expander;
+}
+
+#define LZ4_BUFFER_SIZE (0x100000)
+
+#define LZ4STRMCOMPRESSEDFILEFLAG (I64C(0xc3526de42f15da57)) // mck - what is an ok value ?
+
+
+struct LZ4CompressedFileTrailer
+{
+    offset_t        zfill1;             // must be first
+    offset_t        expandedSize;
+    __int64         compressedType;
+    unsigned        zfill2;             // must be last
+};
+
+
+class CLZ4Stream : public CInterface, implements IFileIOStream
+{
+    Linked<IFileIO> baseio;
+    offset_t expOffset;     // expanded offset
+    offset_t cmpOffset;     // compressed offset in file
+    bool reading;
+    MemoryAttr ma;
+    size32_t bufsize;
+    size32_t bufpos;        // reading only
+    offset_t expSize;
+
+    bool load()
+    {
+        bufpos = 0;
+        bufsize = 0;
+        if (expOffset==expSize)
+            return false;
+        size32_t sz[2];
+        if (baseio->read(cmpOffset,sizeof(size32_t)*2,&sz)!=sizeof(size32_t)*2)
+            return false;
+        bufsize = sz[0];
+        if (!bufsize)
+            return false;
+        cmpOffset += sizeof(size32_t)*2;
+        if (ma.length()<bufsize)
+            ma.allocate(bufsize);
+        MemoryAttr cmpma;
+        byte *cmpbuf = (byte *)cmpma.allocate(sz[1]);
+        if (baseio->read(cmpOffset,sz[1],cmpbuf)!=sz[1])
+            throw MakeStringException(-1,"CLZ4Stream: file corrupt.1");
+
+        size32_t amnt = LZ4_decompress_safe((const char *)cmpbuf, (char *)ma.bufferBase(), sz[1], bufsize);
+
+        if (amnt!=bufsize)
+            throw MakeStringException(-1,"CLZ4Stream: file corrupt.2");
+
+        cmpOffset += sz[1];
+        return true;
+    }
+
+    void save()
+    {
+        if (bufsize) {
+            MemoryAttr dstma;
+
+            byte *dst = (byte *)dstma.allocate(sizeof(size32_t)*2+LZ4_COMPRESSBOUND(bufsize));
+
+            size32_t sz = LZ4_compress((const char *)ma.get(), (char *)(sizeof(size32_t)*2+dst), bufsize);
+
+            memcpy(dst,&bufsize,sizeof(size32_t));
+            memcpy(dst+sizeof(size32_t),&sz,sizeof(size32_t));
+            baseio->write(cmpOffset,sz+sizeof(size32_t)*2,dst);
+            cmpOffset += sz+sizeof(size32_t)*2;
+        }
+        bufsize = 0;
+    }
+
+
+public:
+    IMPLEMENT_IINTERFACE;
+
+    CLZ4Stream()
+    {
+        expOffset = 0;
+        cmpOffset = 0;
+        reading = true;
+        bufpos = 0;
+        bufsize = 0;
+    }
+
+    ~CLZ4Stream()
+    {
+        flush();
+    }
+
+    bool attach(IFileIO *_baseio)
+    {
+        baseio.set(_baseio);
+        expOffset = 0;
+        cmpOffset = 0;
+        reading = true;
+        bufpos = 0;
+        bufsize = 0;
+
+        LZ4CompressedFileTrailer trailer;
+        offset_t filesize = baseio->size();
+        if (filesize<sizeof(trailer))
+            return false;
+        baseio->read(filesize-sizeof(trailer),sizeof(trailer),&trailer);
+        expSize = trailer.expandedSize;
+        return trailer.compressedType==LZ4STRMCOMPRESSEDFILEFLAG;
+    }
+
+    void create(IFileIO *_baseio)
+    {
+        baseio.set(_baseio);
+        expOffset = 0;
+        cmpOffset = 0;
+        reading = false;
+        bufpos = 0;
+        bufsize = 0;
+        ma.allocate(LZ4_BUFFER_SIZE);
+        expSize = (offset_t)-1;
+    }
+
+    void seek(offset_t pos, IFSmode origin)
+    {
+        if ((origin==IFScurrent)&&(pos==0))
+            return;
+        if ((origin==IFSbegin)||(pos!=0))
+            throw MakeStringException(-1,"CLZ4Stream seek not supported");
+        expOffset = 0;
+        bufpos = 0;
+        bufsize = 0;
+    }
+
+    offset_t size()
+    {
+        return (expSize==(offset_t)-1)?0:expSize;
+    }
+
+    offset_t tell()
+    {
+        return expOffset;
+    }
+
+
+    size32_t read(size32_t len, void * data)
+    {
+        if (!reading)
+            throw MakeStringException(-1,"CLZ4Stream read to stream being written");
+        size32_t ret=0;
+        while (len) {
+            size32_t cpy = bufsize-bufpos;
+            if (!cpy) {
+                if (!load())
+                    break;
+                cpy = bufsize-bufpos;
+            }
+            if (cpy>len)
+                cpy = len;
+            memcpy(data,(const byte *)ma.get()+bufpos,cpy);
+            bufpos += cpy;
+            len -= cpy;
+            ret += cpy;
+        }
+        expOffset += ret;
+        return ret;
+    }
+
+    size32_t write(size32_t len, const void * data)
+    {
+        if (reading)
+            throw MakeStringException(-1,"CLZ4Stream write to stream being read");
+        size32_t ret = len;
+        while (len+bufsize>LZ4_BUFFER_SIZE) {
+            size32_t cpy = LZ4_BUFFER_SIZE-bufsize;
+            memcpy((byte *)ma.bufferBase()+bufsize,data,cpy);
+            data = (const byte *)data+cpy;
+            len -= cpy;
+            bufsize = LZ4_BUFFER_SIZE;
+            save();
+        }
+        memcpy((byte *)ma.bufferBase()+bufsize,data,len);
+        bufsize += len;
+        expOffset += len;
+        return ret;
+    }
+
+    void flush()
+    {
+        if (!reading&&(expSize!=expOffset)) {
+            save();
+            LZ4CompressedFileTrailer trailer;
+            memset(&trailer,0,sizeof(trailer));
+            trailer.compressedType = LZ4STRMCOMPRESSEDFILEFLAG;
+            trailer.expandedSize = expOffset;
+            baseio->write(cmpOffset,sizeof(trailer),&trailer);
+            expSize = expOffset;
+        }
+    }
+
+};
+
+IFileIOStream *createLZ4StreamRead(IFileIO *base)
+{
+    Owned<CLZ4Stream> strm = new CLZ4Stream();
+    if (strm->attach(base))
+        return strm.getClear();
+    return NULL;
+}
+
+IFileIOStream *createLZ4StreamWrite(IFileIO *base)
+{
+    Owned<CLZ4Stream> strm = new CLZ4Stream();
+    strm->create(base);
+    return strm.getClear();
+}

+ 38 - 0
system/jlib/jlz4.hpp

@@ -0,0 +1,38 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2015 HPCC Systems.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+#ifndef JLZ4_INCL
+#define JLZ4_INCL
+
+#include "jlzw.hpp"
+#include "lz4.h"
+
+#define LZ4COMPRESSEDFILEBLOCKSIZE (0x10000)
+
+extern jlib_decl ICompressor *createLZ4Compressor();
+extern jlib_decl IExpander   *createLZ4Expander();
+
+extern jlib_decl void LZ4CompressToBuffer(MemoryBuffer & out, size32_t len, const void * src);
+extern jlib_decl void LZ4DecompressToBuffer(MemoryBuffer & out, const void * src);
+extern jlib_decl void LZ4DecompressToBuffer(MemoryBuffer & out, MemoryBuffer & in);
+extern jlib_decl void LZ4DecompressToAttr(MemoryAttr & out, const void * src);
+extern jlib_decl void LZ4DecompressToBuffer(MemoryAttr & out, MemoryBuffer & in);
+
+extern jlib_decl IFileIOStream *createLZ4StreamRead(IFileIO *base);
+extern jlib_decl IFileIOStream *createLZ4StreamWrite(IFileIO *base);
+
+#endif

+ 87 - 24
system/jlib/jlzw.cpp

@@ -24,6 +24,7 @@
 #include "jfile.hpp"
 #include "jencrypt.hpp"
 #include "jflz.hpp"
+#include "jlz4.hpp"
 
 #ifdef _WIN32
 #include <io.h>
@@ -1830,6 +1831,7 @@ typedef enum { ICFcreate, ICFread, ICFappend } ICFmode;
 static const __int64 COMPRESSEDFILEFLAG = I64C(0xc0528ce99f10da55);
 #define COMPRESSEDFILEBLOCKSIZE (0x10000)
 static const __int64 FASTCOMPRESSEDFILEFLAG = I64C(0xc1518de99f10da55);
+static const __int64 LZ4COMPRESSEDFILEFLAG = I64C(0xc1200e0b71321c73);
 
 #pragma pack(push,1)
 
@@ -1839,8 +1841,8 @@ struct CompressedFileTrailer
     offset_t        expandedSize;
     offset_t        indexPos;       // end of blocks
     size32_t        blockSize;
-    size32_t        recordSize;     // 0 is lzw compressed
-    __int64        compressedType;
+    size32_t        recordSize;     // 0 is lzw or fast (flz) or lz4 compressed
+    __int64         compressedType;
     unsigned        crc;                // must be last
     unsigned numBlocks() { return (unsigned)((indexPos+blockSize-1)/blockSize); }
     unsigned method()
@@ -1851,6 +1853,8 @@ struct CompressedFileTrailer
             return COMPRESS_METHOD_LZW;
         if (compressedType==FASTCOMPRESSEDFILEFLAG)
             return COMPRESS_METHOD_FASTLZ;
+        if (compressedType==LZ4COMPRESSEDFILEFLAG)
+            return COMPRESS_METHOD_LZ4;
         return 0;
     }
 
@@ -1924,6 +1928,7 @@ class CCompressedFile : public CInterface, implements ICompressedFileIO
     bool writeException;
     Owned<ICompressor> compressor;
     Owned<IExpander> expander;
+    __int64 compType;
 
     unsigned indexNum() { return indexbuf.length()/sizeof(offset_t); }
 
@@ -2041,7 +2046,7 @@ class CCompressedFile : public CInterface, implements ICompressedFileIO
                 }
             }
         }
-        else { // lzw 
+        else { // lzw or fastlz (flz) or lz4
             assertex(expander.get());
             size32_t exp = expander->init(compbuf);
             if (exp!=expsize) {
@@ -2101,14 +2106,14 @@ class CCompressedFile : public CInterface, implements ICompressedFileIO
                 src += len;
             }
         }
-        else 
+        else // lzw or fastlz (flz) or lz4
             src += compressor->write(src, len);
         return (size32_t)(src-(const byte *)expbuf);
     }
 public:
     IMPLEMENT_IINTERFACE;
 
-    CCompressedFile(IFileIO *_fileio,IMemoryMappedFile *_mmfile,CompressedFileTrailer &_trailer,ICFmode _mode, bool _setcrc,ICompressor *_compressor,IExpander *_expander, bool fast)
+    CCompressedFile(IFileIO *_fileio,IMemoryMappedFile *_mmfile,CompressedFileTrailer &_trailer,ICFmode _mode, bool _setcrc,ICompressor *_compressor,IExpander *_expander, __int64 _compType)
         : fileio(_fileio), mmfile(_mmfile)
     {
         compressor.set(_compressor);
@@ -2119,6 +2124,7 @@ public:
         mode = _mode;
         curblockpos = 0;
         curblocknum = (unsigned)-1; // relies on wrap
+        compType = _compType;
         if (mode!=ICFread) {
             if (!_fileio&&_mmfile)
                 throw MakeStringException(-1,"Compressed Write not supported on memory mapped files");
@@ -2134,9 +2140,11 @@ public:
             if (trailer.recordSize==0) {
                 if (!compressor)
                 {
-                    if (fast)
+                    if (compType == COMPRESS_METHOD_FASTLZ)
                         compressor.setown(createFastLZCompressor());
-                    else
+                    else if (compType == COMPRESS_METHOD_LZ4)
+                        compressor.setown(createLZ4Compressor());
+                    else // COMPRESS_METHOD_LZW
                         compressor.setown(createLZWCompressor(true));
                 }
                 compressor->open(compblkptr, trailer.blockSize);
@@ -2162,9 +2170,11 @@ public:
             }
             if (trailer.recordSize==0) {
                 if (!expander) {
-                    if (fast)
+                    if (compType == COMPRESS_METHOD_FASTLZ)
                         expander.setown(createFastLZExpander());
-                    else
+                    else if (compType == COMPRESS_METHOD_LZ4)
+                        expander.setown(createLZ4Expander());
+                    else // COMPRESS_METHOD_LZW
                         expander.setown(createLZWExpander(true));
                 }
             }
@@ -2316,11 +2326,19 @@ ICompressedFileIO *createCompressedFileReader(IFileIO *fileio,IExpander *expande
             CompressedFileTrailer trailer;
             if (fileio->read(fsize-sizeof(WinCompressedFileTrailer),sizeof(WinCompressedFileTrailer),&wintrailer)==sizeof(WinCompressedFileTrailer)) {
                 wintrailer.translate(trailer);
-                if ((trailer.compressedType==COMPRESSEDFILEFLAG)||(trailer.compressedType==FASTCOMPRESSEDFILEFLAG)) {
+                if ( (trailer.compressedType==COMPRESSEDFILEFLAG) ||
+                     (trailer.compressedType==FASTCOMPRESSEDFILEFLAG) ||
+                     (trailer.compressedType==LZ4COMPRESSEDFILEFLAG) )
+                {
                     if (expander&&(trailer.recordSize!=0)) {
                         throw MakeStringException(-1, "Compressed file format error(%d), Encrypted?",trailer.recordSize);
                     }
-                    CCompressedFile *cfile = new CCompressedFile(fileio,NULL,trailer,ICFread,false,NULL,expander,(trailer.compressedType==FASTCOMPRESSEDFILEFLAG));
+                    __int64 compType1 = COMPRESS_METHOD_LZW;
+                    if (trailer.compressedType == FASTCOMPRESSEDFILEFLAG)
+                        compType1 = COMPRESS_METHOD_FASTLZ;
+                    else if (trailer.compressedType == LZ4COMPRESSEDFILEFLAG)
+                        compType1 = COMPRESS_METHOD_LZ4;
+                    CCompressedFile *cfile = new CCompressedFile(fileio,NULL,trailer,ICFread,false,NULL,expander,compType1);
                     return cfile;
                 }
             }
@@ -2342,11 +2360,19 @@ ICompressedFileIO *createCompressedFileReader(IFile *file,IExpander *expander, b
                     CompressedFileTrailer trailer;
                     memcpy(&wintrailer,mmfile->base()+fsize-sizeof(WinCompressedFileTrailer),sizeof(WinCompressedFileTrailer));
                     wintrailer.translate(trailer);
-                    if ((trailer.compressedType==COMPRESSEDFILEFLAG)||(trailer.compressedType==FASTCOMPRESSEDFILEFLAG)) {
+                    if ( (trailer.compressedType==COMPRESSEDFILEFLAG) ||
+                         (trailer.compressedType==FASTCOMPRESSEDFILEFLAG) ||
+                         (trailer.compressedType==LZ4COMPRESSEDFILEFLAG) )
+                    {
                         if (expander&&(trailer.recordSize!=0)) {
                             throw MakeStringException(-1, "Compressed file format error(%d), Encrypted?",trailer.recordSize);
                         }
-                        CCompressedFile *cfile = new CCompressedFile(NULL,mmfile,trailer,ICFread,false,NULL,expander,(trailer.compressedType==FASTCOMPRESSEDFILEFLAG));
+                        __int64 compType1 = COMPRESS_METHOD_LZW;
+                        if (trailer.compressedType == FASTCOMPRESSEDFILEFLAG)
+                            compType1 = COMPRESS_METHOD_FASTLZ;
+                        else if (trailer.compressedType == LZ4COMPRESSEDFILEFLAG)
+                            compType1 = COMPRESS_METHOD_LZ4;
+                        CCompressedFile *cfile = new CCompressedFile(NULL,mmfile,trailer,ICFread,false,NULL,expander,compType1);
                         return cfile;
                     }
                 }
@@ -2362,7 +2388,7 @@ ICompressedFileIO *createCompressedFileReader(IFile *file,IExpander *expander, b
 
 
 
-ICompressedFileIO *createCompressedFileWriter(IFileIO *fileio,size32_t recordsize,bool _setcrc,ICompressor *compressor,bool fast)
+ICompressedFileIO *createCompressedFileWriter(IFileIO *fileio,size32_t recordsize,bool _setcrc,ICompressor *compressor, __int64 _compType)
 {
     CompressedFileTrailer trailer;
     offset_t fsize = fileio->size();
@@ -2373,7 +2399,20 @@ ICompressedFileIO *createCompressedFileWriter(IFileIO *fileio,size32_t recordsiz
                 CompressedFileTrailer trailer;
                 if (fileio->read(fsize-sizeof(WinCompressedFileTrailer),sizeof(WinCompressedFileTrailer),&wintrailer)==sizeof(WinCompressedFileTrailer)) {
                     wintrailer.translate(trailer);
-                    if ((trailer.compressedType==COMPRESSEDFILEFLAG)||(trailer.compressedType==FASTCOMPRESSEDFILEFLAG)) {
+                    if ( (trailer.compressedType==COMPRESSEDFILEFLAG) ||
+                         (trailer.compressedType==FASTCOMPRESSEDFILEFLAG) ||
+                         (trailer.compressedType==LZ4COMPRESSEDFILEFLAG) )
+                    {
+                        // mck - check trailer.compressedType against _compType ?
+                        __int64 compType1 = 0;
+                        if (trailer.compressedType == COMPRESSEDFILEFLAG)
+                            compType1 = COMPRESS_METHOD_LZW;
+                        else if (trailer.compressedType == FASTCOMPRESSEDFILEFLAG)
+                            compType1 = COMPRESS_METHOD_FASTLZ;
+                        else if (trailer.compressedType == LZ4COMPRESSEDFILEFLAG)
+                            compType1 = COMPRESS_METHOD_LZ4;
+                        if (_compType != compType1)
+                            throw MakeStringException(-1,"Appending to file with different compression method");
                         if ((recordsize==trailer.recordSize)||!trailer.recordSize)
                             break;
                         throw MakeStringException(-1,"Appending to file with different record size (%d,%d)",recordsize,trailer.recordSize);
@@ -2386,24 +2425,37 @@ ICompressedFileIO *createCompressedFileWriter(IFileIO *fileio,size32_t recordsiz
     else {
         memset(&trailer,0,sizeof(trailer));
         trailer.crc = ~0U;
-        trailer.compressedType = fast?FASTCOMPRESSEDFILEFLAG:COMPRESSEDFILEFLAG;
-        trailer.blockSize = COMPRESSEDFILEBLOCKSIZE;
+        if (_compType == COMPRESS_METHOD_FASTLZ)
+        {
+            trailer.compressedType = FASTCOMPRESSEDFILEFLAG;
+            trailer.blockSize = FASTCOMPRESSEDFILEBLOCKSIZE;
+        }
+        else if (_compType == COMPRESS_METHOD_LZ4)
+        {
+            trailer.compressedType = LZ4COMPRESSEDFILEFLAG;
+            trailer.blockSize = LZ4COMPRESSEDFILEBLOCKSIZE;
+        }
+        else // lzw
+        {
+            trailer.compressedType = COMPRESSEDFILEFLAG;
+            trailer.blockSize = COMPRESSEDFILEBLOCKSIZE;
+        }
         trailer.recordSize = recordsize;
     }
     if (compressor)
         trailer.recordSize = 0; // force not row compressed if compressor specified
-    CCompressedFile *cfile = new CCompressedFile(fileio,NULL,trailer,fsize?ICFappend:ICFcreate,_setcrc,compressor,NULL,fast);
+    CCompressedFile *cfile = new CCompressedFile(fileio,NULL,trailer,fsize?ICFappend:ICFcreate,_setcrc,compressor,NULL,_compType);
     return cfile;
 }
 
-ICompressedFileIO *createCompressedFileWriter(IFile *file,size32_t recordsize,bool append,bool _setcrc,ICompressor *compressor,bool fast, IFEflags extraFlags)
+ICompressedFileIO *createCompressedFileWriter(IFile *file,size32_t recordsize,bool append,bool _setcrc,ICompressor *compressor, __int64 _compType, IFEflags extraFlags)
 {
     if (file) {
         if (append&&!file->exists())
             append = false;
         Owned<IFileIO> fileio = file->open(append?IFOreadwrite:IFOcreate, extraFlags);
         if (fileio) 
-            return createCompressedFileWriter(fileio,recordsize,_setcrc,compressor,fast);
+            return createCompressedFileWriter(fileio,recordsize,_setcrc,compressor,_compType);
     }
     return NULL;
 }
@@ -2603,7 +2655,9 @@ IPropertyTree *getBlockedFileDetails(IFile *file)
             CompressedFileTrailer trailer;
             if (fileio->read(fsize-sizeof(WinCompressedFileTrailer),sizeof(WinCompressedFileTrailer),&wintrailer)==sizeof(WinCompressedFileTrailer)) {
                 wintrailer.translate(trailer);
-                if ((trailer.compressedType==COMPRESSEDFILEFLAG)||(trailer.compressedType==FASTCOMPRESSEDFILEFLAG)) {
+                if ((trailer.compressedType==COMPRESSEDFILEFLAG) ||
+                    (trailer.compressedType==FASTCOMPRESSEDFILEFLAG) ||
+                    (trailer.compressedType==LZ4COMPRESSEDFILEFLAG)) {
                     trailer.setDetails(*tree);
                     unsigned nb = trailer.numBlocks();
                     MemoryAttr indexbuf;
@@ -2681,6 +2735,13 @@ MODULE_INIT(INIT_PRIORITY_STANDARD)
         virtual ICompressor *getCompressor(const char *options) { return createFastLZCompressor(); }
         virtual IExpander *getExpander(const char *options) { return createFastLZExpander(); }
     };
+    class CLZ4CompressHandler : public CCompressHandlerBase
+    {
+    public:
+        CLZ4CompressHandler() : CCompressHandlerBase("LZ4") { }
+        virtual ICompressor *getCompressor(const char *options) { return createLZ4Compressor(); }
+        virtual IExpander *getExpander(const char *options) { return createLZ4Expander(); }
+    };
     class CAESCompressHandler : public CCompressHandlerBase
     {
     public:
@@ -2710,12 +2771,14 @@ MODULE_INIT(INIT_PRIORITY_STANDARD)
         virtual ICompressor *getCompressor(const char *options) { return createLZWCompressor(true); }
         virtual IExpander *getExpander(const char *options) { return createLZWExpander(true); }
     };
-    ICompressHandler *flzCompressor = new CFLZCompressHandler();
-    addCompressorHandler(flzCompressor);
+    addCompressorHandler(new CFLZCompressHandler());
     addCompressorHandler(new CAESCompressHandler());
     addCompressorHandler(new CDiffCompressHandler());
     addCompressorHandler(new CLZWCompressHandler());
-    defaultCompressor.set(flzCompressor);
+    addCompressorHandler(new CDENCompressHandler());
+    ICompressHandler *lz4Compressor = new CLZ4CompressHandler();
+    addCompressorHandler(lz4Compressor);
+    defaultCompressor.set(lz4Compressor);
     return true;
 }
 

+ 3 - 2
system/jlib/jlzw.hpp

@@ -98,6 +98,7 @@ extern jlib_decl void appendToBuffer(MemoryBuffer & out, size32_t len, const voi
 #define COMPRESS_METHOD_LZW    2
 #define COMPRESS_METHOD_FASTLZ 3
 #define COMPRESS_METHOD_LZMA   4
+#define COMPRESS_METHOD_LZ4    5
 
 interface ICompressedFileIO: extends IFileIO
 {
@@ -111,8 +112,8 @@ interface ICompressedFileIO: extends IFileIO
 
 extern jlib_decl ICompressedFileIO *createCompressedFileReader(IFile *file,IExpander *expander=NULL, bool memorymapped=false, IFEflags extraFlags=IFEnone);
 extern jlib_decl ICompressedFileIO *createCompressedFileReader(IFileIO *fileio,IExpander *expander=NULL);
-extern jlib_decl ICompressedFileIO *createCompressedFileWriter(IFile *file,size32_t recordsize,bool append=false,bool setcrc=true,ICompressor *compressor=NULL,bool fast=false, IFEflags extraFlags=IFEnone);
-extern jlib_decl ICompressedFileIO *createCompressedFileWriter(IFileIO *fileio,size32_t recordsize,bool setcrc=true,ICompressor *compressor=NULL,bool fast=false);
+extern jlib_decl ICompressedFileIO *createCompressedFileWriter(IFile *file,size32_t recordsize,bool append=false,bool setcrc=true,ICompressor *compressor=NULL, __int64 compType=COMPRESS_METHOD_LZW, IFEflags extraFlags=IFEnone);
+extern jlib_decl ICompressedFileIO *createCompressedFileWriter(IFileIO *fileio,size32_t recordsize,bool setcrc=true,ICompressor *compressor=NULL, __int64 compType=COMPRESS_METHOD_LZW);
 
 #define COMPRESSEDFILECRC (~0U)
 

+ 1 - 1
thorlcr/thorutil/thormisc.hpp

@@ -55,7 +55,7 @@
 #define THOROPT_HDIST_PULLBUFFER_SIZE "hdPullBufferSize"        // Distribute pull buffer size (receiver side limit, before spilling)
 #define THOROPT_HDIST_CANDIDATELIMIT  "hdCandidateLimit"        // Limits # of buckets to push to the writers when send buffer is full           (default = is 50% largest)
 #define THOROPT_HDIST_TARGETWRITELIMIT "hdTargetLimit"          // Limit # of writer threads working on a single target                          (default = unbound, but picks round-robin)
-#define THOROPT_HDIST_COMP            "hdCompressorType"        // Distribute compressor to use                                                  (default = "FLZ")
+#define THOROPT_HDIST_COMP            "hdCompressorType"        // Distribute compressor to use                                                  (default = "LZ4")
 #define THOROPT_HDIST_COMPOPTIONS     "hdCompressorOptions"     // Distribute compressor options, e.g. AES key                                   (default = "")
 #define THOROPT_SPLITTER_SPILL        "splitterSpill"           // Force splitters to spill or not, default is to adhere to helper setting       (default = -1)
 #define THOROPT_LOOP_MAX_EMPTY        "loopMaxEmpty"            // Max # of iterations that LOOP can cycle through with 0 results before errors  (default = 1000)

+ 121 - 41
tools/copyexp/copyexp.cpp

@@ -24,6 +24,9 @@
 #include "jlzw.hpp"
 #include "jio.hpp"
 #include "jflz.hpp"
+#include "jlz4.hpp"
+
+IFEflags extraFlags = IFEnone;
 
 void doexit(int err)
 {
@@ -38,29 +41,43 @@ void usage(bool isHelp)
     printf("   copyexp <file> <destination>       -- copies file to destination\n");
     printf("                                         (expanding as needed)\n");
     printf("   copyexp -z <file> <dest>           -- compresses file (LZW)\n");
-    printf("   copyexp -f <file> <dest>           -- compresses file (FastLZ)\n");
     printf("   copyexp -r <recsz> <file> <dest>   -- compresses file (RowDif)\n");
+    printf("   copyexp -f <file> <dest>           -- compresses file (FastLZ)\n");
     printf("   copyexp -fs <file> <dest>          -- compresses file (FastLZ stream)\n");
+    printf("   copyexp -l <file> <dest>           -- compresses file (LZ4)\n");
+    printf("   copyexp -ls <file> <dest>          -- compresses file (LZ4 stream)\n");
     printf("           -s                         -- timing stats\n");
+    printf("           -d                         -- do not cache files in OS\n");
     doexit(isHelp ? 0 : 2);
 }
 
-#define BUFFERSIZE (0x100000)
+#define BUFFERSIZE (0x10000)
 
-void printCompDetails(const char *fname,IFileIO *baseio,ICompressedFileIO *cmpio,IFileIOStream *flzstrm)
+void printCompDetails(const char *fname,IFileIO *baseio,ICompressedFileIO *cmpio,IFileIOStream *strm, bool flzstrm, bool lz4strm)
 {
     const char *method = "Unknown Method";
-    if (flzstrm)
-        method = "FLZSTREAM";
-    else {
-        switch (cmpio->method()) {
-        case COMPRESS_METHOD_ROWDIF:  method = "ROWDIF"; break;
-        case COMPRESS_METHOD_LZW:     method = "LZW"; break;
-        case COMPRESS_METHOD_FASTLZ:  method = "FASTLZ"; break;
+    offset_t expsize = 0;
+    if (!cmpio&&strm)
+    {
+        if (flzstrm)
+            method = "FLZSTREAM";
+        else if (lz4strm)
+            method = "LZ4STREAM";
+        expsize = strm->size();
+    }
+    else
+    {
+        switch (cmpio->method())
+        {
+            case COMPRESS_METHOD_ROWDIF:  method = "ROWDIF"; break;
+            case COMPRESS_METHOD_LZW:     method = "LZW"; break;
+            case COMPRESS_METHOD_FASTLZ:  method = "FASTLZ"; break;
+            case COMPRESS_METHOD_LZ4:     method = "LZ4"; break;
         }
+        expsize = cmpio->size();
     }
-    printf("%s: is %s compressed, size= %" I64F "d, expanded= %" I64F "d",fname,method,baseio->size(),flzstrm?flzstrm->size():cmpio->size());
-    if (!flzstrm&&cmpio->recordSize())
+    printf("%s: is %s compressed, size= %" I64F "d, expanded= %" I64F "d",fname,method,baseio->size(),expsize);
+    if (!strm&&cmpio&&cmpio->recordSize())
         printf(", record size = %d",cmpio->recordSize());
     printf("\n");
 }
@@ -73,7 +90,6 @@ static const char *formatTime(unsigned t,StringBuffer &str)
     else
         str.appendf("%dms",t);
     return str.str();
-
 }
 
 static const char *formatTimeU(unsigned t,StringBuffer &str)
@@ -86,7 +102,6 @@ static const char *formatTimeU(unsigned t,StringBuffer &str)
     else
         str.appendf("%dus",t);
     return str.str();
-
 }
 
 static void printStats(offset_t filesize,unsigned start,unsigned startu)
@@ -97,7 +112,7 @@ static void printStats(offset_t filesize,unsigned start,unsigned startu)
     if (!elapsedu)
         elapsedu = 1;
     if (elapsed<1000)
-        printf("%" I64F "d bytes copied, at %.2f MB/s in %s\n",filesize,((((double)filesize)/(1024*1024))/elapsedu)*1000000,formatTimeU(elapsedu,tmp));
+        printf("%" I64F "d Bytes copied, at %.2f MB/s in %s\n",filesize,((((double)filesize)/(1024*1024))/elapsedu)*1000000,formatTimeU(elapsedu,tmp));
     else
         printf("%" I64F "d bytes copied, at %.2f MB/s in %s\n",filesize,((((double)filesize)/(1024*1024))/elapsed)*1000,formatTime(elapsed,tmp));
 }
@@ -105,16 +120,31 @@ static void printStats(offset_t filesize,unsigned start,unsigned startu)
 int copyExpanded(const char *from, const char *to, bool stats)
 {
     Owned<IFile> srcfile = createIFile(from);
-    Owned<IFileIO> srcio = srcfile->open(IFOread);
+    Owned<IFileIO> srcio = srcfile->open(IFOread, extraFlags);
     if (!srcio) {
         printf("ERROR: could not open '%s' for read\n",from);
         doexit(3);
     }
     Owned<ICompressedFileIO> cmpio = createCompressedFileReader(srcio);
-    Owned<IFileIOStream>  flzstrm = cmpio?NULL:createFastLZStreamRead(srcio);
+    Owned<IFileIOStream> strmsrc;
+    bool flzstrm = false;
+    bool lz4strm = false;
+    if (!cmpio)
+    {
+        strmsrc.setown(createFastLZStreamRead(srcio));
+        if (strmsrc)
+            flzstrm = true;
+        else
+        {
+            strmsrc.setown(createLZ4StreamRead(srcio));
+            if (strmsrc)
+                lz4strm = true;
+        }
+    }
+
     int ret = 0;
-    if (cmpio||flzstrm) 
-        printCompDetails(from,srcio,cmpio,flzstrm);
+    if (cmpio||strmsrc)
+        printCompDetails(from,srcio,cmpio,strmsrc,flzstrm,lz4strm);
     else {
         ret = 1;
         printf("%s is not compressed, size= %" I64F "d\n",from,srcio->size());
@@ -140,7 +170,7 @@ int copyExpanded(const char *from, const char *to, bool stats)
          start = msTick();
          startu = usTick();
     }
-    Owned<IFileIO> dstio = dstfile->open(IFOcreate);
+    Owned<IFileIO> dstio = dstfile->open(IFOcreate, extraFlags);
     if (!dstio) {
         printf("ERROR: could not open '%s' for write\n",to);
         doexit(5);
@@ -161,7 +191,7 @@ int copyExpanded(const char *from, const char *to, bool stats)
     {
         loop {
             size32_t got = cmpio.get()?cmpio->read(offset,BUFFERSIZE, buffer):
-                (flzstrm?flzstrm->read(BUFFERSIZE, buffer):
+                (strmsrc?strmsrc->read(BUFFERSIZE, buffer):
                     srcio->read(offset, BUFFERSIZE, buffer));
             if (got == 0)
                 break;
@@ -194,16 +224,24 @@ int copyExpanded(const char *from, const char *to, bool stats)
 }
 
 
-void copyCompress(const char *from, const char *to, size32_t rowsize, bool fast, bool flzstrm, bool stats)
+void copyCompress(const char *from, const char *to, size32_t rowsize, bool fast, bool flzstrm, bool lz4, bool lz4strm, bool stats)
 {
     Owned<IFile> srcfile = createIFile(from);
-    Owned<IFileIO> baseio = srcfile->open(IFOread);
+    Owned<IFileIO> baseio = srcfile->open(IFOread, extraFlags);
     if (!baseio) {
         printf("ERROR: could not open '%s' for read\n",from);
         doexit(3);
     }
+
     Owned<ICompressedFileIO> cmpio = createCompressedFileReader(baseio);
-    Owned<IFileIOStream>  flzstrmsrc = cmpio?NULL:createFastLZStreamRead(baseio);
+    Owned<IFileIOStream> strmsrc;
+    if (!cmpio)
+    {
+        strmsrc.setown(createFastLZStreamRead(baseio));
+        if (!strmsrc)
+            strmsrc.setown(createLZ4StreamRead(baseio));
+    }
+
     bool plaincopy = false;
     IFileIO *srcio = NULL;
     if (cmpio) {
@@ -215,18 +253,23 @@ void copyCompress(const char *from, const char *to, size32_t rowsize, bool fast,
                 plaincopy = true;
             else if (!fast&&(cmpio->method()==COMPRESS_METHOD_LZW))
                 plaincopy = true;
+            else if (!fast&&(cmpio->method()==COMPRESS_METHOD_LZ4))
+                plaincopy = true;
         }
     }
-    else if (flzstrmsrc) {
-        if (flzstrm)
+    else if (strmsrc) {
+        if (flzstrm||lz4strm)
             plaincopy = true;
     }
     else
         srcio = baseio; 
+
     if (plaincopy) {
-        cmpio.clear();
+        if(cmpio)
+            cmpio.clear();
         srcio = baseio.get(); 
     }
+
     Owned<IFile> dstfile = createIFile(to);
     StringBuffer fulldst;
     if (dstfile->isDirectory()==foundYes) {
@@ -247,14 +290,28 @@ void copyCompress(const char *from, const char *to, size32_t rowsize, bool fast,
          startu = usTick();
     }
     Owned<IFileIO> dstio;
-    Owned<IFileIOStream>  flzstrmdst;
-    if (plaincopy||flzstrm) {
-        dstio.setown(dstfile->open(IFOcreate));
+
+    Owned<IFileIOStream> strmdst;
+
+    if (plaincopy||flzstrm||lz4strm) {
+        dstio.setown(dstfile->open(IFOcreate, extraFlags));
         if (dstio&&!plaincopy)
-            flzstrmdst.setown(createFastLZStreamWrite(dstio));
+        {
+            if (flzstrm)
+                strmdst.setown(createFastLZStreamWrite(dstio));
+            else if (lz4strm)
+                strmdst.setown(createLZ4StreamWrite(dstio));
+        }
     }
     else 
-        dstio.setown(createCompressedFileWriter(dstfile,rowsize,false,true,NULL,fast));
+    {
+        __int64 compType = COMPRESS_METHOD_LZW;
+        if (fast)
+            compType = COMPRESS_METHOD_FASTLZ;
+        else if(lz4)
+            compType = COMPRESS_METHOD_LZ4;
+        dstio.setown(createCompressedFileWriter(dstfile,rowsize,false,true,NULL,compType,extraFlags));
+    }
 
     if (!dstio) {
         printf("ERROR: could not open '%s' for write\n",to);
@@ -278,8 +335,8 @@ void copyCompress(const char *from, const char *to, size32_t rowsize, bool fast,
             size32_t got = cmpio.get()?cmpio->read(offset, BUFFERSIZE, buffer):srcio->read(offset, BUFFERSIZE, buffer);
             if (got == 0)
                 break;
-            if (flzstrmdst)
-                flzstrmdst->write(got,buffer);
+            if (strmdst)
+                strmdst->write(got,buffer);
             else
                 dstio->write(offset, got, buffer);
             offset += got;
@@ -299,7 +356,8 @@ void copyCompress(const char *from, const char *to, size32_t rowsize, bool fast,
         }
         throw e;
     }
-    flzstrmdst.clear();
+    if (strmdst)
+        strmdst.clear();
     dstio.clear();
     if (stats) 
         printStats(offset,start,startu);
@@ -308,18 +366,25 @@ void copyCompress(const char *from, const char *to, size32_t rowsize, bool fast,
         dstfile->setTime(&createTime, &modifiedTime, NULL);
     printf("copied %s to %s%s\n",from,to,plaincopy?"":" compressing");
     { // print details 
-        dstio.setown(dstfile->open(IFOread));
+        dstio.setown(dstfile->open(IFOread, extraFlags));
         if (dstio) {
             Owned<ICompressedFileIO> cmpio = createCompressedFileReader(dstio);
-            Owned<IFileIOStream>  flzstrm = cmpio?NULL:createFastLZStreamRead(dstio);
-            if (cmpio||flzstrm) 
-                printCompDetails(to,dstio,cmpio,flzstrm);
+            Owned<IFileIOStream> strmchk;
+            if (!cmpio)
+            {
+                strmchk.setown(createFastLZStreamRead(dstio));
+                if (!strmchk)
+                    strmchk.setown(createLZ4StreamRead(dstio));
+            }
+            if (cmpio||strmchk)
+                printCompDetails(to,dstio,cmpio,strmchk,flzstrm,lz4strm);
             else 
                 printf("destination %s not compressed\n",to);
         }
         else
             printf("destination %s could not be read\n",to);
     }
+
 }
 
 
@@ -338,6 +403,8 @@ int main(int argc, char * const * argv)
         bool lzw = false;
         bool fast = false;
         bool flzstrm = false;
+        bool lz4 = false;
+        bool lz4strm = false;
         bool stats = false;
         size32_t rowsz = 0;
         for (int a = 1; a<argc; a++) {
@@ -353,6 +420,10 @@ int main(int argc, char * const * argv)
                     lzw = true;
                     continue;
                 }
+                else if(strcmp(arg, "-d") == 0) {
+                    extraFlags = IFEnocache;
+                    continue;
+                }
                 else if(strcmp(arg, "-s") == 0) {
                     stats = true;
                     continue;
@@ -365,6 +436,14 @@ int main(int argc, char * const * argv)
                     flzstrm = true;
                     continue;
                 }
+                else if(strcmp(arg, "-l") == 0) {
+                    lz4 = true;
+                    continue;
+                }
+                else if(strcmp(arg, "-ls") == 0) {
+                    lz4strm = true;
+                    continue;
+                }
                 else if(strcmp(arg, "-r") == 0) {
                     if (a+1<argc) {
                         rowsz = atoi(argv[a+1]);
@@ -392,10 +471,11 @@ int main(int argc, char * const * argv)
         }
         if (!fname1.length())
             usage(true);
-        if (!fast&&!lzw&&!rowsz&&!flzstrm)
+
+        if (!fast&&!lzw&&!rowsz&&!flzstrm&&!lz4&&!lz4strm)
             copyExpanded(fname1.str(),fname2.str(),stats);
         else
-            copyCompress(fname1.str(),fname2.str(),rowsz,fast,flzstrm,stats);
+            copyCompress(fname1.str(),fname2.str(),rowsz,fast,flzstrm,lz4,lz4strm,stats);
     }
     catch(IException * e)
     {