Ver código fonte

HPCC-18576 Add compressed support to dafilesrv remote read

Signed-off-by: Jake Smith <jake.smith@lexisnexis.com>
Jake Smith 7 anos atrás
pai
commit
67fad1a098
1 arquivos alterados com 13 adições e 20 exclusões
  1. 13 20
      common/remote/sockfile.cpp

+ 13 - 20
common/remote/sockfile.cpp

@@ -35,6 +35,7 @@
 #include "portlist.h"
 #include "jsocket.hpp"
 #include "jencrypt.hpp"
+#include "jlzw.hpp"
 #include "jset.hpp"
 #include "jhtree.hpp"
 
@@ -3518,6 +3519,7 @@ class CRemoteDiskReadActivity : public CSimpleInterfaceOf<IRemoteActivity>
     unsigned __int64 limit = 0;
     unsigned __int64 processed = 0;
     unsigned __int64 startPos = 0;
+    bool compressed = false;
     bool opened = false;
     bool eofSeen = false;
     bool cursorDirty = false;
@@ -3547,28 +3549,18 @@ class CRemoteDiskReadActivity : public CSimpleInterfaceOf<IRemoteActivity>
             const char *fileName = helper->getFileName();
 
             OwnedIFile iFile = createIFile(fileName);
-
-#if 0
-            bool compressed = false; // isCompressedFile(iFileIO); // Should be passed with JSON
-            StringBuffer encryptionkey;
             if (compressed)
             {
-                Owned<IExpander> eexp;
-                if (encryptionkey.length()!=0)
-                    eexp.setown(createAESExpander256((size32_t)encryptionkey.length(),encryptionkey.bufferBase()));
-                iFileIO.setown(createCompressedFileReader(iFile,eexp));
-                if(!iFileIO && !blockcompressed) //fall back to old decompression, unless dfs marked as new
-                {
-                    iFileIO.setown(iFile->open(IFOread));
-                    if(iFileIO)
-                        rowcompressed = true;
-                }
+                iFileIO.setown(createCompressedFileReader(iFile));
+                if (!iFileIO)
+                    throw MakeStringException(0, "Failed to open compressed file: '%s'", fileName);
             }
             else
-#endif
+            {
                 iFileIO.setown(iFile->open(IFOread));
-            if (!iFileIO)
-                throw MakeStringException(0, "Failed to open: '%s'", fileName);
+                if (!iFileIO)
+                    throw MakeStringException(0, "Failed to open: '%s'", fileName);
+            }
 
             inputStream.setown(createFileSerialStream(iFileIO, startPos));
             prefetchBuffer.setStream(inputStream);
@@ -3588,8 +3580,8 @@ class CRemoteDiskReadActivity : public CSimpleInterfaceOf<IRemoteActivity>
     }
     bool segMonitorsMatch(const void *row) { return true; }
 public:
-    CRemoteDiskReadActivity(IHThorDiskReadArg &_helper)
-        : helper(&_helper), prefetchBuffer(nullptr)
+    CRemoteDiskReadActivity(IHThorDiskReadArg &_helper, bool _compressed)
+        : compressed(_compressed), helper(&_helper), prefetchBuffer(nullptr)
     {
         outMeta.set(helper->queryOutputMeta());
         canMatchAny = helper->canMatchAny();
@@ -3669,10 +3661,11 @@ IRemoteActivity *createRemoteDiskRead(IPropertyTree &actNode)
     unsigned __int64 chooseN = actNode.getPropInt64("choosen", defaultFileStreamChooseN);
     unsigned __int64 skipN = actNode.getPropInt64("skipN", defaultFileStreamSkipN);
     unsigned __int64 rowLimit = actNode.getPropInt64("rowLimit", defaultFileStreamRowLimit);
+    bool compressed = actNode.getPropBool("compressed");
     Owned<IOutputMetaData> inMeta = getTypeInfoOutputMetaData(actNode, "input");
     Owned<IOutputMetaData> outMeta = getTypeInfoOutputMetaData(actNode, "output");
     Owned<IHThorDiskReadArg> helper = createDiskReadArg(fileName, inMeta.getClear(), outMeta.getClear(), chooseN, skipN, rowLimit);
-    return new CRemoteDiskReadActivity(*helper);
+    return new CRemoteDiskReadActivity(*helper, compressed);
 }
 
 IRemoteActivity *createRemoteActivity(IPropertyTree &actNode)