Parcourir la source

HPCC-18485 Compressed [binary] output support

Signed-off-by: Jake Smith <jake.smith@lexisnexisrisk.com>
Jake Smith il y a 6 ans
Parent
commit
832fc56e98

+ 144 - 8
common/remote/sockfile.cpp

@@ -1796,6 +1796,18 @@ public:
         // NB: inputGrouped == outputGrouped for now, but may want output to be ungrouped
 
         openRequest();
+        if (queryOutputCompressionDefault())
+        {
+            expander.setown(getExpander(queryOutputCompressionDefault()));
+            if (expander)
+            {
+                expandMb.setEndian(__BIG_ENDIAN);
+                request.appendf("\"outputCompression\" : \"%s\",\n", queryOutputCompressionDefault());
+            }
+            else
+                WARNLOG("Failed to created compression decompressor for: %s", queryOutputCompressionDefault());
+        }
+
         request.appendf("\"format\" : \"binary\",\n"
             "\"node\" : {\n"
             " \"fileName\" : \"%s\"", filename);
@@ -1951,6 +1963,13 @@ protected:
             reply.swapWith(newReply);
             reply.read(bufRemaining);
             eof = (bufRemaining == 0);
+            if (expander)
+            {
+                size32_t expandedSz = expander->init(reply.bytes()+reply.getPos());
+                expandMb.clear().reserve(expandedSz);
+                expander->expand(expandMb.bufferBase());
+                expandMb.swapWith(reply);
+            }
         }
         else
         {
@@ -1979,6 +1998,13 @@ protected:
         reply.read(handle);
         reply.read(bufRemaining);
         eof = (bufRemaining == 0);
+        if (expander)
+        {
+            size32_t expandedSz = expander->init(reply.bytes()+reply.getPos());
+            expandMb.clear().reserve(expandedSz);
+            expander->expand(expandMb.bufferBase());
+            expandMb.swapWith(reply);
+        }
     }
     StringBuffer request;
     MemoryBuffer reply;
@@ -1989,6 +2015,8 @@ protected:
 
     bool firstRequest = true;
     std::unordered_map<std::string, std::string> virtualFields;
+    Owned<IExpander> expander;
+    MemoryBuffer expandMb;
 };
 
 class CRemoteFilteredFileIO : public CRemoteFilteredFileIOBase
@@ -2034,6 +2062,14 @@ protected:
     const RtlRecord &recInfo;
 };
 
+static StringAttr remoteOutputCompressionDefault;
+void setRemoteOutputCompressionDefault(const char *type)
+{
+    if (!isEmptyString(type))
+        remoteOutputCompressionDefault.set(type);
+}
+const char *queryOutputCompressionDefault() { return remoteOutputCompressionDefault; }
+
 extern IRemoteFileIO *createRemoteFilteredFile(SocketEndpoint &ep, const char * filename, IOutputMetaData *actual, IOutputMetaData *projected, const RowFilter &fieldFilters, bool compressed, bool grouped, unsigned __int64 chooseN)
 {
     try
@@ -3924,14 +3960,16 @@ class CRemoteRequest : public CSimpleInterfaceOf<IInterface>
     OutputFormat format;
     unsigned __int64 replyLimit;
     Linked<IRemoteActivity> activity;
+    Linked<ICompressor> compressor;
 public:
-    CRemoteRequest(OutputFormat _format, unsigned __int64 _replyLimit, IRemoteActivity *_activity)
-        : format(_format), replyLimit(_replyLimit), activity(_activity)
+    CRemoteRequest(OutputFormat _format, ICompressor *_compressor, unsigned __int64 _replyLimit, IRemoteActivity *_activity)
+        : format(_format), compressor(_compressor), replyLimit(_replyLimit), activity(_activity)
     {
     }
     OutputFormat queryFormat() const { return format; }
     unsigned __int64 queryReplyLimit() const { return replyLimit; }
     IRemoteActivity *queryActivity() const { return activity; }
+    ICompressor *queryCompressor() const { return compressor; }
 };
 
 enum OpenFileFlag { of_null=0x0, of_key=0x01 };
@@ -5286,6 +5324,22 @@ class CRemoteFileServer : implements IRemoteFileServer, public CInterface
 
 #define IMPERSONATE_USER(client) cImpersonateBlock ublock(client)
 
+    bool handleFull(MemoryBuffer &inMb, size32_t inPos, MemoryBuffer &compressMb, ICompressor *compressor, size32_t replyLimit, size32_t &totalSz)
+    {
+        size32_t sz = inMb.length()-inPos;
+        if (sz < replyLimit)
+            return false;
+
+        if (!compressor)
+            return true;
+
+        // consumes data from inMb into compressor
+        totalSz += sz;
+        const void *data = inMb.bytes()+inPos;
+        assertex(compressor->write(data, sz) == sz);
+        inMb.setLength(inPos);
+        return compressMb.capacity() > replyLimit;
+    }
 public:
 
     IMPLEMENT_IINTERFACE
@@ -6185,6 +6239,27 @@ public:
 
         /* Example JSON request:
          * {
+         *  "format" : "binary",
+         *  "handle" : "1234",
+         *  "replyLimit" : "64",
+         *  "outputCompression" : "LZ4",
+         *  "node" : {
+         *   "kind" : "diskread",
+         *   "fileName": "examplefilename",
+         *   "keyFilter" : "f1='1    '",
+         *   "chooseN" : 5,
+         *   "compressed" : "false"
+         *   "input" : {
+         *    "f1" : "string5",
+         *    "f2" : "string5"
+         *   },
+         *   "output" : {
+         *    "f2" : "string",
+         *    "f1" : "real"
+         *   }
+         *  }
+         * }
+         * {
          *  "format" : "xml",
          *  "handle" : "1234",
          *  "replyLimit" : "64",
@@ -6243,6 +6318,8 @@ public:
         int cursorHandle = requestTree->getPropInt("handle");
         OutputFormat outputFormat = outFmt_Xml;
         unsigned __int64 replyLimit = 0;
+        Owned<ICompressor> compressor;
+        MemoryBuffer compressMb;
 
         Owned<IRemoteActivity> outputActivity;
         OpenFileInfo fileInfo;
@@ -6262,10 +6339,26 @@ public:
 
             replyLimit = requestTree->getPropInt64("replyLimit", defaultDaFSReplyLimitKB) * 1024;
 
+            if (requestTree->hasProp("outputCompression"))
+            {
+                const char *outputCompressionType = requestTree->queryProp("outputCompression");
+                if (isEmptyString(outputCompressionType))
+                    compressor.setown(queryDefaultCompressHandler()->getCompressor());
+                else if (outFmt_Binary == outputFormat)
+                {
+                    compressor.setown(getCompressor(outputCompressionType));
+                    if (!compressor)
+                        WARNLOG("Unknown compressor type specified: %s", outputCompressionType);
+                }
+                else
+                    WARNLOG("Output compression not supported for format: %s", outputFmtStr);
+            }
+
+
             // In future this may be passed the request and build a chain of activities and return sink.
             outputActivity.setown(createOutputActivity(*requestTree));
 
-            Owned<CRemoteRequest> remoteRequest = new CRemoteRequest(outputFormat, replyLimit, outputActivity);
+            Owned<CRemoteRequest> remoteRequest = new CRemoteRequest(outputFormat, compressor, replyLimit, outputActivity);
 
             StringBuffer requestStr("jsonrequest:");
             outputActivity->getInfoStr(requestStr);
@@ -6281,6 +6374,7 @@ public:
         else // known handle, continuation
         {
             outputActivity.set(fileInfo.remoteRequest->queryActivity());
+            compressor.set(fileInfo.remoteRequest->queryCompressor());
             outputFormat = fileInfo.remoteRequest->queryFormat();
             replyLimit = fileInfo.remoteRequest->queryReplyLimit();
         }
@@ -6314,12 +6408,28 @@ public:
             MemoryBufferBuilder outBuilder(resultBuffer, out->getMinRecordSize());
             if (outFmt_Binary == outputFormat)
             {
-                DelayedSizeMarker dataLenMarker(reply); // data length
+                if (compressor)
+                {
+                    compressMb.setEndian(__BIG_ENDIAN);
+                    compressMb.append(reply);
+                }
+
+                DelayedMarker<size32_t> dataLenMarker(compressor ? compressMb : reply); // data length
+
+                if (compressor)
+                {
+                    size32_t initialSz = replyLimit >= 0x10000 ? 0x10000 : replyLimit;
+                    compressor->open(compressMb, initialSz);
+                }
+
                 outBuilder.setBuffer(reply); // write direct to reply buffer for efficiency
+                unsigned __int64 numProcessedStart = outputActivity->queryProcessed();
+                size32_t totalDataSz = 0;
+                size32_t dataStartPos = reply.length();
 
                 if (grouped)
                 {
-                    bool pastFirstRow = outputActivity->queryProcessed()>0;
+                    bool pastFirstRow = numProcessedStart>0;
                     do
                     {
                         size32_t eogPos = 0;
@@ -6354,7 +6464,7 @@ public:
                         }
                         pastFirstRow = true;
                     }
-                    while (reply.length() < replyLimit);
+                    while (!handleFull(reply, dataStartPos, compressMb, compressor, replyLimit, totalDataSz));
                 }
                 else
                 {
@@ -6368,13 +6478,39 @@ public:
                             break;
                         }
                     }
-                    while (reply.length() < replyLimit);
+                    while (!handleFull(reply, dataStartPos, compressMb, compressor, replyLimit, totalDataSz));
                 }
-                dataLenMarker.write();
+
+                // Consume any trailing data remaining
+                if (compressor)
+                {
+                    size32_t sz = reply.length()-dataStartPos;
+                    if (sz)
+                    {
+                        // consumes data built up in reply buffer into compressor
+                        totalDataSz += sz;
+                        const void *data = reply.bytes()+dataStartPos;
+                        assertex(compressor->write(data, sz) == sz);
+                        reply.setLength(dataStartPos);
+                    }
+                }
+
+                // finalize reply
+                dataLenMarker.write(compressor ? totalDataSz : reply.length()-dataStartPos);
                 DelayedSizeMarker cursorLenMarker(reply); // cursor length
                 if (!eoi)
                     outputActivity->serializeCursor(reply);
                 cursorLenMarker.write();
+                if (compressor)
+                {
+                    // consume cursor into compressor
+                    size32_t sz = reply.length()-dataStartPos;
+                    const void *data = reply.bytes()+dataStartPos;
+                    assertex(compressor->write(data, sz) == sz);
+                    compressor->close();
+                    // now ready to swap compressed output into reply
+                    reply.swapWith(compressMb);
+                }
             }
             else
             {

+ 5 - 0
common/remote/sockfile.hpp

@@ -78,6 +78,11 @@ extern REMOTE_API void setDafsEndpointPort(SocketEndpoint &ep);
 extern REMOTE_API void setDafsLocalMountRedirect(const IpAddress &ip,const char *dir,const char *mountdir);
 extern REMOTE_API ISocket *connectDafs(SocketEndpoint &ep, unsigned timeoutms); // NOTE: might alter ep.port if configured for multiple ports ...
 extern REMOTE_API ISocket *checkSocketSecure(ISocket *socket);
+
+
+extern REMOTE_API void setRemoteOutputCompressionDefault(const char *type);
+extern REMOTE_API const char *queryOutputCompressionDefault();
+
 interface IOutputMetaData;
 class RowFilter;
 interface IRemoteFileIO : extends IFileIO

+ 10 - 3
testing/regress/ecl/key/remoteread.xml

@@ -5,22 +5,29 @@
 <Dataset name='Result 3'>
 </Dataset>
 <Dataset name='Result 4'>
+</Dataset>
+<Dataset name='Result 5'>
+</Dataset>
+<Dataset name='Result 6'>
  <Row><fname>John      </fname><lname>Smith     </lname><age>42</age><logicalfile>regress::single::remoteread                       </logicalfile><fpos>0</fpos></Row>
  <Row><fname>Bob       </fname><lname>Brown     </lname><age>29</age><logicalfile>regress::single::remoteread                       </logicalfile><fpos>28</fpos></Row>
  <Row><fname>Samuel    </fname><lname>Jackson   </lname><age>58</age><logicalfile>regress::single::remoteread                       </logicalfile><fpos>56</fpos></Row>
 </Dataset>
-<Dataset name='Result 5'>
+<Dataset name='Result 7'>
  <Row><fname>John      </fname><lname>Smith     </lname><age>42</age><logicalfile>regress::single::remoteread_comp                  </logicalfile><fpos>0</fpos></Row>
  <Row><fname>Bob       </fname><lname>Brown     </lname><age>29</age><logicalfile>regress::single::remoteread_comp                  </logicalfile><fpos>28</fpos></Row>
  <Row><fname>Samuel    </fname><lname>Jackson   </lname><age>58</age><logicalfile>regress::single::remoteread_comp                  </logicalfile><fpos>56</fpos></Row>
 </Dataset>
-<Dataset name='Result 6'>
+<Dataset name='Result 8'>
  <Row><fname>Bob       </fname><lname>Brown     </lname><age>29</age></Row>
  <Row><fname>John      </fname><lname>Smith     </lname><age>42</age></Row>
  <Row><fname>Samuel    </fname><lname>Jackson   </lname><age>58</age></Row>
 </Dataset>
-<Dataset name='Result 7'>
+<Dataset name='Result 9'>
  <Row><lname>Smith     </lname><fname>John      </fname><fpos>0</fpos><logicalfile>regress::single::remoteread</logicalfile></Row>
  <Row><lname>Brown     </lname><fname>Bob       </fname><fpos>28</fpos><logicalfile>regress::single::remoteread</logicalfile></Row>
  <Row><lname>Jackson   </lname><fname>Samuel    </fname><fpos>56</fpos><logicalfile>regress::single::remoteread</logicalfile></Row>
 </Dataset>
+<Dataset name='Result 10'>
+ <Row><Result_10>500043499986</Result_10></Row>
+</Dataset>

+ 19 - 0
testing/regress/ecl/remoteread.ecl

@@ -17,13 +17,16 @@
 
 //version optRemoteRead=false
 //version optRemoteRead=true
+//version optRemoteRead=true,optCompression='LZ4'
 
 #option('layoutTranslation', true);
 
 import Std.File AS FileServices;
 import ^ as root;
 optRemoteRead := #IFDEFINED(root.optRemoteRead, false);
+optCompression := #IFDEFINED(root.optCompression, '');
 #option('forceRemoteRead', optRemoteRead);
+#option('remoteCompressedOutput', optCompression);
 
 
 #onwarning(4523, ignore);
@@ -37,6 +40,8 @@ suffix := '-' + WORKUNIT;
 fname := prefix + 'remoteread' + suffix;
 fname_comp := prefix + 'remoteread_comp' + suffix;
 fname_index := prefix + 'remoteread_index' + suffix;
+fname_large := prefix + 'remoteread_large' + suffix;
+fname_large_out := prefix + 'remoteread_largeout' + suffix;
 
 rec := RECORD
  string10 fname;
@@ -69,20 +74,34 @@ ds := project(DATASET(fname, rec_vf, FLAT), transform(rec_vf, SELF.logicalFile :
 ds_comp := project(DATASET(fname_comp, rec_vf, FLAT), transform(rec_vf, SELF.logicalFile := trimmedLogicalFilename(LEFT.logicalFile); SELF := LEFT;));
 ds_trans := project(DATASET(fname, rec_vf_trans, FLAT), transform(rec_vf_trans, SELF.logicalFile := trimmedLogicalFilename(LEFT.logicalFile); SELF := LEFT;));
 
+// big enough to test default buffer limits in dafilesrv
+largeds := DATASET(1000000, TRANSFORM(rec, SELF.fname := TRIM(inds[(COUNTER%3)+1].fname)+(string)COUNTER; SELF.lname := TRIM(inds[(COUNTER%3)+1].lname)+(string)COUNTER; SELF.age := inds[(COUNTER%3)+1].age+COUNTER));
+largeoutrec := RECORD // shuffles order
+ unsigned age;
+ string10 lname;
+ string10 fname;
+END;
+
+
+
 SEQUENTIAL(
  OUTPUT(inds, , fname, OVERWRITE);
  OUTPUT(inds, , fname_comp, COMPRESSED, OVERWRITE);
  BUILDINDEX(i, OVERWRITE);
+ OUTPUT(largeds, , fname_large, OVERWRITE);
+ OUTPUT(largeds, , fname_large, OVERWRITE);
 
  OUTPUT(ds);
  OUTPUT(ds_comp);
  OUTPUT(i);
  OUTPUT(ds_trans);
+ SUM(NOFOLD(DATASET(fname_large, rec, FLAT)), age);
 
  // Clean-up
  FileServices.DeleteLogicalFile(fname),
  FileServices.DeleteLogicalFile(fname_comp),
  FileServices.DeleteLogicalFile(fname_index),
+ FileServices.DeleteLogicalFile(fname_large),
 );
 
 

+ 6 - 0
thorlcr/graph/thgraphslave.cpp

@@ -24,6 +24,7 @@
 #include "thcodectx.hpp"
 #include "thmem.hpp"
 #include "thorport.hpp"
+#include "sockfile.hpp"
 #include "slwatchdog.hpp"
 #include "thgraphslave.hpp"
 #include "thcompressutil.hpp"
@@ -1653,6 +1654,11 @@ CJobSlave::CJobSlave(ISlaveWatchdog *_watchdog, IPropertyTree *_workUnitInfo, co
     }
     tmpHandler.setown(createTempHandler(true));
     sharedAllocator.setown(::createThorAllocator(globalMemoryMB, sharedMemoryMB, numChannels, memorySpillAtPercentage, *logctx, crcChecking, usePackedAllocator));
+
+    StringBuffer remoteCompressedOutput;
+    getOpt("remoteCompressedOutput", remoteCompressedOutput);
+    if (remoteCompressedOutput.length())
+        setRemoteOutputCompressionDefault(remoteCompressedOutput);
 }
 
 void CJobSlave::addChannel(IMPServer *mpServer)