Bläddra i källkod

HPCC-18515 Dafilesrv remote read streaming

Add support for dynamic remote disk read projects to dafilesrv
(no filtering yet), via JSON requests.

Output format in either binary or JSON.
Continuation either from same instance or if unknown handle,
challenge/response, so client can resend original with cursor.

Signed-off-by: Jake Smith <jake.smith@lexisnexisrisk.com>
Jake Smith 7 år sedan
förälder
incheckning
98c1668457

+ 1 - 0
common/fileview2/fvresultset.cpp

@@ -18,6 +18,7 @@
 #include "platform.h"
 #include "jliball.hpp"
 #include "rtlbcd.hpp"
+#include "rtlformat.hpp"
 #include "workunit.hpp"
 #include "seclib.hpp"
 #include "eclrtl.hpp"

+ 2 - 0
common/remote/CMakeLists.txt

@@ -59,6 +59,7 @@ include_directories (
          ./../../rtl/eclrtl
          ./../../system/security/securesocket
          ./../../testing/unittests
+         ./../../rtl/include
     )
 
 ADD_DEFINITIONS( -D_USRDLL -DREMOTE_EXPORTS )
@@ -67,6 +68,7 @@ HPCC_ADD_LIBRARY( remote SHARED ${SRCS}  )
 install ( TARGETS remote RUNTIME DESTINATION ${EXEC_DIR} LIBRARY DESTINATION ${LIB_DIR} )
 
 target_link_libraries ( remote 
+    eclrtl
     jlib
     jhtree 
     mp

+ 435 - 6
common/remote/sockfile.cpp

@@ -41,6 +41,16 @@
 #include "remoteerr.hpp"
 #include <atomic>
 
+#include "rtldynfield.hpp"
+#include "rtlds_imp.hpp"
+#include "rtlread_imp.hpp"
+#include "rtlrecord.hpp"
+#include "eclhelper_dyn.hpp"
+
+#include "rtlcommon.hpp"
+#include "rtlformat.hpp"
+
+
 #define SOCKET_CACHE_MAX 500
 
 #define MIN_KEYFILTSUPPORT_VERSION 20
@@ -53,6 +63,13 @@
 #define TREECOPYPOLLTIME  (60*1000*5)      // for tracing that delayed
 #define TREECOPYPRUNETIME (24*60*60*1000)  // 1 day
 
+static const unsigned __int64 defaultFileStreamChooseN = I64C(0x7fffffffffffffff); // constant should be move to common place (see eclhelper.hpp)
+static const unsigned __int64 defaultFileStreamSkipN = 0;
+static const unsigned __int64 defaultFileStreamRowLimit = (unsigned __int64) -1;
+static const unsigned __int64 defaultDaFSNumRecs = 100;
+enum OutputFormat { outFmt_Binary, outFmt_Xml, outFmt_Json };
+
+
 #if SIMULATE_PACKETLOSS
 
 #define TESTING_FAILURE_RATE_LOST_SEND  10 // per 1000
@@ -326,6 +343,7 @@ enum {
     RFCreadfilteredindex,
     RFCreadfilteredindexcount,
     RFCreadfilteredindexblob,
+    RFCStreamRead = '{',
     RFCmax,
     RFCunknown = 255 // 0 would have been more sensible, but can't break backward compatibility
 };
@@ -568,12 +586,14 @@ inline MemoryBuffer & initSendBuffer(MemoryBuffer & buff)
     return buff;
 }
 
-inline void sendBuffer(ISocket * socket, MemoryBuffer & src)
+inline void sendBuffer(ISocket * socket, MemoryBuffer & src, bool testSocketFlag=false)
 {
     unsigned length = src.length() - sizeof(unsigned);
     byte * buffer = (byte *)src.toByteArray();
     if (TF_TRACE_FULL)
         PROGLOG("sendBuffer size %d, data = %d %d %d %d",length, (int)buffer[4],(int)buffer[5],(int)buffer[6],(int)buffer[7]);
+    if (testSocketFlag)
+        length |= 0x80000000;
     _WINCPYREV(buffer, &length, sizeof(unsigned));
     SOCKWRITE(socket)(buffer, src.length());
 }
@@ -3679,6 +3699,7 @@ inline void appendCmdErr(MemoryBuffer &reply, RemoteFileCommandType e, int code,
 
 #define MAPCOMMAND(c,p) case c: { ret = this->p(msg, reply) ; break; }
 #define MAPCOMMANDCLIENT(c,p,client) case c: { ret = this->p(msg, reply, client); break; }
+#define MAPCOMMANDCLIENTTESTSOCKET(c,p,client) case c: { ret = this->p(msg, reply, client); testSocketFlag = true; break; }
 #define MAPCOMMANDCLIENTTHROTTLE(c,p,client,throttler) case c: { ret = this->p(msg, reply, client, throttler); break; }
 #define MAPCOMMANDSTATS(c,p,stats) case c: { ret = this->p(msg, reply, stats); break; }
 #define MAPCOMMANDCLIENTSTATS(c,p,client,stats) case c: { ret = this->p(msg, reply, client, stats); break; }
@@ -3816,17 +3837,247 @@ public:
     }
 };
 
+interface IRemoteActivity : extends IInterface
+{
+    virtual const void *nextRow(size32_t &sz) = 0;
+    virtual unsigned __int64 queryProcessed() const = 0;
+    virtual IOutputMetaData *queryOutputMeta() const = 0;
+    virtual StringBuffer &getInfoStr(StringBuffer &out) const = 0;
+    virtual void serializeCursor(MemoryBuffer &tgt) const = 0;
+    virtual void restoreCursor(MemoryBuffer &src) = 0;
+};
+
 enum OpenFileFlag { of_null=0x0, of_key=0x01 };
 struct OpenFileInfo
 {
     OpenFileInfo() { }
     OpenFileInfo(int _handle, IFileIO *_fileIO, StringAttrItem *_filename) : handle(_handle), fileIO(_fileIO), filename(_filename) { }
+    OpenFileInfo(int _handle, IRemoteActivity *_activity, StringAttrItem *_filename) : handle(_handle), activity(_activity), filename(_filename) { }
     Linked<IFileIO> fileIO;
+    Linked<IRemoteActivity> activity;
     Linked<StringAttrItem> filename; // for debug
     int handle = 0;
     unsigned flags = 0;
 };
 
+
+
+static IOutputMetaData *getTypeInfoOutputMetaData(IPropertyTree &actNode, const char *typePropName)
+{
+    IPropertyTree *inputJson = actNode.queryPropTree(typePropName);
+    if (inputJson)
+        return createTypeInfoOutputMetaData(*inputJson);
+    else
+    {
+        StringBuffer binTypePropName(typePropName);
+        MemoryBuffer mb;
+        actNode.getPropBin(binTypePropName.append("Bin").str(), mb);
+        return createTypeInfoOutputMetaData(mb);
+    }
+}
+
+class CRemoteDiskReadActivity : public CSimpleInterfaceOf<IRemoteActivity>
+{
+    StringAttr fileName;
+    Linked<IHThorDiskReadArg> helper;
+    MemoryBuffer resultBuffer;
+    MemoryBufferBuilder *outBuilder = nullptr;
+    CThorContiguousRowBuffer prefetchBuffer;
+    IArrayOf<IKeySegmentMonitor> segMonitors;
+    Owned<ISourceRowPrefetcher> prefetcher;
+    Owned<ISerialStream> inputStream;
+    Owned<IFileIO> iFileIO;
+    Linked<IOutputMetaData> outMeta;
+    unsigned __int64 chooseN = 0;
+    unsigned __int64 limit = 0;
+    unsigned __int64 processed = 0;
+    unsigned __int64 startPos = 0;
+    bool opened = false;
+    bool eofSeen = false;
+    bool cursorDirty = false;
+    bool canMatchAny = false;
+    bool needTransform = true;
+
+    void checkOpen()
+    {
+        if (opened)
+        {
+            if (!cursorDirty)
+                return;
+            if (prefetchBuffer.tell() != startPos)
+            {
+                inputStream->reset(startPos);
+                prefetchBuffer.clearStream();
+                prefetchBuffer.setStream(inputStream);
+                eofSeen = !canMatchAny;
+            }
+            cursorDirty = false;
+            return;
+        }
+        if (!canMatchAny)
+            eofSeen = true;
+        else
+        {
+            const char *fileName = helper->getFileName();
+
+            OwnedIFile iFile = createIFile(fileName);
+
+#if 0
+            bool compressed = false; // isCompressedFile(iFileIO); // Should be passed with JSON
+            StringBuffer encryptionkey;
+            if (compressed)
+            {
+                Owned<IExpander> eexp;
+                if (encryptionkey.length()!=0)
+                    eexp.setown(createAESExpander256((size32_t)encryptionkey.length(),encryptionkey.bufferBase()));
+                iFileIO.setown(createCompressedFileReader(iFile,eexp));
+                if(!iFileIO && !blockcompressed) //fall back to old decompression, unless dfs marked as new
+                {
+                    iFileIO.setown(iFile->open(IFOread));
+                    if(iFileIO)
+                        rowcompressed = true;
+                }
+            }
+            else
+#endif
+                iFileIO.setown(iFile->open(IFOread));
+            if (!iFileIO)
+                throw MakeStringException(0, "Failed to open: '%s'", fileName);
+
+            inputStream.setown(createFileSerialStream(iFileIO, startPos));
+            prefetchBuffer.setStream(inputStream);
+            prefetcher.setown(helper->queryDiskRecordSize()->createDiskPrefetcher(nullptr, 0));
+
+            outBuilder = new MemoryBufferBuilder(resultBuffer, helper->queryOutputMeta()->getMinRecordSize());
+            chooseN = helper->getChooseNLimit();
+            limit = helper->getRowLimit();
+        }
+
+        opened = true;
+    }
+    void close()
+    {
+        iFileIO.clear();
+        opened = false;
+    }
+    bool segMonitorsMatch(const void *row) { return true; }
+public:
+    CRemoteDiskReadActivity(IHThorDiskReadArg &_helper)
+        : helper(&_helper), prefetchBuffer(nullptr)
+    {
+        outMeta.set(helper->queryOutputMeta());
+        canMatchAny = helper->canMatchAny();
+    }
+    ~CRemoteDiskReadActivity()
+    {
+        if (outBuilder)
+            delete outBuilder;
+    }
+    virtual const void *nextRow(size32_t &retSz) override
+    {
+        checkOpen();
+        if (needTransform)
+        {
+            while (!eofSeen && ((chooseN == 0) || (processed < chooseN)))
+            {
+                while (!prefetchBuffer.eos())
+                {
+                    prefetcher->readAhead(prefetchBuffer);
+                    const byte * next = prefetchBuffer.queryRow();
+                    size32_t rowSz; // use local var instead of reference param for efficiency
+                    if (segMonitorsMatch(next))
+                        rowSz = helper->transform(*outBuilder, next);
+                    else
+                        rowSz = 0;
+                    prefetchBuffer.finishedRow();
+
+                    if (rowSz)
+                    {
+                        if (processed >=limit)
+                        {
+                            resultBuffer.clear();
+                            helper->onLimitExceeded();
+                            return nullptr;
+                        }
+                        retSz = rowSz;
+                        processed++;
+                        return resultBuffer.toByteArray();
+                    }
+                }
+                eofSeen = true;
+            }
+        }
+        close();
+        retSz = 0;
+        return nullptr;
+    }
+// IRemoteActivity impl.
+    virtual void serializeCursor(MemoryBuffer &tgt) const override
+    {
+        tgt.append(prefetchBuffer.tell());
+        tgt.append(processed);
+    }
+    virtual void restoreCursor(MemoryBuffer &src) override
+    {
+        cursorDirty = true;
+        src.read(startPos);
+        src.read(processed);
+    }
+    virtual unsigned __int64 queryProcessed() const override
+    {
+        return processed;
+    }
+    virtual IOutputMetaData *queryOutputMeta() const override
+    {
+        return outMeta;
+    }
+    virtual StringBuffer &getInfoStr(StringBuffer &out) const override
+    {
+        return out.appendf("diskread[%s]", helper->getFileName());
+    }
+};
+
+IRemoteActivity *createRemoteDiskRead(IPropertyTree &actNode)
+{
+    const char *fileName = actNode.queryProp("fileName");
+    unsigned __int64 chooseN = actNode.getPropInt64("choosen", defaultFileStreamChooseN);
+    unsigned __int64 skipN = actNode.getPropInt64("skipN", defaultFileStreamSkipN);
+    unsigned __int64 rowLimit = actNode.getPropInt64("rowLimit", defaultFileStreamRowLimit);
+    Owned<IOutputMetaData> inMeta = getTypeInfoOutputMetaData(actNode, "input");
+    Owned<IOutputMetaData> outMeta = getTypeInfoOutputMetaData(actNode, "output");
+    Owned<IHThorDiskReadArg> helper = createDiskReadArg(fileName, inMeta.getClear(), outMeta.getClear(), chooseN, skipN, rowLimit);
+    return new CRemoteDiskReadActivity(*helper);
+}
+
+IRemoteActivity *createRemoteActivity(IPropertyTree &actNode)
+{
+    const char *kindStr = actNode.queryProp("kind");
+
+    ThorActivityKind kind = TAKnone;
+    if (strieq("diskread", kindStr))
+        kind = TAKdiskread;
+    Owned<IRemoteActivity> activity;
+    switch (kind)
+    {
+        case TAKdiskread:
+        {
+            activity.setown(createRemoteDiskRead(actNode));
+            break;
+        }
+        default:
+            throwUnexpected(); // for now
+
+    }
+    return activity.getClear();
+}
+
+IRemoteActivity *createOutputActivity(IPropertyTree &requestTree)
+{
+    IPropertyTree *actNode = requestTree.queryPropTree("node");
+    assertex(actNode);
+    return createRemoteActivity(*actNode);
+}
+
 #define MAX_KEYDATA_SZ 0x10000
 
 class CRemoteFileServer : implements IRemoteFileServer, public CInterface
@@ -4037,8 +4288,8 @@ class CRemoteFileServer : implements IRemoteFileServer, public CInterface
         void processCommand(RemoteFileCommandType cmd, MemoryBuffer &msg, CThrottler *throttler)
         {
             MemoryBuffer reply;
-            parent->processCommand(cmd, msg, initSendBuffer(reply), this, throttler);
-            sendBuffer(socket, reply);
+            bool testSocketFlag = parent->processCommand(cmd, msg, initSendBuffer(reply), this, throttler);
+            sendBuffer(socket, reply, testSocketFlag);
         }
 
         bool immediateCommand() // returns false if socket closed or failure
@@ -5642,6 +5893,179 @@ public:
         return false;
     }
 
+    bool cmdStreamReadTestSocket(MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler &client)
+    {
+        unsigned replyPos = reply.length();
+        try
+        {
+            reply.append('J');
+            return cmdStreamRead(msg, reply, client);
+        }
+        catch (IException *)
+        {
+            reply.rewrite(replyPos);
+            reply.append('-');
+            throw;
+        }
+    }
+
+    bool cmdStreamRead(MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler &client)
+    {
+        // this is an attempt to authenticate when we haven't got authentication turned on
+        if (TF_TRACE_CLIENT_STATS)
+        {
+            StringBuffer s(client.queryPeerName());
+            PROGLOG("Connect from %s",s.str());
+        }
+
+        Owned<IPropertyTree> requestTree = createPTreeFromJSONString(msg.length(), msg.toByteArray());
+
+        /* Example JSON request:
+         * {
+         *  "format" : "xml",
+         *  "cursor" : "1234",
+         *  "node" : {
+         *   "kind" : "diskread",
+         *   "fileName": "examplefilename",
+         *   "keyfilter" : "f1='1    '",
+         *   "choosen" : "5",
+         *   "cursor" : "12345", // cursor handle
+         *   "input" : {
+         *    "f1" : "string5",
+         *    "f2" : "string5"
+         *   },
+         *   "output" : {
+         *    "f2" : "string",
+         *    "f1" : "real"
+         *   }
+         *  }
+         * }
+         *
+         */
+
+        const char *outputFmtStr = requestTree->queryProp("format");
+        int cursorHandle = requestTree->getPropInt("cursor");
+        Owned<IPropertyTree> responseTree; // Used if xml/json
+        OutputFormat outputFormat;
+        if (!outputFmtStr || strieq("xml", outputFmtStr))
+            outputFormat = outFmt_Xml;
+        else if (strieq("json", outputFmtStr))
+            outputFormat = outFmt_Json;
+        else
+            outputFormat = outFmt_Binary;
+        if (outFmt_Binary != outputFormat)
+            responseTree.setown(createPTree("Response"));
+
+        MemoryBuffer cursorMb;
+        if (requestTree->getPropBin("cursorBin", cursorMb))
+            cursorMb.setEndian(BIG_ENDIAN);
+
+        Owned<IRemoteActivity> outputActivity;
+        OpenFileInfo fileInfo;
+        if (!cursorHandle)
+        {
+            // In future this may be passed the request and build a chain of activities and return sink.
+            outputActivity.setown(createOutputActivity(*requestTree));
+
+            StringBuffer requestStr("jsonrequest:");
+            outputActivity->getInfoStr(requestStr);
+            Owned<StringAttrItem> name = new StringAttrItem(requestStr);
+
+            CriticalBlock block(sect);
+            cursorHandle = getNextHandle();
+            client.previdx = client.openFiles.ordinality();
+            client.openFiles.append(OpenFileInfo(cursorHandle, outputActivity, name));
+        }
+        else if (!lookupFileIOHandle(cursorHandle, fileInfo))
+            cursorHandle = 0; // challenge response ..
+        else // known handle, continuation
+            outputActivity.set(fileInfo.activity);
+
+        if (outputActivity && cursorMb.length()) // use handle if one provided
+            outputActivity->restoreCursor(cursorMb);
+
+        if (outFmt_Binary != outputFormat)
+            responseTree->setPropInt("cursor", cursorHandle);
+        else
+            reply.append(cursorHandle);
+        if (cursorHandle)
+        {
+            IOutputMetaData *out = outputActivity->queryOutputMeta();
+            unsigned __int64 initProcessed = outputActivity->queryProcessed();
+            bool eoi=false;
+            if (outFmt_Binary == outputFormat)
+            {
+                DelayedSizeMarker dataLenMarker(reply); // data length
+                for (unsigned __int64 i=0; i<defaultDaFSNumRecs; i++)
+                {
+                    size32_t rowSz;
+                    const void *row = outputActivity->nextRow(rowSz);
+                    if (!row)
+                    {
+                        eoi = true;
+                        break;
+                    }
+                    reply.append(rowSz, row);
+                }
+                dataLenMarker.write();
+            }
+            else
+            {
+                CPropertyTreeWriter iptWriter;
+                for (unsigned __int64 i=0; i<defaultDaFSNumRecs; i++)
+                {
+                    size32_t rowSz;
+                    const void *row = outputActivity->nextRow(rowSz);
+                    if (!row)
+                    {
+                        eoi = true;
+                        break;
+                    }
+                    IPropertyTree *rowNode = responseTree->addPropTree("Row");
+                    iptWriter.setRoot(*rowNode);
+                    out->toXML((const byte *)row, iptWriter);
+                }
+            }
+            if (outFmt_Binary != outputFormat)
+            {
+                if (!eoi)
+                {
+                    MemoryBuffer cursorMb;
+                    cursorMb.setEndian(BIG_ENDIAN);
+                    outputActivity->serializeCursor(cursorMb);
+                    responseTree->setPropBin("cursorBin", cursorMb.length(), cursorMb.toByteArray());
+                }
+            }
+            else
+            {
+                DelayedSizeMarker cursorLenMarker(reply); // cursor length
+                if (!eoi)
+                    outputActivity->serializeCursor(reply);
+                cursorLenMarker.write();
+            }
+        }
+        switch (outputFormat)
+        {
+            case outFmt_Xml:
+            {
+                StringBuffer responseXmlStr;
+                toXML(responseTree, responseXmlStr);
+                reply.append(responseXmlStr.length(), responseXmlStr.str());
+                break;
+            }
+            case outFmt_Json:
+            {
+                StringBuffer responseJsonStr;
+                toJSON(responseTree, responseJsonStr);
+                reply.append(responseJsonStr.length(), responseJsonStr.str());
+                break;
+            }
+            default:
+                break;
+        }
+        return true;
+    }
+
     // legacy version
     bool cmdSetThrottle(MemoryBuffer & msg, MemoryBuffer & reply)
     {
@@ -5740,6 +6164,7 @@ public:
             case RFCgetinfo:
             case RFCfirewall:
             case RFCunlock:
+            case RFCStreamRead:
                 stdCmdThrottler.addCommand(cmd, msg, client);
                 return;
             // NB: The following commands are still bound by the the thread pool
@@ -5758,6 +6183,7 @@ public:
     {
         Owned<CClientStats> stats = clientStatsTable.getClientReference(cmd, client->queryPeerName());
         bool ret = true;
+        bool testSocketFlag = false;
         try
         {
             switch(cmd)
@@ -5797,6 +6223,7 @@ public:
                 MAPCOMMAND(RFCgetinfo, cmdGetInfo);
                 MAPCOMMAND(RFCfirewall, cmdFirewall);
                 MAPCOMMANDCLIENT(RFCunlock, cmdUnlock, *client);
+                MAPCOMMANDCLIENTTESTSOCKET(RFCStreamRead, cmdStreamReadTestSocket, *client);
                 MAPCOMMANDCLIENT(RFCcopysection, cmdCopySection, *client);
                 MAPCOMMANDCLIENTTHROTTLE(RFCtreecopy, cmdTreeCopy, *client, &slowCmdThrottler);
                 MAPCOMMANDCLIENTTHROTTLE(RFCtreecopytmp, cmdTreeCopyTmp, *client, &slowCmdThrottler);
@@ -5816,8 +6243,10 @@ public:
             e->Release();
         }
         if (!ret) // append error string
+        {
             appendError(cmd, client, cmd, reply);
-        return ret;
+        }
+        return testSocketFlag;
     }
 
     IPooledThread *createCommandProcessor()
@@ -6059,8 +6488,8 @@ public:
         if (cmd != RFCgetver)
             cmd = RFCinvalid;
         MemoryBuffer reply;
-        processCommand(cmd, msg, initSendBuffer(reply), NULL, NULL);
-        sendBuffer(socket, reply);
+        bool testSocketFlag = processCommand(cmd, msg, initSendBuffer(reply), NULL, NULL);
+        sendBuffer(socket, reply, testSocketFlag);
     }
 
     bool checkAuthentication(ISocket *socket, IAuthenticatedUser *&ret)

+ 1 - 0
common/thorhelper/roxiehelper.hpp

@@ -18,6 +18,7 @@
 #ifndef ROXIEHELPER_HPP
 #define ROXIEHELPER_HPP
 
+#include "rtlformat.hpp"
 #include "thorherror.h"
 #include "thorxmlwrite.hpp"
 #include "roxiehelper.ipp"

+ 1 - 180
common/thorhelper/thorcommon.cpp

@@ -27,6 +27,7 @@
 #include "thorcommon.ipp"
 #include "eclrtl.hpp"
 #include "rtlread_imp.hpp"
+#include "rtlcommon.hpp"
 #include <algorithm>
 #ifdef _USE_NUMA
 #include <numa.h>
@@ -875,186 +876,6 @@ extern bool isActivitySink(ThorActivityKind kind)
 //=====================================================================================================
 
 
-CThorContiguousRowBuffer::CThorContiguousRowBuffer(ISerialStream * _in) : in(_in)
-{
-    buffer = NULL;
-    maxOffset = 0;
-    readOffset = 0;
-}
-
-void CThorContiguousRowBuffer::doRead(size32_t len, void * ptr)
-{
-    ensureAccessible(readOffset + len);
-    memcpy(ptr, buffer+readOffset, len);
-    readOffset += len;
-}
-
-
-size32_t CThorContiguousRowBuffer::read(size32_t len, void * ptr)
-{
-    doRead(len, ptr);
-    return len;
-}
-
-size32_t CThorContiguousRowBuffer::readSize()
-{
-    size32_t value;
-    doRead(sizeof(value), &value);
-    return value;
-}
-
-size32_t CThorContiguousRowBuffer::readPackedInt(void * ptr)
-{
-    size32_t size = sizePackedInt();
-    doRead(size, ptr);
-    return size;
-}
-
-size32_t CThorContiguousRowBuffer::readUtf8(ARowBuilder & target, size32_t offset, size32_t fixedSize, size32_t len)
-{
-    if (len == 0)
-        return 0;
-
-    size32_t size = sizeUtf8(len);
-    byte * self = target.ensureCapacity(fixedSize + size, NULL);
-    doRead(size, self+offset);
-    return size;
-}
-
-size32_t CThorContiguousRowBuffer::readVStr(ARowBuilder & target, size32_t offset, size32_t fixedSize)
-{
-    size32_t size = sizeVStr();
-    byte * self = target.ensureCapacity(fixedSize + size, NULL);
-    doRead(size, self+offset);
-    return size;
-}
-
-size32_t CThorContiguousRowBuffer::readVUni(ARowBuilder & target, size32_t offset, size32_t fixedSize)
-{
-    size32_t size = sizeVUni();
-    byte * self = target.ensureCapacity(fixedSize + size, NULL);
-    doRead(size, self+offset);
-    return size;
-}
-
-
-size32_t CThorContiguousRowBuffer::sizePackedInt()
-{
-    ensureAccessible(readOffset+1);
-    return rtlGetPackedSizeFromFirst(buffer[readOffset]);
-}
-
-size32_t CThorContiguousRowBuffer::sizeUtf8(size32_t len)
-{
-    if (len == 0)
-        return 0;
-
-    //The len is the number of utf characters, size depends on which characters are included.
-    size32_t nextOffset = readOffset;
-    while (len)
-    {
-        ensureAccessible(nextOffset+1);
-
-        for (;nextOffset < maxOffset;)
-        {
-            nextOffset += readUtf8Size(buffer+nextOffset);  // This function only accesses the first byte
-            if (--len == 0)
-                break;
-        }
-    }
-    return nextOffset - readOffset;
-}
-
-size32_t CThorContiguousRowBuffer::sizeVStr()
-{
-    size32_t nextOffset = readOffset;
-    for (;;)
-    {
-        ensureAccessible(nextOffset+1);
-
-        for (; nextOffset < maxOffset; nextOffset++)
-        {
-            if (buffer[nextOffset] == 0)
-                return (nextOffset + 1) - readOffset;
-        }
-    }
-}
-
-size32_t CThorContiguousRowBuffer::sizeVUni()
-{
-    size32_t nextOffset = readOffset;
-    const size32_t sizeOfUChar = 2;
-    for (;;)
-    {
-        ensureAccessible(nextOffset+sizeOfUChar);
-
-        for (; nextOffset+1 < maxOffset; nextOffset += sizeOfUChar)
-        {
-            if (buffer[nextOffset] == 0 && buffer[nextOffset+1] == 0)
-                return (nextOffset + sizeOfUChar) - readOffset;
-        }
-    }
-}
-
-
-void CThorContiguousRowBuffer::reportReadFail()
-{
-    throwUnexpected();
-}
-
-
-const byte * CThorContiguousRowBuffer::peek(size32_t maxSize)
-{
-    if (maxSize+readOffset > maxOffset)
-        doPeek(maxSize+readOffset);
-    return buffer + readOffset;
-}
-
-offset_t CThorContiguousRowBuffer::beginNested()
-{
-    size32_t len = readSize();
-    return len+readOffset;
-}
-
-bool CThorContiguousRowBuffer::finishedNested(offset_t & endPos)
-{
-    return readOffset >= endPos;
-}
-
-void CThorContiguousRowBuffer::skip(size32_t size)
-{ 
-    ensureAccessible(readOffset+size);
-    readOffset += size;
-}
-
-void CThorContiguousRowBuffer::skipPackedInt()
-{
-    size32_t size = sizePackedInt();
-    ensureAccessible(readOffset+size);
-    readOffset += size;
-}
-
-void CThorContiguousRowBuffer::skipUtf8(size32_t len)
-{
-    size32_t size = sizeUtf8(len);
-    ensureAccessible(readOffset+size);
-    readOffset += size;
-}
-
-void CThorContiguousRowBuffer::skipVStr()
-{
-    size32_t size = sizeVStr();
-    ensureAccessible(readOffset+size);
-    readOffset += size;
-}
-
-void CThorContiguousRowBuffer::skipVUni()
-{
-    size32_t size = sizeVUni();
-    ensureAccessible(readOffset+size);
-    readOffset += size;
-}
-
 // ===========================================
 
 IRowInterfaces *createRowInterfaces(IOutputMetaData *meta, unsigned actid, unsigned heapFlags, ICodeContext *context)

+ 0 - 88
common/thorhelper/thorcommon.ipp

@@ -693,94 +693,6 @@ class NullDiskCallback : public IThorDiskCallback, extends CInterface
 
 extern THORHELPER_API size32_t cloneRow(ARowBuilder & rowBuilder, const void * row, IOutputMetaData * meta);
 
-//The CThorContiguousRowBuffer is the source for a readAhead call to ensure the entire row
-//is in a contiguous block of memory.  The read() and skip() functions must be implemented
-class THORHELPER_API CThorContiguousRowBuffer : implements IRowDeserializerSource
-{
-public:
-    CThorContiguousRowBuffer(ISerialStream * _in);
-
-    inline void setStream(ISerialStream *_in) { in.set(_in); maxOffset = 0; readOffset = 0; }
-
-    virtual const byte * peek(size32_t maxSize);
-    virtual offset_t beginNested();
-    virtual bool finishedNested(offset_t & len);
-
-    virtual size32_t read(size32_t len, void * ptr);
-    virtual size32_t readSize();
-    virtual size32_t readPackedInt(void * ptr);
-    virtual size32_t readUtf8(ARowBuilder & target, size32_t offset, size32_t fixedSize, size32_t len);
-    virtual size32_t readVStr(ARowBuilder & target, size32_t offset, size32_t fixedSize);
-    virtual size32_t readVUni(ARowBuilder & target, size32_t offset, size32_t fixedSize);
-
-    //These shouldn't really be called since this class is meant to be used for a deserialize.
-    //If we allowed padding/alignment fields in the input then the first function would make sense.
-    virtual void skip(size32_t size);
-    virtual void skipPackedInt();
-    virtual void skipUtf8(size32_t len);
-    virtual void skipVStr();
-    virtual void skipVUni();
-
-    inline bool eos()
-    {
-        return in->eos();
-    }
-
-    inline offset_t tell()
-    {
-        return in->tell();
-    }
-
-    inline void clearStream()
-    {
-        in.clear();
-        maxOffset = 0;
-        readOffset = 0;
-    }
-
-    inline const byte * queryRow() { return buffer; }
-    inline size32_t queryRowSize() { return readOffset; }
-    inline void finishedRow()
-    {
-        if (readOffset)
-            in->skip(readOffset);
-        maxOffset = 0;
-        readOffset = 0;
-    }
-
-
-protected:
-    size32_t sizePackedInt();
-    size32_t sizeUtf8(size32_t len);
-    size32_t sizeVStr();
-    size32_t sizeVUni();
-    void reportReadFail();
-
-private:
-    inline void doPeek(size32_t maxSize)
-    {
-        buffer = static_cast<const byte *>(in->peek(maxSize, maxOffset));
-    }
-
-    void doRead(size32_t len, void * ptr);
-
-    inline void ensureAccessible(size32_t required)
-    {
-        if (required > maxOffset)
-        {
-            doPeek(required);
-            assertex(required <= maxOffset);
-        }
-    }
-
-protected:
-    Linked<ISerialStream> in;
-    const byte * buffer;
-    size32_t maxOffset;
-    size32_t readOffset;
-};
-
-
 //=====================================================================================================
 
 class ChildRowLinkerWalker : implements IIndirectMemberVisitor

+ 1 - 0
common/thorhelper/thorpipe.cpp

@@ -23,6 +23,7 @@
 #include "csvsplitter.hpp"
 #include "rtlread_imp.hpp"
 #include "rtlds_imp.hpp"
+#include "rtlformat.hpp"
 #include "roxiemem.hpp"
 
 using roxiemem::OwnedRoxieString;

+ 2 - 0
common/thorhelper/thorsoapcall.cpp

@@ -19,6 +19,8 @@
 #include "jqueue.tpp"
 #include "jisem.hpp"
 
+#include "rtlformat.hpp"
+
 #include "thorxmlread.hpp"
 #include "thorxmlwrite.hpp"
 #include "thorcommon.ipp"

Filskillnaden har hållts tillbaka eftersom den är för stor
+ 1 - 1844
common/thorhelper/thorxmlwrite.cpp


+ 0 - 518
common/thorhelper/thorxmlwrite.hpp

@@ -28,303 +28,6 @@
 #include "jptree.hpp"
 #include "thorhelper.hpp"
 
-interface IXmlStreamFlusher
-{
-    virtual void flushXML(StringBuffer &current, bool isClose) = 0;
-};
-
-interface IXmlWriterExt : extends IXmlWriter
-{
-    virtual IXmlWriterExt & clear() = 0;
-    virtual size32_t length() const = 0;
-    virtual const char *str() const = 0;
-    virtual IInterface *saveLocation() const = 0;
-    virtual void rewindTo(IInterface *location) = 0;
-    virtual void cutFrom(IInterface *location, StringBuffer& databuf) = 0;
-    virtual void outputNumericString(const char *field, const char *fieldname) = 0;
-    virtual void outputInline(const char* text) = 0;
-};
-
-class thorhelper_decl CommonXmlPosition : public CInterface, implements IInterface
-{
-public:
-    IMPLEMENT_IINTERFACE;
-
-    CommonXmlPosition(size32_t _pos, unsigned _indent, unsigned _nestLimit, bool _tagClosed, bool _needDelimiter) :
-        pos(_pos), indent(_indent), nestLimit(_nestLimit), tagClosed(_tagClosed), needDelimiter(_needDelimiter)
-    {}
-
-public:
-    size32_t pos = 0;
-    unsigned indent = 0;
-    unsigned nestLimit = 0;
-    bool tagClosed = false;
-    bool needDelimiter = false;
-};
-
-class thorhelper_decl CommonXmlWriter : implements IXmlWriterExt, public CInterface
-{
-public:
-    CommonXmlWriter(unsigned _flags, unsigned initialIndent=0,  IXmlStreamFlusher *_flusher=NULL);
-    ~CommonXmlWriter();
-    IMPLEMENT_IINTERFACE;
-
-    void outputBeginNested(const char *fieldname, bool nestChildren, bool doIndent);
-    void outputEndNested(const char *fieldname, bool doIndent);
-
-    virtual void outputInlineXml(const char *text){closeTag(); out.append(text); flush(false);} //for appending raw xml content
-    virtual void outputInline(const char* text) { outputInlineXml(text); }
-    virtual void outputQuoted(const char *text);
-    virtual void outputQString(unsigned len, const char *field, const char *fieldname);
-    virtual void outputString(unsigned len, const char *field, const char *fieldname);
-    virtual void outputBool(bool field, const char *fieldname);
-    virtual void outputData(unsigned len, const void *field, const char *fieldname);
-    virtual void outputInt(__int64 field, unsigned size, const char *fieldname);
-    virtual void outputUInt(unsigned __int64 field, unsigned size, const char *fieldname);
-    virtual void outputReal(double field, const char *fieldname);
-    virtual void outputDecimal(const void *field, unsigned size, unsigned precision, const char *fieldname);
-    virtual void outputUDecimal(const void *field, unsigned size, unsigned precision, const char *fieldname);
-    virtual void outputUnicode(unsigned len, const UChar *field, const char *fieldname);
-    virtual void outputUtf8(unsigned len, const char *field, const char *fieldname);
-    virtual void outputBeginDataset(const char *dsname, bool nestChildren);
-    virtual void outputEndDataset(const char *dsname);
-    virtual void outputBeginNested(const char *fieldname, bool nestChildren);
-    virtual void outputEndNested(const char *fieldname);
-    virtual void outputBeginArray(const char *fieldname){}; //repeated elements are inline for xml
-    virtual void outputEndArray(const char *fieldname){};
-    virtual void outputSetAll();
-    virtual void outputXmlns(const char *name, const char *uri);
-
-    //IXmlWriterExt
-    virtual IXmlWriterExt & clear();
-    virtual unsigned length() const                                 { return out.length(); }
-    virtual const char * str() const                                { return out.str(); }
-    virtual IInterface *saveLocation() const
-    {
-        if (flusher)
-            throwUnexpected();
-
-        return new CommonXmlPosition(length(), indent, nestLimit, tagClosed, false);
-    }
-    virtual void rewindTo(IInterface *saved)
-    {
-        if (flusher)
-            throwUnexpected();
-
-        CommonXmlPosition *position = dynamic_cast<CommonXmlPosition *>(saved);
-        if (!position)
-            return;
-        if (position->pos < out.length())
-        {
-            out.setLength(position->pos);
-            tagClosed = position->tagClosed;
-            indent = position->indent;
-            nestLimit = position->nestLimit;
-        }
-    }
-    virtual void cutFrom(IInterface *location, StringBuffer& databuf);
-
-    virtual void outputNumericString(const char *field, const char *fieldname)
-    {
-        outputCString(field, fieldname);
-    }
-
-protected:
-    bool checkForAttribute(const char * fieldname);
-    void closeTag();
-    inline void flush(bool isClose)
-    {
-        if (flusher)
-            flusher->flushXML(out, isClose);
-    }
-
-protected:
-    IXmlStreamFlusher *flusher;
-    StringBuffer out;
-    unsigned flags;
-    unsigned indent;
-    unsigned nestLimit;
-    bool tagClosed;
-};
-
-class thorhelper_decl CommonJsonWriter : implements IXmlWriterExt, public CInterface
-{
-public:
-    CommonJsonWriter(unsigned _flags, unsigned initialIndent=0,  IXmlStreamFlusher *_flusher=NULL);
-    ~CommonJsonWriter();
-    IMPLEMENT_IINTERFACE;
-
-    void checkDelimit(int inc=0);
-    void checkFormat(bool doDelimit, bool needDelimiter=true, int inc=0);
-    void prepareBeginArray(const char *fieldname);
-
-    virtual void outputInlineXml(const char *text) //for appending raw xml content
-    {
-        if (text && *text)
-            outputUtf8(strlen(text), text, "xml");
-    }
-    virtual void outputInline(const char* text) { out.append(text); }
-    virtual void outputQuoted(const char *text);
-    virtual void outputQString(unsigned len, const char *field, const char *fieldname);
-    virtual void outputString(unsigned len, const char *field, const char *fieldname);
-    virtual void outputBool(bool field, const char *fieldname);
-    virtual void outputData(unsigned len, const void *field, const char *fieldname);
-    virtual void outputInt(__int64 field, unsigned size, const char *fieldname);
-    virtual void outputUInt(unsigned __int64 field, unsigned size, const char *fieldname);
-    virtual void outputReal(double field, const char *fieldname);
-    virtual void outputDecimal(const void *field, unsigned size, unsigned precision, const char *fieldname);
-    virtual void outputUDecimal(const void *field, unsigned size, unsigned precision, const char *fieldname);
-    virtual void outputUnicode(unsigned len, const UChar *field, const char *fieldname);
-    virtual void outputUtf8(unsigned len, const char *field, const char *fieldname);
-    virtual void outputBeginDataset(const char *dsname, bool nestChildren);
-    virtual void outputEndDataset(const char *dsname);
-    virtual void outputBeginNested(const char *fieldname, bool nestChildren);
-    virtual void outputEndNested(const char *fieldname);
-    virtual void outputBeginArray(const char *fieldname);
-    virtual void outputEndArray(const char *fieldname);
-    virtual void outputSetAll();
-    virtual void outputXmlns(const char *name, const char *uri){}
-    virtual void outputNumericString(const char *field, const char *fieldname);
-
-    //IXmlWriterExt
-    virtual IXmlWriterExt & clear();
-    virtual unsigned length() const                                 { return out.length(); }
-    virtual const char * str() const                                { return out.str(); }
-    virtual void rewindTo(unsigned int prevlen)                     { if (prevlen < out.length()) out.setLength(prevlen); }
-    virtual IInterface *saveLocation() const
-    {
-        if (flusher)
-            throwUnexpected();
-
-        return new CommonXmlPosition(length(), indent, nestLimit, false, needDelimiter);
-    }
-    virtual void rewindTo(IInterface *saved)
-    {
-        if (flusher)
-            throwUnexpected();
-
-        CommonXmlPosition *position = dynamic_cast<CommonXmlPosition *>(saved);
-        if (!position)
-            return;
-        if (position->pos < out.length())
-        {
-            out.setLength(position->pos);
-            needDelimiter = position->needDelimiter;
-            indent = position->indent;
-            nestLimit = position->nestLimit;
-        }
-    }
-    virtual void cutFrom(IInterface *location, StringBuffer& databuf);
-
-    void outputBeginRoot(){out.append('{');}
-    void outputEndRoot(){out.append('}');}
-
-protected:
-    inline void flush(bool isClose)
-    {
-        if (flusher)
-            flusher->flushXML(out, isClose);
-    }
-
-    class CJsonWriterItem : public CInterface
-    {
-    public:
-        CJsonWriterItem(const char *_name) : name(_name), depth(0){}
-
-        StringAttr name;
-        unsigned depth;
-    };
-
-    const char *checkItemName(CJsonWriterItem *item, const char *name, bool simpleType=true);
-    const char *checkItemName(const char *name, bool simpleType=true);
-    const char *checkItemNameBeginNested(const char *name);
-    const char *checkItemNameEndNested(const char *name);
-    bool checkUnamedArrayItem(bool begin);
-
-
-    IXmlStreamFlusher *flusher;
-    CIArrayOf<CJsonWriterItem> arrays;
-    StringBuffer out;
-    unsigned flags;
-    unsigned indent;
-    unsigned nestLimit;
-    bool needDelimiter;
-};
-
-thorhelper_decl StringBuffer &buildJsonHeader(StringBuffer  &header, const char *suppliedHeader, const char *rowTag);
-thorhelper_decl StringBuffer &buildJsonFooter(StringBuffer  &footer, const char *suppliedFooter, const char *rowTag);
-
-//Writes type encoded XML strings  (xsi:type="xsd:string", xsi:type="xsd:boolean" etc)
-class thorhelper_decl CommonEncodedXmlWriter : public CommonXmlWriter
-{
-public:
-    CommonEncodedXmlWriter(unsigned _flags, unsigned initialIndent=0, IXmlStreamFlusher *_flusher=NULL);
-
-    virtual void outputString(unsigned len, const char *field, const char *fieldname);
-    virtual void outputBool(bool field, const char *fieldname);
-    virtual void outputData(unsigned len, const void *field, const char *fieldname);
-    virtual void outputInt(__int64 field, unsigned size, const char *fieldname);
-    virtual void outputUInt(unsigned __int64 field, unsigned size, const char *fieldname);
-    virtual void outputReal(double field, const char *fieldname);
-    virtual void outputDecimal(const void *field, unsigned size, unsigned precision, const char *fieldname);
-    virtual void outputUDecimal(const void *field, unsigned size, unsigned precision, const char *fieldname);
-    virtual void outputUnicode(unsigned len, const UChar *field, const char *fieldname);
-    virtual void outputUtf8(unsigned len, const char *field, const char *fieldname);
-};
-
-//Writes all encoded DATA fields as base64Binary
-class thorhelper_decl CommonEncoded64XmlWriter : public CommonEncodedXmlWriter
-{
-public:
-    CommonEncoded64XmlWriter(unsigned _flags, unsigned initialIndent=0, IXmlStreamFlusher *_flusher=NULL);
-
-    virtual void outputData(unsigned len, const void *field, const char *fieldname);
-};
-
-enum XMLWriterType{WTStandard, WTEncoding, WTEncodingData64, WTJSON} ;
-thorhelper_decl CommonXmlWriter * CreateCommonXmlWriter(unsigned _flags, unsigned initialIndent=0, IXmlStreamFlusher *_flusher=NULL, XMLWriterType xmlType=WTStandard);
-thorhelper_decl IXmlWriterExt * createIXmlWriterExt(unsigned _flags, unsigned initialIndent=0, IXmlStreamFlusher *_flusher=NULL, XMLWriterType xmlType=WTStandard);
-
-class thorhelper_decl SimpleOutputWriter : implements IXmlWriter, public CInterface
-{
-    void outputFieldSeparator();
-    bool separatorNeeded;
-public:
-    SimpleOutputWriter();
-    IMPLEMENT_IINTERFACE;
-
-    SimpleOutputWriter & clear();
-    unsigned length() const                                 { return out.length(); }
-    const char * str() const                                { return out.str(); }
-
-    virtual void outputQuoted(const char *text);
-    virtual void outputQString(unsigned len, const char *field, const char *fieldname);
-    virtual void outputString(unsigned len, const char *field, const char *fieldname);
-    virtual void outputBool(bool field, const char *fieldname);
-    virtual void outputData(unsigned len, const void *field, const char *fieldname);
-    virtual void outputReal(double field, const char *fieldname);
-    virtual void outputDecimal(const void *field, unsigned size, unsigned precision, const char *fieldname);
-    virtual void outputUDecimal(const void *field, unsigned size, unsigned precision, const char *fieldname);
-    virtual void outputUnicode(unsigned len, const UChar *field, const char *fieldname);
-    virtual void outputUtf8(unsigned len, const char *field, const char *fieldname);
-    virtual void outputBeginNested(const char *fieldname, bool nestChildren);
-    virtual void outputEndNested(const char *fieldname);
-    virtual void outputBeginDataset(const char *dsname, bool nestChildren){}
-    virtual void outputEndDataset(const char *dsname){}
-    virtual void outputBeginArray(const char *fieldname){}
-    virtual void outputEndArray(const char *fieldname){}
-    virtual void outputSetAll();
-    virtual void outputInlineXml(const char *text){} //for appending raw xml content
-    virtual void outputXmlns(const char *name, const char *uri){}
-
-    virtual void outputInt(__int64 field, unsigned size, const char *fieldname);
-    virtual void outputUInt(unsigned __int64 field, unsigned size, const char *fieldname);
-
-    void newline();
-
-protected:
-    StringBuffer out;
-};
 
 class thorhelper_decl CommonFieldProcessor : implements IFieldProcessor, public CInterface
 {
@@ -359,226 +62,5 @@ extern thorhelper_decl void printKeyedValues(StringBuffer &out, IIndexReadContex
 extern thorhelper_decl void convertRowToXML(size32_t & lenResult, char * & result, IOutputMetaData & info, const void * row, unsigned flags = (unsigned)-1);
 extern thorhelper_decl void convertRowToJSON(size32_t & lenResult, char * & result, IOutputMetaData & info, const void * row, unsigned flags = (unsigned)-1);
 
-struct CSVOptions
-{
-    StringAttr delimiter, terminator;
-    bool includeHeader;
-};
-
-class CCSVItem : public CInterface, implements IInterface
-{
-    unsigned columnID, nextRowID, rowCount, nestedLayer;
-    StringAttr name, type, value, parentXPath;
-    StringArray childNames;
-    MapStringTo<bool> childNameMap;
-    bool isNestedItem, simpleNested, currentRowEmpty, outputHeader;
-public:
-    CCSVItem() : columnID(0), nestedLayer(0), nextRowID(0), rowCount(0), isNestedItem(false),
-        simpleNested(false), currentRowEmpty(true) { };
-
-    IMPLEMENT_IINTERFACE;
-    inline const char* getName() const { return name.get(); };
-    inline void setName(const char* _name) { name.set(_name); };
-    inline const char* getValue() const { return value.get(); };
-    inline void setValue(const char* _value) { value.set(_value); };
-    inline unsigned getColumnID() const { return columnID; };
-    inline void setColumnID(unsigned _columnID) { columnID = _columnID; };
-
-    inline unsigned getNextRowID() const { return nextRowID; };
-    inline void setNextRowID(unsigned _rowID) { nextRowID = _rowID; };
-    inline void incrementNextRowID() { nextRowID++; };
-    inline unsigned getRowCount() const { return rowCount; };
-    inline void setRowCount(unsigned _rowCount) { rowCount = _rowCount; };
-    inline void incrementRowCount() { rowCount++; };
-    inline bool getCurrentRowEmpty() const { return currentRowEmpty; };
-    inline void setCurrentRowEmpty(bool _currentRowEmpty) { currentRowEmpty = _currentRowEmpty; };
-
-    inline unsigned getNestedLayer() const { return nestedLayer; };
-    inline void setNestedLayer(unsigned _nestedLayer) { nestedLayer = _nestedLayer; };
-    inline bool checkIsNestedItem() const { return isNestedItem; };
-    inline void setIsNestedItem(bool _isNestedItem) { isNestedItem = _isNestedItem; };
-    inline bool checkSimpleNested() const { return simpleNested; };
-    inline void setSimpleNested(bool _simpleNested) { simpleNested = _simpleNested; };
-    inline bool checkOutputHeader() const { return outputHeader; };
-    inline void setOutputHeader(bool _outputHeader) { outputHeader = _outputHeader; };
-    inline const char* getParentXPath() const { return parentXPath.str(); };
-    inline void setParentXPath(const char* _parentXPath) { parentXPath.set(_parentXPath); };
-    inline StringArray& getChildrenNames() { return childNames; };
-    inline void addChildName(const char* name)
-    {
-        if (hasChildName(name))
-            return;
-        childNameMap.setValue(name, true);
-        childNames.append(name);
-    };
-    inline bool hasChildName(const char* name)
-    {
-        bool* found = childNameMap.getValue(name);
-        return (found && *found);
-    };
-    inline void clearContentVariables()
-    {
-        nextRowID = rowCount = 0;
-        currentRowEmpty = true;
-    };
-};
-
-class CCSVRow : public CInterface, implements IInterface
-{
-    unsigned rowID;
-    CIArrayOf<CCSVItem> columns;
-public:
-    CCSVRow(unsigned _rowID) : rowID(_rowID) {};
-    IMPLEMENT_IINTERFACE;
-
-    inline unsigned getRowID() const { return rowID; };
-    inline void setRowID(unsigned _rowID) { rowID = _rowID; };
-    inline unsigned getColumnCount() const { return columns.length(); };
-
-    const char* getColumnValue(unsigned columnID) const;
-    void setColumn(unsigned columnID, const char* columnName, const char* columnValue);
-};
-
-//CommonCSVWriter is used to output a WU result in CSV format.
-//Read CSV header information;
-//If needed, output CSV headers into the 'out' buffer;
-//Read each row (a record) of the WU result and output into the 'out' buffer;
-//The 'out' buffer can be accessed through the str() method.
-class thorhelper_decl CommonCSVWriter: public CInterface, implements IXmlWriterExt
-{
-    class CXPathItem : public CInterface, implements IInterface
-    {
-        bool isArray;
-        StringAttr path;
-    public:
-        CXPathItem(const char* _path, bool _isArray) : path(_path), isArray(_isArray) { };
-
-        IMPLEMENT_IINTERFACE;
-        inline const char* getPath() const { return path.get(); };
-        inline bool getIsArray() const { return isArray; };
-    };
-    CSVOptions options;
-    bool readingCSVHeader, addingSimpleNestedContent;
-    unsigned recordCount, headerColumnID, nestedHeaderLayerID;
-    StringBuffer currentParentXPath, auditOut;
-    StringArray headerXPathList;
-    MapStringTo<bool> topHeaderNameMap;
-    MapStringToMyClass<CCSVItem> csvItems;
-    CIArrayOf<CCSVRow> contentRowsBuffer;
-    CIArrayOf<CXPathItem> dataXPath;//xpath in caller
-
-    void escapeQuoted(unsigned len, char const* in, StringBuffer& out);
-    bool checkHeaderName(const char* name);
-    CCSVItem* getParentCSVItem();
-    CCSVItem* getCSVItemByFieldName(const char* name);
-    void addColumnToRow(CIArrayOf<CCSVRow>& rows, unsigned rowID, unsigned colID, const char* columnValue, const char* columnName);
-    void addCSVHeader(const char* name, const char* type, bool isNested, bool simpleNested, bool outputHeader);
-    void addContentField(const char* field, const char* fieldName);
-    void addStringField(unsigned len, const char* field, const char* fieldName);
-    void setChildrenNextRowID(const char* path, unsigned rowID);
-    unsigned getChildrenMaxNextRowID(const char* path);
-    unsigned getChildrenMaxColumnID(CCSVItem* item, unsigned& maxColumnID);
-    void addChildNameToParentCSVItem(const char* name);
-    void setParentItemRowEmpty(CCSVItem* item, bool empty);
-    void addFieldToParentXPath(const char* fieldName);
-    void removeFieldFromCurrentParentXPath(const char* fieldName);
-    void appendDataXPathItem(const char* fieldName, bool isArray);
-    bool isDataRow(const char* fieldName);
-    void outputCSVRows(CIArrayOf<CCSVRow>& rows, bool isHeader);
-    void outputHeadersToBuffer();
-    void finishContentResultRow();
-
-    void auditHeaderInfo()
-    {
-        ForEachItemIn(i, headerXPathList)
-        {
-            const char* path = headerXPathList.item(i);
-            CCSVItem* item = csvItems.getValue(path);
-            if (!item)
-                continue;
-            if (!item->checkIsNestedItem())
-            {
-                auditOut.appendf("dumpHeaderInfo path<%s> next row<%d> col<%d>: name<%s> - value<%s>\n", path, item->getNextRowID(),
-                    item->getColumnID(), item->getName() ? item->getName() : "", item->getValue() ? item->getValue() : "");
-            }
-            else
-            {
-                auditOut.appendf("dumpHeaderInfo path<%s> next row<%d> col<%d>: name<%s> - value<%s>\n", path, item->getNextRowID(),
-                    item->getColumnID(), item->getName() ? item->getName() : "", item->getValue() ? item->getValue() : "");
-            }
-        }
-    }
-
-public:
-    CommonCSVWriter(unsigned _flags, CSVOptions& _options, IXmlStreamFlusher* _flusher = NULL);
-    ~CommonCSVWriter();
-
-    IMPLEMENT_IINTERFACE;
-
-    inline void flush(bool isClose)
-    {
-        if (flusher)
-            flusher->flushXML(out, isClose);
-    }
-
-    virtual unsigned length() const { return out.length(); }
-    virtual const char* str() const { return out.str(); }
-    virtual void rewindTo(IInterface* location) { };
-    virtual void cutFrom(IInterface *location, StringBuffer& databuf) { };
-    virtual IInterface* saveLocation() const
-    {
-        if (flusher)
-            throwUnexpected();
-        return NULL;
-    };
-
-    //IXmlWriter
-    virtual void outputString(unsigned len, const char* field, const char* fieldName);
-    virtual void outputBool(bool field, const char* fieldName);
-    virtual void outputData(unsigned len, const void* field, const char* fieldName);
-    virtual void outputInt(__int64 field, unsigned size, const char* fieldName);
-    virtual void outputUInt(unsigned __int64 field, unsigned size, const char* fieldName);
-    virtual void outputReal(double field, const char *fieldName);
-    virtual void outputDecimal(const void* field, unsigned size, unsigned precision, const char* fieldName);
-    virtual void outputUDecimal(const void* field, unsigned size, unsigned precision, const char* fieldName);
-    virtual void outputUnicode(unsigned len, const UChar* field, const char* fieldName);
-    virtual void outputQString(unsigned len, const char* field, const char* fieldName);
-    virtual void outputUtf8(unsigned len, const char* field, const char* fieldName);
-    virtual void outputBeginNested(const char* fieldName, bool simpleNested);
-    virtual void outputEndNested(const char* fieldName);
-    virtual void outputBeginDataset(const char* dsname, bool nestChildren);
-    virtual void outputEndDataset(const char* dsname);
-    virtual void outputBeginArray(const char* fieldName);
-    virtual void outputEndArray(const char* fieldName);
-    virtual void outputSetAll() { };
-    virtual void outputXmlns(const char* name, const char* uri) { };
-    virtual void outputQuoted(const char* text)
-    {
-        //No fieldName. Is it valid for CSV?
-    };
-    virtual void outputInlineXml(const char* text)//for appending raw xml content
-    {
-        //Dynamically add a new header 'xml' and insert the header.
-        //But, not sure we want to do that for a big WU result.
-        //if (text && *text)
-          //outputUtf8(strlen(text), text, "xml");
-    };
-    virtual void outputInline(const char* text) { out.append(text); }
-
-    //IXmlWriterExt
-    virtual void outputNumericString(const char* field, const char* fieldName);
-    virtual IXmlWriterExt& clear();
-
-    void outputBeginNested(const char* fieldName, bool simpleNested, bool outputHeader);
-    void outputEndNested(const char* fieldName, bool outputHeader);
-    void outputCSVHeader(const char* name, const char* type);
-    void finishCSVHeaders();
-    const char* auditStr() const { return auditOut.str(); }
-
-protected:
-    IXmlStreamFlusher* flusher;
-    StringBuffer out;
-    unsigned flags;
-};
 
 #endif // THORXMLWRITE_HPP

+ 1 - 0
ecl/eclagent/eclagent.cpp

@@ -37,6 +37,7 @@
 #include "hqlplugins.hpp"
 #include "eclrtl_imp.hpp"
 #include "rtlds_imp.hpp"
+#include "rtlcommon.hpp"
 #include "workunit.hpp"
 #include "eventqueue.hpp"
 #include "schedulectrl.hpp"

+ 1 - 0
ecl/hthor/hthor.ipp

@@ -38,6 +38,7 @@
 #include "thorcommon.ipp"
 #include "roxielmj.hpp"
 #include "eclrtl_imp.hpp"
+#include "rtlcommon.hpp"
 #include "rtlds_imp.hpp"
 #include "rtlread_imp.hpp"
 #include "rtlrecord.hpp"

+ 2 - 0
esp/esdllib/esdl_transformer2.cpp

@@ -21,6 +21,8 @@
 #include "xpp/xpputils.h"
 #include <memory>
 
+#include "rtlformat.hpp"
+
 #include "eclhelper.hpp"    //IXMLWriter
 #include "thorxmlwrite.hpp" //JSON WRITER
 #include "eclrtl.hpp"

+ 2 - 0
esp/services/esdl_svc_engine/esdl_binding.cpp

@@ -22,6 +22,8 @@
 #include "wsexcept.hpp"
 #include "httpclient.hpp"
 
+#include "rtlformat.hpp"
+
 //for dali communication
 #include "daclient.hpp"
 #include "dalienv.hpp"

+ 2 - 0
esp/services/ws_workunits/ws_workunitsService.cpp

@@ -45,6 +45,8 @@
 #include "fvdatasource.hpp"
 #include "fvresultset.ipp"
 
+#include "rtlformat.hpp"
+
 #include "package.h"
 
 #ifdef _USE_ZLIB

+ 1 - 0
roxie/ccd/ccd.hpp

@@ -30,6 +30,7 @@
 #include "roxie.hpp"
 #include "roxiedebug.ipp"
 #include "eclrtl.hpp"
+#include "rtlformat.hpp"
 #include "workunit.hpp"
 #include "layouttrans.hpp"
 

+ 1 - 0
roxie/ccd/ccdactivities.cpp

@@ -32,6 +32,7 @@
 #include "rtlkey.hpp"
 #include "eclrtl_imp.hpp"
 #include "rtlread_imp.hpp"
+#include "rtlcommon.hpp"
 
 #include "jhtree.hpp"
 #include "jlog.hpp"

+ 2 - 0
roxie/ccd/ccdmain.cpp

@@ -29,6 +29,8 @@
 #include "jutil.hpp"
 #include <build-config.h>
 
+#include "rtlformat.hpp"
+
 #include "dalienv.hpp"
 #include "rmtfile.hpp"
 #include "ccd.hpp"

+ 2 - 0
roxie/ccd/ccdprotocol.cpp

@@ -19,6 +19,8 @@
 #include "jlib.hpp"
 #include "jthread.hpp"
 
+#include "rtlcommon.hpp"
+
 #include "roxie.hpp"
 #include "roxiehelper.hpp"
 #include "ccdprotocol.hpp"

+ 1 - 0
roxie/ccd/ccdserver.cpp

@@ -41,6 +41,7 @@
 #include "rtlfield.hpp"
 #include "rtlds_imp.hpp"
 #include "rtlread_imp.hpp"
+#include "rtlcommon.hpp"
 
 #include <algorithm>
 

+ 3 - 0
rtl/eclrtl/CMakeLists.txt

@@ -44,6 +44,8 @@ set (    SRCS
          rtlrecord.cpp
          rtltype.cpp 
          rtlxml.cpp
+         rtlcommon.cpp
+         rtlformat.cpp
          
          eclinclude4.hpp
          eclrtl.hpp
@@ -66,6 +68,7 @@ set (    SRCS
 include_directories ( 
          ./../../rtl/include 
          ./../../rtl/nbcd 
+         ./../../rtl/eclrtl 
          ./../../system/include 
          ./../../system/jlib 
          ./../../roxie/roxiemem

+ 14 - 0
rtl/eclrtl/eclhelper_dyn.cpp

@@ -69,6 +69,20 @@ CDeserializedOutputMetaData::CDeserializedOutputMetaData(const char *json)
     typeInfo = deserializer->deserialize(json);
 }
 
+extern ECLRTL_API IOutputMetaData *createTypeInfoOutputMetaData(MemoryBuffer &binInfo)
+{
+    return new CDeserializedOutputMetaData(binInfo);
+}
+
+extern ECLRTL_API IOutputMetaData *createTypeInfoOutputMetaData(IPropertyTree &jsonInfo)
+{
+    return new CDeserializedOutputMetaData(jsonInfo);
+}
+
+extern ECLRTL_API IOutputMetaData *createTypeInfoOutputMetaData(const char *json)
+{
+    return new CDeserializedOutputMetaData(json);
+}
 //---------------------------------------------------------------------------------------------------------------------
 
 class ECLRTL_API CDynamicDiskReadArg : public CThorDiskReadArg

+ 5 - 0
rtl/eclrtl/eclhelper_dyn.hpp

@@ -22,7 +22,12 @@
 #include "eclrtl.hpp"
 #include "eclhelper.hpp"
 
+extern ECLRTL_API IOutputMetaData *createTypeInfoOutputMetaData(MemoryBuffer &mb);
+extern ECLRTL_API IOutputMetaData *createTypeInfoOutputMetaData(IPropertyTree &jsonTree);
+extern ECLRTL_API IOutputMetaData *createTypeInfoOutputMetaData(const char *json);
+
 extern ECLRTL_API IHThorDiskReadArg *createDiskReadArg(IPropertyTree &xgmml);
+extern ECLRTL_API IHThorDiskReadArg *createDiskReadArg(const char *fileName, IOutputMetaData *in, IOutputMetaData *out, unsigned __int64 chooseN, unsigned __int64 skipN, unsigned __int64 rowLimit);
 extern ECLRTL_API IHThorArg *createWorkunitWriteArg(IPropertyTree &xgmml);
 extern ECLRTL_API IEclProcess* createDynamicEclProcess();
 

+ 185 - 0
rtl/eclrtl/rtlcommon.cpp

@@ -0,0 +1,185 @@
+#include "jiface.hpp"
+#include "jbuff.hpp"
+#include "jstring.hpp"
+#include "junicode.hpp"
+#include "rtlcommon.hpp"
+
+CThorContiguousRowBuffer::CThorContiguousRowBuffer(ISerialStream * _in) : in(_in)
+{
+    buffer = NULL;
+    maxOffset = 0;
+    readOffset = 0;
+}
+
+void CThorContiguousRowBuffer::doRead(size32_t len, void * ptr)
+{
+    ensureAccessible(readOffset + len);
+    memcpy(ptr, buffer+readOffset, len);
+    readOffset += len;
+}
+
+
+size32_t CThorContiguousRowBuffer::read(size32_t len, void * ptr)
+{
+    doRead(len, ptr);
+    return len;
+}
+
+size32_t CThorContiguousRowBuffer::readSize()
+{
+    size32_t value;
+    doRead(sizeof(value), &value);
+    return value;
+}
+
+size32_t CThorContiguousRowBuffer::readPackedInt(void * ptr)
+{
+    size32_t size = sizePackedInt();
+    doRead(size, ptr);
+    return size;
+}
+
+size32_t CThorContiguousRowBuffer::readUtf8(ARowBuilder & target, size32_t offset, size32_t fixedSize, size32_t len)
+{
+    if (len == 0)
+        return 0;
+
+    size32_t size = sizeUtf8(len);
+    byte * self = target.ensureCapacity(fixedSize + size, NULL);
+    doRead(size, self+offset);
+    return size;
+}
+
+size32_t CThorContiguousRowBuffer::readVStr(ARowBuilder & target, size32_t offset, size32_t fixedSize)
+{
+    size32_t size = sizeVStr();
+    byte * self = target.ensureCapacity(fixedSize + size, NULL);
+    doRead(size, self+offset);
+    return size;
+}
+
+size32_t CThorContiguousRowBuffer::readVUni(ARowBuilder & target, size32_t offset, size32_t fixedSize)
+{
+    size32_t size = sizeVUni();
+    byte * self = target.ensureCapacity(fixedSize + size, NULL);
+    doRead(size, self+offset);
+    return size;
+}
+
+
+size32_t CThorContiguousRowBuffer::sizePackedInt()
+{
+    ensureAccessible(readOffset+1);
+    return rtlGetPackedSizeFromFirst(buffer[readOffset]);
+}
+
+size32_t CThorContiguousRowBuffer::sizeUtf8(size32_t len)
+{
+    if (len == 0)
+        return 0;
+
+    //The len is the number of utf characters, size depends on which characters are included.
+    size32_t nextOffset = readOffset;
+    while (len)
+    {
+        ensureAccessible(nextOffset+1);
+
+        for (;nextOffset < maxOffset;)
+        {
+            nextOffset += readUtf8Size(buffer+nextOffset);  // This function only accesses the first byte
+            if (--len == 0)
+                break;
+        }
+    }
+    return nextOffset - readOffset;
+}
+
+size32_t CThorContiguousRowBuffer::sizeVStr()
+{
+    size32_t nextOffset = readOffset;
+    for (;;)
+    {
+        ensureAccessible(nextOffset+1);
+
+        for (; nextOffset < maxOffset; nextOffset++)
+        {
+            if (buffer[nextOffset] == 0)
+                return (nextOffset + 1) - readOffset;
+        }
+    }
+}
+
+size32_t CThorContiguousRowBuffer::sizeVUni()
+{
+    size32_t nextOffset = readOffset;
+    const size32_t sizeOfUChar = 2;
+    for (;;)
+    {
+        ensureAccessible(nextOffset+sizeOfUChar);
+
+        for (; nextOffset+1 < maxOffset; nextOffset += sizeOfUChar)
+        {
+            if (buffer[nextOffset] == 0 && buffer[nextOffset+1] == 0)
+                return (nextOffset + sizeOfUChar) - readOffset;
+        }
+    }
+}
+
+
+void CThorContiguousRowBuffer::reportReadFail()
+{
+    throwUnexpected();
+}
+
+
+const byte * CThorContiguousRowBuffer::peek(size32_t maxSize)
+{
+    if (maxSize+readOffset > maxOffset)
+        doPeek(maxSize+readOffset);
+    return buffer + readOffset;
+}
+
+offset_t CThorContiguousRowBuffer::beginNested()
+{
+    size32_t len = readSize();
+    return len+readOffset;
+}
+
+bool CThorContiguousRowBuffer::finishedNested(offset_t & endPos)
+{
+    return readOffset >= endPos;
+}
+
+void CThorContiguousRowBuffer::skip(size32_t size)
+{
+    ensureAccessible(readOffset+size);
+    readOffset += size;
+}
+
+void CThorContiguousRowBuffer::skipPackedInt()
+{
+    size32_t size = sizePackedInt();
+    ensureAccessible(readOffset+size);
+    readOffset += size;
+}
+
+void CThorContiguousRowBuffer::skipUtf8(size32_t len)
+{
+    size32_t size = sizeUtf8(len);
+    ensureAccessible(readOffset+size);
+    readOffset += size;
+}
+
+void CThorContiguousRowBuffer::skipVStr()
+{
+    size32_t size = sizeVStr();
+    ensureAccessible(readOffset+size);
+    readOffset += size;
+}
+
+void CThorContiguousRowBuffer::skipVUni()
+{
+    size32_t size = sizeVUni();
+    ensureAccessible(readOffset+size);
+    readOffset += size;
+}

+ 97 - 0
rtl/eclrtl/rtlcommon.hpp

@@ -0,0 +1,97 @@
+#ifndef ECLCOMMON_HPP
+#define ECLCOMMON_HPP
+
+
+#include "jiface.hpp"
+#include "jfile.hpp"
+#include "eclrtl.hpp"
+#include "eclhelper.hpp"
+
+//The CThorContiguousRowBuffer is the source for a readAhead call to ensure the entire row
+//is in a contiguous block of memory.  The read() and skip() functions must be implemented
+class ECLRTL_API CThorContiguousRowBuffer : implements IRowDeserializerSource
+{
+public:
+    CThorContiguousRowBuffer(ISerialStream * _in);
+
+    inline void setStream(ISerialStream *_in) { in.set(_in); maxOffset = 0; readOffset = 0; }
+
+    virtual const byte * peek(size32_t maxSize);
+    virtual offset_t beginNested();
+    virtual bool finishedNested(offset_t & len);
+
+    virtual size32_t read(size32_t len, void * ptr);
+    virtual size32_t readSize();
+    virtual size32_t readPackedInt(void * ptr);
+    virtual size32_t readUtf8(ARowBuilder & target, size32_t offset, size32_t fixedSize, size32_t len);
+    virtual size32_t readVStr(ARowBuilder & target, size32_t offset, size32_t fixedSize);
+    virtual size32_t readVUni(ARowBuilder & target, size32_t offset, size32_t fixedSize);
+
+    //These shouldn't really be called since this class is meant to be used for a deserialize.
+    //If we allowed padding/alignment fields in the input then the first function would make sense.
+    virtual void skip(size32_t size);
+    virtual void skipPackedInt();
+    virtual void skipUtf8(size32_t len);
+    virtual void skipVStr();
+    virtual void skipVUni();
+
+    inline bool eos()
+    {
+        return in->eos();
+    }
+
+    inline offset_t tell() const
+    {
+        return in->tell();
+    }
+
+    inline void clearStream()
+    {
+        in.clear();
+        maxOffset = 0;
+        readOffset = 0;
+    }
+
+    inline const byte * queryRow() { return buffer; }
+    inline size32_t queryRowSize() { return readOffset; }
+    inline void finishedRow()
+    {
+        if (readOffset)
+            in->skip(readOffset);
+        maxOffset = 0;
+        readOffset = 0;
+    }
+
+
+protected:
+    size32_t sizePackedInt();
+    size32_t sizeUtf8(size32_t len);
+    size32_t sizeVStr();
+    size32_t sizeVUni();
+    void reportReadFail();
+
+private:
+    inline void doPeek(size32_t maxSize)
+    {
+        buffer = static_cast<const byte *>(in->peek(maxSize, maxOffset));
+    }
+
+    void doRead(size32_t len, void * ptr);
+
+    inline void ensureAccessible(size32_t required)
+    {
+        if (required > maxOffset)
+        {
+            doPeek(required);
+            assertex(required <= maxOffset);
+        }
+    }
+
+protected:
+    Linked<ISerialStream> in;
+    const byte * buffer;
+    size32_t maxOffset;
+    size32_t readOffset;
+};
+
+#endif

+ 0 - 1
rtl/eclrtl/rtldynfield.cpp

@@ -1352,4 +1352,3 @@ extern ECLRTL_API IRowStream * transformRecord(IEngineRowAllocator * resultAlloc
     else
         return stream.getClear();
 }
-

Filskillnaden har hållts tillbaka eftersom den är för stor
+ 2043 - 0
rtl/eclrtl/rtlformat.cpp


+ 572 - 0
rtl/eclrtl/rtlformat.hpp

@@ -0,0 +1,572 @@
+#ifndef ECLFORMAT_HPP
+#define ECLFORMAT_HPP
+
+
+#include "jiface.hpp"
+#include "jfile.hpp"
+#include "eclrtl.hpp"
+#include "eclhelper.hpp"
+
+
+class ECLRTL_API SimpleOutputWriter : implements IXmlWriter, public CInterface
+{
+    void outputFieldSeparator();
+    bool separatorNeeded;
+public:
+    SimpleOutputWriter();
+    IMPLEMENT_IINTERFACE;
+
+    SimpleOutputWriter & clear();
+    unsigned length() const                                 { return out.length(); }
+    const char * str() const                                { return out.str(); }
+
+    virtual void outputQuoted(const char *text);
+    virtual void outputQString(unsigned len, const char *field, const char *fieldname);
+    virtual void outputString(unsigned len, const char *field, const char *fieldname);
+    virtual void outputBool(bool field, const char *fieldname);
+    virtual void outputData(unsigned len, const void *field, const char *fieldname);
+    virtual void outputReal(double field, const char *fieldname);
+    virtual void outputDecimal(const void *field, unsigned size, unsigned precision, const char *fieldname);
+    virtual void outputUDecimal(const void *field, unsigned size, unsigned precision, const char *fieldname);
+    virtual void outputUnicode(unsigned len, const UChar *field, const char *fieldname);
+    virtual void outputUtf8(unsigned len, const char *field, const char *fieldname);
+    virtual void outputBeginNested(const char *fieldname, bool nestChildren);
+    virtual void outputEndNested(const char *fieldname);
+    virtual void outputBeginDataset(const char *dsname, bool nestChildren){}
+    virtual void outputEndDataset(const char *dsname){}
+    virtual void outputBeginArray(const char *fieldname){}
+    virtual void outputEndArray(const char *fieldname){}
+    virtual void outputSetAll();
+    virtual void outputInlineXml(const char *text){} //for appending raw xml content
+    virtual void outputXmlns(const char *name, const char *uri){}
+
+    virtual void outputInt(__int64 field, unsigned size, const char *fieldname);
+    virtual void outputUInt(unsigned __int64 field, unsigned size, const char *fieldname);
+
+    void newline();
+
+protected:
+    StringBuffer out;
+};
+
+
+interface IXmlStreamFlusher
+{
+    virtual void flushXML(StringBuffer &current, bool isClose) = 0;
+};
+
+interface IXmlWriterExt : extends IXmlWriter
+{
+    virtual IXmlWriterExt & clear() = 0;
+    virtual size32_t length() const = 0;
+    virtual const char *str() const = 0;
+    virtual IInterface *saveLocation() const = 0;
+    virtual void rewindTo(IInterface *location) = 0;
+    virtual void cutFrom(IInterface *location, StringBuffer& databuf) = 0;
+    virtual void outputNumericString(const char *field, const char *fieldname) = 0;
+    virtual void outputInline(const char* text) = 0;
+};
+
+class ECLRTL_API CommonXmlPosition : public CInterface, implements IInterface
+{
+public:
+    IMPLEMENT_IINTERFACE;
+
+    CommonXmlPosition(size32_t _pos, unsigned _indent, unsigned _nestLimit, bool _tagClosed, bool _needDelimiter) :
+        pos(_pos), indent(_indent), nestLimit(_nestLimit), tagClosed(_tagClosed), needDelimiter(_needDelimiter)
+    {}
+
+public:
+    size32_t pos = 0;
+    unsigned indent = 0;
+    unsigned nestLimit = 0;
+    bool tagClosed = false;
+    bool needDelimiter = false;
+};
+
+class ECLRTL_API CommonXmlWriter : implements IXmlWriterExt, public CInterface
+{
+public:
+    CommonXmlWriter(unsigned _flags, unsigned initialIndent=0,  IXmlStreamFlusher *_flusher=NULL);
+    ~CommonXmlWriter();
+    IMPLEMENT_IINTERFACE;
+
+    void outputBeginNested(const char *fieldname, bool nestChildren, bool doIndent);
+    void outputEndNested(const char *fieldname, bool doIndent);
+
+    virtual void outputInlineXml(const char *text){closeTag(); out.append(text); flush(false);} //for appending raw xml content
+    virtual void outputInline(const char* text) { outputInlineXml(text); }
+    virtual void outputQuoted(const char *text);
+    virtual void outputQString(unsigned len, const char *field, const char *fieldname);
+    virtual void outputString(unsigned len, const char *field, const char *fieldname);
+    virtual void outputBool(bool field, const char *fieldname);
+    virtual void outputData(unsigned len, const void *field, const char *fieldname);
+    virtual void outputInt(__int64 field, unsigned size, const char *fieldname);
+    virtual void outputUInt(unsigned __int64 field, unsigned size, const char *fieldname);
+    virtual void outputReal(double field, const char *fieldname);
+    virtual void outputDecimal(const void *field, unsigned size, unsigned precision, const char *fieldname);
+    virtual void outputUDecimal(const void *field, unsigned size, unsigned precision, const char *fieldname);
+    virtual void outputUnicode(unsigned len, const UChar *field, const char *fieldname);
+    virtual void outputUtf8(unsigned len, const char *field, const char *fieldname);
+    virtual void outputBeginDataset(const char *dsname, bool nestChildren);
+    virtual void outputEndDataset(const char *dsname);
+    virtual void outputBeginNested(const char *fieldname, bool nestChildren);
+    virtual void outputEndNested(const char *fieldname);
+    virtual void outputBeginArray(const char *fieldname){}; //repeated elements are inline for xml
+    virtual void outputEndArray(const char *fieldname){};
+    virtual void outputSetAll();
+    virtual void outputXmlns(const char *name, const char *uri);
+
+    //IXmlWriterExt
+    virtual IXmlWriterExt & clear();
+    virtual unsigned length() const                                 { return out.length(); }
+    virtual const char * str() const                                { return out.str(); }
+    virtual IInterface *saveLocation() const
+    {
+        if (flusher)
+            throwUnexpected();
+
+        return new CommonXmlPosition(length(), indent, nestLimit, tagClosed, false);
+    }
+    virtual void rewindTo(IInterface *saved)
+    {
+        if (flusher)
+            throwUnexpected();
+
+        CommonXmlPosition *position = dynamic_cast<CommonXmlPosition *>(saved);
+        if (!position)
+            return;
+        if (position->pos < out.length())
+        {
+            out.setLength(position->pos);
+            tagClosed = position->tagClosed;
+            indent = position->indent;
+            nestLimit = position->nestLimit;
+        }
+    }
+    virtual void cutFrom(IInterface *location, StringBuffer& databuf);
+
+    virtual void outputNumericString(const char *field, const char *fieldname)
+    {
+        outputCString(field, fieldname);
+    }
+
+protected:
+    bool checkForAttribute(const char * fieldname);
+    void closeTag();
+    inline void flush(bool isClose)
+    {
+        if (flusher)
+            flusher->flushXML(out, isClose);
+    }
+
+protected:
+    IXmlStreamFlusher *flusher;
+    StringBuffer out;
+    unsigned flags;
+    unsigned indent;
+    unsigned nestLimit;
+    bool tagClosed;
+};
+
+class ECLRTL_API CommonJsonWriter : implements IXmlWriterExt, public CInterface
+{
+public:
+    CommonJsonWriter(unsigned _flags, unsigned initialIndent=0,  IXmlStreamFlusher *_flusher=NULL);
+    ~CommonJsonWriter();
+    IMPLEMENT_IINTERFACE;
+
+    void checkDelimit(int inc=0);
+    void checkFormat(bool doDelimit, bool needDelimiter=true, int inc=0);
+    void prepareBeginArray(const char *fieldname);
+
+    virtual void outputInlineXml(const char *text) //for appending raw xml content
+    {
+        if (text && *text)
+            outputUtf8(strlen(text), text, "xml");
+    }
+    virtual void outputInline(const char* text) { out.append(text); }
+    virtual void outputQuoted(const char *text);
+    virtual void outputQString(unsigned len, const char *field, const char *fieldname);
+    virtual void outputString(unsigned len, const char *field, const char *fieldname);
+    virtual void outputBool(bool field, const char *fieldname);
+    virtual void outputData(unsigned len, const void *field, const char *fieldname);
+    virtual void outputInt(__int64 field, unsigned size, const char *fieldname);
+    virtual void outputUInt(unsigned __int64 field, unsigned size, const char *fieldname);
+    virtual void outputReal(double field, const char *fieldname);
+    virtual void outputDecimal(const void *field, unsigned size, unsigned precision, const char *fieldname);
+    virtual void outputUDecimal(const void *field, unsigned size, unsigned precision, const char *fieldname);
+    virtual void outputUnicode(unsigned len, const UChar *field, const char *fieldname);
+    virtual void outputUtf8(unsigned len, const char *field, const char *fieldname);
+    virtual void outputBeginDataset(const char *dsname, bool nestChildren);
+    virtual void outputEndDataset(const char *dsname);
+    virtual void outputBeginNested(const char *fieldname, bool nestChildren);
+    virtual void outputEndNested(const char *fieldname);
+    virtual void outputBeginArray(const char *fieldname);
+    virtual void outputEndArray(const char *fieldname);
+    virtual void outputSetAll();
+    virtual void outputXmlns(const char *name, const char *uri){}
+    virtual void outputNumericString(const char *field, const char *fieldname);
+
+    //IXmlWriterExt
+    virtual IXmlWriterExt & clear();
+    virtual unsigned length() const                                 { return out.length(); }
+    virtual const char * str() const                                { return out.str(); }
+    virtual void rewindTo(unsigned int prevlen)                     { if (prevlen < out.length()) out.setLength(prevlen); }
+    virtual IInterface *saveLocation() const
+    {
+        if (flusher)
+            throwUnexpected();
+
+        return new CommonXmlPosition(length(), indent, nestLimit, false, needDelimiter);
+    }
+    virtual void rewindTo(IInterface *saved)
+    {
+        if (flusher)
+            throwUnexpected();
+
+        CommonXmlPosition *position = dynamic_cast<CommonXmlPosition *>(saved);
+        if (!position)
+            return;
+        if (position->pos < out.length())
+        {
+            out.setLength(position->pos);
+            needDelimiter = position->needDelimiter;
+            indent = position->indent;
+            nestLimit = position->nestLimit;
+        }
+    }
+    virtual void cutFrom(IInterface *location, StringBuffer& databuf);
+
+    void outputBeginRoot(){out.append('{');}
+    void outputEndRoot(){out.append('}');}
+
+protected:
+    inline void flush(bool isClose)
+    {
+        if (flusher)
+            flusher->flushXML(out, isClose);
+    }
+
+    class CJsonWriterItem : public CInterface
+    {
+    public:
+        CJsonWriterItem(const char *_name) : name(_name), depth(0){}
+
+        StringAttr name;
+        unsigned depth;
+    };
+
+    const char *checkItemName(CJsonWriterItem *item, const char *name, bool simpleType=true);
+    const char *checkItemName(const char *name, bool simpleType=true);
+    const char *checkItemNameBeginNested(const char *name);
+    const char *checkItemNameEndNested(const char *name);
+    bool checkUnamedArrayItem(bool begin);
+
+
+    IXmlStreamFlusher *flusher;
+    CIArrayOf<CJsonWriterItem> arrays;
+    StringBuffer out;
+    unsigned flags;
+    unsigned indent;
+    unsigned nestLimit;
+    bool needDelimiter;
+};
+
+class ECLRTL_API CPropertyTreeWriter : public CSimpleInterfaceOf<IXmlWriter>
+{
+public:
+    CPropertyTreeWriter(IPropertyTree *_root=nullptr, unsigned _flags=0);
+
+    void setRoot(IPropertyTree &_root);
+
+    virtual void outputInlineXml(const char *text) override;
+    virtual void outputQuoted(const char *text) override;
+    virtual void outputQString(unsigned len, const char *field, const char *fieldname) override;
+    virtual void outputString(unsigned len, const char *field, const char *fieldname) override;
+    virtual void outputBool(bool field, const char *fieldname) override;
+    virtual void outputData(unsigned len, const void *field, const char *fieldname) override;
+    virtual void outputInt(__int64 field, unsigned size, const char *fieldname) override;
+    virtual void outputUInt(unsigned __int64 field, unsigned size, const char *fieldname) override;
+    virtual void outputReal(double field, const char *fieldname) override;
+    virtual void outputDecimal(const void *field, unsigned size, unsigned precision, const char *fieldname) override;
+    virtual void outputUDecimal(const void *field, unsigned size, unsigned precision, const char *fieldname) override;
+    virtual void outputUnicode(unsigned len, const UChar *field, const char *fieldname) override;
+    virtual void outputUtf8(unsigned len, const char *field, const char *fieldname) override;
+    virtual void outputBeginDataset(const char *dsname, bool nestChildren) override;
+    virtual void outputEndDataset(const char *dsname) override;
+    virtual void outputBeginNested(const char *fieldname, bool nestChildren) override;
+    virtual void outputEndNested(const char *fieldname) override;
+    virtual void outputBeginArray(const char *fieldname)  override {}; //repeated elements are inline for xml
+    virtual void outputEndArray(const char *fieldname) override {};
+    virtual void outputSetAll();
+    virtual void outputXmlns(const char *name, const char *uri);
+
+protected:
+    bool checkForAttribute(const char * fieldname);
+
+protected:
+    Linked<IPropertyTree> root;
+    IPropertyTree *target = nullptr;
+    ICopyArrayOf<IPropertyTree> nestedLevels;
+    unsigned flags;
+};
+
+
+ECLRTL_API StringBuffer &buildJsonHeader(StringBuffer  &header, const char *suppliedHeader, const char *rowTag);
+ECLRTL_API StringBuffer &buildJsonFooter(StringBuffer  &footer, const char *suppliedFooter, const char *rowTag);
+
+//Writes type encoded XML strings  (xsi:type="xsd:string", xsi:type="xsd:boolean" etc)
+class ECLRTL_API CommonEncodedXmlWriter : public CommonXmlWriter
+{
+public:
+    CommonEncodedXmlWriter(unsigned _flags, unsigned initialIndent=0, IXmlStreamFlusher *_flusher=NULL);
+
+    virtual void outputString(unsigned len, const char *field, const char *fieldname);
+    virtual void outputBool(bool field, const char *fieldname);
+    virtual void outputData(unsigned len, const void *field, const char *fieldname);
+    virtual void outputInt(__int64 field, unsigned size, const char *fieldname);
+    virtual void outputUInt(unsigned __int64 field, unsigned size, const char *fieldname);
+    virtual void outputReal(double field, const char *fieldname);
+    virtual void outputDecimal(const void *field, unsigned size, unsigned precision, const char *fieldname);
+    virtual void outputUDecimal(const void *field, unsigned size, unsigned precision, const char *fieldname);
+    virtual void outputUnicode(unsigned len, const UChar *field, const char *fieldname);
+    virtual void outputUtf8(unsigned len, const char *field, const char *fieldname);
+};
+
+//Writes all encoded DATA fields as base64Binary
+class ECLRTL_API CommonEncoded64XmlWriter : public CommonEncodedXmlWriter
+{
+public:
+    CommonEncoded64XmlWriter(unsigned _flags, unsigned initialIndent=0, IXmlStreamFlusher *_flusher=NULL);
+
+    virtual void outputData(unsigned len, const void *field, const char *fieldname);
+};
+
+enum XMLWriterType{WTStandard, WTEncoding, WTEncodingData64, WTJSON} ;
+ECLRTL_API CommonXmlWriter * CreateCommonXmlWriter(unsigned _flags, unsigned initialIndent=0, IXmlStreamFlusher *_flusher=NULL, XMLWriterType xmlType=WTStandard);
+ECLRTL_API IXmlWriterExt * createIXmlWriterExt(unsigned _flags, unsigned initialIndent=0, IXmlStreamFlusher *_flusher=NULL, XMLWriterType xmlType=WTStandard);
+
+struct CSVOptions
+{
+    StringAttr delimiter, terminator;
+    bool includeHeader;
+};
+
+class CCSVItem : public CInterface, implements IInterface
+{
+    unsigned columnID, nextRowID, rowCount, nestedLayer;
+    StringAttr name, type, value, parentXPath;
+    StringArray childNames;
+    MapStringTo<bool> childNameMap;
+    bool isNestedItem, simpleNested, currentRowEmpty, outputHeader;
+public:
+    CCSVItem() : columnID(0), nestedLayer(0), nextRowID(0), rowCount(0), isNestedItem(false),
+        simpleNested(false), currentRowEmpty(true) { };
+
+    IMPLEMENT_IINTERFACE;
+    inline const char* getName() const { return name.get(); };
+    inline void setName(const char* _name) { name.set(_name); };
+    inline const char* getValue() const { return value.get(); };
+    inline void setValue(const char* _value) { value.set(_value); };
+    inline unsigned getColumnID() const { return columnID; };
+    inline void setColumnID(unsigned _columnID) { columnID = _columnID; };
+
+    inline unsigned getNextRowID() const { return nextRowID; };
+    inline void setNextRowID(unsigned _rowID) { nextRowID = _rowID; };
+    inline void incrementNextRowID() { nextRowID++; };
+    inline unsigned getRowCount() const { return rowCount; };
+    inline void setRowCount(unsigned _rowCount) { rowCount = _rowCount; };
+    inline void incrementRowCount() { rowCount++; };
+    inline bool getCurrentRowEmpty() const { return currentRowEmpty; };
+    inline void setCurrentRowEmpty(bool _currentRowEmpty) { currentRowEmpty = _currentRowEmpty; };
+
+    inline unsigned getNestedLayer() const { return nestedLayer; };
+    inline void setNestedLayer(unsigned _nestedLayer) { nestedLayer = _nestedLayer; };
+    inline bool checkIsNestedItem() const { return isNestedItem; };
+    inline void setIsNestedItem(bool _isNestedItem) { isNestedItem = _isNestedItem; };
+    inline bool checkSimpleNested() const { return simpleNested; };
+    inline void setSimpleNested(bool _simpleNested) { simpleNested = _simpleNested; };
+    inline bool checkOutputHeader() const { return outputHeader; };
+    inline void setOutputHeader(bool _outputHeader) { outputHeader = _outputHeader; };
+    inline const char* getParentXPath() const { return parentXPath.str(); };
+    inline void setParentXPath(const char* _parentXPath) { parentXPath.set(_parentXPath); };
+    inline StringArray& getChildrenNames() { return childNames; };
+    inline void addChildName(const char* name)
+    {
+        if (hasChildName(name))
+            return;
+        childNameMap.setValue(name, true);
+        childNames.append(name);
+    };
+    inline bool hasChildName(const char* name)
+    {
+        bool* found = childNameMap.getValue(name);
+        return (found && *found);
+    };
+    inline void clearContentVariables()
+    {
+        nextRowID = rowCount = 0;
+        currentRowEmpty = true;
+    };
+};
+
+class CCSVRow : public CInterface, implements IInterface
+{
+    unsigned rowID;
+    CIArrayOf<CCSVItem> columns;
+public:
+    CCSVRow(unsigned _rowID) : rowID(_rowID) {};
+    IMPLEMENT_IINTERFACE;
+
+    inline unsigned getRowID() const { return rowID; };
+    inline void setRowID(unsigned _rowID) { rowID = _rowID; };
+    inline unsigned getColumnCount() const { return columns.length(); };
+
+    const char* getColumnValue(unsigned columnID) const;
+    void setColumn(unsigned columnID, const char* columnName, const char* columnValue);
+};
+
+//CommonCSVWriter is used to output a WU result in CSV format.
+//Read CSV header information;
+//If needed, output CSV headers into the 'out' buffer;
+//Read each row (a record) of the WU result and output into the 'out' buffer;
+//The 'out' buffer can be accessed through the str() method.
+class ECLRTL_API CommonCSVWriter: public CInterface, implements IXmlWriterExt
+{
+    class CXPathItem : public CInterface, implements IInterface
+    {
+        bool isArray;
+        StringAttr path;
+    public:
+        CXPathItem(const char* _path, bool _isArray) : path(_path), isArray(_isArray) { };
+
+        IMPLEMENT_IINTERFACE;
+        inline const char* getPath() const { return path.get(); };
+        inline bool getIsArray() const { return isArray; };
+    };
+    CSVOptions options;
+    bool readingCSVHeader, addingSimpleNestedContent;
+    unsigned recordCount, headerColumnID, nestedHeaderLayerID;
+    StringBuffer currentParentXPath, auditOut;
+    StringArray headerXPathList;
+    MapStringTo<bool> topHeaderNameMap;
+    MapStringToMyClass<CCSVItem> csvItems;
+    CIArrayOf<CCSVRow> contentRowsBuffer;
+    CIArrayOf<CXPathItem> dataXPath;//xpath in caller
+
+    void escapeQuoted(unsigned len, char const* in, StringBuffer& out);
+    bool checkHeaderName(const char* name);
+    CCSVItem* getParentCSVItem();
+    CCSVItem* getCSVItemByFieldName(const char* name);
+    void addColumnToRow(CIArrayOf<CCSVRow>& rows, unsigned rowID, unsigned colID, const char* columnValue, const char* columnName);
+    void addCSVHeader(const char* name, const char* type, bool isNested, bool simpleNested, bool outputHeader);
+    void addContentField(const char* field, const char* fieldName);
+    void addStringField(unsigned len, const char* field, const char* fieldName);
+    void setChildrenNextRowID(const char* path, unsigned rowID);
+    unsigned getChildrenMaxNextRowID(const char* path);
+    unsigned getChildrenMaxColumnID(CCSVItem* item, unsigned& maxColumnID);
+    void addChildNameToParentCSVItem(const char* name);
+    void setParentItemRowEmpty(CCSVItem* item, bool empty);
+    void addFieldToParentXPath(const char* fieldName);
+    void removeFieldFromCurrentParentXPath(const char* fieldName);
+    void appendDataXPathItem(const char* fieldName, bool isArray);
+    bool isDataRow(const char* fieldName);
+    void outputCSVRows(CIArrayOf<CCSVRow>& rows, bool isHeader);
+    void outputHeadersToBuffer();
+    void finishContentResultRow();
+
+    void auditHeaderInfo()
+    {
+        ForEachItemIn(i, headerXPathList)
+        {
+            const char* path = headerXPathList.item(i);
+            CCSVItem* item = csvItems.getValue(path);
+            if (!item)
+                continue;
+            if (!item->checkIsNestedItem())
+            {
+                auditOut.appendf("dumpHeaderInfo path<%s> next row<%d> col<%d>: name<%s> - value<%s>\n", path, item->getNextRowID(),
+                    item->getColumnID(), item->getName() ? item->getName() : "", item->getValue() ? item->getValue() : "");
+            }
+            else
+            {
+                auditOut.appendf("dumpHeaderInfo path<%s> next row<%d> col<%d>: name<%s> - value<%s>\n", path, item->getNextRowID(),
+                    item->getColumnID(), item->getName() ? item->getName() : "", item->getValue() ? item->getValue() : "");
+            }
+        }
+    }
+
+public:
+    CommonCSVWriter(unsigned _flags, CSVOptions& _options, IXmlStreamFlusher* _flusher = NULL);
+    ~CommonCSVWriter();
+
+    IMPLEMENT_IINTERFACE;
+
+    inline void flush(bool isClose)
+    {
+        if (flusher)
+            flusher->flushXML(out, isClose);
+    }
+
+    virtual unsigned length() const { return out.length(); }
+    virtual const char* str() const { return out.str(); }
+    virtual void rewindTo(IInterface* location) { };
+    virtual void cutFrom(IInterface *location, StringBuffer& databuf) { };
+    virtual IInterface* saveLocation() const
+    {
+        if (flusher)
+            throwUnexpected();
+        return NULL;
+    };
+
+    //IXmlWriter
+    virtual void outputString(unsigned len, const char* field, const char* fieldName);
+    virtual void outputBool(bool field, const char* fieldName);
+    virtual void outputData(unsigned len, const void* field, const char* fieldName);
+    virtual void outputInt(__int64 field, unsigned size, const char* fieldName);
+    virtual void outputUInt(unsigned __int64 field, unsigned size, const char* fieldName);
+    virtual void outputReal(double field, const char *fieldName);
+    virtual void outputDecimal(const void* field, unsigned size, unsigned precision, const char* fieldName);
+    virtual void outputUDecimal(const void* field, unsigned size, unsigned precision, const char* fieldName);
+    virtual void outputUnicode(unsigned len, const UChar* field, const char* fieldName);
+    virtual void outputQString(unsigned len, const char* field, const char* fieldName);
+    virtual void outputUtf8(unsigned len, const char* field, const char* fieldName);
+    virtual void outputBeginNested(const char* fieldName, bool simpleNested);
+    virtual void outputEndNested(const char* fieldName);
+    virtual void outputBeginDataset(const char* dsname, bool nestChildren);
+    virtual void outputEndDataset(const char* dsname);
+    virtual void outputBeginArray(const char* fieldName);
+    virtual void outputEndArray(const char* fieldName);
+    virtual void outputSetAll() { };
+    virtual void outputXmlns(const char* name, const char* uri) { };
+    virtual void outputQuoted(const char* text)
+    {
+        //No fieldName. Is it valid for CSV?
+    };
+    virtual void outputInlineXml(const char* text)//for appending raw xml content
+    {
+        //Dynamically add a new header 'xml' and insert the header.
+        //But, not sure we want to do that for a big WU result.
+        //if (text && *text)
+          //outputUtf8(strlen(text), text, "xml");
+    };
+    virtual void outputInline(const char* text) { out.append(text); }
+
+    //IXmlWriterExt
+    virtual void outputNumericString(const char* field, const char* fieldName);
+    virtual IXmlWriterExt& clear();
+
+    void outputBeginNested(const char* fieldName, bool simpleNested, bool outputHeader);
+    void outputEndNested(const char* fieldName, bool outputHeader);
+    void outputCSVHeader(const char* name, const char* type);
+    void finishCSVHeaders();
+    const char* auditStr() const { return auditOut.str(); }
+
+protected:
+    IXmlStreamFlusher* flusher;
+    StringBuffer out;
+    unsigned flags;
+};
+
+#endif

+ 1 - 0
thorlcr/activities/xmlwrite/thxmlwrite.cpp

@@ -17,6 +17,7 @@
 
 #include "jiface.hpp"
 #include "jtime.hpp"
+#include "rtlformat.hpp"
 #include "dadfs.hpp"
 #include "thexception.hpp"
 #include "thmfilemanager.hpp"

+ 1 - 0
thorlcr/graph/thgraph.cpp

@@ -25,6 +25,7 @@
 #include "thormisc.hpp"
 #include "thbufdef.hpp"
 #include "thmem.hpp"
+#include "rtlformat.hpp"
 
 
 PointerArray createFuncs;

+ 1 - 0
thorlcr/slave/traceslave.hpp

@@ -31,6 +31,7 @@
 #include <stddef.h>
 #include <thmem.hpp>
 #include <thorxmlwrite.hpp>
+#include <rtlformat.hpp>
 
 interface IThorDebug : extends IInterface
 {

+ 1 - 0
thorlcr/thorutil/thormisc.cpp

@@ -48,6 +48,7 @@
 #include "rtlfield.hpp"
 #include "rtlrecord.hpp"
 #include "rtlds_imp.hpp"
+#include "rtlformat.hpp"
 #include "rmtfile.hpp"
 
 

+ 152 - 14
tools/testsocket/testsocket.cpp

@@ -44,6 +44,10 @@ bool sendFileAfterQuery = false;
 bool doLock = false;
 bool roxieLogMode = false;
 bool rawOnly = false;
+bool rawSend = false;
+bool remoteStreamForceResend = false;
+bool remoteStreamSendCursor = false;
+
 
 StringBuffer sendFileName;
 StringAttr queryNameOverride;
@@ -182,11 +186,12 @@ void sendFileChunk(const char * filename, offset_t offset, ISocket * socket)
     free(buff);
 }
 
-int readResults(ISocket * socket, bool readBlocked, bool useHTTP, StringBuffer &result)
+int readResults(ISocket * socket, bool readBlocked, bool useHTTP, StringBuffer &result, const char *query, size32_t queryLen)
 {
     if (readBlocked)
         socket->set_block_mode(BF_SYNC_TRANSFER_PULL,0,60*1000);
 
+    MemoryBuffer remoteReadCursorMb;
     unsigned len;
     bool is_status;
     bool isBlockedResult;
@@ -231,6 +236,7 @@ int readResults(ISocket * socket, bool readBlocked, bool useHTTP, StringBuffer &
         bool isSpecial = false;
         bool pluginRequest = false;
         bool dataBlockRequest = false;
+        bool remoteReadRequest = false;
         if (len & 0x80000000)
         {
             unsigned char flag;
@@ -266,14 +272,19 @@ int readResults(ISocket * socket, bool readBlocked, bool useHTTP, StringBuffer &
             case 'R':
                 isBlockedResult = true;
                 break;
+            case 'J':
+                remoteReadRequest = true;
+                break;
             }
             len &= 0x7FFFFFFF;
             len--;      // flag already read
         }
 
-        char * mem = (char*) malloc(len+1);
+        MemoryBuffer mb;
+        mb.setEndian(BIG_ENDIAN);
+        char *mem = (char *)mb.reserveTruncate(len+1);
         char * t = mem;
-        unsigned sendlen = len;
+        size32_t sendlen = len;
         t[len]=0;
         try
         {
@@ -301,6 +312,7 @@ int readResults(ISocket * socket, bool readBlocked, bool useHTTP, StringBuffer &
         catch(IException * e)
         {
             pexception("failed to read data", e);
+            e->Release();
             return 1;
         }
         if (pluginRequest)
@@ -318,9 +330,112 @@ int readResults(ISocket * socket, bool readBlocked, bool useHTTP, StringBuffer &
         {
             //Not very robust!  A poor man's implementation for testing...
             offset_t offset;
-            memcpy(&offset, t, sizeof(offset));
-            _WINREV(offset);
-            sendFileChunk(t+sizeof(offset), offset, socket);
+            mb.read(offset);
+            sendFileChunk((const char *)mb.readDirect(offset), offset, socket);
+        }
+        else if (remoteReadRequest)
+        {
+            Owned<IPropertyTree> requestTree = createPTreeFromJSONString(queryLen, query);
+            Owned<IPropertyTree> responseTree; // used if response is xml or json
+            const char *outputFmtStr = requestTree->queryProp("format");
+            const char *response = nullptr;
+            if (!outputFmtStr || strieq("xml", outputFmtStr))
+            {
+                response = (const char *)mb.readDirect(len);
+                responseTree.setown(createPTreeFromXMLString(len, response));
+            }
+            else if (strieq("json", outputFmtStr))
+            {
+                response = (const char *)mb.readDirect(len);
+                responseTree.setown(createPTreeFromJSONString(len, response));
+            }
+            unsigned cursorHandle;
+            if (responseTree)
+                cursorHandle = responseTree->getPropInt("cursor");
+            else
+                mb.read(cursorHandle);
+            bool retrySend = false;
+            if (cursorHandle)
+            {
+                PROGLOG("Got handle back: %u; len=%u", cursorHandle, len);
+                StringBuffer xml;
+                if (responseTree)
+                {
+                    if (echoResults)
+                    {
+                        fputs(response, stdout);
+                        fflush(stdout);
+                    }
+                    if (!responseTree->getPropBin("cursorBin", remoteReadCursorMb.clear()))
+                        break;
+                }
+                else
+                {
+                    size32_t dataLen;
+                    mb.read(dataLen);
+                    if (!dataLen)
+                        break;
+                    const void *rowData = mb.readDirect(dataLen);
+                    // JCSMORE - output binary row data?
+
+                    // cursor
+                    size32_t cursorLen;
+                    mb.read(cursorLen);
+                    if (!cursorLen)
+                        break;
+                    const void *cursor = mb.readDirect(cursorLen);
+                    memcpy(remoteReadCursorMb.clear().reserveTruncate(cursorLen), cursor, cursorLen);
+                }
+
+                if (remoteStreamForceResend)
+                    cursorHandle = NotFound; // fake that it's a handle dafilesrv doesn't know about
+
+                Owned<IPropertyTree> requestTree = createPTree();
+                requestTree->setPropInt("cursor", cursorHandle);
+
+                // Only the handle is needed for continuation, but this tests the behaviour of some clients which may send cursor per request (e.g. to refresh)
+                if (remoteStreamSendCursor)
+                    requestTree->setPropBin("cursorBin", remoteReadCursorMb.length(), remoteReadCursorMb.toByteArray());
+
+                requestTree->setProp("format", outputFmtStr);
+                StringBuffer requestStr;
+                toJSON(requestTree, requestStr);
+#ifdef _DEBUG
+                fputs(requestStr, stdout);
+#endif
+
+                sendlen = requestStr.length();
+                _WINREV(sendlen);
+
+                try
+                {
+                    if (!rawSend && !useHTTP)
+                        socket->write(&sendlen, sizeof(sendlen));
+                    socket->write(requestStr.str(), requestStr.length());
+                }
+                catch (IJSOCK_Exception *e)
+                {
+                    retrySend = true;
+                    EXCLOG(e, nullptr);
+                    e->Release();
+                }
+            }
+            else // dafilesrv didn't know who I was, resent query + serialized cursor
+                retrySend = true;
+            if (retrySend)
+            {
+                PROGLOG("Retry send for handle: %u", cursorHandle);
+                requestTree->setPropBin("cursorBin", remoteReadCursorMb.length(), remoteReadCursorMb.toByteArray());
+                StringBuffer requestStr;
+                toJSON(requestTree, requestStr);
+
+                PROGLOG("requestStr = %s", requestStr.str());
+                sendlen = requestStr.length();
+                _WINREV(sendlen);
+                if (!rawSend && !useHTTP)
+                    socket->write(&sendlen, sizeof(sendlen));
+                socket->write(requestStr.str(), requestStr.length());
+            }
         }
         else
         {
@@ -339,7 +454,6 @@ int readResults(ISocket * socket, bool readBlocked, bool useHTTP, StringBuffer &
                 result.append(sendlen, t);
         }
 
-        free(mem);
         if (abortAfterFirst)
             return 0;
     }
@@ -358,7 +472,7 @@ int ReceiveThread::run()
     ISocket * socket = ISocket::create(3456);
     ISocket * client = socket->accept();
     StringBuffer result;
-    readResults(client, parallelBlocked, false, result);
+    readResults(client, parallelBlocked, false, result, nullptr, 0);
     client->Release();
     socket->Release();
     finishedReading.signal();
@@ -431,7 +545,7 @@ int doSendQuery(const char * ip, unsigned port, const char * base)
         else
         {
             SocketEndpoint ep(ip,port);
-            socket.setown(ISocket::connect_timeout(ep, 1000));
+            socket.setown(ISocket::connect_timeout(ep, 100000));
             if (useSSL)
             {
 #ifdef _USE_OPENSSL
@@ -492,7 +606,7 @@ int doSendQuery(const char * ip, unsigned port, const char * base)
             socket->write(&locklen, sizeof(locklen));
             socket->write(lock, strlen(lock));
             StringBuffer lockResult;
-            readResults(socket, false, false, lockResult);
+            readResults(socket, false, false, lockResult, nullptr, 0);
         }
         if (queryNameOverride.length())
         {
@@ -515,17 +629,22 @@ int doSendQuery(const char * ip, unsigned port, const char * base)
     }
 
     const char * query = fullQuery.str();
-    int len=strlen(query);
-    int sendlen = len;
+    size32_t queryLen=(size32_t)strlen(query);
+    size32_t len = queryLen;
+    size32_t sendlen = len;
     if (persistConnections)
         sendlen |= 0x80000000;
     _WINREV(sendlen);                    
 
     try
     {
-        if (!useHTTP)
+        if (!rawSend && !useHTTP)
             socket->write(&sendlen, sizeof(sendlen));
+
+        fprintf(stdout, "about to write %u <%s>\n", len, query);
+
         socket->write(query, len);
+
         if (sendFileAfterQuery)
         {
             FILE *in = fopen(sendFileName.str(), "rb");
@@ -560,7 +679,7 @@ int doSendQuery(const char * ip, unsigned port, const char * base)
     // back-end does some processing.....
 
     StringBuffer result;
-    int ret = readResults(socket, false, useHTTP, result);
+    int ret = readResults(socket, false, useHTTP, result, query, queryLen);
 
     if ((ret == 0) && !justResults)
     {
@@ -649,6 +768,8 @@ void usage(int exitCode)
     printf("  -qname xx Use xx as queryname in place of the xml root element name\n");
     printf("  -r <n>    repeat the query several times\n");
     printf("  -rl       roxie logfile mode\n");
+    printf("  -rsr      force remote stream resend per continuation request\n");
+    printf("  -rssc     send cursor per continuation request\n");
     printf("  -s        add stars to indicate transfer packets\n");
     printf("  -ss       suppress XML Status messages to screen (always suppressed from tracefile)\n");
     printf("  -ssl      use ssl\n");
@@ -658,6 +779,7 @@ void usage(int exitCode)
     printf("  -u<max>   run queries on separate threads\n");
     printf("  -cascade  cascade query (to all roxie nodes)\n");
     printf("  -lock     locked cascade query (to all roxie nodes)\n");
+    printf("  -x        raw send\n");
     
     exit(exitCode);
 }
@@ -665,6 +787,7 @@ void usage(int exitCode)
 int main(int argc, char **argv) 
 {
     InitModuleObjects();
+
     StringAttr outputName("result.txt");
 
     bool fromFile = false;
@@ -729,6 +852,11 @@ int main(int argc, char **argv)
             fromFile = true;
             ++arg;
         }
+        else if (stricmp(argv[arg], "-x") == 0)
+        {
+            rawSend = true;
+            ++arg;
+        }
         else if (stricmp(argv[arg], "-ff") == 0)
         {
             fromMultiFile = true;
@@ -838,6 +966,16 @@ int main(int argc, char **argv)
             }
             arg+=2;
         }
+        else if (strieq(argv[arg], "-rsr"))
+        {
+            remoteStreamForceResend = true;
+            ++arg;
+        }
+        else if (strieq(argv[arg], "-rssc"))
+        {
+            remoteStreamSendCursor = true;
+            ++arg;
+        }
         else
         {
             printf("Unknown argument %s, ignored\n", argv[arg]);