Browse Source

Merge pull request #10494 from jakesmith/hpcc-18515

HPCC-18515 Dafilesrv remote read streaming

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 7 years ago
parent
commit
b6a0769d41

+ 1 - 0
common/fileview2/fvresultset.cpp

@@ -18,6 +18,7 @@
 #include "platform.h"
 #include "jliball.hpp"
 #include "rtlbcd.hpp"
+#include "rtlformat.hpp"
 #include "workunit.hpp"
 #include "seclib.hpp"
 #include "eclrtl.hpp"

+ 2 - 0
common/remote/CMakeLists.txt

@@ -59,6 +59,7 @@ include_directories (
          ./../../rtl/eclrtl
          ./../../system/security/securesocket
          ./../../testing/unittests
+         ./../../rtl/include
     )
 
 ADD_DEFINITIONS( -D_USRDLL -DREMOTE_EXPORTS )
@@ -67,6 +68,7 @@ HPCC_ADD_LIBRARY( remote SHARED ${SRCS}  )
 install ( TARGETS remote RUNTIME DESTINATION ${EXEC_DIR} LIBRARY DESTINATION ${LIB_DIR} )
 
 target_link_libraries ( remote 
+    eclrtl
     jlib
     jhtree 
     mp

+ 435 - 6
common/remote/sockfile.cpp

@@ -41,6 +41,16 @@
 #include "remoteerr.hpp"
 #include <atomic>
 
+#include "rtldynfield.hpp"
+#include "rtlds_imp.hpp"
+#include "rtlread_imp.hpp"
+#include "rtlrecord.hpp"
+#include "eclhelper_dyn.hpp"
+
+#include "rtlcommon.hpp"
+#include "rtlformat.hpp"
+
+
 #define SOCKET_CACHE_MAX 500
 
 #define MIN_KEYFILTSUPPORT_VERSION 20
@@ -53,6 +63,13 @@
 #define TREECOPYPOLLTIME  (60*1000*5)      // for tracing that delayed
 #define TREECOPYPRUNETIME (24*60*60*1000)  // 1 day
 
+static const unsigned __int64 defaultFileStreamChooseN = I64C(0x7fffffffffffffff); // constant should be move to common place (see eclhelper.hpp)
+static const unsigned __int64 defaultFileStreamSkipN = 0;
+static const unsigned __int64 defaultFileStreamRowLimit = (unsigned __int64) -1;
+static const unsigned __int64 defaultDaFSNumRecs = 100;
+enum OutputFormat { outFmt_Binary, outFmt_Xml, outFmt_Json };
+
+
 #if SIMULATE_PACKETLOSS
 
 #define TESTING_FAILURE_RATE_LOST_SEND  10 // per 1000
@@ -326,6 +343,7 @@ enum {
     RFCreadfilteredindex,
     RFCreadfilteredindexcount,
     RFCreadfilteredindexblob,
+    RFCStreamRead = '{',
     RFCmax,
     RFCunknown = 255 // 0 would have been more sensible, but can't break backward compatibility
 };
@@ -568,12 +586,14 @@ inline MemoryBuffer & initSendBuffer(MemoryBuffer & buff)
     return buff;
 }
 
-inline void sendBuffer(ISocket * socket, MemoryBuffer & src)
+inline void sendBuffer(ISocket * socket, MemoryBuffer & src, bool testSocketFlag=false)
 {
     unsigned length = src.length() - sizeof(unsigned);
     byte * buffer = (byte *)src.toByteArray();
     if (TF_TRACE_FULL)
         PROGLOG("sendBuffer size %d, data = %d %d %d %d",length, (int)buffer[4],(int)buffer[5],(int)buffer[6],(int)buffer[7]);
+    if (testSocketFlag)
+        length |= 0x80000000;
     _WINCPYREV(buffer, &length, sizeof(unsigned));
     SOCKWRITE(socket)(buffer, src.length());
 }
@@ -3679,6 +3699,7 @@ inline void appendCmdErr(MemoryBuffer &reply, RemoteFileCommandType e, int code,
 
 #define MAPCOMMAND(c,p) case c: { ret = this->p(msg, reply) ; break; }
 #define MAPCOMMANDCLIENT(c,p,client) case c: { ret = this->p(msg, reply, client); break; }
+#define MAPCOMMANDCLIENTTESTSOCKET(c,p,client) case c: { ret = this->p(msg, reply, client); testSocketFlag = true; break; }
 #define MAPCOMMANDCLIENTTHROTTLE(c,p,client,throttler) case c: { ret = this->p(msg, reply, client, throttler); break; }
 #define MAPCOMMANDSTATS(c,p,stats) case c: { ret = this->p(msg, reply, stats); break; }
 #define MAPCOMMANDCLIENTSTATS(c,p,client,stats) case c: { ret = this->p(msg, reply, client, stats); break; }
@@ -3816,17 +3837,247 @@ public:
     }
 };
 
+interface IRemoteActivity : extends IInterface
+{
+    virtual const void *nextRow(size32_t &sz) = 0;
+    virtual unsigned __int64 queryProcessed() const = 0;
+    virtual IOutputMetaData *queryOutputMeta() const = 0;
+    virtual StringBuffer &getInfoStr(StringBuffer &out) const = 0;
+    virtual void serializeCursor(MemoryBuffer &tgt) const = 0;
+    virtual void restoreCursor(MemoryBuffer &src) = 0;
+};
+
 enum OpenFileFlag { of_null=0x0, of_key=0x01 };
 struct OpenFileInfo
 {
     OpenFileInfo() { }
     OpenFileInfo(int _handle, IFileIO *_fileIO, StringAttrItem *_filename) : handle(_handle), fileIO(_fileIO), filename(_filename) { }
+    OpenFileInfo(int _handle, IRemoteActivity *_activity, StringAttrItem *_filename) : handle(_handle), activity(_activity), filename(_filename) { }
     Linked<IFileIO> fileIO;
+    Linked<IRemoteActivity> activity;
     Linked<StringAttrItem> filename; // for debug
     int handle = 0;
     unsigned flags = 0;
 };
 
+
+
+static IOutputMetaData *getTypeInfoOutputMetaData(IPropertyTree &actNode, const char *typePropName)
+{
+    IPropertyTree *inputJson = actNode.queryPropTree(typePropName);
+    if (inputJson)
+        return createTypeInfoOutputMetaData(*inputJson);
+    else
+    {
+        StringBuffer binTypePropName(typePropName);
+        MemoryBuffer mb;
+        actNode.getPropBin(binTypePropName.append("Bin").str(), mb);
+        return createTypeInfoOutputMetaData(mb);
+    }
+}
+
+class CRemoteDiskReadActivity : public CSimpleInterfaceOf<IRemoteActivity>
+{
+    StringAttr fileName;
+    Linked<IHThorDiskReadArg> helper;
+    MemoryBuffer resultBuffer;
+    MemoryBufferBuilder *outBuilder = nullptr;
+    CThorContiguousRowBuffer prefetchBuffer;
+    IArrayOf<IKeySegmentMonitor> segMonitors;
+    Owned<ISourceRowPrefetcher> prefetcher;
+    Owned<ISerialStream> inputStream;
+    Owned<IFileIO> iFileIO;
+    Linked<IOutputMetaData> outMeta;
+    unsigned __int64 chooseN = 0;
+    unsigned __int64 limit = 0;
+    unsigned __int64 processed = 0;
+    unsigned __int64 startPos = 0;
+    bool opened = false;
+    bool eofSeen = false;
+    bool cursorDirty = false;
+    bool canMatchAny = false;
+    bool needTransform = true;
+
+    void checkOpen()
+    {
+        if (opened)
+        {
+            if (!cursorDirty)
+                return;
+            if (prefetchBuffer.tell() != startPos)
+            {
+                inputStream->reset(startPos);
+                prefetchBuffer.clearStream();
+                prefetchBuffer.setStream(inputStream);
+                eofSeen = !canMatchAny;
+            }
+            cursorDirty = false;
+            return;
+        }
+        if (!canMatchAny)
+            eofSeen = true;
+        else
+        {
+            const char *fileName = helper->getFileName();
+
+            OwnedIFile iFile = createIFile(fileName);
+
+#if 0
+            bool compressed = false; // isCompressedFile(iFileIO); // Should be passed with JSON
+            StringBuffer encryptionkey;
+            if (compressed)
+            {
+                Owned<IExpander> eexp;
+                if (encryptionkey.length()!=0)
+                    eexp.setown(createAESExpander256((size32_t)encryptionkey.length(),encryptionkey.bufferBase()));
+                iFileIO.setown(createCompressedFileReader(iFile,eexp));
+                if(!iFileIO && !blockcompressed) //fall back to old decompression, unless dfs marked as new
+                {
+                    iFileIO.setown(iFile->open(IFOread));
+                    if(iFileIO)
+                        rowcompressed = true;
+                }
+            }
+            else
+#endif
+                iFileIO.setown(iFile->open(IFOread));
+            if (!iFileIO)
+                throw MakeStringException(0, "Failed to open: '%s'", fileName);
+
+            inputStream.setown(createFileSerialStream(iFileIO, startPos));
+            prefetchBuffer.setStream(inputStream);
+            prefetcher.setown(helper->queryDiskRecordSize()->createDiskPrefetcher(nullptr, 0));
+
+            outBuilder = new MemoryBufferBuilder(resultBuffer, helper->queryOutputMeta()->getMinRecordSize());
+            chooseN = helper->getChooseNLimit();
+            limit = helper->getRowLimit();
+        }
+
+        opened = true;
+    }
+    void close()
+    {
+        iFileIO.clear();
+        opened = false;
+    }
+    bool segMonitorsMatch(const void *row) { return true; }
+public:
+    CRemoteDiskReadActivity(IHThorDiskReadArg &_helper)
+        : helper(&_helper), prefetchBuffer(nullptr)
+    {
+        outMeta.set(helper->queryOutputMeta());
+        canMatchAny = helper->canMatchAny();
+    }
+    ~CRemoteDiskReadActivity()
+    {
+        if (outBuilder)
+            delete outBuilder;
+    }
+    virtual const void *nextRow(size32_t &retSz) override
+    {
+        checkOpen();
+        if (needTransform)
+        {
+            while (!eofSeen && ((chooseN == 0) || (processed < chooseN)))
+            {
+                while (!prefetchBuffer.eos())
+                {
+                    prefetcher->readAhead(prefetchBuffer);
+                    const byte * next = prefetchBuffer.queryRow();
+                    size32_t rowSz; // use local var instead of reference param for efficiency
+                    if (segMonitorsMatch(next))
+                        rowSz = helper->transform(*outBuilder, next);
+                    else
+                        rowSz = 0;
+                    prefetchBuffer.finishedRow();
+
+                    if (rowSz)
+                    {
+                        if (processed >=limit)
+                        {
+                            resultBuffer.clear();
+                            helper->onLimitExceeded();
+                            return nullptr;
+                        }
+                        retSz = rowSz;
+                        processed++;
+                        return resultBuffer.toByteArray();
+                    }
+                }
+                eofSeen = true;
+            }
+        }
+        close();
+        retSz = 0;
+        return nullptr;
+    }
+// IRemoteActivity impl.
+    virtual void serializeCursor(MemoryBuffer &tgt) const override
+    {
+        tgt.append(prefetchBuffer.tell());
+        tgt.append(processed);
+    }
+    virtual void restoreCursor(MemoryBuffer &src) override
+    {
+        cursorDirty = true;
+        src.read(startPos);
+        src.read(processed);
+    }
+    virtual unsigned __int64 queryProcessed() const override
+    {
+        return processed;
+    }
+    virtual IOutputMetaData *queryOutputMeta() const override
+    {
+        return outMeta;
+    }
+    virtual StringBuffer &getInfoStr(StringBuffer &out) const override
+    {
+        return out.appendf("diskread[%s]", helper->getFileName());
+    }
+};
+
+IRemoteActivity *createRemoteDiskRead(IPropertyTree &actNode)
+{
+    const char *fileName = actNode.queryProp("fileName");
+    unsigned __int64 chooseN = actNode.getPropInt64("choosen", defaultFileStreamChooseN);
+    unsigned __int64 skipN = actNode.getPropInt64("skipN", defaultFileStreamSkipN);
+    unsigned __int64 rowLimit = actNode.getPropInt64("rowLimit", defaultFileStreamRowLimit);
+    Owned<IOutputMetaData> inMeta = getTypeInfoOutputMetaData(actNode, "input");
+    Owned<IOutputMetaData> outMeta = getTypeInfoOutputMetaData(actNode, "output");
+    Owned<IHThorDiskReadArg> helper = createDiskReadArg(fileName, inMeta.getClear(), outMeta.getClear(), chooseN, skipN, rowLimit);
+    return new CRemoteDiskReadActivity(*helper);
+}
+
+IRemoteActivity *createRemoteActivity(IPropertyTree &actNode)
+{
+    const char *kindStr = actNode.queryProp("kind");
+
+    ThorActivityKind kind = TAKnone;
+    if (strieq("diskread", kindStr))
+        kind = TAKdiskread;
+    Owned<IRemoteActivity> activity;
+    switch (kind)
+    {
+        case TAKdiskread:
+        {
+            activity.setown(createRemoteDiskRead(actNode));
+            break;
+        }
+        default:
+            throwUnexpected(); // for now
+
+    }
+    return activity.getClear();
+}
+
+IRemoteActivity *createOutputActivity(IPropertyTree &requestTree)
+{
+    IPropertyTree *actNode = requestTree.queryPropTree("node");
+    assertex(actNode);
+    return createRemoteActivity(*actNode);
+}
+
 #define MAX_KEYDATA_SZ 0x10000
 
 class CRemoteFileServer : implements IRemoteFileServer, public CInterface
@@ -4037,8 +4288,8 @@ class CRemoteFileServer : implements IRemoteFileServer, public CInterface
         void processCommand(RemoteFileCommandType cmd, MemoryBuffer &msg, CThrottler *throttler)
         {
             MemoryBuffer reply;
-            parent->processCommand(cmd, msg, initSendBuffer(reply), this, throttler);
-            sendBuffer(socket, reply);
+            bool testSocketFlag = parent->processCommand(cmd, msg, initSendBuffer(reply), this, throttler);
+            sendBuffer(socket, reply, testSocketFlag);
         }
 
         bool immediateCommand() // returns false if socket closed or failure
@@ -5642,6 +5893,179 @@ public:
         return false;
     }
 
+    bool cmdStreamReadTestSocket(MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler &client)
+    {
+        unsigned replyPos = reply.length();
+        try
+        {
+            reply.append('J');
+            return cmdStreamRead(msg, reply, client);
+        }
+        catch (IException *)
+        {
+            reply.rewrite(replyPos);
+            reply.append('-');
+            throw;
+        }
+    }
+
+    bool cmdStreamRead(MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler &client)
+    {
+        // this is an attempt to authenticate when we haven't got authentication turned on
+        if (TF_TRACE_CLIENT_STATS)
+        {
+            StringBuffer s(client.queryPeerName());
+            PROGLOG("Connect from %s",s.str());
+        }
+
+        Owned<IPropertyTree> requestTree = createPTreeFromJSONString(msg.length(), msg.toByteArray());
+
+        /* Example JSON request:
+         * {
+         *  "format" : "xml",
+         *  "cursor" : "1234",
+         *  "node" : {
+         *   "kind" : "diskread",
+         *   "fileName": "examplefilename",
+         *   "keyfilter" : "f1='1    '",
+         *   "choosen" : "5",
+         *   "cursor" : "12345", // cursor handle
+         *   "input" : {
+         *    "f1" : "string5",
+         *    "f2" : "string5"
+         *   },
+         *   "output" : {
+         *    "f2" : "string",
+         *    "f1" : "real"
+         *   }
+         *  }
+         * }
+         *
+         */
+
+        const char *outputFmtStr = requestTree->queryProp("format");
+        int cursorHandle = requestTree->getPropInt("cursor");
+        Owned<IPropertyTree> responseTree; // Used if xml/json
+        OutputFormat outputFormat;
+        if (!outputFmtStr || strieq("xml", outputFmtStr))
+            outputFormat = outFmt_Xml;
+        else if (strieq("json", outputFmtStr))
+            outputFormat = outFmt_Json;
+        else
+            outputFormat = outFmt_Binary;
+        if (outFmt_Binary != outputFormat)
+            responseTree.setown(createPTree("Response"));
+
+        MemoryBuffer cursorMb;
+        if (requestTree->getPropBin("cursorBin", cursorMb))
+            cursorMb.setEndian(BIG_ENDIAN);
+
+        Owned<IRemoteActivity> outputActivity;
+        OpenFileInfo fileInfo;
+        if (!cursorHandle)
+        {
+            // In future this may be passed the request and build a chain of activities and return sink.
+            outputActivity.setown(createOutputActivity(*requestTree));
+
+            StringBuffer requestStr("jsonrequest:");
+            outputActivity->getInfoStr(requestStr);
+            Owned<StringAttrItem> name = new StringAttrItem(requestStr);
+
+            CriticalBlock block(sect);
+            cursorHandle = getNextHandle();
+            client.previdx = client.openFiles.ordinality();
+            client.openFiles.append(OpenFileInfo(cursorHandle, outputActivity, name));
+        }
+        else if (!lookupFileIOHandle(cursorHandle, fileInfo))
+            cursorHandle = 0; // challenge response ..
+        else // known handle, continuation
+            outputActivity.set(fileInfo.activity);
+
+        if (outputActivity && cursorMb.length()) // use handle if one provided
+            outputActivity->restoreCursor(cursorMb);
+
+        if (outFmt_Binary != outputFormat)
+            responseTree->setPropInt("cursor", cursorHandle);
+        else
+            reply.append(cursorHandle);
+        if (cursorHandle)
+        {
+            IOutputMetaData *out = outputActivity->queryOutputMeta();
+            unsigned __int64 initProcessed = outputActivity->queryProcessed();
+            bool eoi=false;
+            if (outFmt_Binary == outputFormat)
+            {
+                DelayedSizeMarker dataLenMarker(reply); // data length
+                for (unsigned __int64 i=0; i<defaultDaFSNumRecs; i++)
+                {
+                    size32_t rowSz;
+                    const void *row = outputActivity->nextRow(rowSz);
+                    if (!row)
+                    {
+                        eoi = true;
+                        break;
+                    }
+                    reply.append(rowSz, row);
+                }
+                dataLenMarker.write();
+            }
+            else
+            {
+                CPropertyTreeWriter iptWriter;
+                for (unsigned __int64 i=0; i<defaultDaFSNumRecs; i++)
+                {
+                    size32_t rowSz;
+                    const void *row = outputActivity->nextRow(rowSz);
+                    if (!row)
+                    {
+                        eoi = true;
+                        break;
+                    }
+                    IPropertyTree *rowNode = responseTree->addPropTree("Row");
+                    iptWriter.setRoot(*rowNode);
+                    out->toXML((const byte *)row, iptWriter);
+                }
+            }
+            if (outFmt_Binary != outputFormat)
+            {
+                if (!eoi)
+                {
+                    MemoryBuffer cursorMb;
+                    cursorMb.setEndian(BIG_ENDIAN);
+                    outputActivity->serializeCursor(cursorMb);
+                    responseTree->setPropBin("cursorBin", cursorMb.length(), cursorMb.toByteArray());
+                }
+            }
+            else
+            {
+                DelayedSizeMarker cursorLenMarker(reply); // cursor length
+                if (!eoi)
+                    outputActivity->serializeCursor(reply);
+                cursorLenMarker.write();
+            }
+        }
+        switch (outputFormat)
+        {
+            case outFmt_Xml:
+            {
+                StringBuffer responseXmlStr;
+                toXML(responseTree, responseXmlStr);
+                reply.append(responseXmlStr.length(), responseXmlStr.str());
+                break;
+            }
+            case outFmt_Json:
+            {
+                StringBuffer responseJsonStr;
+                toJSON(responseTree, responseJsonStr);
+                reply.append(responseJsonStr.length(), responseJsonStr.str());
+                break;
+            }
+            default:
+                break;
+        }
+        return true;
+    }
+
     // legacy version
     bool cmdSetThrottle(MemoryBuffer & msg, MemoryBuffer & reply)
     {
@@ -5740,6 +6164,7 @@ public:
             case RFCgetinfo:
             case RFCfirewall:
             case RFCunlock:
+            case RFCStreamRead:
                 stdCmdThrottler.addCommand(cmd, msg, client);
                 return;
             // NB: The following commands are still bound by the the thread pool
@@ -5758,6 +6183,7 @@ public:
     {
         Owned<CClientStats> stats = clientStatsTable.getClientReference(cmd, client->queryPeerName());
         bool ret = true;
+        bool testSocketFlag = false;
         try
         {
             switch(cmd)
@@ -5797,6 +6223,7 @@ public:
                 MAPCOMMAND(RFCgetinfo, cmdGetInfo);
                 MAPCOMMAND(RFCfirewall, cmdFirewall);
                 MAPCOMMANDCLIENT(RFCunlock, cmdUnlock, *client);
+                MAPCOMMANDCLIENTTESTSOCKET(RFCStreamRead, cmdStreamReadTestSocket, *client);
                 MAPCOMMANDCLIENT(RFCcopysection, cmdCopySection, *client);
                 MAPCOMMANDCLIENTTHROTTLE(RFCtreecopy, cmdTreeCopy, *client, &slowCmdThrottler);
                 MAPCOMMANDCLIENTTHROTTLE(RFCtreecopytmp, cmdTreeCopyTmp, *client, &slowCmdThrottler);
@@ -5816,8 +6243,10 @@ public:
             e->Release();
         }
         if (!ret) // append error string
+        {
             appendError(cmd, client, cmd, reply);
-        return ret;
+        }
+        return testSocketFlag;
     }
 
     IPooledThread *createCommandProcessor()
@@ -6059,8 +6488,8 @@ public:
         if (cmd != RFCgetver)
             cmd = RFCinvalid;
         MemoryBuffer reply;
-        processCommand(cmd, msg, initSendBuffer(reply), NULL, NULL);
-        sendBuffer(socket, reply);
+        bool testSocketFlag = processCommand(cmd, msg, initSendBuffer(reply), NULL, NULL);
+        sendBuffer(socket, reply, testSocketFlag);
     }
 
     bool checkAuthentication(ISocket *socket, IAuthenticatedUser *&ret)

+ 1 - 0
common/thorhelper/roxiehelper.hpp

@@ -18,6 +18,7 @@
 #ifndef ROXIEHELPER_HPP
 #define ROXIEHELPER_HPP
 
+#include "rtlformat.hpp"
 #include "thorherror.h"
 #include "thorxmlwrite.hpp"
 #include "roxiehelper.ipp"

+ 1 - 180
common/thorhelper/thorcommon.cpp

@@ -27,6 +27,7 @@
 #include "thorcommon.ipp"
 #include "eclrtl.hpp"
 #include "rtlread_imp.hpp"
+#include "rtlcommon.hpp"
 #include <algorithm>
 #ifdef _USE_NUMA
 #include <numa.h>
@@ -875,186 +876,6 @@ extern bool isActivitySink(ThorActivityKind kind)
 //=====================================================================================================
 
 
-CThorContiguousRowBuffer::CThorContiguousRowBuffer(ISerialStream * _in) : in(_in)
-{
-    buffer = NULL;
-    maxOffset = 0;
-    readOffset = 0;
-}
-
-void CThorContiguousRowBuffer::doRead(size32_t len, void * ptr)
-{
-    ensureAccessible(readOffset + len);
-    memcpy(ptr, buffer+readOffset, len);
-    readOffset += len;
-}
-
-
-size32_t CThorContiguousRowBuffer::read(size32_t len, void * ptr)
-{
-    doRead(len, ptr);
-    return len;
-}
-
-size32_t CThorContiguousRowBuffer::readSize()
-{
-    size32_t value;
-    doRead(sizeof(value), &value);
-    return value;
-}
-
-size32_t CThorContiguousRowBuffer::readPackedInt(void * ptr)
-{
-    size32_t size = sizePackedInt();
-    doRead(size, ptr);
-    return size;
-}
-
-size32_t CThorContiguousRowBuffer::readUtf8(ARowBuilder & target, size32_t offset, size32_t fixedSize, size32_t len)
-{
-    if (len == 0)
-        return 0;
-
-    size32_t size = sizeUtf8(len);
-    byte * self = target.ensureCapacity(fixedSize + size, NULL);
-    doRead(size, self+offset);
-    return size;
-}
-
-size32_t CThorContiguousRowBuffer::readVStr(ARowBuilder & target, size32_t offset, size32_t fixedSize)
-{
-    size32_t size = sizeVStr();
-    byte * self = target.ensureCapacity(fixedSize + size, NULL);
-    doRead(size, self+offset);
-    return size;
-}
-
-size32_t CThorContiguousRowBuffer::readVUni(ARowBuilder & target, size32_t offset, size32_t fixedSize)
-{
-    size32_t size = sizeVUni();
-    byte * self = target.ensureCapacity(fixedSize + size, NULL);
-    doRead(size, self+offset);
-    return size;
-}
-
-
-size32_t CThorContiguousRowBuffer::sizePackedInt()
-{
-    ensureAccessible(readOffset+1);
-    return rtlGetPackedSizeFromFirst(buffer[readOffset]);
-}
-
-size32_t CThorContiguousRowBuffer::sizeUtf8(size32_t len)
-{
-    if (len == 0)
-        return 0;
-
-    //The len is the number of utf characters, size depends on which characters are included.
-    size32_t nextOffset = readOffset;
-    while (len)
-    {
-        ensureAccessible(nextOffset+1);
-
-        for (;nextOffset < maxOffset;)
-        {
-            nextOffset += readUtf8Size(buffer+nextOffset);  // This function only accesses the first byte
-            if (--len == 0)
-                break;
-        }
-    }
-    return nextOffset - readOffset;
-}
-
-size32_t CThorContiguousRowBuffer::sizeVStr()
-{
-    size32_t nextOffset = readOffset;
-    for (;;)
-    {
-        ensureAccessible(nextOffset+1);
-
-        for (; nextOffset < maxOffset; nextOffset++)
-        {
-            if (buffer[nextOffset] == 0)
-                return (nextOffset + 1) - readOffset;
-        }
-    }
-}
-
-size32_t CThorContiguousRowBuffer::sizeVUni()
-{
-    size32_t nextOffset = readOffset;
-    const size32_t sizeOfUChar = 2;
-    for (;;)
-    {
-        ensureAccessible(nextOffset+sizeOfUChar);
-
-        for (; nextOffset+1 < maxOffset; nextOffset += sizeOfUChar)
-        {
-            if (buffer[nextOffset] == 0 && buffer[nextOffset+1] == 0)
-                return (nextOffset + sizeOfUChar) - readOffset;
-        }
-    }
-}
-
-
-void CThorContiguousRowBuffer::reportReadFail()
-{
-    throwUnexpected();
-}
-
-
-const byte * CThorContiguousRowBuffer::peek(size32_t maxSize)
-{
-    if (maxSize+readOffset > maxOffset)
-        doPeek(maxSize+readOffset);
-    return buffer + readOffset;
-}
-
-offset_t CThorContiguousRowBuffer::beginNested()
-{
-    size32_t len = readSize();
-    return len+readOffset;
-}
-
-bool CThorContiguousRowBuffer::finishedNested(offset_t & endPos)
-{
-    return readOffset >= endPos;
-}
-
-void CThorContiguousRowBuffer::skip(size32_t size)
-{ 
-    ensureAccessible(readOffset+size);
-    readOffset += size;
-}
-
-void CThorContiguousRowBuffer::skipPackedInt()
-{
-    size32_t size = sizePackedInt();
-    ensureAccessible(readOffset+size);
-    readOffset += size;
-}
-
-void CThorContiguousRowBuffer::skipUtf8(size32_t len)
-{
-    size32_t size = sizeUtf8(len);
-    ensureAccessible(readOffset+size);
-    readOffset += size;
-}
-
-void CThorContiguousRowBuffer::skipVStr()
-{
-    size32_t size = sizeVStr();
-    ensureAccessible(readOffset+size);
-    readOffset += size;
-}
-
-void CThorContiguousRowBuffer::skipVUni()
-{
-    size32_t size = sizeVUni();
-    ensureAccessible(readOffset+size);
-    readOffset += size;
-}
-
 // ===========================================
 
 IRowInterfaces *createRowInterfaces(IOutputMetaData *meta, unsigned actid, unsigned heapFlags, ICodeContext *context)

+ 0 - 88
common/thorhelper/thorcommon.ipp

@@ -693,94 +693,6 @@ class NullDiskCallback : public IThorDiskCallback, extends CInterface
 
 extern THORHELPER_API size32_t cloneRow(ARowBuilder & rowBuilder, const void * row, IOutputMetaData * meta);
 
-//The CThorContiguousRowBuffer is the source for a readAhead call to ensure the entire row
-//is in a contiguous block of memory.  The read() and skip() functions must be implemented
-class THORHELPER_API CThorContiguousRowBuffer : implements IRowDeserializerSource
-{
-public:
-    CThorContiguousRowBuffer(ISerialStream * _in);
-
-    inline void setStream(ISerialStream *_in) { in.set(_in); maxOffset = 0; readOffset = 0; }
-
-    virtual const byte * peek(size32_t maxSize);
-    virtual offset_t beginNested();
-    virtual bool finishedNested(offset_t & len);
-
-    virtual size32_t read(size32_t len, void * ptr);
-    virtual size32_t readSize();
-    virtual size32_t readPackedInt(void * ptr);
-    virtual size32_t readUtf8(ARowBuilder & target, size32_t offset, size32_t fixedSize, size32_t len);
-    virtual size32_t readVStr(ARowBuilder & target, size32_t offset, size32_t fixedSize);
-    virtual size32_t readVUni(ARowBuilder & target, size32_t offset, size32_t fixedSize);
-
-    //These shouldn't really be called since this class is meant to be used for a deserialize.
-    //If we allowed padding/alignment fields in the input then the first function would make sense.
-    virtual void skip(size32_t size);
-    virtual void skipPackedInt();
-    virtual void skipUtf8(size32_t len);
-    virtual void skipVStr();
-    virtual void skipVUni();
-
-    inline bool eos()
-    {
-        return in->eos();
-    }
-
-    inline offset_t tell()
-    {
-        return in->tell();
-    }
-
-    inline void clearStream()
-    {
-        in.clear();
-        maxOffset = 0;
-        readOffset = 0;
-    }
-
-    inline const byte * queryRow() { return buffer; }
-    inline size32_t queryRowSize() { return readOffset; }
-    inline void finishedRow()
-    {
-        if (readOffset)
-            in->skip(readOffset);
-        maxOffset = 0;
-        readOffset = 0;
-    }
-
-
-protected:
-    size32_t sizePackedInt();
-    size32_t sizeUtf8(size32_t len);
-    size32_t sizeVStr();
-    size32_t sizeVUni();
-    void reportReadFail();
-
-private:
-    inline void doPeek(size32_t maxSize)
-    {
-        buffer = static_cast<const byte *>(in->peek(maxSize, maxOffset));
-    }
-
-    void doRead(size32_t len, void * ptr);
-
-    inline void ensureAccessible(size32_t required)
-    {
-        if (required > maxOffset)
-        {
-            doPeek(required);
-            assertex(required <= maxOffset);
-        }
-    }
-
-protected:
-    Linked<ISerialStream> in;
-    const byte * buffer;
-    size32_t maxOffset;
-    size32_t readOffset;
-};
-
-
 //=====================================================================================================
 
 class ChildRowLinkerWalker : implements IIndirectMemberVisitor

+ 1 - 0
common/thorhelper/thorpipe.cpp

@@ -23,6 +23,7 @@
 #include "csvsplitter.hpp"
 #include "rtlread_imp.hpp"
 #include "rtlds_imp.hpp"
+#include "rtlformat.hpp"
 #include "roxiemem.hpp"
 
 using roxiemem::OwnedRoxieString;

+ 2 - 0
common/thorhelper/thorsoapcall.cpp

@@ -19,6 +19,8 @@
 #include "jqueue.tpp"
 #include "jisem.hpp"
 
+#include "rtlformat.hpp"
+
 #include "thorxmlread.hpp"
 #include "thorxmlwrite.hpp"
 #include "thorcommon.ipp"

File diff suppressed because it is too large
+ 1 - 1844
common/thorhelper/thorxmlwrite.cpp


+ 0 - 518
common/thorhelper/thorxmlwrite.hpp

@@ -28,303 +28,6 @@
 #include "jptree.hpp"
 #include "thorhelper.hpp"
 
-interface IXmlStreamFlusher
-{
-    virtual void flushXML(StringBuffer &current, bool isClose) = 0;
-};
-
-interface IXmlWriterExt : extends IXmlWriter
-{
-    virtual IXmlWriterExt & clear() = 0;
-    virtual size32_t length() const = 0;
-    virtual const char *str() const = 0;
-    virtual IInterface *saveLocation() const = 0;
-    virtual void rewindTo(IInterface *location) = 0;
-    virtual void cutFrom(IInterface *location, StringBuffer& databuf) = 0;
-    virtual void outputNumericString(const char *field, const char *fieldname) = 0;
-    virtual void outputInline(const char* text) = 0;
-};
-
-class thorhelper_decl CommonXmlPosition : public CInterface, implements IInterface
-{
-public:
-    IMPLEMENT_IINTERFACE;
-
-    CommonXmlPosition(size32_t _pos, unsigned _indent, unsigned _nestLimit, bool _tagClosed, bool _needDelimiter) :
-        pos(_pos), indent(_indent), nestLimit(_nestLimit), tagClosed(_tagClosed), needDelimiter(_needDelimiter)
-    {}
-
-public:
-    size32_t pos = 0;
-    unsigned indent = 0;
-    unsigned nestLimit = 0;
-    bool tagClosed = false;
-    bool needDelimiter = false;
-};
-
-class thorhelper_decl CommonXmlWriter : implements IXmlWriterExt, public CInterface
-{
-public:
-    CommonXmlWriter(unsigned _flags, unsigned initialIndent=0,  IXmlStreamFlusher *_flusher=NULL);
-    ~CommonXmlWriter();
-    IMPLEMENT_IINTERFACE;
-
-    void outputBeginNested(const char *fieldname, bool nestChildren, bool doIndent);
-    void outputEndNested(const char *fieldname, bool doIndent);
-
-    virtual void outputInlineXml(const char *text){closeTag(); out.append(text); flush(false);} //for appending raw xml content
-    virtual void outputInline(const char* text) { outputInlineXml(text); }
-    virtual void outputQuoted(const char *text);
-    virtual void outputQString(unsigned len, const char *field, const char *fieldname);
-    virtual void outputString(unsigned len, const char *field, const char *fieldname);
-    virtual void outputBool(bool field, const char *fieldname);
-    virtual void outputData(unsigned len, const void *field, const char *fieldname);
-    virtual void outputInt(__int64 field, unsigned size, const char *fieldname);
-    virtual void outputUInt(unsigned __int64 field, unsigned size, const char *fieldname);
-    virtual void outputReal(double field, const char *fieldname);
-    virtual void outputDecimal(const void *field, unsigned size, unsigned precision, const char *fieldname);
-    virtual void outputUDecimal(const void *field, unsigned size, unsigned precision, const char *fieldname);
-    virtual void outputUnicode(unsigned len, const UChar *field, const char *fieldname);
-    virtual void outputUtf8(unsigned len, const char *field, const char *fieldname);
-    virtual void outputBeginDataset(const char *dsname, bool nestChildren);
-    virtual void outputEndDataset(const char *dsname);
-    virtual void outputBeginNested(const char *fieldname, bool nestChildren);
-    virtual void outputEndNested(const char *fieldname);
-    virtual void outputBeginArray(const char *fieldname){}; //repeated elements are inline for xml
-    virtual void outputEndArray(const char *fieldname){};
-    virtual void outputSetAll();
-    virtual void outputXmlns(const char *name, const char *uri);
-
-    //IXmlWriterExt
-    virtual IXmlWriterExt & clear();
-    virtual unsigned length() const                                 { return out.length(); }
-    virtual const char * str() const                                { return out.str(); }
-    virtual IInterface *saveLocation() const
-    {
-        if (flusher)
-            throwUnexpected();
-
-        return new CommonXmlPosition(length(), indent, nestLimit, tagClosed, false);
-    }
-    virtual void rewindTo(IInterface *saved)
-    {
-        if (flusher)
-            throwUnexpected();
-
-        CommonXmlPosition *position = dynamic_cast<CommonXmlPosition *>(saved);
-        if (!position)
-            return;
-        if (position->pos < out.length())
-        {
-            out.setLength(position->pos);
-            tagClosed = position->tagClosed;
-            indent = position->indent;
-            nestLimit = position->nestLimit;
-        }
-    }
-    virtual void cutFrom(IInterface *location, StringBuffer& databuf);
-
-    virtual void outputNumericString(const char *field, const char *fieldname)
-    {
-        outputCString(field, fieldname);
-    }
-
-protected:
-    bool checkForAttribute(const char * fieldname);
-    void closeTag();
-    inline void flush(bool isClose)
-    {
-        if (flusher)
-            flusher->flushXML(out, isClose);
-    }
-
-protected:
-    IXmlStreamFlusher *flusher;
-    StringBuffer out;
-    unsigned flags;
-    unsigned indent;
-    unsigned nestLimit;
-    bool tagClosed;
-};
-
-class thorhelper_decl CommonJsonWriter : implements IXmlWriterExt, public CInterface
-{
-public:
-    CommonJsonWriter(unsigned _flags, unsigned initialIndent=0,  IXmlStreamFlusher *_flusher=NULL);
-    ~CommonJsonWriter();
-    IMPLEMENT_IINTERFACE;
-
-    void checkDelimit(int inc=0);
-    void checkFormat(bool doDelimit, bool needDelimiter=true, int inc=0);
-    void prepareBeginArray(const char *fieldname);
-
-    virtual void outputInlineXml(const char *text) //for appending raw xml content
-    {
-        if (text && *text)
-            outputUtf8(strlen(text), text, "xml");
-    }
-    virtual void outputInline(const char* text) { out.append(text); }
-    virtual void outputQuoted(const char *text);
-    virtual void outputQString(unsigned len, const char *field, const char *fieldname);
-    virtual void outputString(unsigned len, const char *field, const char *fieldname);
-    virtual void outputBool(bool field, const char *fieldname);
-    virtual void outputData(unsigned len, const void *field, const char *fieldname);
-    virtual void outputInt(__int64 field, unsigned size, const char *fieldname);
-    virtual void outputUInt(unsigned __int64 field, unsigned size, const char *fieldname);
-    virtual void outputReal(double field, const char *fieldname);
-    virtual void outputDecimal(const void *field, unsigned size, unsigned precision, const char *fieldname);
-    virtual void outputUDecimal(const void *field, unsigned size, unsigned precision, const char *fieldname);
-    virtual void outputUnicode(unsigned len, const UChar *field, const char *fieldname);
-    virtual void outputUtf8(unsigned len, const char *field, const char *fieldname);
-    virtual void outputBeginDataset(const char *dsname, bool nestChildren);
-    virtual void outputEndDataset(const char *dsname);
-    virtual void outputBeginNested(const char *fieldname, bool nestChildren);
-    virtual void outputEndNested(const char *fieldname);
-    virtual void outputBeginArray(const char *fieldname);
-    virtual void outputEndArray(const char *fieldname);
-    virtual void outputSetAll();
-    virtual void outputXmlns(const char *name, const char *uri){}
-    virtual void outputNumericString(const char *field, const char *fieldname);
-
-    //IXmlWriterExt
-    virtual IXmlWriterExt & clear();
-    virtual unsigned length() const                                 { return out.length(); }
-    virtual const char * str() const                                { return out.str(); }
-    virtual void rewindTo(unsigned int prevlen)                     { if (prevlen < out.length()) out.setLength(prevlen); }
-    virtual IInterface *saveLocation() const
-    {
-        if (flusher)
-            throwUnexpected();
-
-        return new CommonXmlPosition(length(), indent, nestLimit, false, needDelimiter);
-    }
-    virtual void rewindTo(IInterface *saved)
-    {
-        if (flusher)
-            throwUnexpected();
-
-        CommonXmlPosition *position = dynamic_cast<CommonXmlPosition *>(saved);
-        if (!position)
-            return;
-        if (position->pos < out.length())
-        {
-            out.setLength(position->pos);
-            needDelimiter = position->needDelimiter;
-            indent = position->indent;
-            nestLimit = position->nestLimit;
-        }
-    }
-    virtual void cutFrom(IInterface *location, StringBuffer& databuf);
-
-    void outputBeginRoot(){out.append('{');}
-    void outputEndRoot(){out.append('}');}
-
-protected:
-    inline void flush(bool isClose)
-    {
-        if (flusher)
-            flusher->flushXML(out, isClose);
-    }
-
-    class CJsonWriterItem : public CInterface
-    {
-    public:
-        CJsonWriterItem(const char *_name) : name(_name), depth(0){}
-
-        StringAttr name;
-        unsigned depth;
-    };
-
-    const char *checkItemName(CJsonWriterItem *item, const char *name, bool simpleType=true);
-    const char *checkItemName(const char *name, bool simpleType=true);
-    const char *checkItemNameBeginNested(const char *name);
-    const char *checkItemNameEndNested(const char *name);
-    bool checkUnamedArrayItem(bool begin);
-
-
-    IXmlStreamFlusher *flusher;
-    CIArrayOf<CJsonWriterItem> arrays;
-    StringBuffer out;
-    unsigned flags;
-    unsigned indent;
-    unsigned nestLimit;
-    bool needDelimiter;
-};
-
-thorhelper_decl StringBuffer &buildJsonHeader(StringBuffer  &header, const char *suppliedHeader, const char *rowTag);
-thorhelper_decl StringBuffer &buildJsonFooter(StringBuffer  &footer, const char *suppliedFooter, const char *rowTag);
-
-//Writes type encoded XML strings  (xsi:type="xsd:string", xsi:type="xsd:boolean" etc)
-class thorhelper_decl CommonEncodedXmlWriter : public CommonXmlWriter
-{
-public:
-    CommonEncodedXmlWriter(unsigned _flags, unsigned initialIndent=0, IXmlStreamFlusher *_flusher=NULL);
-
-    virtual void outputString(unsigned len, const char *field, const char *fieldname);
-    virtual void outputBool(bool field, const char *fieldname);
-    virtual void outputData(unsigned len, const void *field, const char *fieldname);
-    virtual void outputInt(__int64 field, unsigned size, const char *fieldname);
-    virtual void outputUInt(unsigned __int64 field, unsigned size, const char *fieldname);
-    virtual void outputReal(double field, const char *fieldname);
-    virtual void outputDecimal(const void *field, unsigned size, unsigned precision, const char *fieldname);
-    virtual void outputUDecimal(const void *field, unsigned size, unsigned precision, const char *fieldname);
-    virtual void outputUnicode(unsigned len, const UChar *field, const char *fieldname);
-    virtual void outputUtf8(unsigned len, const char *field, const char *fieldname);
-};
-
-//Writes all encoded DATA fields as base64Binary
-class thorhelper_decl CommonEncoded64XmlWriter : public CommonEncodedXmlWriter
-{
-public:
-    CommonEncoded64XmlWriter(unsigned _flags, unsigned initialIndent=0, IXmlStreamFlusher *_flusher=NULL);
-
-    virtual void outputData(unsigned len, const void *field, const char *fieldname);
-};
-
-enum XMLWriterType{WTStandard, WTEncoding, WTEncodingData64, WTJSON} ;
-thorhelper_decl CommonXmlWriter * CreateCommonXmlWriter(unsigned _flags, unsigned initialIndent=0, IXmlStreamFlusher *_flusher=NULL, XMLWriterType xmlType=WTStandard);
-thorhelper_decl IXmlWriterExt * createIXmlWriterExt(unsigned _flags, unsigned initialIndent=0, IXmlStreamFlusher *_flusher=NULL, XMLWriterType xmlType=WTStandard);
-
-class thorhelper_decl SimpleOutputWriter : implements IXmlWriter, public CInterface
-{
-    void outputFieldSeparator();
-    bool separatorNeeded;
-public:
-    SimpleOutputWriter();
-    IMPLEMENT_IINTERFACE;
-
-    SimpleOutputWriter & clear();
-    unsigned length() const                                 { return out.length(); }
-    const char * str() const                                { return out.str(); }
-
-    virtual void outputQuoted(const char *text);
-    virtual void outputQString(unsigned len, const char *field, const char *fieldname);
-    virtual void outputString(unsigned len, const char *field, const char *fieldname);
-    virtual void outputBool(bool field, const char *fieldname);
-    virtual void outputData(unsigned len, const void *field, const char *fieldname);
-    virtual void outputReal(double field, const char *fieldname);
-    virtual void outputDecimal(const void *field, unsigned size, unsigned precision, const char *fieldname);
-    virtual void outputUDecimal(const void *field, unsigned size, unsigned precision, const char *fieldname);
-    virtual void outputUnicode(unsigned len, const UChar *field, const char *fieldname);
-    virtual void outputUtf8(unsigned len, const char *field, const char *fieldname);
-    virtual void outputBeginNested(const char *fieldname, bool nestChildren);
-    virtual void outputEndNested(const char *fieldname);
-    virtual void outputBeginDataset(const char *dsname, bool nestChildren){}
-    virtual void outputEndDataset(const char *dsname){}
-    virtual void outputBeginArray(const char *fieldname){}
-    virtual void outputEndArray(const char *fieldname){}
-    virtual void outputSetAll();
-    virtual void outputInlineXml(const char *text){} //for appending raw xml content
-    virtual void outputXmlns(const char *name, const char *uri){}
-
-    virtual void outputInt(__int64 field, unsigned size, const char *fieldname);
-    virtual void outputUInt(unsigned __int64 field, unsigned size, const char *fieldname);
-
-    void newline();
-
-protected:
-    StringBuffer out;
-};
 
 class thorhelper_decl CommonFieldProcessor : implements IFieldProcessor, public CInterface
 {
@@ -359,226 +62,5 @@ extern thorhelper_decl void printKeyedValues(StringBuffer &out, IIndexReadContex
 extern thorhelper_decl void convertRowToXML(size32_t & lenResult, char * & result, IOutputMetaData & info, const void * row, unsigned flags = (unsigned)-1);
 extern thorhelper_decl void convertRowToJSON(size32_t & lenResult, char * & result, IOutputMetaData & info, const void * row, unsigned flags = (unsigned)-1);
 
-struct CSVOptions
-{
-    StringAttr delimiter, terminator;
-    bool includeHeader;
-};
-
-class CCSVItem : public CInterface, implements IInterface
-{
-    unsigned columnID, nextRowID, rowCount, nestedLayer;
-    StringAttr name, type, value, parentXPath;
-    StringArray childNames;
-    MapStringTo<bool> childNameMap;
-    bool isNestedItem, simpleNested, currentRowEmpty, outputHeader;
-public:
-    CCSVItem() : columnID(0), nestedLayer(0), nextRowID(0), rowCount(0), isNestedItem(false),
-        simpleNested(false), currentRowEmpty(true) { };
-
-    IMPLEMENT_IINTERFACE;
-    inline const char* getName() const { return name.get(); };
-    inline void setName(const char* _name) { name.set(_name); };
-    inline const char* getValue() const { return value.get(); };
-    inline void setValue(const char* _value) { value.set(_value); };
-    inline unsigned getColumnID() const { return columnID; };
-    inline void setColumnID(unsigned _columnID) { columnID = _columnID; };
-
-    inline unsigned getNextRowID() const { return nextRowID; };
-    inline void setNextRowID(unsigned _rowID) { nextRowID = _rowID; };
-    inline void incrementNextRowID() { nextRowID++; };
-    inline unsigned getRowCount() const { return rowCount; };
-    inline void setRowCount(unsigned _rowCount) { rowCount = _rowCount; };
-    inline void incrementRowCount() { rowCount++; };
-    inline bool getCurrentRowEmpty() const { return currentRowEmpty; };
-    inline void setCurrentRowEmpty(bool _currentRowEmpty) { currentRowEmpty = _currentRowEmpty; };
-
-    inline unsigned getNestedLayer() const { return nestedLayer; };
-    inline void setNestedLayer(unsigned _nestedLayer) { nestedLayer = _nestedLayer; };
-    inline bool checkIsNestedItem() const { return isNestedItem; };
-    inline void setIsNestedItem(bool _isNestedItem) { isNestedItem = _isNestedItem; };
-    inline bool checkSimpleNested() const { return simpleNested; };
-    inline void setSimpleNested(bool _simpleNested) { simpleNested = _simpleNested; };
-    inline bool checkOutputHeader() const { return outputHeader; };
-    inline void setOutputHeader(bool _outputHeader) { outputHeader = _outputHeader; };
-    inline const char* getParentXPath() const { return parentXPath.str(); };
-    inline void setParentXPath(const char* _parentXPath) { parentXPath.set(_parentXPath); };
-    inline StringArray& getChildrenNames() { return childNames; };
-    inline void addChildName(const char* name)
-    {
-        if (hasChildName(name))
-            return;
-        childNameMap.setValue(name, true);
-        childNames.append(name);
-    };
-    inline bool hasChildName(const char* name)
-    {
-        bool* found = childNameMap.getValue(name);
-        return (found && *found);
-    };
-    inline void clearContentVariables()
-    {
-        nextRowID = rowCount = 0;
-        currentRowEmpty = true;
-    };
-};
-
-class CCSVRow : public CInterface, implements IInterface
-{
-    unsigned rowID;
-    CIArrayOf<CCSVItem> columns;
-public:
-    CCSVRow(unsigned _rowID) : rowID(_rowID) {};
-    IMPLEMENT_IINTERFACE;
-
-    inline unsigned getRowID() const { return rowID; };
-    inline void setRowID(unsigned _rowID) { rowID = _rowID; };
-    inline unsigned getColumnCount() const { return columns.length(); };
-
-    const char* getColumnValue(unsigned columnID) const;
-    void setColumn(unsigned columnID, const char* columnName, const char* columnValue);
-};
-
-//CommonCSVWriter is used to output a WU result in CSV format.
-//Read CSV header information;
-//If needed, output CSV headers into the 'out' buffer;
-//Read each row (a record) of the WU result and output into the 'out' buffer;
-//The 'out' buffer can be accessed through the str() method.
-class thorhelper_decl CommonCSVWriter: public CInterface, implements IXmlWriterExt
-{
-    class CXPathItem : public CInterface, implements IInterface
-    {
-        bool isArray;
-        StringAttr path;
-    public:
-        CXPathItem(const char* _path, bool _isArray) : path(_path), isArray(_isArray) { };
-
-        IMPLEMENT_IINTERFACE;
-        inline const char* getPath() const { return path.get(); };
-        inline bool getIsArray() const { return isArray; };
-    };
-    CSVOptions options;
-    bool readingCSVHeader, addingSimpleNestedContent;
-    unsigned recordCount, headerColumnID, nestedHeaderLayerID;
-    StringBuffer currentParentXPath, auditOut;
-    StringArray headerXPathList;
-    MapStringTo<bool> topHeaderNameMap;
-    MapStringToMyClass<CCSVItem> csvItems;
-    CIArrayOf<CCSVRow> contentRowsBuffer;
-    CIArrayOf<CXPathItem> dataXPath;//xpath in caller
-
-    void escapeQuoted(unsigned len, char const* in, StringBuffer& out);
-    bool checkHeaderName(const char* name);
-    CCSVItem* getParentCSVItem();
-    CCSVItem* getCSVItemByFieldName(const char* name);
-    void addColumnToRow(CIArrayOf<CCSVRow>& rows, unsigned rowID, unsigned colID, const char* columnValue, const char* columnName);
-    void addCSVHeader(const char* name, const char* type, bool isNested, bool simpleNested, bool outputHeader);
-    void addContentField(const char* field, const char* fieldName);
-    void addStringField(unsigned len, const char* field, const char* fieldName);
-    void setChildrenNextRowID(const char* path, unsigned rowID);
-    unsigned getChildrenMaxNextRowID(const char* path);
-    unsigned getChildrenMaxColumnID(CCSVItem* item, unsigned& maxColumnID);
-    void addChildNameToParentCSVItem(const char* name);
-    void setParentItemRowEmpty(CCSVItem* item, bool empty);
-    void addFieldToParentXPath(const char* fieldName);
-    void removeFieldFromCurrentParentXPath(const char* fieldName);
-    void appendDataXPathItem(const char* fieldName, bool isArray);
-    bool isDataRow(const char* fieldName);
-    void outputCSVRows(CIArrayOf<CCSVRow>& rows, bool isHeader);
-    void outputHeadersToBuffer();
-    void finishContentResultRow();
-
-    void auditHeaderInfo()
-    {
-        ForEachItemIn(i, headerXPathList)
-        {
-            const char* path = headerXPathList.item(i);
-            CCSVItem* item = csvItems.getValue(path);
-            if (!item)
-                continue;
-            if (!item->checkIsNestedItem())
-            {
-                auditOut.appendf("dumpHeaderInfo path<%s> next row<%d> col<%d>: name<%s> - value<%s>\n", path, item->getNextRowID(),
-                    item->getColumnID(), item->getName() ? item->getName() : "", item->getValue() ? item->getValue() : "");
-            }
-            else
-            {
-                auditOut.appendf("dumpHeaderInfo path<%s> next row<%d> col<%d>: name<%s> - value<%s>\n", path, item->getNextRowID(),
-                    item->getColumnID(), item->getName() ? item->getName() : "", item->getValue() ? item->getValue() : "");
-            }
-        }
-    }
-
-public:
-    CommonCSVWriter(unsigned _flags, CSVOptions& _options, IXmlStreamFlusher* _flusher = NULL);
-    ~CommonCSVWriter();
-
-    IMPLEMENT_IINTERFACE;
-
-    inline void flush(bool isClose)
-    {
-        if (flusher)
-            flusher->flushXML(out, isClose);
-    }
-
-    virtual unsigned length() const { return out.length(); }
-    virtual const char* str() const { return out.str(); }
-    virtual void rewindTo(IInterface* location) { };
-    virtual void cutFrom(IInterface *location, StringBuffer& databuf) { };
-    virtual IInterface* saveLocation() const
-    {
-        if (flusher)
-            throwUnexpected();
-        return NULL;
-    };
-
-    //IXmlWriter
-    virtual void outputString(unsigned len, const char* field, const char* fieldName);
-    virtual void outputBool(bool field, const char* fieldName);
-    virtual void outputData(unsigned len, const void* field, const char* fieldName);
-    virtual void outputInt(__int64 field, unsigned size, const char* fieldName);
-    virtual void outputUInt(unsigned __int64 field, unsigned size, const char* fieldName);
-    virtual void outputReal(double field, const char *fieldName);
-    virtual void outputDecimal(const void* field, unsigned size, unsigned precision, const char* fieldName);
-    virtual void outputUDecimal(const void* field, unsigned size, unsigned precision, const char* fieldName);
-    virtual void outputUnicode(unsigned len, const UChar* field, const char* fieldName);
-    virtual void outputQString(unsigned len, const char* field, const char* fieldName);
-    virtual void outputUtf8(unsigned len, const char* field, const char* fieldName);
-    virtual void outputBeginNested(const char* fieldName, bool simpleNested);
-    virtual void outputEndNested(const char* fieldName);
-    virtual void outputBeginDataset(const char* dsname, bool nestChildren);
-    virtual void outputEndDataset(const char* dsname);
-    virtual void outputBeginArray(const char* fieldName);
-    virtual void outputEndArray(const char* fieldName);
-    virtual void outputSetAll() { };
-    virtual void outputXmlns(const char* name, const char* uri) { };
-    virtual void outputQuoted(const char* text)
-    {
-        //No fieldName. Is it valid for CSV?
-    };
-    virtual void outputInlineXml(const char* text)//for appending raw xml content
-    {
-        //Dynamically add a new header 'xml' and insert the header.
-        //But, not sure we want to do that for a big WU result.
-        //if (text && *text)
-          //outputUtf8(strlen(text), text, "xml");
-    };
-    virtual void outputInline(const char* text) { out.append(text); }
-
-    //IXmlWriterExt
-    virtual void outputNumericString(const char* field, const char* fieldName);
-    virtual IXmlWriterExt& clear();
-
-    void outputBeginNested(const char* fieldName, bool simpleNested, bool outputHeader);
-    void outputEndNested(const char* fieldName, bool outputHeader);
-    void outputCSVHeader(const char* name, const char* type);
-    void finishCSVHeaders();
-    const char* auditStr() const { return auditOut.str(); }
-
-protected:
-    IXmlStreamFlusher* flusher;
-    StringBuffer out;
-    unsigned flags;
-};
 
 #endif // THORXMLWRITE_HPP

+ 1 - 0
ecl/eclagent/eclagent.cpp

@@ -37,6 +37,7 @@
 #include "hqlplugins.hpp"
 #include "eclrtl_imp.hpp"
 #include "rtlds_imp.hpp"
+#include "rtlcommon.hpp"
 #include "workunit.hpp"
 #include "eventqueue.hpp"
 #include "schedulectrl.hpp"

+ 1 - 0
ecl/hthor/hthor.ipp

@@ -38,6 +38,7 @@
 #include "thorcommon.ipp"
 #include "roxielmj.hpp"
 #include "eclrtl_imp.hpp"
+#include "rtlcommon.hpp"
 #include "rtlds_imp.hpp"
 #include "rtlread_imp.hpp"
 #include "rtlrecord.hpp"

+ 2 - 0
esp/esdllib/esdl_transformer2.cpp

@@ -21,6 +21,8 @@
 #include "xpp/xpputils.h"
 #include <memory>
 
+#include "rtlformat.hpp"
+
 #include "eclhelper.hpp"    //IXMLWriter
 #include "thorxmlwrite.hpp" //JSON WRITER
 #include "eclrtl.hpp"

+ 2 - 0
esp/services/esdl_svc_engine/esdl_binding.cpp

@@ -22,6 +22,8 @@
 #include "wsexcept.hpp"
 #include "httpclient.hpp"
 
+#include "rtlformat.hpp"
+
 //for dali communication
 #include "daclient.hpp"
 #include "dalienv.hpp"

+ 2 - 0
esp/services/ws_workunits/ws_workunitsService.cpp

@@ -45,6 +45,8 @@
 #include "fvdatasource.hpp"
 #include "fvresultset.ipp"
 
+#include "rtlformat.hpp"
+
 #include "package.h"
 
 #ifdef _USE_ZLIB

+ 1 - 0
roxie/ccd/ccd.hpp

@@ -30,6 +30,7 @@
 #include "roxie.hpp"
 #include "roxiedebug.ipp"
 #include "eclrtl.hpp"
+#include "rtlformat.hpp"
 #include "workunit.hpp"
 #include "layouttrans.hpp"
 

+ 1 - 0
roxie/ccd/ccdactivities.cpp

@@ -32,6 +32,7 @@
 #include "rtlkey.hpp"
 #include "eclrtl_imp.hpp"
 #include "rtlread_imp.hpp"
+#include "rtlcommon.hpp"
 
 #include "jhtree.hpp"
 #include "jlog.hpp"

+ 2 - 0
roxie/ccd/ccdmain.cpp

@@ -29,6 +29,8 @@
 #include "jutil.hpp"
 #include <build-config.h>
 
+#include "rtlformat.hpp"
+
 #include "dalienv.hpp"
 #include "rmtfile.hpp"
 #include "ccd.hpp"

+ 2 - 0
roxie/ccd/ccdprotocol.cpp

@@ -19,6 +19,8 @@
 #include "jlib.hpp"
 #include "jthread.hpp"
 
+#include "rtlcommon.hpp"
+
 #include "roxie.hpp"
 #include "roxiehelper.hpp"
 #include "ccdprotocol.hpp"

+ 1 - 0
roxie/ccd/ccdserver.cpp

@@ -41,6 +41,7 @@
 #include "rtlfield.hpp"
 #include "rtlds_imp.hpp"
 #include "rtlread_imp.hpp"
+#include "rtlcommon.hpp"
 
 #include <algorithm>
 

+ 3 - 0
rtl/eclrtl/CMakeLists.txt

@@ -44,6 +44,8 @@ set (    SRCS
          rtlrecord.cpp
          rtltype.cpp 
          rtlxml.cpp
+         rtlcommon.cpp
+         rtlformat.cpp
          
          eclinclude4.hpp
          eclrtl.hpp
@@ -66,6 +68,7 @@ set (    SRCS
 include_directories ( 
          ./../../rtl/include 
          ./../../rtl/nbcd 
+         ./../../rtl/eclrtl 
          ./../../system/include 
          ./../../system/jlib 
          ./../../roxie/roxiemem

+ 14 - 0
rtl/eclrtl/eclhelper_dyn.cpp

@@ -69,6 +69,20 @@ CDeserializedOutputMetaData::CDeserializedOutputMetaData(const char *json)
     typeInfo = deserializer->deserialize(json);
 }
 
+extern ECLRTL_API IOutputMetaData *createTypeInfoOutputMetaData(MemoryBuffer &binInfo)
+{
+    return new CDeserializedOutputMetaData(binInfo);
+}
+
+extern ECLRTL_API IOutputMetaData *createTypeInfoOutputMetaData(IPropertyTree &jsonInfo)
+{
+    return new CDeserializedOutputMetaData(jsonInfo);
+}
+
+extern ECLRTL_API IOutputMetaData *createTypeInfoOutputMetaData(const char *json)
+{
+    return new CDeserializedOutputMetaData(json);
+}
 //---------------------------------------------------------------------------------------------------------------------
 
 class ECLRTL_API CDynamicDiskReadArg : public CThorDiskReadArg

+ 5 - 0
rtl/eclrtl/eclhelper_dyn.hpp

@@ -22,7 +22,12 @@
 #include "eclrtl.hpp"
 #include "eclhelper.hpp"
 
+extern ECLRTL_API IOutputMetaData *createTypeInfoOutputMetaData(MemoryBuffer &mb);
+extern ECLRTL_API IOutputMetaData *createTypeInfoOutputMetaData(IPropertyTree &jsonTree);
+extern ECLRTL_API IOutputMetaData *createTypeInfoOutputMetaData(const char *json);
+
 extern ECLRTL_API IHThorDiskReadArg *createDiskReadArg(IPropertyTree &xgmml);
+extern ECLRTL_API IHThorDiskReadArg *createDiskReadArg(const char *fileName, IOutputMetaData *in, IOutputMetaData *out, unsigned __int64 chooseN, unsigned __int64 skipN, unsigned __int64 rowLimit);
 extern ECLRTL_API IHThorArg *createWorkunitWriteArg(IPropertyTree &xgmml);
 extern ECLRTL_API IEclProcess* createDynamicEclProcess();
 

+ 185 - 0
rtl/eclrtl/rtlcommon.cpp

@@ -0,0 +1,185 @@
+#include "jiface.hpp"
+#include "jbuff.hpp"
+#include "jstring.hpp"
+#include "junicode.hpp"
+#include "rtlcommon.hpp"
+
+CThorContiguousRowBuffer::CThorContiguousRowBuffer(ISerialStream * _in) : in(_in)
+{
+    buffer = NULL;
+    maxOffset = 0;
+    readOffset = 0;
+}
+
+void CThorContiguousRowBuffer::doRead(size32_t len, void * ptr)
+{
+    ensureAccessible(readOffset + len);
+    memcpy(ptr, buffer+readOffset, len);
+    readOffset += len;
+}
+
+
+size32_t CThorContiguousRowBuffer::read(size32_t len, void * ptr)
+{
+    doRead(len, ptr);
+    return len;
+}
+
+size32_t CThorContiguousRowBuffer::readSize()
+{
+    size32_t value;
+    doRead(sizeof(value), &value);
+    return value;
+}
+
+size32_t CThorContiguousRowBuffer::readPackedInt(void * ptr)
+{
+    size32_t size = sizePackedInt();
+    doRead(size, ptr);
+    return size;
+}
+
+size32_t CThorContiguousRowBuffer::readUtf8(ARowBuilder & target, size32_t offset, size32_t fixedSize, size32_t len)
+{
+    if (len == 0)
+        return 0;
+
+    size32_t size = sizeUtf8(len);
+    byte * self = target.ensureCapacity(fixedSize + size, NULL);
+    doRead(size, self+offset);
+    return size;
+}
+
+size32_t CThorContiguousRowBuffer::readVStr(ARowBuilder & target, size32_t offset, size32_t fixedSize)
+{
+    size32_t size = sizeVStr();
+    byte * self = target.ensureCapacity(fixedSize + size, NULL);
+    doRead(size, self+offset);
+    return size;
+}
+
+size32_t CThorContiguousRowBuffer::readVUni(ARowBuilder & target, size32_t offset, size32_t fixedSize)
+{
+    size32_t size = sizeVUni();
+    byte * self = target.ensureCapacity(fixedSize + size, NULL);
+    doRead(size, self+offset);
+    return size;
+}
+
+
+size32_t CThorContiguousRowBuffer::sizePackedInt()
+{
+    ensureAccessible(readOffset+1);
+    return rtlGetPackedSizeFromFirst(buffer[readOffset]);
+}
+
+size32_t CThorContiguousRowBuffer::sizeUtf8(size32_t len)
+{
+    if (len == 0)
+        return 0;
+
+    //The len is the number of utf characters, size depends on which characters are included.
+    size32_t nextOffset = readOffset;
+    while (len)
+    {
+        ensureAccessible(nextOffset+1);
+
+        for (;nextOffset < maxOffset;)
+        {
+            nextOffset += readUtf8Size(buffer+nextOffset);  // This function only accesses the first byte
+            if (--len == 0)
+                break;
+        }
+    }
+    return nextOffset - readOffset;
+}
+
+size32_t CThorContiguousRowBuffer::sizeVStr()
+{
+    size32_t nextOffset = readOffset;
+    for (;;)
+    {
+        ensureAccessible(nextOffset+1);
+
+        for (; nextOffset < maxOffset; nextOffset++)
+        {
+            if (buffer[nextOffset] == 0)
+                return (nextOffset + 1) - readOffset;
+        }
+    }
+}
+
+size32_t CThorContiguousRowBuffer::sizeVUni()
+{
+    size32_t nextOffset = readOffset;
+    const size32_t sizeOfUChar = 2;
+    for (;;)
+    {
+        ensureAccessible(nextOffset+sizeOfUChar);
+
+        for (; nextOffset+1 < maxOffset; nextOffset += sizeOfUChar)
+        {
+            if (buffer[nextOffset] == 0 && buffer[nextOffset+1] == 0)
+                return (nextOffset + sizeOfUChar) - readOffset;
+        }
+    }
+}
+
+
+void CThorContiguousRowBuffer::reportReadFail()
+{
+    throwUnexpected();
+}
+
+
+const byte * CThorContiguousRowBuffer::peek(size32_t maxSize)
+{
+    if (maxSize+readOffset > maxOffset)
+        doPeek(maxSize+readOffset);
+    return buffer + readOffset;
+}
+
+offset_t CThorContiguousRowBuffer::beginNested()
+{
+    size32_t len = readSize();
+    return len+readOffset;
+}
+
+bool CThorContiguousRowBuffer::finishedNested(offset_t & endPos)
+{
+    return readOffset >= endPos;
+}
+
+void CThorContiguousRowBuffer::skip(size32_t size)
+{
+    ensureAccessible(readOffset+size);
+    readOffset += size;
+}
+
+void CThorContiguousRowBuffer::skipPackedInt()
+{
+    size32_t size = sizePackedInt();
+    ensureAccessible(readOffset+size);
+    readOffset += size;
+}
+
+void CThorContiguousRowBuffer::skipUtf8(size32_t len)
+{
+    size32_t size = sizeUtf8(len);
+    ensureAccessible(readOffset+size);
+    readOffset += size;
+}
+
+void CThorContiguousRowBuffer::skipVStr()
+{
+    size32_t size = sizeVStr();
+    ensureAccessible(readOffset+size);
+    readOffset += size;
+}
+
+void CThorContiguousRowBuffer::skipVUni()
+{
+    size32_t size = sizeVUni();
+    ensureAccessible(readOffset+size);
+    readOffset += size;
+}

+ 97 - 0
rtl/eclrtl/rtlcommon.hpp

@@ -0,0 +1,97 @@
+#ifndef ECLCOMMON_HPP
+#define ECLCOMMON_HPP
+
+
+#include "jiface.hpp"
+#include "jfile.hpp"
+#include "eclrtl.hpp"
+#include "eclhelper.hpp"
+
+//The CThorContiguousRowBuffer is the source for a readAhead call to ensure the entire row
+//is in a contiguous block of memory.  The read() and skip() functions must be implemented
+class ECLRTL_API CThorContiguousRowBuffer : implements IRowDeserializerSource
+{
+public:
+    CThorContiguousRowBuffer(ISerialStream * _in);
+
+    inline void setStream(ISerialStream *_in) { in.set(_in); maxOffset = 0; readOffset = 0; }
+
+    virtual const byte * peek(size32_t maxSize);
+    virtual offset_t beginNested();
+    virtual bool finishedNested(offset_t & len);
+
+    virtual size32_t read(size32_t len, void * ptr);
+    virtual size32_t readSize();
+    virtual size32_t readPackedInt(void * ptr);
+    virtual size32_t readUtf8(ARowBuilder & target, size32_t offset, size32_t fixedSize, size32_t len);
+    virtual size32_t readVStr(ARowBuilder & target, size32_t offset, size32_t fixedSize);
+    virtual size32_t readVUni(ARowBuilder & target, size32_t offset, size32_t fixedSize);
+
+    //These shouldn't really be called since this class is meant to be used for a deserialize.
+    //If we allowed padding/alignment fields in the input then the first function would make sense.
+    virtual void skip(size32_t size);
+    virtual void skipPackedInt();
+    virtual void skipUtf8(size32_t len);
+    virtual void skipVStr();
+    virtual void skipVUni();
+
+    inline bool eos()
+    {
+        return in->eos();
+    }
+
+    inline offset_t tell() const
+    {
+        return in->tell();
+    }
+
+    inline void clearStream()
+    {
+        in.clear();
+        maxOffset = 0;
+        readOffset = 0;
+    }
+
+    inline const byte * queryRow() { return buffer; }
+    inline size32_t queryRowSize() { return readOffset; }
+    inline void finishedRow()
+    {
+        if (readOffset)
+            in->skip(readOffset);
+        maxOffset = 0;
+        readOffset = 0;
+    }
+
+
+protected:
+    size32_t sizePackedInt();
+    size32_t sizeUtf8(size32_t len);
+    size32_t sizeVStr();
+    size32_t sizeVUni();
+    void reportReadFail();
+
+private:
+    inline void doPeek(size32_t maxSize)
+    {
+        buffer = static_cast<const byte *>(in->peek(maxSize, maxOffset));
+    }
+
+    void doRead(size32_t len, void * ptr);
+
+    inline void ensureAccessible(size32_t required)
+    {
+        if (required > maxOffset)
+        {
+            doPeek(required);
+            assertex(required <= maxOffset);
+        }
+    }
+
+protected:
+    Linked<ISerialStream> in;
+    const byte * buffer;
+    size32_t maxOffset;
+    size32_t readOffset;
+};
+
+#endif

+ 0 - 1
rtl/eclrtl/rtldynfield.cpp

@@ -1352,4 +1352,3 @@ extern ECLRTL_API IRowStream * transformRecord(IEngineRowAllocator * resultAlloc
     else
         return stream.getClear();
 }
-

File diff suppressed because it is too large
+ 2043 - 0
rtl/eclrtl/rtlformat.cpp


+ 572 - 0
rtl/eclrtl/rtlformat.hpp

@@ -0,0 +1,572 @@
+#ifndef ECLFORMAT_HPP
+#define ECLFORMAT_HPP
+
+
+#include "jiface.hpp"
+#include "jfile.hpp"
+#include "eclrtl.hpp"
+#include "eclhelper.hpp"
+
+
+class ECLRTL_API SimpleOutputWriter : implements IXmlWriter, public CInterface
+{
+    void outputFieldSeparator();
+    bool separatorNeeded;
+public:
+    SimpleOutputWriter();
+    IMPLEMENT_IINTERFACE;
+
+    SimpleOutputWriter & clear();
+    unsigned length() const                                 { return out.length(); }
+    const char * str() const                                { return out.str(); }
+
+    virtual void outputQuoted(const char *text);
+    virtual void outputQString(unsigned len, const char *field, const char *fieldname);
+    virtual void outputString(unsigned len, const char *field, const char *fieldname);
+    virtual void outputBool(bool field, const char *fieldname);
+    virtual void outputData(unsigned len, const void *field, const char *fieldname);
+    virtual void outputReal(double field, const char *fieldname);
+    virtual void outputDecimal(const void *field, unsigned size, unsigned precision, const char *fieldname);
+    virtual void outputUDecimal(const void *field, unsigned size, unsigned precision, const char *fieldname);
+    virtual void outputUnicode(unsigned len, const UChar *field, const char *fieldname);
+    virtual void outputUtf8(unsigned len, const char *field, const char *fieldname);
+    virtual void outputBeginNested(const char *fieldname, bool nestChildren);
+    virtual void outputEndNested(const char *fieldname);
+    virtual void outputBeginDataset(const char *dsname, bool nestChildren){}
+    virtual void outputEndDataset(const char *dsname){}
+    virtual void outputBeginArray(const char *fieldname){}
+    virtual void outputEndArray(const char *fieldname){}
+    virtual void outputSetAll();
+    virtual void outputInlineXml(const char *text){} //for appending raw xml content
+    virtual void outputXmlns(const char *name, const char *uri){}
+
+    virtual void outputInt(__int64 field, unsigned size, const char *fieldname);
+    virtual void outputUInt(unsigned __int64 field, unsigned size, const char *fieldname);
+
+    void newline();
+
+protected:
+    StringBuffer out;
+};
+
+
+interface IXmlStreamFlusher
+{
+    virtual void flushXML(StringBuffer &current, bool isClose) = 0;
+};
+
+interface IXmlWriterExt : extends IXmlWriter
+{
+    virtual IXmlWriterExt & clear() = 0;
+    virtual size32_t length() const = 0;
+    virtual const char *str() const = 0;
+    virtual IInterface *saveLocation() const = 0;
+    virtual void rewindTo(IInterface *location) = 0;
+    virtual void cutFrom(IInterface *location, StringBuffer& databuf) = 0;
+    virtual void outputNumericString(const char *field, const char *fieldname) = 0;
+    virtual void outputInline(const char* text) = 0;
+};
+
+class ECLRTL_API CommonXmlPosition : public CInterface, implements IInterface
+{
+public:
+    IMPLEMENT_IINTERFACE;
+
+    CommonXmlPosition(size32_t _pos, unsigned _indent, unsigned _nestLimit, bool _tagClosed, bool _needDelimiter) :
+        pos(_pos), indent(_indent), nestLimit(_nestLimit), tagClosed(_tagClosed), needDelimiter(_needDelimiter)
+    {}
+
+public:
+    size32_t pos = 0;
+    unsigned indent = 0;
+    unsigned nestLimit = 0;
+    bool tagClosed = false;
+    bool needDelimiter = false;
+};
+
+class ECLRTL_API CommonXmlWriter : implements IXmlWriterExt, public CInterface
+{
+public:
+    CommonXmlWriter(unsigned _flags, unsigned initialIndent=0,  IXmlStreamFlusher *_flusher=NULL);
+    ~CommonXmlWriter();
+    IMPLEMENT_IINTERFACE;
+
+    void outputBeginNested(const char *fieldname, bool nestChildren, bool doIndent);
+    void outputEndNested(const char *fieldname, bool doIndent);
+
+    virtual void outputInlineXml(const char *text){closeTag(); out.append(text); flush(false);} //for appending raw xml content
+    virtual void outputInline(const char* text) { outputInlineXml(text); }
+    virtual void outputQuoted(const char *text);
+    virtual void outputQString(unsigned len, const char *field, const char *fieldname);
+    virtual void outputString(unsigned len, const char *field, const char *fieldname);
+    virtual void outputBool(bool field, const char *fieldname);
+    virtual void outputData(unsigned len, const void *field, const char *fieldname);
+    virtual void outputInt(__int64 field, unsigned size, const char *fieldname);
+    virtual void outputUInt(unsigned __int64 field, unsigned size, const char *fieldname);
+    virtual void outputReal(double field, const char *fieldname);
+    virtual void outputDecimal(const void *field, unsigned size, unsigned precision, const char *fieldname);
+    virtual void outputUDecimal(const void *field, unsigned size, unsigned precision, const char *fieldname);
+    virtual void outputUnicode(unsigned len, const UChar *field, const char *fieldname);
+    virtual void outputUtf8(unsigned len, const char *field, const char *fieldname);
+    virtual void outputBeginDataset(const char *dsname, bool nestChildren);
+    virtual void outputEndDataset(const char *dsname);
+    virtual void outputBeginNested(const char *fieldname, bool nestChildren);
+    virtual void outputEndNested(const char *fieldname);
+    virtual void outputBeginArray(const char *fieldname){}; //repeated elements are inline for xml
+    virtual void outputEndArray(const char *fieldname){};
+    virtual void outputSetAll();
+    virtual void outputXmlns(const char *name, const char *uri);
+
+    //IXmlWriterExt
+    virtual IXmlWriterExt & clear();
+    virtual unsigned length() const                                 { return out.length(); }
+    virtual const char * str() const                                { return out.str(); }
+    virtual IInterface *saveLocation() const
+    {
+        if (flusher)
+            throwUnexpected();
+
+        return new CommonXmlPosition(length(), indent, nestLimit, tagClosed, false);
+    }
+    virtual void rewindTo(IInterface *saved)
+    {
+        if (flusher)
+            throwUnexpected();
+
+        CommonXmlPosition *position = dynamic_cast<CommonXmlPosition *>(saved);
+        if (!position)
+            return;
+        if (position->pos < out.length())
+        {
+            out.setLength(position->pos);
+            tagClosed = position->tagClosed;
+            indent = position->indent;
+            nestLimit = position->nestLimit;
+        }
+    }
+    virtual void cutFrom(IInterface *location, StringBuffer& databuf);
+
+    virtual void outputNumericString(const char *field, const char *fieldname)
+    {
+        outputCString(field, fieldname);
+    }
+
+protected:
+    bool checkForAttribute(const char * fieldname);
+    void closeTag();
+    inline void flush(bool isClose)
+    {
+        if (flusher)
+            flusher->flushXML(out, isClose);
+    }
+
+protected:
+    IXmlStreamFlusher *flusher;
+    StringBuffer out;
+    unsigned flags;
+    unsigned indent;
+    unsigned nestLimit;
+    bool tagClosed;
+};
+
+class ECLRTL_API CommonJsonWriter : implements IXmlWriterExt, public CInterface
+{
+public:
+    CommonJsonWriter(unsigned _flags, unsigned initialIndent=0,  IXmlStreamFlusher *_flusher=NULL);
+    ~CommonJsonWriter();
+    IMPLEMENT_IINTERFACE;
+
+    void checkDelimit(int inc=0);
+    void checkFormat(bool doDelimit, bool needDelimiter=true, int inc=0);
+    void prepareBeginArray(const char *fieldname);
+
+    virtual void outputInlineXml(const char *text) //for appending raw xml content
+    {
+        if (text && *text)
+            outputUtf8(strlen(text), text, "xml");
+    }
+    virtual void outputInline(const char* text) { out.append(text); }
+    virtual void outputQuoted(const char *text);
+    virtual void outputQString(unsigned len, const char *field, const char *fieldname);
+    virtual void outputString(unsigned len, const char *field, const char *fieldname);
+    virtual void outputBool(bool field, const char *fieldname);
+    virtual void outputData(unsigned len, const void *field, const char *fieldname);
+    virtual void outputInt(__int64 field, unsigned size, const char *fieldname);
+    virtual void outputUInt(unsigned __int64 field, unsigned size, const char *fieldname);
+    virtual void outputReal(double field, const char *fieldname);
+    virtual void outputDecimal(const void *field, unsigned size, unsigned precision, const char *fieldname);
+    virtual void outputUDecimal(const void *field, unsigned size, unsigned precision, const char *fieldname);
+    virtual void outputUnicode(unsigned len, const UChar *field, const char *fieldname);
+    virtual void outputUtf8(unsigned len, const char *field, const char *fieldname);
+    virtual void outputBeginDataset(const char *dsname, bool nestChildren);
+    virtual void outputEndDataset(const char *dsname);
+    virtual void outputBeginNested(const char *fieldname, bool nestChildren);
+    virtual void outputEndNested(const char *fieldname);
+    virtual void outputBeginArray(const char *fieldname);
+    virtual void outputEndArray(const char *fieldname);
+    virtual void outputSetAll();
+    virtual void outputXmlns(const char *name, const char *uri){}
+    virtual void outputNumericString(const char *field, const char *fieldname);
+
+    //IXmlWriterExt
+    virtual IXmlWriterExt & clear();
+    virtual unsigned length() const                                 { return out.length(); }
+    virtual const char * str() const                                { return out.str(); }
+    virtual void rewindTo(unsigned int prevlen)                     { if (prevlen < out.length()) out.setLength(prevlen); }
+    virtual IInterface *saveLocation() const
+    {
+        if (flusher)
+            throwUnexpected();
+
+        return new CommonXmlPosition(length(), indent, nestLimit, false, needDelimiter);
+    }
+    virtual void rewindTo(IInterface *saved)
+    {
+        if (flusher)
+            throwUnexpected();
+
+        CommonXmlPosition *position = dynamic_cast<CommonXmlPosition *>(saved);
+        if (!position)
+            return;
+        if (position->pos < out.length())
+        {
+            out.setLength(position->pos);
+            needDelimiter = position->needDelimiter;
+            indent = position->indent;
+            nestLimit = position->nestLimit;
+        }
+    }
+    virtual void cutFrom(IInterface *location, StringBuffer& databuf);
+
+    void outputBeginRoot(){out.append('{');}
+    void outputEndRoot(){out.append('}');}
+
+protected:
+    inline void flush(bool isClose)
+    {
+        if (flusher)
+            flusher->flushXML(out, isClose);
+    }
+
+    class CJsonWriterItem : public CInterface
+    {
+    public:
+        CJsonWriterItem(const char *_name) : name(_name), depth(0){}
+
+        StringAttr name;
+        unsigned depth;
+    };
+
+    const char *checkItemName(CJsonWriterItem *item, const char *name, bool simpleType=true);
+    const char *checkItemName(const char *name, bool simpleType=true);
+    const char *checkItemNameBeginNested(const char *name);
+    const char *checkItemNameEndNested(const char *name);
+    bool checkUnamedArrayItem(bool begin);
+
+
+    IXmlStreamFlusher *flusher;
+    CIArrayOf<CJsonWriterItem> arrays;
+    StringBuffer out;
+    unsigned flags;
+    unsigned indent;
+    unsigned nestLimit;
+    bool needDelimiter;
+};
+
+class ECLRTL_API CPropertyTreeWriter : public CSimpleInterfaceOf<IXmlWriter>
+{
+public:
+    CPropertyTreeWriter(IPropertyTree *_root=nullptr, unsigned _flags=0);
+
+    void setRoot(IPropertyTree &_root);
+
+    virtual void outputInlineXml(const char *text) override;
+    virtual void outputQuoted(const char *text) override;
+    virtual void outputQString(unsigned len, const char *field, const char *fieldname) override;
+    virtual void outputString(unsigned len, const char *field, const char *fieldname) override;
+    virtual void outputBool(bool field, const char *fieldname) override;
+    virtual void outputData(unsigned len, const void *field, const char *fieldname) override;
+    virtual void outputInt(__int64 field, unsigned size, const char *fieldname) override;
+    virtual void outputUInt(unsigned __int64 field, unsigned size, const char *fieldname) override;
+    virtual void outputReal(double field, const char *fieldname) override;
+    virtual void outputDecimal(const void *field, unsigned size, unsigned precision, const char *fieldname) override;
+    virtual void outputUDecimal(const void *field, unsigned size, unsigned precision, const char *fieldname) override;
+    virtual void outputUnicode(unsigned len, const UChar *field, const char *fieldname) override;
+    virtual void outputUtf8(unsigned len, const char *field, const char *fieldname) override;
+    virtual void outputBeginDataset(const char *dsname, bool nestChildren) override;
+    virtual void outputEndDataset(const char *dsname) override;
+    virtual void outputBeginNested(const char *fieldname, bool nestChildren) override;
+    virtual void outputEndNested(const char *fieldname) override;
+    virtual void outputBeginArray(const char *fieldname)  override {}; //repeated elements are inline for xml
+    virtual void outputEndArray(const char *fieldname) override {};
+    virtual void outputSetAll();
+    virtual void outputXmlns(const char *name, const char *uri);
+
+protected:
+    bool checkForAttribute(const char * fieldname);
+
+protected:
+    Linked<IPropertyTree> root;
+    IPropertyTree *target = nullptr;
+    ICopyArrayOf<IPropertyTree> nestedLevels;
+    unsigned flags;
+};
+
+
+ECLRTL_API StringBuffer &buildJsonHeader(StringBuffer  &header, const char *suppliedHeader, const char *rowTag);
+ECLRTL_API StringBuffer &buildJsonFooter(StringBuffer  &footer, const char *suppliedFooter, const char *rowTag);
+
+//Writes type encoded XML strings  (xsi:type="xsd:string", xsi:type="xsd:boolean" etc)
+class ECLRTL_API CommonEncodedXmlWriter : public CommonXmlWriter
+{
+public:
+    CommonEncodedXmlWriter(unsigned _flags, unsigned initialIndent=0, IXmlStreamFlusher *_flusher=NULL);
+
+    virtual void outputString(unsigned len, const char *field, const char *fieldname);
+    virtual void outputBool(bool field, const char *fieldname);
+    virtual void outputData(unsigned len, const void *field, const char *fieldname);
+    virtual void outputInt(__int64 field, unsigned size, const char *fieldname);
+    virtual void outputUInt(unsigned __int64 field, unsigned size, const char *fieldname);
+    virtual void outputReal(double field, const char *fieldname);
+    virtual void outputDecimal(const void *field, unsigned size, unsigned precision, const char *fieldname);
+    virtual void outputUDecimal(const void *field, unsigned size, unsigned precision, const char *fieldname);
+    virtual void outputUnicode(unsigned len, const UChar *field, const char *fieldname);
+    virtual void outputUtf8(unsigned len, const char *field, const char *fieldname);
+};
+
+//Writes all encoded DATA fields as base64Binary
+class ECLRTL_API CommonEncoded64XmlWriter : public CommonEncodedXmlWriter
+{
+public:
+    CommonEncoded64XmlWriter(unsigned _flags, unsigned initialIndent=0, IXmlStreamFlusher *_flusher=NULL);
+
+    virtual void outputData(unsigned len, const void *field, const char *fieldname);
+};
+
+enum XMLWriterType{WTStandard, WTEncoding, WTEncodingData64, WTJSON} ;
+ECLRTL_API CommonXmlWriter * CreateCommonXmlWriter(unsigned _flags, unsigned initialIndent=0, IXmlStreamFlusher *_flusher=NULL, XMLWriterType xmlType=WTStandard);
+ECLRTL_API IXmlWriterExt * createIXmlWriterExt(unsigned _flags, unsigned initialIndent=0, IXmlStreamFlusher *_flusher=NULL, XMLWriterType xmlType=WTStandard);
+
+struct CSVOptions
+{
+    StringAttr delimiter, terminator;
+    bool includeHeader;
+};
+
+class CCSVItem : public CInterface, implements IInterface
+{
+    unsigned columnID, nextRowID, rowCount, nestedLayer;
+    StringAttr name, type, value, parentXPath;
+    StringArray childNames;
+    MapStringTo<bool> childNameMap;
+    bool isNestedItem, simpleNested, currentRowEmpty, outputHeader;
+public:
+    CCSVItem() : columnID(0), nestedLayer(0), nextRowID(0), rowCount(0), isNestedItem(false),
+        simpleNested(false), currentRowEmpty(true) { };
+
+    IMPLEMENT_IINTERFACE;
+    inline const char* getName() const { return name.get(); };
+    inline void setName(const char* _name) { name.set(_name); };
+    inline const char* getValue() const { return value.get(); };
+    inline void setValue(const char* _value) { value.set(_value); };
+    inline unsigned getColumnID() const { return columnID; };
+    inline void setColumnID(unsigned _columnID) { columnID = _columnID; };
+
+    inline unsigned getNextRowID() const { return nextRowID; };
+    inline void setNextRowID(unsigned _rowID) { nextRowID = _rowID; };
+    inline void incrementNextRowID() { nextRowID++; };
+    inline unsigned getRowCount() const { return rowCount; };
+    inline void setRowCount(unsigned _rowCount) { rowCount = _rowCount; };
+    inline void incrementRowCount() { rowCount++; };
+    inline bool getCurrentRowEmpty() const { return currentRowEmpty; };
+    inline void setCurrentRowEmpty(bool _currentRowEmpty) { currentRowEmpty = _currentRowEmpty; };
+
+    inline unsigned getNestedLayer() const { return nestedLayer; };
+    inline void setNestedLayer(unsigned _nestedLayer) { nestedLayer = _nestedLayer; };
+    inline bool checkIsNestedItem() const { return isNestedItem; };
+    inline void setIsNestedItem(bool _isNestedItem) { isNestedItem = _isNestedItem; };
+    inline bool checkSimpleNested() const { return simpleNested; };
+    inline void setSimpleNested(bool _simpleNested) { simpleNested = _simpleNested; };
+    inline bool checkOutputHeader() const { return outputHeader; };
+    inline void setOutputHeader(bool _outputHeader) { outputHeader = _outputHeader; };
+    inline const char* getParentXPath() const { return parentXPath.str(); };
+    inline void setParentXPath(const char* _parentXPath) { parentXPath.set(_parentXPath); };
+    inline StringArray& getChildrenNames() { return childNames; };
+    inline void addChildName(const char* name)
+    {
+        if (hasChildName(name))
+            return;
+        childNameMap.setValue(name, true);
+        childNames.append(name);
+    };
+    inline bool hasChildName(const char* name)
+    {
+        bool* found = childNameMap.getValue(name);
+        return (found && *found);
+    };
+    inline void clearContentVariables()
+    {
+        nextRowID = rowCount = 0;
+        currentRowEmpty = true;
+    };
+};
+
+class CCSVRow : public CInterface, implements IInterface
+{
+    unsigned rowID;
+    CIArrayOf<CCSVItem> columns;
+public:
+    CCSVRow(unsigned _rowID) : rowID(_rowID) {};
+    IMPLEMENT_IINTERFACE;
+
+    inline unsigned getRowID() const { return rowID; };
+    inline void setRowID(unsigned _rowID) { rowID = _rowID; };
+    inline unsigned getColumnCount() const { return columns.length(); };
+
+    const char* getColumnValue(unsigned columnID) const;
+    void setColumn(unsigned columnID, const char* columnName, const char* columnValue);
+};
+
+//CommonCSVWriter is used to output a WU result in CSV format.
+//Read CSV header information;
+//If needed, output CSV headers into the 'out' buffer;
+//Read each row (a record) of the WU result and output into the 'out' buffer;
+//The 'out' buffer can be accessed through the str() method.
+class ECLRTL_API CommonCSVWriter: public CInterface, implements IXmlWriterExt
+{
+    class CXPathItem : public CInterface, implements IInterface
+    {
+        bool isArray;
+        StringAttr path;
+    public:
+        CXPathItem(const char* _path, bool _isArray) : path(_path), isArray(_isArray) { };
+
+        IMPLEMENT_IINTERFACE;
+        inline const char* getPath() const { return path.get(); };
+        inline bool getIsArray() const { return isArray; };
+    };
+    CSVOptions options;
+    bool readingCSVHeader, addingSimpleNestedContent;
+    unsigned recordCount, headerColumnID, nestedHeaderLayerID;
+    StringBuffer currentParentXPath, auditOut;
+    StringArray headerXPathList;
+    MapStringTo<bool> topHeaderNameMap;
+    MapStringToMyClass<CCSVItem> csvItems;
+    CIArrayOf<CCSVRow> contentRowsBuffer;
+    CIArrayOf<CXPathItem> dataXPath;//xpath in caller
+
+    void escapeQuoted(unsigned len, char const* in, StringBuffer& out);
+    bool checkHeaderName(const char* name);
+    CCSVItem* getParentCSVItem();
+    CCSVItem* getCSVItemByFieldName(const char* name);
+    void addColumnToRow(CIArrayOf<CCSVRow>& rows, unsigned rowID, unsigned colID, const char* columnValue, const char* columnName);
+    void addCSVHeader(const char* name, const char* type, bool isNested, bool simpleNested, bool outputHeader);
+    void addContentField(const char* field, const char* fieldName);
+    void addStringField(unsigned len, const char* field, const char* fieldName);
+    void setChildrenNextRowID(const char* path, unsigned rowID);
+    unsigned getChildrenMaxNextRowID(const char* path);
+    unsigned getChildrenMaxColumnID(CCSVItem* item, unsigned& maxColumnID);
+    void addChildNameToParentCSVItem(const char* name);
+    void setParentItemRowEmpty(CCSVItem* item, bool empty);
+    void addFieldToParentXPath(const char* fieldName);
+    void removeFieldFromCurrentParentXPath(const char* fieldName);
+    void appendDataXPathItem(const char* fieldName, bool isArray);
+    bool isDataRow(const char* fieldName);
+    void outputCSVRows(CIArrayOf<CCSVRow>& rows, bool isHeader);
+    void outputHeadersToBuffer();
+    void finishContentResultRow();
+
+    void auditHeaderInfo()
+    {
+        ForEachItemIn(i, headerXPathList)
+        {
+            const char* path = headerXPathList.item(i);
+            CCSVItem* item = csvItems.getValue(path);
+            if (!item)
+                continue;
+            if (!item->checkIsNestedItem())
+            {
+                auditOut.appendf("dumpHeaderInfo path<%s> next row<%d> col<%d>: name<%s> - value<%s>\n", path, item->getNextRowID(),
+                    item->getColumnID(), item->getName() ? item->getName() : "", item->getValue() ? item->getValue() : "");
+            }
+            else
+            {
+                auditOut.appendf("dumpHeaderInfo path<%s> next row<%d> col<%d>: name<%s> - value<%s>\n", path, item->getNextRowID(),
+                    item->getColumnID(), item->getName() ? item->getName() : "", item->getValue() ? item->getValue() : "");
+            }
+        }
+    }
+
+public:
+    CommonCSVWriter(unsigned _flags, CSVOptions& _options, IXmlStreamFlusher* _flusher = NULL);
+    ~CommonCSVWriter();
+
+    IMPLEMENT_IINTERFACE;
+
+    inline void flush(bool isClose)
+    {
+        if (flusher)
+            flusher->flushXML(out, isClose);
+    }
+
+    virtual unsigned length() const { return out.length(); }
+    virtual const char* str() const { return out.str(); }
+    virtual void rewindTo(IInterface* location) { };
+    virtual void cutFrom(IInterface *location, StringBuffer& databuf) { };
+    virtual IInterface* saveLocation() const
+    {
+        if (flusher)
+            throwUnexpected();
+        return NULL;
+    };
+
+    //IXmlWriter
+    virtual void outputString(unsigned len, const char* field, const char* fieldName);
+    virtual void outputBool(bool field, const char* fieldName);
+    virtual void outputData(unsigned len, const void* field, const char* fieldName);
+    virtual void outputInt(__int64 field, unsigned size, const char* fieldName);
+    virtual void outputUInt(unsigned __int64 field, unsigned size, const char* fieldName);
+    virtual void outputReal(double field, const char *fieldName);
+    virtual void outputDecimal(const void* field, unsigned size, unsigned precision, const char* fieldName);
+    virtual void outputUDecimal(const void* field, unsigned size, unsigned precision, const char* fieldName);
+    virtual void outputUnicode(unsigned len, const UChar* field, const char* fieldName);
+    virtual void outputQString(unsigned len, const char* field, const char* fieldName);
+    virtual void outputUtf8(unsigned len, const char* field, const char* fieldName);
+    virtual void outputBeginNested(const char* fieldName, bool simpleNested);
+    virtual void outputEndNested(const char* fieldName);
+    virtual void outputBeginDataset(const char* dsname, bool nestChildren);
+    virtual void outputEndDataset(const char* dsname);
+    virtual void outputBeginArray(const char* fieldName);
+    virtual void outputEndArray(const char* fieldName);
+    virtual void outputSetAll() { };
+    virtual void outputXmlns(const char* name, const char* uri) { };
+    virtual void outputQuoted(const char* text)
+    {
+        //No fieldName. Is it valid for CSV?
+    };
+    virtual void outputInlineXml(const char* text)//for appending raw xml content
+    {
+        //Dynamically add a new header 'xml' and insert the header.
+        //But, not sure we want to do that for a big WU result.
+        //if (text && *text)
+          //outputUtf8(strlen(text), text, "xml");
+    };
+    virtual void outputInline(const char* text) { out.append(text); }
+
+    //IXmlWriterExt
+    virtual void outputNumericString(const char* field, const char* fieldName);
+    virtual IXmlWriterExt& clear();
+
+    void outputBeginNested(const char* fieldName, bool simpleNested, bool outputHeader);
+    void outputEndNested(const char* fieldName, bool outputHeader);
+    void outputCSVHeader(const char* name, const char* type);
+    void finishCSVHeaders();
+    const char* auditStr() const { return auditOut.str(); }
+
+protected:
+    IXmlStreamFlusher* flusher;
+    StringBuffer out;
+    unsigned flags;
+};
+
+#endif

+ 1 - 0
thorlcr/activities/xmlwrite/thxmlwrite.cpp

@@ -17,6 +17,7 @@
 
 #include "jiface.hpp"
 #include "jtime.hpp"
+#include "rtlformat.hpp"
 #include "dadfs.hpp"
 #include "thexception.hpp"
 #include "thmfilemanager.hpp"

+ 1 - 0
thorlcr/graph/thgraph.cpp

@@ -25,6 +25,7 @@
 #include "thormisc.hpp"
 #include "thbufdef.hpp"
 #include "thmem.hpp"
+#include "rtlformat.hpp"
 
 
 PointerArray createFuncs;

+ 1 - 0
thorlcr/slave/traceslave.hpp

@@ -31,6 +31,7 @@
 #include <stddef.h>
 #include <thmem.hpp>
 #include <thorxmlwrite.hpp>
+#include <rtlformat.hpp>
 
 interface IThorDebug : extends IInterface
 {

+ 1 - 0
thorlcr/thorutil/thormisc.cpp

@@ -48,6 +48,7 @@
 #include "rtlfield.hpp"
 #include "rtlrecord.hpp"
 #include "rtlds_imp.hpp"
+#include "rtlformat.hpp"
 #include "rmtfile.hpp"
 
 

+ 152 - 14
tools/testsocket/testsocket.cpp

@@ -44,6 +44,10 @@ bool sendFileAfterQuery = false;
 bool doLock = false;
 bool roxieLogMode = false;
 bool rawOnly = false;
+bool rawSend = false;
+bool remoteStreamForceResend = false;
+bool remoteStreamSendCursor = false;
+
 
 StringBuffer sendFileName;
 StringAttr queryNameOverride;
@@ -182,11 +186,12 @@ void sendFileChunk(const char * filename, offset_t offset, ISocket * socket)
     free(buff);
 }
 
-int readResults(ISocket * socket, bool readBlocked, bool useHTTP, StringBuffer &result)
+int readResults(ISocket * socket, bool readBlocked, bool useHTTP, StringBuffer &result, const char *query, size32_t queryLen)
 {
     if (readBlocked)
         socket->set_block_mode(BF_SYNC_TRANSFER_PULL,0,60*1000);
 
+    MemoryBuffer remoteReadCursorMb;
     unsigned len;
     bool is_status;
     bool isBlockedResult;
@@ -231,6 +236,7 @@ int readResults(ISocket * socket, bool readBlocked, bool useHTTP, StringBuffer &
         bool isSpecial = false;
         bool pluginRequest = false;
         bool dataBlockRequest = false;
+        bool remoteReadRequest = false;
         if (len & 0x80000000)
         {
             unsigned char flag;
@@ -266,14 +272,19 @@ int readResults(ISocket * socket, bool readBlocked, bool useHTTP, StringBuffer &
             case 'R':
                 isBlockedResult = true;
                 break;
+            case 'J':
+                remoteReadRequest = true;
+                break;
             }
             len &= 0x7FFFFFFF;
             len--;      // flag already read
         }
 
-        char * mem = (char*) malloc(len+1);
+        MemoryBuffer mb;
+        mb.setEndian(BIG_ENDIAN);
+        char *mem = (char *)mb.reserveTruncate(len+1);
         char * t = mem;
-        unsigned sendlen = len;
+        size32_t sendlen = len;
         t[len]=0;
         try
         {
@@ -301,6 +312,7 @@ int readResults(ISocket * socket, bool readBlocked, bool useHTTP, StringBuffer &
         catch(IException * e)
         {
             pexception("failed to read data", e);
+            e->Release();
             return 1;
         }
         if (pluginRequest)
@@ -318,9 +330,112 @@ int readResults(ISocket * socket, bool readBlocked, bool useHTTP, StringBuffer &
         {
             //Not very robust!  A poor man's implementation for testing...
             offset_t offset;
-            memcpy(&offset, t, sizeof(offset));
-            _WINREV(offset);
-            sendFileChunk(t+sizeof(offset), offset, socket);
+            mb.read(offset);
+            sendFileChunk((const char *)mb.readDirect(offset), offset, socket);
+        }
+        else if (remoteReadRequest)
+        {
+            Owned<IPropertyTree> requestTree = createPTreeFromJSONString(queryLen, query);
+            Owned<IPropertyTree> responseTree; // used if response is xml or json
+            const char *outputFmtStr = requestTree->queryProp("format");
+            const char *response = nullptr;
+            if (!outputFmtStr || strieq("xml", outputFmtStr))
+            {
+                response = (const char *)mb.readDirect(len);
+                responseTree.setown(createPTreeFromXMLString(len, response));
+            }
+            else if (strieq("json", outputFmtStr))
+            {
+                response = (const char *)mb.readDirect(len);
+                responseTree.setown(createPTreeFromJSONString(len, response));
+            }
+            unsigned cursorHandle;
+            if (responseTree)
+                cursorHandle = responseTree->getPropInt("cursor");
+            else
+                mb.read(cursorHandle);
+            bool retrySend = false;
+            if (cursorHandle)
+            {
+                PROGLOG("Got handle back: %u; len=%u", cursorHandle, len);
+                StringBuffer xml;
+                if (responseTree)
+                {
+                    if (echoResults)
+                    {
+                        fputs(response, stdout);
+                        fflush(stdout);
+                    }
+                    if (!responseTree->getPropBin("cursorBin", remoteReadCursorMb.clear()))
+                        break;
+                }
+                else
+                {
+                    size32_t dataLen;
+                    mb.read(dataLen);
+                    if (!dataLen)
+                        break;
+                    const void *rowData = mb.readDirect(dataLen);
+                    // JCSMORE - output binary row data?
+
+                    // cursor
+                    size32_t cursorLen;
+                    mb.read(cursorLen);
+                    if (!cursorLen)
+                        break;
+                    const void *cursor = mb.readDirect(cursorLen);
+                    memcpy(remoteReadCursorMb.clear().reserveTruncate(cursorLen), cursor, cursorLen);
+                }
+
+                if (remoteStreamForceResend)
+                    cursorHandle = NotFound; // fake that it's a handle dafilesrv doesn't know about
+
+                Owned<IPropertyTree> requestTree = createPTree();
+                requestTree->setPropInt("cursor", cursorHandle);
+
+                // Only the handle is needed for continuation, but this tests the behaviour of some clients which may send cursor per request (e.g. to refresh)
+                if (remoteStreamSendCursor)
+                    requestTree->setPropBin("cursorBin", remoteReadCursorMb.length(), remoteReadCursorMb.toByteArray());
+
+                requestTree->setProp("format", outputFmtStr);
+                StringBuffer requestStr;
+                toJSON(requestTree, requestStr);
+#ifdef _DEBUG
+                fputs(requestStr, stdout);
+#endif
+
+                sendlen = requestStr.length();
+                _WINREV(sendlen);
+
+                try
+                {
+                    if (!rawSend && !useHTTP)
+                        socket->write(&sendlen, sizeof(sendlen));
+                    socket->write(requestStr.str(), requestStr.length());
+                }
+                catch (IJSOCK_Exception *e)
+                {
+                    retrySend = true;
+                    EXCLOG(e, nullptr);
+                    e->Release();
+                }
+            }
+            else // dafilesrv didn't know who I was, resent query + serialized cursor
+                retrySend = true;
+            if (retrySend)
+            {
+                PROGLOG("Retry send for handle: %u", cursorHandle);
+                requestTree->setPropBin("cursorBin", remoteReadCursorMb.length(), remoteReadCursorMb.toByteArray());
+                StringBuffer requestStr;
+                toJSON(requestTree, requestStr);
+
+                PROGLOG("requestStr = %s", requestStr.str());
+                sendlen = requestStr.length();
+                _WINREV(sendlen);
+                if (!rawSend && !useHTTP)
+                    socket->write(&sendlen, sizeof(sendlen));
+                socket->write(requestStr.str(), requestStr.length());
+            }
         }
         else
         {
@@ -339,7 +454,6 @@ int readResults(ISocket * socket, bool readBlocked, bool useHTTP, StringBuffer &
                 result.append(sendlen, t);
         }
 
-        free(mem);
         if (abortAfterFirst)
             return 0;
     }
@@ -358,7 +472,7 @@ int ReceiveThread::run()
     ISocket * socket = ISocket::create(3456);
     ISocket * client = socket->accept();
     StringBuffer result;
-    readResults(client, parallelBlocked, false, result);
+    readResults(client, parallelBlocked, false, result, nullptr, 0);
     client->Release();
     socket->Release();
     finishedReading.signal();
@@ -431,7 +545,7 @@ int doSendQuery(const char * ip, unsigned port, const char * base)
         else
         {
             SocketEndpoint ep(ip,port);
-            socket.setown(ISocket::connect_timeout(ep, 1000));
+            socket.setown(ISocket::connect_timeout(ep, 100000));
             if (useSSL)
             {
 #ifdef _USE_OPENSSL
@@ -492,7 +606,7 @@ int doSendQuery(const char * ip, unsigned port, const char * base)
             socket->write(&locklen, sizeof(locklen));
             socket->write(lock, strlen(lock));
             StringBuffer lockResult;
-            readResults(socket, false, false, lockResult);
+            readResults(socket, false, false, lockResult, nullptr, 0);
         }
         if (queryNameOverride.length())
         {
@@ -515,17 +629,22 @@ int doSendQuery(const char * ip, unsigned port, const char * base)
     }
 
     const char * query = fullQuery.str();
-    int len=strlen(query);
-    int sendlen = len;
+    size32_t queryLen=(size32_t)strlen(query);
+    size32_t len = queryLen;
+    size32_t sendlen = len;
     if (persistConnections)
         sendlen |= 0x80000000;
     _WINREV(sendlen);                    
 
     try
     {
-        if (!useHTTP)
+        if (!rawSend && !useHTTP)
             socket->write(&sendlen, sizeof(sendlen));
+
+        fprintf(stdout, "about to write %u <%s>\n", len, query);
+
         socket->write(query, len);
+
         if (sendFileAfterQuery)
         {
             FILE *in = fopen(sendFileName.str(), "rb");
@@ -560,7 +679,7 @@ int doSendQuery(const char * ip, unsigned port, const char * base)
     // back-end does some processing.....
 
     StringBuffer result;
-    int ret = readResults(socket, false, useHTTP, result);
+    int ret = readResults(socket, false, useHTTP, result, query, queryLen);
 
     if ((ret == 0) && !justResults)
     {
@@ -649,6 +768,8 @@ void usage(int exitCode)
     printf("  -qname xx Use xx as queryname in place of the xml root element name\n");
     printf("  -r <n>    repeat the query several times\n");
     printf("  -rl       roxie logfile mode\n");
+    printf("  -rsr      force remote stream resend per continuation request\n");
+    printf("  -rssc     send cursor per continuation request\n");
     printf("  -s        add stars to indicate transfer packets\n");
     printf("  -ss       suppress XML Status messages to screen (always suppressed from tracefile)\n");
     printf("  -ssl      use ssl\n");
@@ -658,6 +779,7 @@ void usage(int exitCode)
     printf("  -u<max>   run queries on separate threads\n");
     printf("  -cascade  cascade query (to all roxie nodes)\n");
     printf("  -lock     locked cascade query (to all roxie nodes)\n");
+    printf("  -x        raw send\n");
     
     exit(exitCode);
 }
@@ -665,6 +787,7 @@ void usage(int exitCode)
 int main(int argc, char **argv) 
 {
     InitModuleObjects();
+
     StringAttr outputName("result.txt");
 
     bool fromFile = false;
@@ -729,6 +852,11 @@ int main(int argc, char **argv)
             fromFile = true;
             ++arg;
         }
+        else if (stricmp(argv[arg], "-x") == 0)
+        {
+            rawSend = true;
+            ++arg;
+        }
         else if (stricmp(argv[arg], "-ff") == 0)
         {
             fromMultiFile = true;
@@ -838,6 +966,16 @@ int main(int argc, char **argv)
             }
             arg+=2;
         }
+        else if (strieq(argv[arg], "-rsr"))
+        {
+            remoteStreamForceResend = true;
+            ++arg;
+        }
+        else if (strieq(argv[arg], "-rssc"))
+        {
+            remoteStreamSendCursor = true;
+            ++arg;
+        }
         else
         {
             printf("Unknown argument %s, ignored\n", argv[arg]);