Jelajahi Sumber

Merge branch 'candidate-5.4.0'

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 10 tahun lalu
induk
melakukan
5967c5d92f

+ 5 - 2
docs/ECLStandardLibraryReference/SLR-Mods/MonitorLogicalFileName.xml

@@ -49,8 +49,7 @@
           <entry><emphasis>filename</emphasis></entry>
 
           <entry>A null-terminated string containing the name of the logical
-          file in the DFU to monitor. This may contain wildcard characters ( *
-          and ?).</entry>
+          file in the DFU to monitor.</entry>
         </row>
 
         <row>
@@ -98,6 +97,10 @@
   <emphasis>event</emphasis> with the name of the triggering object as the
   event subtype (see the EVENT function).</para>
 
+  <para>This function does not support wildcard characters. To monitor
+  physical files or directories using wildcards, use the <link
+  linkend="MonitorFile">MonitorFile</link> function.</para>
+
   <para>This process continues until either:</para>
 
   <para>1) The <emphasis>shotcount</emphasis> number of events have been

+ 2 - 0
ecl/eclccserver/eclccserver.cpp

@@ -274,6 +274,8 @@ class EclccCompileThread : public CInterface, implements IPooledThread, implemen
             }
             else if (stricmp(optName, "compileOption") == 0)
                 eclccCmd.appendf(" -Wc,%s", value);
+            else if (stricmp(optName, "linkOption") == 0)
+                eclccCmd.appendf(" -Wl,%s", value);
             else if (stricmp(optName, "includeLibraryPath") == 0)
                 eclccCmd.appendf(" -I%s", value);
             else if (stricmp(optName, "libraryPath") == 0)

+ 20 - 4
ecl/hql/hqlutil.cpp

@@ -1620,11 +1620,27 @@ IHqlExpression * getExpandSelectExpr(IHqlExpression * expr)
 
 IHqlExpression * replaceChildDataset(IHqlExpression * expr, IHqlExpression * newChild, unsigned whichChild)
 {
-    HqlMapTransformer mapper;
+    if (!(getChildDatasetType(expr) & childdataset_hasdataset))
+    {
+        HqlExprArray args;
+        unwindChildren(args, expr);
+        args.replace(*LINK(newChild), whichChild);
+        return expr->clone(args);
+    }
+
     IHqlExpression * oldChild = expr->queryChild(whichChild);
-    mapper.setMapping(oldChild, newChild);
-    mapper.setSelectorMapping(oldChild, newChild);
-    return mapper.transformRoot(expr);
+    HqlMapSelectorTransformer mapper(oldChild, newChild);
+    HqlExprArray args;
+    ForEachChild(i, expr)
+    {
+        IHqlExpression * cur = expr->queryChild(i);
+        if (i == whichChild)
+            args.append(*LINK(newChild));
+        else
+            args.append(*mapper.transformRoot(cur));
+    }
+
+    return expr->clone(args);
 }
 
 IHqlExpression * insertChildDataset(IHqlExpression * expr, IHqlExpression * newChild, unsigned whichChild)

+ 7 - 0
ecl/hthor/hthor.cpp

@@ -454,6 +454,13 @@ void CHThorDiskWriteActivity::resolve()
                         throw MakeStringException(99, "Cannot write %s, file already exists (missing OVERWRITE attribute?)", full.str());
                     file->remove();
                 }
+
+                //Ensure target folder exists
+                if (!recursiveCreateDirectoryForFile(filename.get()))
+                {
+                    throw MakeStringException(99, "Cannot create file folder for %s", filename.str());
+                }
+
                 PROGLOG("Writing to file %s", filename.get());
             }
             f.clear();

+ 1 - 0
esp/src/eclwatch/nls/hpcc.js

@@ -344,6 +344,7 @@ define({root:
     Port: "Port",
     Prefix: "Prefix",
     PrefixPlaceholder: "filename{:length}, filesize{:[B|L][1-8]}",
+    PreserveCompression: "Preserve Compression",
     Preview: "Preview",
     Priority: "Priority",
     Process: "Process",

+ 2 - 0
esp/src/eclwatch/templates/DFUQueryWidget.html

@@ -66,6 +66,7 @@
                                         <input id="${id}CopyTargetCompress" title="${i18n.Compress}:" name="compress" data-dojo-type="dijit.form.CheckBox" />
                                         <input id="${id}CopyTargetWrap" title="${i18n.Wrap}:" name="Wrap" data-dojo-type="dijit.form.CheckBox" />
                                         <input id="${id}CopyTargetRetainSuperfileStructure" title="${i18n.RetainSuperfileStructure}:" name="superCopy" data-dojo-type="dijit.form.CheckBox" />
+                                        <input id="${id}CopyPreserveCompression" title="${i18n.PreserveCompression}:" checked="true" name="preserveCompression" data-dojo-type="dijit.form.CheckBox" />
                                     </div>
                                 </div>
                                 <div class="dijitDialogPaneActionBar">
@@ -152,6 +153,7 @@
                         <input id="${id}Name" title="${i18n.Name}:" name="LogicalName" colspan="2" style="width:100%;" data-dojo-props="trim: true, placeHolder:'${i18n.somefile}'" data-dojo-type="dijit.form.TextBox" />
                         <input id="${id}Description" title="${i18n.Description}:" name="Description" colspan="2" style="width:100%;" data-dojo-props="trim: true, placeHolder:'${i18n.SomeDescription}'" data-dojo-type="dijit.form.TextBox" />
                         <input id="${id}Owner" title="${i18n.Owner}:" name="Owner" colspan="2" data-dojo-props="trim: true, placeHolder:'${i18n.JSmith}'" data-dojo-type="dijit.form.TextBox" />
+                        <input id="${id}Index" title="${i18n.Index}:" value="key" name="ContentType" colspan="2" data-dojo-props="trim: true" data-dojo-type="dijit.form.CheckBox" />
                         <input id="${id}ClusterTargetSelect" title="${i18n.Cluster}:" name="NodeGroup" colspan="2" style="display: inline-block; vertical-align: middle" data-dojo-type="TargetSelectWidget" />
                         <input id="${id}FromSize" title="${i18n.FromSizes}:" name="FileSizeFrom" colspan="2" data-dojo-props="trim: true, placeHolder:'4096'" data-dojo-type="dijit.form.TextBox" />
                         <input id="${id}ToSize" title="${i18n.ToSizes}:" name="FileSizeTo" colspan="2" data-dojo-props="trim: true, placeHolder:'16777216'" data-dojo-type="dijit.form.TextBox" />

+ 3 - 1
plugins/cassandra/CMakeLists.txt

@@ -65,7 +65,9 @@ if (USE_CASSANDRA)
     option(CASS_BUILD_EXAMPLES "Build examples" OFF)
     set_property(GLOBAL PROPERTY FIND_LIBRARY_USE_LIB64_PATHS FALSE)
     SET (_SAVE_CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS}")
-    SET (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-format-nonliteral") # Work around cassandra build error
+    if (NOT WIN32)
+       SET (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-format-nonliteral") # Work around cassandra build error
+    endif()
     add_subdirectory (cpp-driver ${PROJECT_BINARY_DIR}/cassandra)
     add_dependencies( cassandra libuv )
     SET (CMAKE_CXX_FLAGS "${_SAVE_CMAKE_CXX_FLAGS}")

+ 1 - 1
system/jlib/jbuff.hpp

@@ -214,7 +214,7 @@ public:
     void *          insertDirect(unsigned offset, size32_t len); // insert len bytes at offset returning address to area inserted
     inline void     Release() const                         { delete this; }    // for consistency even though not link counted
 
-    inline void *   bufferBase() const { return curLen ? buffer : NULL; }
+    inline void *   bufferBase() const { return buffer; }
 
 
 protected:

+ 0 - 5
system/jlib/jfile.cpp

@@ -3967,11 +3967,6 @@ void setDefaultUser(const char * username,const char *password)
 
 bool recursiveCreateDirectory(const char * path)
 {
-#ifndef _WIN32
-#ifdef USE_SAMBA
-    return localCreateDirectory(path);
-#endif
-#endif
     Owned<IFile> file = createIFile(path);
     return file->createDirectory();
 }

+ 2 - 2
system/jlib/jfile.hpp

@@ -245,8 +245,8 @@ extern jlib_decl void setPasswordProvider(IPasswordProvider * provider);
 extern jlib_decl size32_t read(IFileIO * in, offset_t pos, size32_t len, MemoryBuffer & buffer);
 extern jlib_decl void copyFile(const char *target, const char *source, size32_t buffersize=DEFAULT_COPY_BLKSIZE, ICopyFileProgress *progress=NULL,CFflags copyFlags=CFnone);
 extern jlib_decl void copyFile(IFile * target, IFile * source,size32_t buffersize=DEFAULT_COPY_BLKSIZE, ICopyFileProgress *progress=NULL,CFflags copyFlags=CFnone);
-extern jlib_decl bool recursiveCreateDirectory(const char * path);              // only works locally, use IFile::createDirectory() for remote
-extern jlib_decl bool recursiveCreateDirectoryForFile(const char *filename);    // only works locally, use IFile::createDirectory() for remote
+extern jlib_decl bool recursiveCreateDirectory(const char * path);
+extern jlib_decl bool recursiveCreateDirectoryForFile(const char *filename);
 
 extern jlib_decl void splitFilename(const char * filename, StringBuffer * drive, StringBuffer * path, StringBuffer * tail, StringBuffer * ext, bool longExt = false);
 extern jlib_decl bool splitUNCFilename(const char * filename, StringBuffer * machine, StringBuffer * path, StringBuffer * tail, StringBuffer * ext);

+ 90 - 27
system/jlib/jflz.cpp

@@ -606,22 +606,25 @@ static FASTLZ_INLINE int FASTLZ_DECOMPRESSOR(const void* input, int length, void
 class jlib_decl CFastLZCompressor : public CInterface, public ICompressor
 {
     HTAB_T ht;
-    size32_t blksz;         
+    size32_t blksz;
     size32_t bufalloc;
-    MemoryAttr inma;        // equals blksize len
+    MemoryBuffer inma;      // equals blksize len
+    MemoryBuffer *outBufMb; // used when dynamic output buffer (when open() used)
+    size32_t outBufStart;
     byte *inbuf;
     size32_t inmax;         // remaining
-    size32_t inlen;         
+    size32_t inlen;
     size32_t inlenblk;      // set to COMMITTED when so
     bool trailing;
     byte *outbuf;
     size32_t outlen;
     size32_t wrmax;
+    size32_t dynamicOutSz;
 
     inline void setinmax()
     {
         inmax = blksz-outlen-sizeof(size32_t);
-        if (inmax<256)      
+        if (inmax<256)
             trailing = true;    // too small to bother compressing
         else {
             trailing = false;
@@ -640,16 +643,29 @@ class jlib_decl CFastLZCompressor : public CInterface, public ICompressor
         size32_t toflush = (inlenblk==COMMITTED)?inlen:inlenblk;
         if (toflush == 0)
             return;
-        assertex(outlen+sizeof(size32_t)*2+toflush+fastlzSlack(toflush)<=blksz);
+        size32_t outSzRequired = outlen+sizeof(size32_t)*2+toflush+fastlzSlack(toflush);
+        if (!dynamicOutSz)
+            assertex(outSzRequired<=blksz);
+        else
+        {
+            if (outSzRequired>dynamicOutSz)
+            {
+                verifyex(outBufMb->ensureCapacity(outBufStart+outSzRequired));
+                dynamicOutSz = outBufMb->capacity();
+                outbuf = ((byte *)outBufMb->bufferBase()+outBufStart);
+            }
+        }
         size32_t *cmpsize = (size32_t *)(outbuf+outlen);
         byte *out = (byte *)(cmpsize+1);
         *cmpsize = (size32_t)fastlz_compress(inbuf, (int)toflush, out, ht);
-        if (*cmpsize<toflush) {
+        if (*cmpsize<toflush)
+        {
             *(size32_t *)outbuf += toflush;
             outlen += *cmpsize+sizeof(size32_t);
             if (inlenblk==COMMITTED)
                 inlen = 0;
-            else {
+            else
+            {
                 inlen -= inlenblk;
                 memmove(inbuf,inbuf+toflush,inlen);
             }
@@ -659,7 +675,15 @@ class jlib_decl CFastLZCompressor : public CInterface, public ICompressor
         trailing = true;
     }
 
-
+    void initCommon()
+    {
+        blksz = inma.capacity();
+        *(size32_t *)outbuf = 0;
+        outlen = sizeof(size32_t);
+        inlen = 0;
+        inlenblk = COMMITTED;
+        setinmax();
+    }
 public:
     IMPLEMENT_IINTERFACE;
 
@@ -669,6 +693,10 @@ public:
         outbuf = NULL;      // only set on close
         bufalloc = 0;
         wrmax = 0;          // set at open
+        dynamicOutSz = 0;
+        outBufMb = NULL;
+        outBufStart = 0;
+        inbuf = NULL;
     }
 
     virtual ~CFastLZCompressor()
@@ -680,34 +708,50 @@ public:
 
     virtual void open(void *buf,size32_t max)
     {
+        if (max<1024)
+            throw MakeStringException(-1,"CFastLZCompressor::open - block size (%d) not large enough", blksz);
         wrmax = max;
-        if (buf) {
-            if (bufalloc) {
+        if (buf)
+        {
+            if (bufalloc)
                 free(outbuf);
-            }
             bufalloc = 0;
             outbuf = (byte *)buf;
         }
-        else if (max>bufalloc) {
+        else if (max>bufalloc)
+        {
             if (bufalloc)
                 free(outbuf);
             bufalloc = max;
             outbuf = (byte *)malloc(bufalloc);
         }
-        blksz = max;
-        if (blksz!=inma.length())
-            inbuf = (byte *)inma.allocate(blksz);
-        else
-            inbuf = (byte *)inma.bufferBase();
-        if (blksz<1024)
-            throw MakeStringException(-1,"CFastLZCompressor::open - block size (%d) not large enough", blksz);
-        *(size32_t *)outbuf = 0;
-        outlen = sizeof(size32_t);
-        inlen = 0;
-        inlenblk = COMMITTED;
-        setinmax();
+        outBufMb = NULL;
+        outBufStart = 0;
+        dynamicOutSz = 0;
+        inbuf = (byte *)inma.ensureCapacity(max);
+        initCommon();
     }
-    
+
+    virtual void open(MemoryBuffer &mb, size32_t initialSize)
+    {
+        if (!initialSize)
+            initialSize = 0x100000; // 1MB
+        if (initialSize<1024)
+            throw MakeStringException(-1,"CFastLZCompressor::open - block size (%d) not large enough", initialSize);
+        wrmax = initialSize;
+        if (bufalloc)
+        {
+            free(outbuf);
+            bufalloc = 0;
+        }
+        inbuf = (byte *)inma.ensureCapacity(initialSize);
+        outBufMb = &mb;
+        outBufStart = mb.length();
+        outbuf = (byte *)outBufMb->ensureCapacity(initialSize);
+        dynamicOutSz = outBufMb->capacity();
+        initCommon();
+    }
+
     virtual void close()
     {
         if (inlenblk!=COMMITTED) {
@@ -723,12 +767,17 @@ public:
         outlen = totlen;
         *(size32_t *)outbuf += inlen;
         inbuf = NULL;
+        if (outBufMb)
+        {
+            outBufMb->setWritePos(outBufStart+outlen);
+            outBufMb = NULL;
+        }
     }
 
 
     size32_t write(const void *buf,size32_t len)
     {
-        // no more than wrmax per write
+        // no more than wrmax per write (unless dynamically sizing)
         size32_t lenb = wrmax;
         byte *b = (byte *)buf;
         size32_t written = 0;
@@ -736,12 +785,26 @@ public:
         {
             if (len < lenb)
                 lenb = len;
-            if (lenb+inlen>inmax) {
+            if (lenb+inlen>inmax)
+            {
                 if (trailing)
                     return written;
                 flushcommitted();
                 if (lenb+inlen>inmax)
+                {
+                    if (outBufMb) // sizing input buffer, but outBufMb!=NULL is condition of whether in use or not
+                    {
+                        blksz += len > 0x100000 ? len : 0x100000;
+                        verifyex(inma.ensureCapacity(blksz));
+                        blksz = inma.capacity();
+                        inbuf = (byte *)inma.bufferBase();
+                        wrmax = blksz;
+                        setinmax();
+                    }
                     lenb = inmax-inlen;
+                    if (len < lenb)
+                        lenb = len;
+                }
             }
             if (lenb == 0)
                 return written;

+ 216 - 74
system/jlib/jlzw.cpp

@@ -102,6 +102,8 @@ CLZWCompressor::CLZWCompressor(bool _supportbigendian)
     bufalloc = 0;
     inuseflag=0xff;
     supportbigendian = _supportbigendian;
+    outBufStart = 0;
+    outBufMb = NULL;
 }
 
 CLZWCompressor::~CLZWCompressor()
@@ -223,6 +225,21 @@ static struct __initShiftArray {
   curShift = shift;                                          \
 }
 
+void CLZWCompressor::initCommon()
+{
+    ASSERT(dict.curbits==0);   // check for open called twice with no close
+    initdict();
+    curcode = -1;
+    inlen = 0;
+    inlenblk = COMMITTED;
+    memset(outbuf,0,sizeof(size32_t));
+    outlen = sizeof(size32_t)+dict.curbits;
+    outbytes = (unsigned char *)outbuf+sizeof(size32_t);
+    outbits = outbytes+8;
+    outnext = outbytes+dict.curbits;
+    curShift=0; //outmask = 0x80;
+    outbitbuf = 0;
+}
 
 void CLZWCompressor::flushbuf()
 {
@@ -237,43 +254,56 @@ void CLZWCompressor::flushbuf()
     } while (outbytes+(dict.curbits-8)!=outnext);
 }
 
+void CLZWCompressor::ensure(size32_t sz)
+{
+    dbgassertex(outBufMb);
+    size32_t outBytesOffset = outbytes-(byte *)outbuf;
+    size32_t outBitsOffset = outbits-(byte *)outbuf;
+    size32_t outNextOffset = outnext-(byte *)outbuf;
+    outbuf = (byte *)outBufMb->ensureCapacity(sz);
+    maxlen = outBufMb->capacity()-SAFETY_MARGIN;
+    outbytes = (byte *)outbuf+outBytesOffset;
+    outbits = (byte *)outbuf+outBitsOffset;
+    outnext = (byte *)outbuf+outNextOffset;
+}
 
+void CLZWCompressor::open(MemoryBuffer &mb, size32_t initialSize)
+{
+    if (bufalloc)
+        free(outbuf);
+    bufalloc = 0;
+    outBufMb = &mb;
+    outBufStart = mb.length();
+    outbuf = (byte *)outBufMb->ensureCapacity(initialSize);
+    maxlen = outBufMb->capacity()-SAFETY_MARGIN;
+    initCommon();
+}
 
 void CLZWCompressor::open(void *buf,size32_t max)
 {
-
 #ifdef STATS
     st_thistime = msTick();
     st_thiswrites=0;
 #endif
 
-    if (buf) {
-        if (bufalloc) {
+    if (buf)
+    {
+        if (bufalloc)
             free(outbuf);
-        }
         bufalloc = 0;
         outbuf = buf;
     }
-    else if (max>bufalloc) {
+    else if (max>bufalloc)
+    {
         if (bufalloc)
             free(outbuf);
         bufalloc = max;
         outbuf = malloc(bufalloc);
     }
+    outBufMb = NULL;
     ASSERT(max>SAFETY_MARGIN+sizeof(size32_t)); // minimum required
     maxlen=max-SAFETY_MARGIN;
-    ASSERT(dict.curbits==0);   // check for open called twice with no close
-    initdict();
-    curcode = -1;
-    inlen = 0;
-    inlenblk = COMMITTED;
-    memset(outbuf,0,sizeof(size32_t));
-    outlen = sizeof(size32_t)+dict.curbits;
-    outbytes = (unsigned char *)outbuf+sizeof(size32_t);
-    outbits = outbytes+8;
-    outnext = outbytes+dict.curbits;
-    curShift=0; //outmask = 0x80;
-    outbitbuf = 0;
+    initCommon();
 }
 
 
@@ -295,40 +325,49 @@ size32_t CLZWCompressor::write(const void *buf,size32_t buflen)
 #endif
 
     size32_t len=buflen;
-    if (curcode==-1) {
+    if (curcode==-1)
+    {
         curcode = *(in++);
         len--;
     }
-    while (len--) {
-
+    while (len--)
+    {
         int ch = *(in++);
         int index = HASHC(curcode,ch);
-        for (;;) {
-            
-            if (dictinuse[index]!=inuseflag) {
+        for (;;)
+        {
+            if (dictinuse[index]!=inuseflag)
+            {
                 dictinuse[index] = inuseflag;
                 dictcode[index] = dict.nextcode++;
                 dict.dictparent[index] = curcode;
                 dict.dictchar[index] = (unsigned char) ch;
                 PUTCODE(curcode);
-                if ((outlen>=maxlen)) {
-                    size32_t ret;
-                    if (inlenblk==COMMITTED) {
-                        ret = in-(unsigned char *)buf-1;
-                        inlen += in-(unsigned char *)buf-1;
-                    }
+                if ((outlen>=maxlen))
+                {
+                    if (outBufMb)
+                        ensure(outlen+0x10000);
                     else
-                        ret = 0;
-                    close();
-                    return ret;
+                    {
+                        size32_t ret;
+                        if (inlenblk==COMMITTED)
+                        {
+                            ret = in-(unsigned char *)buf-1;
+                            inlen += in-(unsigned char *)buf-1;
+                        }
+                        else
+                            ret = 0;
+                        close();
+                        return ret;
+                    }
                 }
-                if (dict.nextcode == dict.nextbump) {
+                if (dict.nextcode == dict.nextbump)
+                {
                     PUTCODE(BUMP_CODE);
                     flushbuf();
                     bool eodict = !dict.bumpbits();
-                    if (eodict) {
+                    if (eodict)
                         initdict();
-                    }
                     outbytes = outnext;
                     outbits = outbytes+8;
                     outnext += dict.curbits;
@@ -340,14 +379,14 @@ size32_t CLZWCompressor::write(const void *buf,size32_t buflen)
                 break;
             }
             if (dict.dictparent[index] == curcode &&
-                dict.dictchar[index] == (unsigned char)ch) {
+                dict.dictchar[index] == (unsigned char)ch)
+            {
                 curcode = dictcode[index];
                 break;
             }
             index--;
-            if (index<0) {
+            if (index<0)
                 index = LZW_HASH_TABLE_SIZE-1;
-            }
         }
     }
     inlen += buflen;
@@ -366,7 +405,8 @@ void CLZWCompressor::commitblock()
 
 void CLZWCompressor::close()
 {
-    if (dict.curbits) {
+    if (dict.curbits)
+    {
         PUTCODE(curcode);
         flushbuf();
         dict.curbits = 0;
@@ -386,6 +426,11 @@ void CLZWCompressor::close()
         st_totread += (inlen+511)/1024;
         st_totblocks++;
 #endif
+        if (outBufMb)
+        {
+            outBufMb->setWritePos(outBufStart+outlen);
+            outBufMb = NULL;
+        }
     }
 }
 
@@ -1227,7 +1272,9 @@ class jlib_decl CRDiffCompressor : public CInterface, public ICompressor
     size32_t bufalloc;
     size32_t remaining;
     void *outbuf;
-    unsigned char *out; 
+    unsigned char *out;
+    MemoryBuffer *outBufMb;
+    size32_t outBufStart;
 
     size32_t recsize;       // assumed fixed length rows
     // assumes a transaction is a record
@@ -1235,6 +1282,25 @@ class jlib_decl CRDiffCompressor : public CInterface, public ICompressor
     size32_t maxrecsize;  // maximum size diff compress 
     unsigned char *prev;
 
+    void initCommon()
+    {
+        inlen = 0;
+        memset(outbuf, 0, sizeof(size32_t)*2);
+        outlen = sizeof(size32_t)*2;
+        out = (byte *)outbuf+outlen;
+        free(prev);
+        prev = NULL;
+    }
+    inline void ensure(size32_t sz)
+    {
+        if (NULL == outBufMb)
+            throw MakeStringException(-3,"CRDiffCompressor row doesn't fit in buffer!");
+        dbgassertex(remaining<sz);
+        verifyex(outBufMb->ensureCapacity(outBufMb->capacity()+(sz-remaining)));
+        outbuf = ((byte *)outBufMb->bufferBase())+outBufStart;
+        out = (byte *)outbuf+outlen;
+        remaining = outBufMb->capacity()-outlen;
+    }
 public:
     IMPLEMENT_IINTERFACE;
     CRDiffCompressor()
@@ -1245,8 +1311,9 @@ public:
         recsize = 0;
         bufalloc = 0;
         prev = NULL;
+        outBufMb = NULL;
     }
-        
+
     ~CRDiffCompressor()
     {
         free(prev);
@@ -1254,36 +1321,48 @@ public:
             free(outbuf);
     }
 
+    void open(MemoryBuffer &mb, size32_t initialSize)
+    {
+        outBufMb = &mb;
+        outBufStart = mb.length();
+        outbuf = (byte *)outBufMb->ensureCapacity(initialSize);
+        bufalloc = 0;
+        initCommon();
+        remaining = outBufMb->capacity()-outlen;
+    }
+
     void open(void *buf,size32_t max)
     {
-        if (buf) {
-            if (bufalloc) {
+        if (buf)
+        {
+            if (bufalloc)
                 free(outbuf);
-            }
             bufalloc = 0;
             outbuf = buf;
         }
-        else if (max>bufalloc) {
+        else if (max>bufalloc)
+        {
             if (bufalloc)
                 free(outbuf);
             bufalloc = max;
             outbuf = malloc(bufalloc);
         }
+        outBufMb = NULL;
         ASSERT(max>2+sizeof(size32_t)*2); // minimum required (actually will need enough for recsize so only a guess)
-        inlen = 0;
-        memset(outbuf,0,sizeof(size32_t)*2);
-        outlen = sizeof(size32_t)*2;
+        initCommon();
         remaining = max-outlen;
-        out = (unsigned char *)outbuf+sizeof(size32_t)*2;
-        free(prev);
-        prev = NULL;
     }
 
     void close()
     {
         transbuf.clear();
         memcpy(outbuf,&inlen,sizeof(inlen));        // expanded size
-        memcpy((unsigned char *)outbuf+sizeof(inlen),&recsize,sizeof(recsize));
+        memcpy((byte *)outbuf+sizeof(inlen),&recsize,sizeof(recsize));
+        if (outBufMb)
+        {
+            outBufMb->setWritePos(outBufStart+outlen);
+            outBufMb = NULL;
+        }
     }
 
     inline size32_t maxcompsize(size32_t s) { return s+((s+254)/255)*2; }
@@ -1291,14 +1370,22 @@ public:
     size32_t write(const void *buf,size32_t buflen)
     {
         // assumes a transaction is a row and at least one row fits in
-        if (prev) {
-            if ((transbuf.length()==0)&&(remaining<maxrecsize))  // this is a bit odd because no incremental diffcomp
-                return 0;
+        if (prev)
+        {
+            if (transbuf.length()==0)
+            {
+                if (remaining<maxrecsize)  // this is a bit odd because no incremental diffcomp
+                {
+                    if (NULL == outBufMb)
+                        return 0;
+                }
+            }
             transbuf.append(buflen,buf);
         }
-        else { // first row
-            if (buflen>remaining)
-                throw MakeStringException(-3,"CRDiffCompressor row doesn't fit in buffer!");
+        else // first row
+        {
+            if (remaining<buflen)
+                ensure(buflen);
             memcpy(out,buf,buflen);
             out += buflen;
             outlen += buflen;
@@ -1316,19 +1403,23 @@ public:
 
     void commitblock()
     {
-        if (prev) {
+        if (prev)
+        {
             if (recsize!=transbuf.length())
                 throw MakeStringException(-1,"CRDiffCompressor used with variable sized row");
+            if (remaining<maxrecsize)
+                ensure(maxrecsize-remaining);
             size32_t sz = DiffCompress(transbuf.toByteArray(),out,prev,recsize);
             transbuf.clear();
             out += sz;
             outlen += sz;
             remaining -= sz;
         }
-        else {
+        else
+        {
             recsize = outlen-sizeof(size32_t)*2;
             maxrecsize = maxcompsize(recsize);
-            prev = (unsigned char *)malloc(recsize);
+            prev = (byte *)malloc(recsize);
             memcpy(prev,out-recsize,recsize);
             remaining -= recsize;
         }
@@ -1463,6 +1554,19 @@ class jlib_decl CRandRDiffCompressor : public CInterface, public ICompressor
     size32_t maxdiffsize;
     size32_t recsize;
     size32_t compsize;
+    size32_t outBufStart;
+    MemoryBuffer *outBufMb;
+
+    void initCommon()
+    {
+        header = (RRDheader *)outbuf;
+        inlen = 0;
+        memset(header,0,MIN_RRDHEADER_SIZE);
+        diffbuf.clear();
+        firstrec.clear();
+        firstrle.clear();
+        rowbuf.clear();
+    }
 public:
     IMPLEMENT_IINTERFACE;
     CRandRDiffCompressor()
@@ -1473,6 +1577,8 @@ public:
         max = 0;
         maxdiffsize = 0;
         recsize = 0;
+        outBufStart = 0;
+        outBufMb = NULL;
     }
         
     ~CRandRDiffCompressor()
@@ -1481,6 +1587,15 @@ public:
             free(outbuf);
     }
 
+    void open(MemoryBuffer &mb, size32_t initialSize)
+    {
+        outBufMb = &mb;
+        outBufStart = mb.length();
+        outbuf = (byte *)outBufMb->ensureCapacity(initialSize);
+        bufalloc = 0;
+        initCommon();
+    }
+
     void open(void *buf,size32_t _max)
     {
         max = _max;
@@ -1497,14 +1612,9 @@ public:
             bufalloc = max;
             outbuf = malloc(bufalloc);
         }
+        outBufMb = NULL;
         ASSERT(max>MIN_RRDHEADER_SIZE+sizeof(unsigned short)+3); // hopefully a lot bigger!
-        header = (RRDheader *)outbuf;
-        inlen = 0;
-        memset(header,0,MIN_RRDHEADER_SIZE);
-        diffbuf.clear();
-        firstrec.clear();
-        firstrle.clear();
-        rowbuf.clear();
+        initCommon();
     }
 
     void close()
@@ -1513,6 +1623,12 @@ public:
         ASSERT((size32_t)(header->totsize+header->firstrlesize)<=max);
         unsigned short hofs = header->hsize();
         ASSERT(header->totsize==hofs+diffbuf.length());
+        if (outBufMb)
+        {
+            outbuf = (byte *)outBufMb->ensureCapacity(header->totsize+header->firstrlesize);
+            outBufMb->setWritePos(outBufStart+header->totsize+header->firstrlesize);
+            outBufMb = NULL;
+        }
         byte *out = (byte *)outbuf+hofs;
         memcpy(out,diffbuf.toByteArray(),diffbuf.length());
         out += diffbuf.length();
@@ -2299,12 +2415,13 @@ ICompressedFileIO *createCompressedFileWriter(IFile *file,size32_t recordsize,bo
 class CAESCompressor : public CInterface, implements ICompressor
 {
     Owned<ICompressor> comp;    // base compressor
-    MemoryAttr compattr;        // compressed buffer
+    MemoryBuffer compattr;      // compressed buffer
     MemoryAttr outattr;         // compressed and encrypted (if outblk NULL)
     void *outbuf;               // dest
     size32_t outlen;
     size32_t outmax;
     MemoryAttr key;
+    MemoryBuffer *outBufMb;
 
 public:
     IMPLEMENT_IINTERFACE;
@@ -2313,6 +2430,17 @@ public:
     {
         comp.setown(createLZWCompressor(true));
         outlen = 0;
+        outmax = 0;
+        outBufMb = NULL;
+    }
+
+    void open(MemoryBuffer &mb, size32_t initialSize)
+    {
+        outlen = 0;
+        outmax = initialSize;
+        outbuf = NULL;
+        outBufMb = &mb;
+        comp->open(compattr, initialSize);
     }
 
     void open(void *blk,size32_t blksize)
@@ -2323,23 +2451,29 @@ public:
             outbuf = blk;
         else
             outbuf = outattr.allocate(blksize);
+        outBufMb = NULL;
         size32_t subsz = blksize-AES_PADDING_SIZE-sizeof(size32_t);
-        comp->open(compattr.allocate(subsz),subsz);
+        comp->open(compattr.reserveTruncate(subsz),subsz);
     }
 
     void close()
     {
         comp->close();
         // now encrypt
-        byte *p = (byte *)comp->bufptr();
-        size32_t bl = comp->buflen();
         MemoryBuffer buf;
         aesEncrypt(key.get(), key.length(), comp->bufptr(), comp->buflen(), buf);
         outlen = buf.length();
+        if (outBufMb)
+        {
+            outmax = sizeof(size32_t)+outlen;
+            outbuf = outBufMb->reserveTruncate(outmax);
+            outBufMb = NULL;
+        }
         memcpy(outbuf,&outlen,sizeof(size32_t));
         outlen += sizeof(size32_t);
         assertex(outlen<=outmax);
         memcpy((byte *)outbuf+sizeof(size32_t),buf.bufferBase(),buf.length());
+        outmax = 0;
     }
 
     size32_t write(const void *buf,size32_t len)
@@ -2347,15 +2481,15 @@ public:
         return comp->write(buf,len);
     }
 
-
-
     void * bufptr()
     {
+        assertex(0 == outmax); // i.e. closed
         return outbuf;
     }
 
     size32_t buflen()
     {
+        assertex(0 == outmax); // i.e. closed
         return outlen;
     }
 
@@ -2565,10 +2699,18 @@ MODULE_INIT(INIT_PRIORITY_STANDARD)
         virtual ICompressor *getCompressor(const char *options) { return createRDiffCompressor(); }
         virtual IExpander *getExpander(const char *options) { return createRDiffExpander(); }
     };
+    class CLZWCompressHandler : public CCompressHandlerBase
+    {
+    public:
+        CLZWCompressHandler() : CCompressHandlerBase("LZW") { }
+        virtual ICompressor *getCompressor(const char *options) { return createLZWCompressor(true); }
+        virtual IExpander *getExpander(const char *options) { return createLZWExpander(true); }
+    };
     ICompressHandler *flzCompressor = new CFLZCompressHandler();
     addCompressorHandler(flzCompressor);
     addCompressorHandler(new CAESCompressHandler());
     addCompressorHandler(new CDiffCompressHandler());
+    addCompressorHandler(new CLZWCompressHandler());
     defaultCompressor.set(flzCompressor);
     return true;
 }

+ 2 - 1
system/jlib/jlzw.hpp

@@ -26,7 +26,8 @@
 
 interface jlib_decl ICompressor : public IInterface
 {
-    virtual void   open(void *blk,size32_t blksize)=0;                   // fixed size output
+    virtual void   open(MemoryBuffer &mb, size32_t initialSize=0)=0; // variable internally sized buffer
+    virtual void   open(void *blk, size32_t blksize)=0;              // fixed size output
     virtual void   close()=0;
     virtual size32_t write(const void *buf,size32_t len)=0;             
                                                                             

+ 5 - 1
system/jlib/jlzw.ipp

@@ -46,6 +46,7 @@ public:
 
     CLZWCompressor(bool _supportbigendian);
     virtual         ~CLZWCompressor();
+    virtual void    open(MemoryBuffer &mb, size32_t initialSize);
     virtual void    open(void *blk,size32_t blksize);
     virtual void    close();
     virtual size32_t    write(const void *buf,size32_t len);
@@ -56,6 +57,8 @@ public:
 
 protected:
     void flushbuf();
+    void initCommon();
+    void ensure(size32_t sz);
     virtual void initdict();
     size32_t inlen;
     size32_t inlenblk;
@@ -67,10 +70,11 @@ protected:
     unsigned char *outbytes;  // byte output
     unsigned char *outbits;   // for trailing bits
     unsigned char *outnext;   // next block
-    unsigned char *inlast;    // for xoring
     unsigned char inuseflag;
     unsigned char outbitbuf;
     unsigned curShift;
+    size32_t outBufStart;
+    MemoryBuffer *outBufMb;
 
     LZWDictionary dict;
     unsigned char dictinuse[LZW_HASH_TABLE_SIZE];

+ 45 - 0
testing/regress/ecl/issue13863.ecl

@@ -0,0 +1,45 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2015 HPCC Systems.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+
+r := { unsigned id1, unsigned id2 };
+ro := { unsigned id, DATASET(r) child };
+
+
+
+ro t2(ro main) := TRANSFORM
+
+    ds := main.child;
+    r t(ds l) := TRANSFORM
+        oneIds := ds(id1 = 1);
+
+        SELF.id2 := oneIds[1].id2;
+        SELF := l;
+    END;
+
+    p := PROJECT(ds, t(LEFT));
+    f := p(id1 != 1);
+
+    SELF.child := f;
+    SELF := main;
+END;
+
+ds(unsigned base) := DATASET(10, TRANSFORM(r, SELF.id1 := COUNTER, SELF.id2 := ((base + COUNTER) % 4)));
+
+ds2 := DATASET(10, TRANSFORM(ro, SELF.id := COUNTER, SELF.child := ds(COUNTER)));
+
+output(PROJECT(ds2, t2(LEFT)));

+ 12 - 0
testing/regress/ecl/key/issue13863.xml

@@ -0,0 +1,12 @@
+<Dataset name='Result 1'>
+ <Row><id>1</id><child><Row><id1>2</id1><id2>2</id2></Row><Row><id1>3</id1><id2>2</id2></Row><Row><id1>4</id1><id2>2</id2></Row><Row><id1>5</id1><id2>2</id2></Row><Row><id1>6</id1><id2>2</id2></Row><Row><id1>7</id1><id2>2</id2></Row><Row><id1>8</id1><id2>2</id2></Row><Row><id1>9</id1><id2>2</id2></Row><Row><id1>10</id1><id2>2</id2></Row></child></Row>
+ <Row><id>2</id><child><Row><id1>2</id1><id2>3</id2></Row><Row><id1>3</id1><id2>3</id2></Row><Row><id1>4</id1><id2>3</id2></Row><Row><id1>5</id1><id2>3</id2></Row><Row><id1>6</id1><id2>3</id2></Row><Row><id1>7</id1><id2>3</id2></Row><Row><id1>8</id1><id2>3</id2></Row><Row><id1>9</id1><id2>3</id2></Row><Row><id1>10</id1><id2>3</id2></Row></child></Row>
+ <Row><id>3</id><child><Row><id1>2</id1><id2>0</id2></Row><Row><id1>3</id1><id2>0</id2></Row><Row><id1>4</id1><id2>0</id2></Row><Row><id1>5</id1><id2>0</id2></Row><Row><id1>6</id1><id2>0</id2></Row><Row><id1>7</id1><id2>0</id2></Row><Row><id1>8</id1><id2>0</id2></Row><Row><id1>9</id1><id2>0</id2></Row><Row><id1>10</id1><id2>0</id2></Row></child></Row>
+ <Row><id>4</id><child><Row><id1>2</id1><id2>1</id2></Row><Row><id1>3</id1><id2>1</id2></Row><Row><id1>4</id1><id2>1</id2></Row><Row><id1>5</id1><id2>1</id2></Row><Row><id1>6</id1><id2>1</id2></Row><Row><id1>7</id1><id2>1</id2></Row><Row><id1>8</id1><id2>1</id2></Row><Row><id1>9</id1><id2>1</id2></Row><Row><id1>10</id1><id2>1</id2></Row></child></Row>
+ <Row><id>5</id><child><Row><id1>2</id1><id2>2</id2></Row><Row><id1>3</id1><id2>2</id2></Row><Row><id1>4</id1><id2>2</id2></Row><Row><id1>5</id1><id2>2</id2></Row><Row><id1>6</id1><id2>2</id2></Row><Row><id1>7</id1><id2>2</id2></Row><Row><id1>8</id1><id2>2</id2></Row><Row><id1>9</id1><id2>2</id2></Row><Row><id1>10</id1><id2>2</id2></Row></child></Row>
+ <Row><id>6</id><child><Row><id1>2</id1><id2>3</id2></Row><Row><id1>3</id1><id2>3</id2></Row><Row><id1>4</id1><id2>3</id2></Row><Row><id1>5</id1><id2>3</id2></Row><Row><id1>6</id1><id2>3</id2></Row><Row><id1>7</id1><id2>3</id2></Row><Row><id1>8</id1><id2>3</id2></Row><Row><id1>9</id1><id2>3</id2></Row><Row><id1>10</id1><id2>3</id2></Row></child></Row>
+ <Row><id>7</id><child><Row><id1>2</id1><id2>0</id2></Row><Row><id1>3</id1><id2>0</id2></Row><Row><id1>4</id1><id2>0</id2></Row><Row><id1>5</id1><id2>0</id2></Row><Row><id1>6</id1><id2>0</id2></Row><Row><id1>7</id1><id2>0</id2></Row><Row><id1>8</id1><id2>0</id2></Row><Row><id1>9</id1><id2>0</id2></Row><Row><id1>10</id1><id2>0</id2></Row></child></Row>
+ <Row><id>8</id><child><Row><id1>2</id1><id2>1</id2></Row><Row><id1>3</id1><id2>1</id2></Row><Row><id1>4</id1><id2>1</id2></Row><Row><id1>5</id1><id2>1</id2></Row><Row><id1>6</id1><id2>1</id2></Row><Row><id1>7</id1><id2>1</id2></Row><Row><id1>8</id1><id2>1</id2></Row><Row><id1>9</id1><id2>1</id2></Row><Row><id1>10</id1><id2>1</id2></Row></child></Row>
+ <Row><id>9</id><child><Row><id1>2</id1><id2>2</id2></Row><Row><id1>3</id1><id2>2</id2></Row><Row><id1>4</id1><id2>2</id2></Row><Row><id1>5</id1><id2>2</id2></Row><Row><id1>6</id1><id2>2</id2></Row><Row><id1>7</id1><id2>2</id2></Row><Row><id1>8</id1><id2>2</id2></Row><Row><id1>9</id1><id2>2</id2></Row><Row><id1>10</id1><id2>2</id2></Row></child></Row>
+ <Row><id>10</id><child><Row><id1>2</id1><id2>3</id2></Row><Row><id1>3</id1><id2>3</id2></Row><Row><id1>4</id1><id2>3</id2></Row><Row><id1>5</id1><id2>3</id2></Row><Row><id1>6</id1><id2>3</id2></Row><Row><id1>7</id1><id2>3</id2></Row><Row><id1>8</id1><id2>3</id2></Row><Row><id1>9</id1><id2>3</id2></Row><Row><id1>10</id1><id2>3</id2></Row></child></Row>
+</Dataset>

+ 1 - 2
thorlcr/activities/hashdistrib/thhashdistribslave.cpp

@@ -226,8 +226,7 @@ protected:
             size32_t compSz = 0;
             size32_t dstPos = dstMb.length();
             dstMb.append(compSz); // placeholder
-            void *dst = dstMb.reserve(owner.bucketSendSize * 2); // allow for worst case
-            compressor.open(dst, owner.bucketSendSize * 2);
+            compressor.open(dstMb, owner.bucketSendSize * 2);
             loop
             {
                 OwnedConstThorRow row = nextRow();

+ 0 - 3
thorlcr/thorutil/thbuf.cpp

@@ -478,9 +478,6 @@ public:
         size32_t sz = 0;
         if (row) {
             sz = thorRowMemoryFootprint(rowIf->queryRowSerializer(), row);
-#ifdef _DEBUG
-            assertex(sz<0x1000000);
-#endif
             assertex((int)sz>=0); // trap invalid sizes a bit earlier
 
 #ifdef _TRACE_SMART_PUTGET