Browse Source

HPCC-24470 Add LZ4HC compressor support

Signed-off-by: Jake Smith <jake.smith@lexisnexisrisk.com>
Jake Smith 4 years ago
parent
commit
9094a4cdbc

+ 12 - 4
common/thorhelper/thorcommon.hpp

@@ -96,13 +96,14 @@ enum RowReaderWriterFlags
     rw_buffered       = 0x80,
     rw_lzw            = 0x100, // if rw_compress
     rw_lz4            = 0x200, // if rw_compress
-    rw_sparse         = 0x400  // NB: mutually exclusive with rw_grouped
+    rw_sparse         = 0x400, // NB: mutually exclusive with rw_grouped
+    rw_lz4hc          = 0x800  // if rw_compress
 };
 #define DEFAULT_RWFLAGS (rw_buffered|rw_autoflush|rw_compressblkcrc)
 inline bool TestRwFlag(unsigned flags, RowReaderWriterFlags flag) { return 0 != (flags & flag); }
 
-#define COMP_MASK (rw_compress|rw_compressblkcrc|rw_fastlz|rw_lzw|rw_lz4)
-#define COMP_TYPE_MASK (rw_fastlz|rw_lzw|rw_lz4)
+#define COMP_MASK (rw_compress|rw_compressblkcrc|rw_fastlz|rw_lzw|rw_lz4|rw_lz4hc)
+#define COMP_TYPE_MASK (rw_fastlz|rw_lzw|rw_lz4|rw_lz4hc)
 inline void setCompFlag(const char *compStr, unsigned &flags)
 {
     flags &= ~COMP_TYPE_MASK;
@@ -112,7 +113,9 @@ inline void setCompFlag(const char *compStr, unsigned &flags)
             flags |= rw_fastlz;
         else if (0 == stricmp("LZW", compStr))
             flags |= rw_lzw;
-        else // not specifically FLZ or LZW so set to default LZ4
+        else if (0 == stricmp("LZ4HC", compStr))
+            flags |= rw_lz4hc;
+        else // not specifically FLZ, LZW, or FL4HC so set to default LZ4
             flags |= rw_lz4;
     }
     else // default is LZ4
@@ -126,6 +129,9 @@ inline unsigned getCompMethod(unsigned flags)
         compMethod = COMPRESS_METHOD_LZW;
     else if (TestRwFlag(flags, rw_fastlz))
         compMethod = COMPRESS_METHOD_FASTLZ;
+    else if (TestRwFlag(flags, rw_lz4hc))
+        compMethod = COMPRESS_METHOD_LZ4HC;
+
     return compMethod;
 }
 
@@ -138,6 +144,8 @@ inline unsigned getCompMethod(const char *compStr)
             compMethod = COMPRESS_METHOD_FASTLZ;
         else if (0 == stricmp("LZW", compStr))
             compMethod = COMPRESS_METHOD_LZW;
+        else if (0 == stricmp("LZ4HC", compStr))
+            compMethod = COMPRESS_METHOD_LZ4HC;
     }
     return compMethod;
 }

+ 15 - 6
system/jlib/jlz4.cpp

@@ -19,6 +19,7 @@
 #include "jfcmp.hpp"
 #include "jlz4.hpp"
 #include "lz4.h"
+#include "lz4hc.h"
 
 /* Format:
     size32_t totalexpsize;
@@ -28,7 +29,9 @@
 
 class jlib_decl CLZ4Compressor : public CFcmpCompressor
 {
-    virtual void setinmax()
+    bool hc;
+protected:
+    virtual void setinmax() override
     {
         inmax = blksz-outlen-sizeof(size32_t);
         if (inmax<256)
@@ -45,7 +48,7 @@ class jlib_decl CLZ4Compressor : public CFcmpCompressor
         }
     }
 
-    virtual void flushcommitted()
+    virtual void flushcommitted() override
     {
         // only does non trailing
         if (trailing)
@@ -75,7 +78,10 @@ class jlib_decl CLZ4Compressor : public CFcmpCompressor
         size32_t *cmpsize = (size32_t *)(outbuf+outlen);
         byte *out = (byte *)(cmpsize+1);
 
-        *cmpsize = LZ4_compress_default((const char *)inbuf, (char *)out, toflush, LZ4_COMPRESSBOUND(toflush));
+        if (hc)
+            *cmpsize = LZ4_compress_HC((const char *)inbuf, (char *)out, toflush, LZ4_COMPRESSBOUND(toflush), LZ4HC_CLEVEL_DEFAULT);
+        else
+            *cmpsize = LZ4_compress_default((const char *)inbuf, (char *)out, toflush, LZ4_COMPRESSBOUND(toflush));
         if (*cmpsize && *cmpsize<toflush)
         {
             *(size32_t *)outbuf += toflush;
@@ -92,7 +98,10 @@ class jlib_decl CLZ4Compressor : public CFcmpCompressor
         }
         trailing = true;
     }
-
+public:
+    CLZ4Compressor(bool _hc) : hc(_hc)
+    {        
+    }
 };
 
 
@@ -229,9 +238,9 @@ void LZ4DecompressToBuffer(MemoryAttr & out, MemoryBuffer & in)
 }
 
 
-ICompressor *createLZ4Compressor()
+ICompressor *createLZ4Compressor(bool hc)
 {
-    return new CLZ4Compressor;
+    return new CLZ4Compressor(hc);
 }
 
 IExpander *createLZ4Expander()

+ 1 - 1
system/jlib/jlz4.hpp

@@ -22,7 +22,7 @@
 
 #define LZ4COMPRESSEDFILEBLOCKSIZE (0x100000)
 
-extern jlib_decl ICompressor *createLZ4Compressor();
+extern jlib_decl ICompressor *createLZ4Compressor(bool hc=false);
 extern jlib_decl IExpander   *createLZ4Expander();
 
 extern jlib_decl void LZ4CompressToBuffer(MemoryBuffer & out, size32_t len, const void * src);

+ 53 - 20
system/jlib/jlzw.cpp

@@ -2130,10 +2130,12 @@ public:
         curblockpos = 0;
         curblocknum = (unsigned)-1; // relies on wrap
         compMethod = _compMethod;
-        if (mode!=ICFread) {
+        if (mode!=ICFread)
+        {
             if (!_fileio&&_mmfile)
                 throw MakeStringException(-1,"Compressed Write not supported on memory mapped files");
-            if (trailer.recordSize) {
+            if (trailer.recordSize)
+            {
                 if ((trailer.recordSize>trailer.blockSize/4) || // just too big
                     (trailer.recordSize<10))                    // or too small
                     trailer.recordSize = 0;
@@ -2142,43 +2144,58 @@ public:
             }
             compblkptr = (byte *)compblk.allocate(trailer.blockSize+trailer.recordSize*2+16); // over estimate!
             compblklen = 0;
-            if (trailer.recordSize==0) {
+            if (trailer.recordSize==0)
+            {
                 if (!compressor)
                 {
-                    if (compMethod == COMPRESS_METHOD_FASTLZ)
-                        compressor.setown(createFastLZCompressor());
-                    else if (compMethod == COMPRESS_METHOD_LZ4)
-                        compressor.setown(createLZ4Compressor());
-                    else // fallback
+                    switch (compMethod)
                     {
-                        compMethod = COMPRESS_METHOD_LZW;
-                        trailer.compressedType = COMPRESSEDFILEFLAG;
-                        compressor.setown(createLZWCompressor(true));
+                        case COMPRESS_METHOD_FASTLZ:
+                            compressor.setown(createFastLZCompressor());
+                            break;
+                        case COMPRESS_METHOD_LZ4:
+                            compressor.setown(createLZ4Compressor(false));
+                            break;
+                        case COMPRESS_METHOD_LZ4HC:
+                            compressor.setown(createLZ4Compressor(true));
+                            break;
+                        default:
+                            compMethod = COMPRESS_METHOD_LZW;
+                            trailer.compressedType = COMPRESSEDFILEFLAG;
+                            compressor.setown(createLZWCompressor(true));
+                            break;
                     }
                 }
                 compressor->open(compblkptr, trailer.blockSize);
             }
         }
-        if (mode!=ICFcreate) {
+        if (mode!=ICFcreate)
+        {
             unsigned nb = trailer.numBlocks();
             size32_t toread = sizeof(offset_t)*nb;
-            if (fileio) {
+            if (fileio)
+            {
                 size32_t r = fileio->read(trailer.indexPos,toread,indexbuf.reserveTruncate(toread));
                 assertex(r==toread);
             }
-            else {
+            else
+            {
                 assertex((memsize_t)trailer.indexPos==trailer.indexPos);
                 memcpy(indexbuf.reserveTruncate(toread),mmfile->base()+(memsize_t)trailer.indexPos,toread);
             }
-            if (mode==ICFappend) {
+            if (mode==ICFappend)
+            {
                 curblocknum = nb-1;
-                if (setcrc) {
+                if (setcrc)
+                {
                     trailer.crc = trailer.datacrc;
                     trailer.datacrc = ~0U;
                 }
             }
-            if (trailer.recordSize==0) {
-                if (!expander) {
+            if (trailer.recordSize==0)
+            {
+                if (!expander)
+                {
                     if (compMethod == COMPRESS_METHOD_FASTLZ)
                         expander.setown(createFastLZExpander());
                     else if (compMethod == COMPRESS_METHOD_LZ4)
@@ -2466,7 +2483,7 @@ ICompressedFileIO *createCompressedFileWriter(IFileIO *fileio, bool append, size
             trailer.blockSize = FASTCOMPRESSEDFILEBLOCKSIZE;
             trailer.recordSize = 0;
         }
-        else if (_compMethod == COMPRESS_METHOD_LZ4)
+        else if ((_compMethod == COMPRESS_METHOD_LZ4) || (_compMethod == COMPRESS_METHOD_LZ4HC))
         {
             trailer.compressedType = LZ4COMPRESSEDFILEFLAG;
             trailer.blockSize = LZ4COMPRESSEDFILEBLOCKSIZE;
@@ -2735,6 +2752,14 @@ public:
     }
 } compressors;
 
+typedef IIteratorOf<ICompressHandler> ICompressHandlerIterator;
+
+ICompressHandlerIterator *getCompressHandlerIterator()
+{
+    return new ArrayIIteratorOf<IArrayOf<ICompressHandler>, ICompressHandler, ICompressHandlerIterator>(compressors);
+}
+
+
 
 bool addCompressorHandler(ICompressHandler *handler)
 {
@@ -2776,7 +2801,14 @@ MODULE_INIT(INIT_PRIORITY_STANDARD)
     {
     public:
         CLZ4CompressHandler() : CCompressHandlerBase("LZ4") { }
-        virtual ICompressor *getCompressor(const char *options) { return createLZ4Compressor(); }
+        virtual ICompressor *getCompressor(const char *options) { return createLZ4Compressor(false); }
+        virtual IExpander *getExpander(const char *options) { return createLZ4Expander(); }
+    };
+    class CLZ4HCCompressHandler : public CCompressHandlerBase
+    {
+    public:
+        CLZ4HCCompressHandler() : CCompressHandlerBase("LZ4HC") { }
+        virtual ICompressor *getCompressor(const char *options) { return createLZ4Compressor(true); }
         virtual IExpander *getExpander(const char *options) { return createLZ4Expander(); }
     };
     class CAESCompressHandler : public CCompressHandlerBase
@@ -2812,6 +2844,7 @@ MODULE_INIT(INIT_PRIORITY_STANDARD)
     addCompressorHandler(new CDiffCompressHandler());
     addCompressorHandler(new CLZWCompressHandler());
     addCompressorHandler(new CFLZCompressHandler());
+    addCompressorHandler(new CLZ4HCCompressHandler());    
     ICompressHandler *lz4Compressor = new CLZ4CompressHandler();
     addCompressorHandler(lz4Compressor);
     defaultCompressor.set(lz4Compressor);

+ 7 - 0
system/jlib/jlzw.hpp

@@ -99,6 +99,7 @@ extern jlib_decl void appendToBuffer(MemoryBuffer & out, size32_t len, const voi
 #define COMPRESS_METHOD_FASTLZ 3
 #define COMPRESS_METHOD_LZMA   4
 #define COMPRESS_METHOD_LZ4    5
+#define COMPRESS_METHOD_LZ4HC  6
 
 interface ICompressedFileIO: extends IFileIO
 {
@@ -135,6 +136,8 @@ interface ICompressHandler : extends IInterface
     virtual ICompressor *getCompressor(const char *options=NULL) = 0;
     virtual IExpander *getExpander(const char *options=NULL) = 0;
 };
+typedef IIteratorOf<ICompressHandler> ICompressHandlerIterator;
+extern jlib_decl ICompressHandlerIterator *getCompressHandlerIterator();
 extern jlib_decl void setDefaultCompressor(const char *type);
 extern jlib_decl ICompressHandler *queryCompressHandler(const char *type);
 extern jlib_decl ICompressHandler *queryDefaultCompressHandler();
@@ -157,6 +160,8 @@ inline unsigned translateToCompMethod(const char *compStr)
             compMethod = COMPRESS_METHOD_ROWDIF;
         else if (strieq("LZMA", compStr))
             compMethod = COMPRESS_METHOD_LZMA;
+        else if (strieq("LZ4HC", compStr))
+            compMethod = COMPRESS_METHOD_LZ4HC;
         //else // default is LZ4
     }
     return compMethod;
@@ -174,6 +179,8 @@ inline const char *translateFromCompMethod(unsigned compMethod)
             return "FLZ";
         case COMPRESS_METHOD_LZ4:
             return "LZ4";
+        case COMPRESS_METHOD_LZ4HC:
+            return "LZ4HC";
         case COMPRESS_METHOD_LZMA:
             return "LZMA";
         default:

+ 1 - 0
system/lz4_sm/CMakeLists.txt

@@ -27,6 +27,7 @@ project( lz4 )
 
 set ( SRCS
         lz4/lib/lz4.c
+        lz4/lib/lz4hc.c
 )
 
 include_directories (

+ 94 - 0
testing/unittests/jlibtests.cpp

@@ -27,6 +27,7 @@
 #include "jdebug.hpp"
 #include "jset.hpp"
 #include "rmtfile.hpp"
+#include "jlzw.hpp"
 #include "jqueue.hpp"
 #include "jregexp.hpp"
 
@@ -2282,6 +2283,99 @@ CPPUNIT_TEST_SUITE_REGISTRATION(JlibIOTest);
 CPPUNIT_TEST_SUITE_NAMED_REGISTRATION(JlibIOTest, "JlibIOTest");
 
 
+class JlibCompressionTestsStress : public CppUnit::TestFixture
+{
+    CPPUNIT_TEST_SUITE(JlibCompressionTestsStress);
+        CPPUNIT_TEST(test);
+    CPPUNIT_TEST_SUITE_END();
+
+public:
+    void test()
+    {
+        try
+        {
+            size32_t sz = 100*0x100000; // 100MB
+            MemoryBuffer src;
+            src.ensureCapacity(sz);
+            MemoryBuffer compressed;
+            const char *aesKey = "012345678901234567890123";
+            Owned<ICompressHandlerIterator> iter = getCompressHandlerIterator();
+
+            StringBuffer tmp;
+            unsigned card = 0;
+            size32_t rowSz = 0;
+            while (true)
+            {
+                size32_t cLen = src.length();
+                if (cLen > sz)
+                    break;
+                src.append(cLen);
+                tmp.clear().appendf("%10u", cLen);
+                src.append(tmp.length(), tmp.str());
+                src.append(++card % 52);
+                src.append(crc32((const char *)&cLen, sizeof(cLen), 0));
+                unsigned ccrc = crc32((const char *)&card, sizeof(card), 0);
+                tmp.clear().appendf("%10u", ccrc);
+                src.append(tmp.length(), tmp.str());
+                tmp.clear().appendf("%20u", (++card % 10));
+                src.append(tmp.length(), tmp.str());
+                if (0 == rowSz)
+                    rowSz = src.length();
+                else
+                {
+                    dbgassertex(0 == (src.length() % rowSz));
+                }
+            }
+
+            printf("\nAlgorithm || Compression Time (ms) || Decompression Time (ms) || Compression Ratio\n");
+
+            ForEach(*iter)
+            {
+                compressed.clear();
+                ICompressHandler &handler = iter->query();
+                Owned<ICompressor> compressor = handler.getCompressor(streq("AES", handler.queryType()) ? aesKey: nullptr);
+
+                CCycleTimer timer;
+                compressor->open(compressed, sz);
+                compressor->startblock();
+                const byte *ptr = src.bytes();
+                const byte *ptrEnd = ptr + src.length();
+                while (ptr != ptrEnd)
+                {
+                    compressor->write(ptr, rowSz);
+                    ptr += rowSz;
+                }
+                compressor->commitblock();
+                compressor->close();
+                cycle_t compressCycles = timer.elapsedCycles();
+
+                Owned<IExpander> expander = handler.getExpander(streq("AES", handler.queryType()) ? aesKey: nullptr);
+
+                timer.reset();
+                size32_t required = expander->init(compressed.bytes());
+                MemoryBuffer tgt(required);
+                expander->expand(tgt.bufferBase());
+                tgt.setWritePos(required);
+                cycle_t decompressCycles = timer.elapsedCycles();
+
+                float ratio = (float)(src.length()) / compressed.length();
+
+                printf("%9s || %21u || %23u || %17.2f [ %u, %u ]\n", handler.queryType(), (unsigned)cycle_to_millisec(compressCycles), (unsigned)cycle_to_millisec(decompressCycles), ratio, src.length(), compressed.length());
+
+                CPPUNIT_ASSERT(tgt.length() >= sz);
+                CPPUNIT_ASSERT(0 == memcmp(src.bufferBase(), tgt.bufferBase(), sz));
+           }
+        }
+        catch (IException *e)
+        {
+            EXCLOG(e, nullptr);
+            throw;
+        }
+    }
+};
+
+CPPUNIT_TEST_SUITE_REGISTRATION( JlibCompressionTestsStress );
+CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( JlibCompressionTestsStress, "JlibCompressionTestsStress" );
 
 
 #endif // _USE_CPPUNIT

+ 2 - 0
thorlcr/activities/thactivityutil.cpp

@@ -763,6 +763,8 @@ IFileIO *createMultipleWrite(CActivityBase *activity, IPartDescriptor &partDesc,
                 compMethod = COMPRESS_METHOD_FASTLZ;
             else if (activity->getOptBool(THOROPT_COMP_FORCELZ4, false))
                 compMethod = COMPRESS_METHOD_LZ4;
+            else if (activity->getOptBool(THOROPT_COMP_FORCELZ4HC, false))
+                compMethod = COMPRESS_METHOD_LZ4HC;
         }
         fileio.setown(createCompressedFileWriter(file, recordSize, 0 != (twFlags & TW_Extend), true, ecomp, compMethod));
         if (!fileio)

+ 2 - 0
thorlcr/activities/thdiskbase.cpp

@@ -242,6 +242,8 @@ void CWriteMasterBase::publish()
                 compMethod = COMPRESS_METHOD_FASTLZ;
             else if (getOptBool(THOROPT_COMP_FORCELZ4, false))
                 compMethod = COMPRESS_METHOD_LZ4;
+            else if (getOptBool(THOROPT_COMP_FORCELZ4HC, false))
+                compMethod = COMPRESS_METHOD_LZ4HC;
             bool blockCompressed;
             bool compressed = fileDesc->isCompressed(&blockCompressed);
             for (unsigned clusterIdx=0; clusterIdx<fileDesc->numClusters(); clusterIdx++)

+ 1 - 0
thorlcr/thorutil/thormisc.hpp

@@ -68,6 +68,7 @@
 #define THOROPT_COMP_FORCELZW         "forceLZW"                // Forces file compression to use LZW                                            (default = false)
 #define THOROPT_COMP_FORCEFLZ         "forceFLZ"                // Forces file compression to use FLZ                                            (default = false)
 #define THOROPT_COMP_FORCELZ4         "forceLZ4"                // Forces file compression to use LZ4                                            (default = false)
+#define THOROPT_COMP_FORCELZ4HC       "forceLZ4HC"              // Forces file compression to use LZ4HC                                          (default = false)
 #define THOROPT_TRACE_ENABLED         "traceEnabled"            // Output from TRACE activity enabled                                            (default = false)
 #define THOROPT_TRACE_LIMIT           "traceLimit"              // Number of rows from TRACE activity                                            (default = 10)
 #define THOROPT_READ_CRC              "crcReadEnabled"          // Enabled CRC validation on disk reads if file CRC are available                (default = true)