浏览代码

HPCC-14764 Roxie protocol encapsulation

Encapsulate roxie protocol handling behind generic interfaces.  Remove
all references to internal ccd code from roxie protocol component.

This is a major step toward allowing pluggable protocols, although more
work and testing will be needed to load additional protocol
implementations.

Signed-off-by: Anthony Fishbeck <anthony.fishbeck@lexisnexis.com>
Anthony Fishbeck 9 年之前
父节点
当前提交
bad2522746

+ 1 - 3
common/thorhelper/roxiedebug.cpp

@@ -46,10 +46,9 @@ bool CDebugCommandHandler::checkCommand(IXmlWriter &out, const char *&supplied,
     }
 }
 
-void CDebugCommandHandler::doDebugCommand(IPropertyTree *query, IDebuggerContext *debugContext, FlushingStringBuffer &output)
+void CDebugCommandHandler::doDebugCommand(IPropertyTree *query, IDebuggerContext *debugContext, IXmlWriter &out)
 {
     const char *commandName = query->queryName();
-    CommonXmlWriter out(0, 1);
     if (strnicmp(commandName, "b", 1)==0 && checkCommand(out, commandName, "breakpoint"))
     {
         const char *mode = query->queryProp("@mode");
@@ -207,7 +206,6 @@ void CDebugCommandHandler::doDebugCommand(IPropertyTree *query, IDebuggerContext
     else
         throw MakeStringException(THORHELPER_DEBUG_ERROR, "Unknown command %s", commandName);
     out.outputEndNested(commandName);
-    output.append(out.str());
 }
 
 //=======================================================================================

+ 7 - 1
common/thorhelper/roxiedebug.hpp

@@ -39,7 +39,13 @@ class THORHELPER_API CDebugCommandHandler : public CInterface, implements IInter
 public:
     IMPLEMENT_IINTERFACE;
     static bool checkCommand(IXmlWriter &out, const char *&supplied, const char *expected);
-    void doDebugCommand(IPropertyTree *query, IDebuggerContext *debugContext, FlushingStringBuffer &output);
+    void doDebugCommand(IPropertyTree *query, IDebuggerContext *debugContext, IXmlWriter &out);
+    void doDebugCommand(IPropertyTree *query, IDebuggerContext *debugContext, FlushingStringBuffer &output)
+    {
+        CommonXmlWriter out(0, 1);
+        doDebugCommand(query, debugContext, out);
+        output.append(out.str());
+    }
 };
 
 //=======================================================================================

+ 2 - 1
common/thorhelper/roxiehelper.hpp

@@ -231,7 +231,6 @@ protected:
     UnsignedArray lengths;
 
     bool needsFlush(bool closing);
-    void startBlock();
 public:
     TextMarkupFormat mlFmt;      // controls whether xml/json elements are output
     bool isRaw;      // controls whether output as binary or ascii
@@ -260,11 +259,13 @@ public:
     virtual void flush(bool closing) ;
     virtual void addPayload(StringBuffer &s, unsigned int reserve=0);
     virtual void *getPayload(size32_t &length);
+    virtual void startBlock();
     virtual void startDataset(const char *elementName, const char *resultName, unsigned sequence, bool _extend = false, const IProperties *xmlns=NULL);
     virtual void startScalar(const char *resultName, unsigned sequence);
     virtual void setScalarInt(const char *resultName, unsigned sequence, __int64 value, unsigned size);
     virtual void setScalarUInt(const char *resultName, unsigned sequence, unsigned __int64 value, unsigned size);
     virtual void incrementRowCount();
+    void setTail(const char *value){tail.set(value);}
 };
 
 class THORHELPER_API FlushingJsonBuffer : public FlushingStringBuffer

+ 3 - 0
roxie/ccd/CMakeLists.txt

@@ -36,6 +36,7 @@ set (   SRCS
         ccdkey.cpp 
         ccdlistener.cpp
         ccdmain.cpp
+        ccdprotocol.cpp
         ccdquery.cpp
         ccdqueue.cpp
         ccdsnmp.cpp 
@@ -50,10 +51,12 @@ set (   SRCS
         ccdfile.hpp
         ccdkey.hpp
         ccdlistener.hpp
+        ccdprotocol.hpp
         ccdquery.hpp
         ccdqueue.ipp
         ccdsnmp.hpp
         ccdstate.hpp 
+        hpccprotocol.hpp
         
                 sourcedoc.xml
     )

+ 70 - 341
roxie/ccd/ccdcontext.cpp

@@ -1123,8 +1123,6 @@ protected:
     const IRoxieContextLogger &logctx;
 
 protected:
-    CriticalSection resultsCrit;
-    IPointerArrayOf<FlushingStringBuffer> resultMap;
     bool exceptionLogged;
     bool aborted;
     CriticalSection abortLock; // NOTE: we don't bother to get lock when just reading to see whether to abort
@@ -2546,10 +2544,12 @@ public:
 class CRoxieServerContext : public CRoxieContextBase, implements IRoxieServerContext, implements IGlobalCodeContext
 {
     const IQueryFactory *serverQueryFactory;
+    IHpccProtocolResponse *protocol;
+    IHpccProtocolResultsWriter *results;
+    IHpccNativeProtocolResponse *nativeProtocol;
     CriticalSection daliUpdateCrit;
     StringAttr querySetName;
 
-    TextMarkupFormat mlFmt;
     bool isRaw;
     bool sendHeartBeats;
     unsigned lastSocketCheckTime;
@@ -2559,34 +2559,23 @@ protected:
     Owned<CRoxieWorkflowMachine> workflow;
     Owned<ITimeReporter> myTimer;
     mutable MapStringToMyClass<IResolvedFile> fileCache;
-    SafeSocket *client;
     StringArray clusterNames;
     int clusterWidth = -1;
 
     bool isBlocked;
-    bool isHttp;
+    bool isNative;
     bool trim;
 
     void doPostProcess()
     {
-        CriticalBlock b(resultsCrit); // Probably not needed
+        if (!protocol)
+            return;
+
         if (!isRaw && !isBlocked)
-        {
-            ForEachItemIn(seq, resultMap)
-            {
-                FlushingStringBuffer *result = resultMap.item(seq);
-                if (result)
-                    result->flush(true);
-            }
-        }
+            protocol->flush();
 
         if (probeQuery)
         {
-            FlushingStringBuffer response(client, isBlocked, MarkupFmt_XML, false, isHttp, *this);
-
-            // create output stream
-            response.startDataset("_Probe", NULL, (unsigned) -1);  // initialize it
-
             // loop through all of the graphs and create a _Probe to output each xgmml
             Owned<IPropertyTreeIterator> graphs = probeQuery->getElements("Graph");
             ForEach(*graphs)
@@ -2595,8 +2584,7 @@ protected:
 
                 StringBuffer xgmml;
                 _toXML(&graph, xgmml, 0);
-                response.append("\n");
-                response.append(xgmml.str());
+                protocol->appendProbeGraph(xgmml.str());
             }
         }
     }
@@ -2608,13 +2596,10 @@ protected:
 
     void init()
     {
-        client = NULL;
         totSlavesReplyLen = 0;
-        mlFmt = MarkupFmt_XML;
         isRaw = false;
         isBlocked = false;
-        isHttp = false;
-        trim = false;
+        isNative = true;
         sendHeartBeats = false;
 
         lastSocketCheckTime = startTime;
@@ -2628,7 +2613,7 @@ protected:
         wu->subscribe(SubscribeOptionAbort);
         addTimeStamp(wu, SSTglobal, NULL, StWhenQueryStarted);
         if (!context->getPropBool("@outputToSocket", false))
-            client = NULL;
+            protocol = NULL;
         updateSuppliedXmlParams(wu);
         SCMStringBuffer wuParams;
         if (workUnit->getXmlParams(wuParams, false).length())
@@ -2657,9 +2642,9 @@ protected:
 
     void initDebugMode(bool breakAtStart, const char *debugUID)
     {
-        if (!debugPermitted || !ownEP.port)
+        if (!debugPermitted || !ownEP.port || !nativeProtocol)
             throw MakeStringException(ROXIE_ACCESS_ERROR, "Debug queries are not permitted on this system");
-        debugContext.setown(new CRoxieServerDebugContext(this, logctx, factory->cloneQueryXGMML(), *client));
+        debugContext.setown(new CRoxieServerDebugContext(this, logctx, factory->cloneQueryXGMML(), *nativeProtocol->querySafeSocket()));
         debugContext->debugInitialize(debugUID, factory->queryQueryName(), breakAtStart);
         if (workUnit)
         {
@@ -2677,7 +2662,7 @@ public:
     IMPLEMENT_IINTERFACE;
 
     CRoxieServerContext(const IQueryFactory *_factory, const IRoxieContextLogger &_logctx)
-        : CRoxieContextBase(_factory, _logctx), serverQueryFactory(_factory)
+        : CRoxieContextBase(_factory, _logctx), serverQueryFactory(_factory), results(NULL)
     {
         init();
         rowManager->setMemoryLimit(options.memoryLimit);
@@ -2686,7 +2671,7 @@ public:
     }
 
     CRoxieServerContext(IConstWorkUnit *_workUnit, const IQueryFactory *_factory, const ContextLogger &_logctx)
-        : CRoxieContextBase(_factory, _logctx), serverQueryFactory(_factory)
+        : CRoxieContextBase(_factory, _logctx), serverQueryFactory(_factory), results(NULL)
     {
         init();
         workUnit.set(_workUnit);
@@ -2700,18 +2685,20 @@ public:
         startWorkUnit();
     }
 
-    CRoxieServerContext(IPropertyTree *_context, const IQueryFactory *_factory, SafeSocket &_client, TextMarkupFormat _mlFmt, bool _isRaw, bool _isBlocked, HttpHelper &httpHelper, bool _trim, const ContextLogger &_logctx, PTreeReaderOptions _xmlReadFlags, const char *_querySetName)
-        : CRoxieContextBase(_factory, _logctx), serverQueryFactory(_factory), querySetName(_querySetName)
+    CRoxieServerContext(IPropertyTree *_context, IHpccProtocolResponse *_protocol, const IQueryFactory *_factory, unsigned flags, const ContextLogger &_logctx, PTreeReaderOptions _xmlReadFlags, const char *_querySetName)
+        : CRoxieContextBase(_factory, _logctx), serverQueryFactory(_factory), querySetName(_querySetName), protocol(_protocol), results(NULL)
     {
         init();
+        if (protocol)
+        {
+            nativeProtocol = dynamic_cast<IHpccNativeProtocolResponse*>(protocol);
+            results = protocol->queryHpccResultsSection();
+        }
         context.set(_context);
         options.setFromContext(context);
-        client = &_client;
-        mlFmt = _mlFmt;
-        isRaw = _isRaw;
-        isBlocked = _isBlocked;
-        isHttp = httpHelper.isHttp();
-        trim = _trim;
+        isNative = (flags & HPCC_PROTOCOL_NATIVE);
+        isRaw = (flags & HPCC_PROTOCOL_NATIVE_RAW);
+        isBlocked = (flags & HPCC_PROTOCOL_BLOCKED);
         xmlStoredDatasetReadFlags = _xmlReadFlags;
         sendHeartBeats = enableHeartBeat && isRaw && isBlocked && options.priority==0;
 
@@ -2738,7 +2725,7 @@ public:
         // MORE some of these might be appropriate in wu case too?
         rowManager->setActivityTracking(context->getPropBool("_TraceMemory", false));
         rowManager->setMemoryLimit(options.memoryLimit);
-        authToken.append(httpHelper.queryAuthToken());
+
         workflow.setown(_factory->createWorkflowMachine(workUnit, false, logctx));
     }
 
@@ -2805,14 +2792,14 @@ public:
                 }
             }
         }
-        if (client)
+        if (protocol)
         {
             if (socketCheckInterval)
             {
                 if (ticksNow - lastSocketCheckTime > socketCheckInterval)
                 {
                     CriticalBlock b(abortLock);
-                    if (!client->checkConnection())
+                    if (!protocol->checkConnection())
                         throw MakeStringException(ROXIE_CLIENT_CLOSED, "Client socket closed");
                     lastSocketCheckTime = ticksNow;
                 }
@@ -2823,7 +2810,7 @@ public:
                 if (hb > 30000)
                 {
                     lastHeartBeat = msTick();
-                    client->sendHeartBeat(*this);
+                    protocol->sendHeartBeat();
                 }
             }
         }
@@ -2945,31 +2932,15 @@ public:
         return this;
     }
 
-    // interface ICodeContext
-    virtual FlushingStringBuffer *queryResult(unsigned sequence)
-    {
-        if (!client && workUnit)
-            return NULL;    // when outputting to workunit only, don't output anything to stdout
-        CriticalBlock procedure(resultsCrit);
-        while (!resultMap.isItem(sequence))
-            resultMap.append(NULL);
-        FlushingStringBuffer *result = resultMap.item(sequence);
-        if (!result)
-        {
-            result = new FlushingStringBuffer(client, isBlocked, mlFmt, isRaw, isHttp, *this);
-            result->isSoap = isHttp;
-            result->trim = trim;
-            result->queryName.set(context->queryName());
-            resultMap.replace(result, sequence);
-        }
-        return result;
-    }
-
     virtual char *getDaliServers()
     {
         //MORE: Should this now be implemented using IRoxieDaliHelper?
         throwUnexpected();
     }
+    virtual IHpccProtocolResponse *queryProtocol()
+    {
+        return protocol;
+    }
 
     virtual void setResultBool(const char *name, unsigned sequence, bool value)
     {
@@ -2978,18 +2949,11 @@ public:
             CriticalBlock b(contextCrit);
             useContext(sequence).setPropBool(name, value);
         }
-        else
+        else if (results)
         {
-            FlushingStringBuffer *r = queryResult(sequence);
-            if (r)
-            {
-                r->startScalar(name, sequence);
-                if (isRaw)
-                    r->append(sizeof(value), (char *)&value);
-                else
-                    r->append(value ? "true" : "false");
-            }
+            results->setResultBool(name, sequence, value);
         }
+
         if (workUnit)
         {
             try
@@ -3024,14 +2988,9 @@ public:
             IPropertyTree &ctx = useContext(sequence);
             ctx.setProp(name, s.str());
         }
-        else
+        else if (results)
         {
-            FlushingStringBuffer *r = queryResult(sequence);
-            if (r)
-            {
-                r->startScalar(name, sequence);
-                r->encodeData(data, len);
-            }
+            results->setResultData(name, sequence, len, data);
         }
         if (workUnit)
         {
@@ -3125,17 +3084,9 @@ public:
             ctx.setPropBin(name, len, data);
             ctx.queryPropTree(name)->setProp("@format", "raw");
         }
-        else
+        else if (results)
         {
-            FlushingStringBuffer *r = queryResult(sequence);
-            if (r)
-            {
-                r->startScalar(name, sequence);
-                if (isRaw)
-                    r->append(len, (const char *) data);
-                else
-                    UNIMPLEMENTED;
-            }
+            results->setResultRaw(name, sequence, len, data);
         }
 
         if (workUnit)
@@ -3169,42 +3120,9 @@ public:
             ctx.queryPropTree(name)->setProp("@format", "raw");
             ctx.queryPropTree(name)->setPropBool("@isAll", isAll);
         }
-        else
+        else if (results)
         {
-            FlushingStringBuffer *r = queryResult(sequence);
-            if (r)
-            {
-                r->startScalar(name, sequence);
-                if (isRaw)
-                    r->append(len, (char *)data);
-                else if (mlFmt==MarkupFmt_XML)
-                {
-                    assertex(transformer);
-                    CommonXmlWriter writer(getXmlFlags()|XWFnoindent, 0);
-                    transformer->toXML(isAll, len, (byte *)data, writer);
-                    r->append(writer.str());
-                }
-                else if (mlFmt==MarkupFmt_JSON)
-                {
-                    assertex(transformer);
-                    CommonJsonWriter writer(getXmlFlags()|XWFnoindent, 0);
-                    transformer->toXML(isAll, len, (byte *)data, writer);
-                    r->append(writer.str());
-                }
-                else
-                {
-                    assertex(transformer);
-                    r->append('[');
-                    if (isAll)
-                        r->appendf("*]");
-                    else
-                    {
-                        SimpleOutputWriter x;
-                        transformer->toXML(isAll, len, (const byte *) data, x);
-                        r->appendf("%s]", x.str());
-                    }
-                }
-            }
+            results->setResultSet(name, sequence, isAll, len, data, transformer);
         }
 
         if (workUnit)
@@ -3243,24 +3161,9 @@ public:
             CriticalBlock b(contextCrit);
             useContext(sequence).setPropBin(name, m.length(), m.toByteArray());
         }
-        else
+        else if (results)
         {
-            FlushingStringBuffer *r = queryResult(sequence);
-            if (r)
-            {
-                r->startScalar(name, sequence);
-                if (isRaw)
-                    r->append(len, (char *)val);
-                else
-                {
-                    StringBuffer s;
-                    if (isSigned)
-                        outputXmlDecimal(val, len, precision, NULL, s);
-                    else
-                        outputXmlUDecimal(val, len, precision, NULL, s);
-                    r->append(s);
-                }
-            }
+            results->setResultDecimal(name, sequence, len, precision, isSigned, val);
         }
         if (workUnit)
         {
@@ -3290,21 +3193,10 @@ public:
             CriticalBlock b(contextCrit);
             useContext(sequence).setPropInt64(name, value);
         }
-        else
+        else if (results)
         {
-            FlushingStringBuffer *r = queryResult(sequence);
-            if (r)
-            {
-                if (isRaw)
-                {
-                    r->startScalar(name, sequence);
-                    r->append(sizeof(value), (char *)&value);
-                }
-                else
-                    r->setScalarInt(name, sequence, value, size);
-            }
+            results->setResultInt(name, sequence, value, size);
         }
-
         if (workUnit)
         {
             try
@@ -3334,19 +3226,9 @@ public:
             CriticalBlock b(contextCrit);
             useContext(sequence).setPropInt64(name, value);
         }
-        else
+        else if (results)
         {
-            FlushingStringBuffer *r = queryResult(sequence);
-            if (r)
-            {
-                if (isRaw)
-                {
-                    r->startScalar(name, sequence);
-                    r->append(sizeof(value), (char *)&value);
-                }
-                else
-                    r->setScalarUInt(name, sequence, value, size);
-            }
+            results->setResultUInt(name, sequence, value, size);
         }
 
         if (workUnit)
@@ -3378,14 +3260,9 @@ public:
             CriticalBlock b(contextCrit);
             useContext(sequence).setPropBin(name, sizeof(value), &value);
         }
-        else
+        else if (results)
         {
-            FlushingStringBuffer *r = queryResult(sequence);
-            if (r)
-            {
-                r->startScalar(name, sequence);
-                r->append(value);
-            }
+            results->setResultReal(name, sequence, value);
         }
         if (workUnit)
         {
@@ -3415,23 +3292,10 @@ public:
             CriticalBlock b(contextCrit);
             useContext(sequence).setPropBin(name, len, str);
         }
-        else
+        else if (results)
         {
-            FlushingStringBuffer *r = queryResult(sequence);
-            if (r)
-            {
-                r->startScalar(name, sequence);
-                if (r->isRaw)
-                {
-                    r->append(len, str);
-                }
-                else
-                {
-                    r->encodeString(str, len);
-                }
-            }
+            results->setResultString(name, sequence, len, str);
         }
-
         if (workUnit)
         {
             try
@@ -3463,26 +3327,10 @@ public:
             CriticalBlock b(contextCrit);
             useContext(sequence).setPropBin(name, bufflen, buff.getstr());
         }
-        else
+        else if (results)
         {
-            FlushingStringBuffer *r = queryResult(sequence);
-            if (r)
-            {
-                r->startScalar(name, sequence);
-                if (r->isRaw)
-                {
-                    r->append(len*2, (const char *) str);
-                }
-                else
-                {
-                    rtlDataAttr buff;
-                    unsigned bufflen = 0;
-                    rtlUnicodeToCodepageX(bufflen, buff.refstr(), len, str, "utf-8");
-                    r->encodeString(buff.getstr(), bufflen, true); // output as UTF-8
-                }
-            }
+            results->setResultUnicode(name, sequence, len, str);
         }
-
         if (workUnit)
         {
             try
@@ -3519,7 +3367,9 @@ public:
 
     virtual IWorkUnitRowReader *createStreamedRawRowReader(IEngineRowAllocator *rowAllocator, bool isGrouped, const char *id)
     {
-        return new StreamedRawDataReader(this, rowAllocator, isGrouped, logctx, *client, id);
+        if (!nativeProtocol)
+            throwUnexpected();
+        return new StreamedRawDataReader(this, rowAllocator, isGrouped, logctx, *nativeProtocol->querySafeSocket(), id);
     }
 
     virtual void printResults(IXmlWriter *output, const char *name, unsigned sequence)
@@ -3885,10 +3735,15 @@ public:
             superfileTransaction.setown(createDistributedFileTransaction(queryUserDescriptor(), queryCodeContext()));
         return superfileTransaction.get();
     }
-    virtual void flush(unsigned seqNo) { throwUnexpected(); }
+    virtual void finalize(unsigned seqNo)
+    {
+        if (!protocol)
+            throwUnexpected();
+        protocol->finalize(seqNo);
+    }
     virtual unsigned getPriority() const { return options.priority; }
     virtual IConstWorkUnit *queryWorkUnit() const { return workUnit; }
-    virtual bool outputResultsToSocket() const { return client != NULL; }
+    virtual bool outputResultsToSocket() const { return protocol != NULL; }
 
     virtual void selectCluster(const char * newCluster)
     {
@@ -3927,67 +3782,8 @@ private:
     StringAttr queryName;
 
 public:
-    CSoapRoxieServerContext(IPropertyTree *_context, const IQueryFactory *_factory, SafeSocket &_client, HttpHelper &httpHelper, const ContextLogger &_logctx, PTreeReaderOptions xmlReadFlags, const char *_querySetName)
-        : CRoxieServerContext(_context, _factory, _client, MarkupFmt_XML, false, false, httpHelper, httpHelper.getTrim(), _logctx, xmlReadFlags, _querySetName)
-    {
-        queryName.set(_context->queryName());
-    }
-
-    virtual void process()
-    {
-        EclProcessFactory pf = (EclProcessFactory) factory->queryDll()->getEntry("createProcess");
-        Owned<IEclProcess> p = pf();
-        if (workflow)
-            workflow->perform(this, p);
-        else
-            p->perform(this, 0);
-    }
-
-    virtual void flush(unsigned seqNo)
-    {
-        CriticalBlock b(resultsCrit);
-        CriticalBlock b1(client->queryCrit());
-
-        StringBuffer responseHead, responseTail;
-        responseHead.append("<").append(queryName).append("Response");
-        responseHead.append(" sequence=\"").append(seqNo).append("\"");
-        responseHead.append(" xmlns=\"urn:hpccsystems:ecl:").appendLower(queryName.length(), queryName.str()).append("\">");
-        responseHead.append("<Results><Result>");
-        unsigned len = responseHead.length();
-        client->write(responseHead.detach(), len, true);
-
-        ForEachItemIn(seq, resultMap)
-        {
-            FlushingStringBuffer *result = resultMap.item(seq);
-            if (result)
-            {
-                result->flush(true);
-                for(;;)
-                {
-                    size32_t length;
-                    void *payload = result->getPayload(length);
-                    if (!length)
-                        break;
-                    client->write(payload, length, true);
-                }
-            }
-        }
-
-        responseTail.append("</Result></Results>");
-        responseTail.append("</").append(queryName).append("Response>");
-        len = responseTail.length();
-        client->write(responseTail.detach(), len, true);
-    }
-};
-
-class CJsonRoxieServerContext : public CRoxieServerContext
-{
-private:
-    StringAttr queryName;
-
-public:
-    CJsonRoxieServerContext(IPropertyTree *_context, const IQueryFactory *_factory, SafeSocket &_client, HttpHelper &httpHelper, const ContextLogger &_logctx, PTreeReaderOptions xmlReadFlags, const char *_querySetName)
-        : CRoxieServerContext(_context, _factory, _client, MarkupFmt_JSON, false, false, httpHelper, httpHelper.getTrim(), _logctx, xmlReadFlags, _querySetName)
+    CSoapRoxieServerContext(IPropertyTree *_context, IHpccProtocolResponse *_protocol, const IQueryFactory *_factory, unsigned flags, const ContextLogger &_logctx, PTreeReaderOptions xmlReadFlags, const char *_querySetName)
+        : CRoxieServerContext(_context, _protocol, _factory, flags, _logctx, xmlReadFlags, _querySetName)
     {
         queryName.set(_context->queryName());
     }
@@ -4001,80 +3797,13 @@ public:
         else
             p->perform(this, 0);
     }
-
-    virtual void flush(unsigned seqNo)
-    {
-        CriticalBlock b(resultsCrit);
-        CriticalBlock b1(client->queryCrit());
-
-        StringBuffer responseHead, responseTail;
-        appendfJSONName(responseHead, "%sResponse", queryName.get()).append(" {");
-        appendJSONValue(responseHead, "sequence", seqNo);
-        appendJSONName(responseHead, "Results").append(" {");
-
-        unsigned len = responseHead.length();
-        client->write(responseHead.detach(), len, true);
-
-        bool needDelimiter = false;
-        ForEachItemIn(seq, resultMap)
-        {
-            FlushingStringBuffer *result = resultMap.item(seq);
-            if (result)
-            {
-                result->flush(true);
-                for(;;)
-                {
-                    size32_t length;
-                    void *payload = result->getPayload(length);
-                    if (!length)
-                        break;
-                    if (needDelimiter)
-                    {
-                        StringAttr s(","); //write() will take ownership of buffer
-                        size32_t len = s.length();
-                        client->write((void *)s.detach(), len, true);
-                        needDelimiter=false;
-                    }
-                    client->write(payload, length, true);
-                }
-                needDelimiter=true;
-            }
-        }
-
-        responseTail.append("}}");
-        len = responseTail.length();
-        client->write(responseTail.detach(), len, true);
-    }
-
-    virtual FlushingStringBuffer *queryResult(unsigned sequence)
-    {
-        if (!client && workUnit)
-            return NULL;    // when outputting to workunit only, don't output anything to stdout
-        CriticalBlock procedure(resultsCrit);
-        while (!resultMap.isItem(sequence))
-            resultMap.append(NULL);
-        FlushingStringBuffer *result = resultMap.item(sequence);
-        if (!result)
-        {
-            result = new FlushingJsonBuffer(client, isBlocked, isHttp, *this);
-            result->trim = trim;
-            result->queryName.set(context->queryName());
-            resultMap.replace(result, sequence);
-        }
-        return result;
-    }
 };
 
-IRoxieServerContext *createRoxieServerContext(IPropertyTree *context, const IQueryFactory *factory, SafeSocket &client, bool isXml, bool isRaw, bool isBlocked, HttpHelper &httpHelper, bool trim, const ContextLogger &_logctx, PTreeReaderOptions readFlags, const char *querySetName)
+IRoxieServerContext *createRoxieServerContext(IPropertyTree *context, IHpccProtocolResponse *protocol, const IQueryFactory *factory, unsigned flags, const ContextLogger &_logctx, PTreeReaderOptions readFlags, const char *querySetName)
 {
-    if (httpHelper.isHttp())
-    {
-        if (httpHelper.queryContentFormat()==MarkupFmt_JSON)
-            return new CJsonRoxieServerContext(context, factory, client, httpHelper, _logctx, readFlags, querySetName);
-        return new CSoapRoxieServerContext(context, factory, client, httpHelper, _logctx, readFlags, querySetName);
-    }
-    else
-        return new CRoxieServerContext(context, factory, client, isXml ? MarkupFmt_XML : MarkupFmt_Unknown, isRaw, isBlocked, httpHelper, trim, _logctx, readFlags, querySetName);
+    if (flags & HPCC_PROTOCOL_NATIVE)
+        return new CRoxieServerContext(context, protocol, factory, flags, _logctx, readFlags, querySetName);
+    return new CSoapRoxieServerContext(context, protocol, factory, flags, _logctx, readFlags, querySetName);
 }
 
 IRoxieServerContext *createOnceServerContext(const IQueryFactory *factory, const IRoxieContextLogger &_logctx)

+ 3 - 3
roxie/ccd/ccdcontext.hpp

@@ -76,7 +76,6 @@ interface IRoxieSlaveContext : extends IRoxieContextLogger
 interface IRoxieServerContext : extends IInterface
 {
     virtual IGlobalCodeContext *queryGlobalCodeContext() = 0;
-    virtual FlushingStringBuffer *queryResult(unsigned sequence) = 0;
     virtual void setResultXml(const char *name, unsigned sequence, const char *xml) = 0;
     virtual void appendResultDeserialized(const char *name, unsigned sequence, size32_t count, byte **data, bool extend, IOutputMetaData *meta) = 0;
     virtual void appendResultRawContext(const char *name, unsigned sequence, int len, const void * data, int numRows, bool extend, bool saveInContext) = 0;
@@ -84,7 +83,7 @@ interface IRoxieServerContext : extends IInterface
 
     virtual void process() = 0;
     virtual void done(bool failed) = 0;
-    virtual void flush(unsigned seqNo) = 0;
+    virtual void finalize(unsigned seqNo) = 0;
     virtual unsigned getMemoryUsage() = 0;
     virtual unsigned getSlavesReplyLen() = 0;
 
@@ -94,6 +93,7 @@ interface IRoxieServerContext : extends IInterface
 
     virtual IRoxieDaliHelper *checkDaliConnection() = 0;
     virtual const IProperties *queryXmlns(unsigned seqNo) = 0;
+    virtual IHpccProtocolResponse *queryProtocol() = 0;
 };
 
 interface IDeserializedResultStore : public IInterface
@@ -109,7 +109,7 @@ class CRoxieWorkflowMachine;
 
 extern IDeserializedResultStore *createDeserializedResultStore();
 extern IRoxieSlaveContext *createSlaveContext(const IQueryFactory *factory, const SlaveContextLogger &logctx, IRoxieQueryPacket *packet, bool hasRemoteChildren);
-extern IRoxieServerContext *createRoxieServerContext(IPropertyTree *context, const IQueryFactory *factory, SafeSocket &client, bool isXml, bool isRaw, bool isBlocked, HttpHelper &httpHelper, bool trim, const ContextLogger &logctx, PTreeReaderOptions xmlReadFlags, const char *querySetName);
+extern IRoxieServerContext *createRoxieServerContext(IPropertyTree *context, IHpccProtocolResponse *protocol, const IQueryFactory *factory, unsigned flags, const ContextLogger &logctx, PTreeReaderOptions xmlReadFlags, const char *querySetName);
 extern IRoxieServerContext *createOnceServerContext(const IQueryFactory *factory, const IRoxieContextLogger &_logctx);
 extern IRoxieServerContext *createWorkUnitServerContext(IConstWorkUnit *wu, const IQueryFactory *factory, const ContextLogger &logctx);
 extern CRoxieWorkflowMachine *createRoxieWorkflowMachine(IPropertyTree *_workflowInfo, IConstWorkUnit *wu, bool doOnce, const IRoxieContextLogger &_logctx);

文件差异内容过多而无法显示
+ 566 - 827
roxie/ccd/ccdlistener.cpp


+ 11 - 17
roxie/ccd/ccdlistener.hpp

@@ -19,25 +19,19 @@
 #define _CCDLISTENER_INCL
 
 #include <jlib.hpp>
+#include "ccdprotocol.hpp"
 
-interface IRoxieListener : extends IInterface
-{
-    virtual void start() = 0;
-    virtual bool stop(unsigned timeout) = 0;
-    virtual void stopListening() = 0;
-    virtual void disconnectQueue() = 0;
-    virtual void addAccess(bool allow, bool allowBlind, const char *ip, const char *mask, const char *query, const char *errMsg, int errCode) = 0;
-    virtual unsigned queryPort() const = 0;
-    virtual const SocketEndpoint &queryEndpoint() const = 0;
-    virtual bool suspend(bool suspendIt) = 0;
-
-    virtual void runOnce(const char *query) = 0;
-};
-
-extern IRoxieListener *createRoxieSocketListener(unsigned port, unsigned poolSize, unsigned listenQueue, bool suspended);
-extern IRoxieListener *createRoxieWorkUnitListener(unsigned poolSize, bool suspended);
+extern IHpccProtocolMsgSink *createRoxieProtocolMsgSink(const IpAddress &ip, unsigned short port, unsigned poolSize, bool suspended);
+
+extern IHpccProtocolListener *createRoxieWorkUnitListener(unsigned poolSize, bool suspended);
 extern bool suspendRoxieListener(unsigned port, bool suspended);
-extern IArrayOf<IRoxieListener> socketListeners;
+
+extern MapStringToMyClass<SharedObject> protocolDlls;
+extern MapStringToMyClass<IHpccProtocolPlugin> protocolPlugins;
+extern IArrayOf<IHpccProtocolListener> socketListeners;
+
+extern IHpccProtocolPlugin *ensureProtocolPlugin(IHpccProtocolPluginContext &protocolCtx, const char *soname);
+
 extern void disconnectRoxieQueues();
 extern void updateAffinity(unsigned __int64 affinity);
 

+ 33 - 5
roxie/ccd/ccdmain.cpp

@@ -409,6 +409,24 @@ void saveTopology()
     }
 }
 
+class CHpccProtocolPluginCtx : public CInterface, implements IHpccProtocolPluginContext
+{
+public:
+    IMPLEMENT_IINTERFACE;
+    virtual int ctxGetPropInt(const char *propName, int defaultValue) const
+    {
+        return topology->getPropInt(propName, defaultValue);
+    }
+    virtual bool ctxGetPropBool(const char *propName, bool defaultValue) const
+    {
+        return topology->getPropBool(propName, defaultValue);
+    }
+    virtual const char *ctxQueryProp(const char *propName) const
+    {
+        return topology->queryProp(propName);
+    }
+};
+
 int STARTQUERY_API start_query(int argc, const char *argv[])
 {
     EnableSEHtoExceptionMapping();
@@ -1010,9 +1028,11 @@ int STARTQUERY_API start_query(int argc, const char *argv[])
 #endif
         EnableSEHtoExceptionMapping();
         setSEHtoExceptionHandler(&abortHandler);
+        Owned<IHpccProtocolPluginContext> protocolCtx = new CHpccProtocolPluginCtx();
         if (runOnce)
         {
-            Owned <IRoxieListener> roxieServer = createRoxieSocketListener(0, 1, 0, false);
+            Owned<IHpccProtocolPlugin> protocolPlugin = loadHpccProtocolPlugin(protocolCtx, NULL);
+            Owned<IHpccProtocolListener> roxieServer = protocolPlugin->createListener("runOnce", createRoxieProtocolMsgSink(getNodeAddress(myNodeIndex), 0, 1, false), 0, 0, NULL);
             try
             {
                 const char *format = globals->queryProp("format");
@@ -1048,18 +1068,26 @@ int STARTQUERY_API start_query(int argc, const char *argv[])
                 unsigned numThreads = roxieFarm.getPropInt("@numThreads", numServerThreads);
                 unsigned port = roxieFarm.getPropInt("@port", ROXIE_SERVER_PORT);
                 unsigned requestArrayThreads = roxieFarm.getPropInt("@requestArrayThreads", 5);
+                const IpAddress &ip = getNodeAddress(myNodeIndex);
                 if (!roxiePort)
                 {
                     roxiePort = port;
-                    ownEP.set(roxiePort, getNodeAddress(myNodeIndex));
+                    ownEP.set(roxiePort, ip);
                 }
                 bool suspended = roxieFarm.getPropBool("@suspended", false);
-                Owned <IRoxieListener> roxieServer;
+                Owned <IHpccProtocolListener> roxieServer;
                 if (port)
-                    roxieServer.setown(createRoxieSocketListener(port, numThreads, listenQueue, suspended));
+                {
+                    const char *protocol = roxieFarm.queryProp("@protocol");
+                    const char *soname =  roxieFarm.queryProp("@so");
+                    const char *config  = roxieFarm.queryProp("@config");
+                    Owned<IHpccProtocolPlugin> protocolPlugin = ensureProtocolPlugin(*protocolCtx, soname);
+                    roxieServer.setown(protocolPlugin->createListener(protocol ? protocol : "native", createRoxieProtocolMsgSink(ip, port, numThreads, suspended), port, listenQueue, config));
+                }
                 else
                     roxieServer.setown(createRoxieWorkUnitListener(numThreads, suspended));
 
+                IHpccProtocolMsgSink *sink = roxieServer->queryMsgSink();
                 const char *aclName = roxieFarm.queryProp("@aclName");
                 if (aclName && *aclName)
                 {
@@ -1071,7 +1099,7 @@ int STARTQUERY_API start_query(int argc, const char *argv[])
                         IPropertyTree &access = accesses->query();
                         try
                         {
-                            roxieServer->addAccess(access.getPropBool("@allow", true), access.getPropBool("@allowBlind", true), access.queryProp("@ip"), access.queryProp("@mask"), access.queryProp("@query"), access.queryProp("@error"), access.getPropInt("@errorCode"));
+                            sink->addAccess(access.getPropBool("@allow", true), access.getPropBool("@allowBlind", true), access.queryProp("@ip"), access.queryProp("@mask"), access.queryProp("@query"), access.queryProp("@error"), access.getPropInt("@errorCode"));
                         }
                         catch (IException *E)
                         {

文件差异内容过多而无法显示
+ 1819 - 0
roxie/ccd/ccdprotocol.cpp


+ 46 - 0
roxie/ccd/ccdprotocol.hpp

@@ -0,0 +1,46 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2016 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+#ifndef _CCDPROTOCOL_INCL
+#define _CCDPROTOCOL_INCL
+
+#include "hpccprotocol.hpp"
+
+#define HPCC_PROTOCOL_NATIVE           0x10000
+#define HPCC_PROTOCOL_NATIVE_RAW       0x20000
+#define HPCC_PROTOCOL_NATIVE_XML       0x40000
+#define HPCC_PROTOCOL_NATIVE_ASCII     0x80000
+
+interface IHpccNativeProtocolMsgSink : extends IHpccProtocolMsgSink
+{
+    virtual void onControlMsg(IHpccProtocolMsgContext *msgctx, IPropertyTree *msg, IHpccProtocolResponse *protocol) = 0;
+    virtual void onDebugMsg(IHpccProtocolMsgContext *msgctx, const char *uid, IPropertyTree *msg, IXmlWriter &out) = 0;
+};
+
+interface IHpccNativeProtocolResultsWriter : extends IHpccProtocolResultsWriter
+{
+    virtual void appendRawRow(unsigned sequence, unsigned len, const char *data) = 0;
+    virtual void appendSimpleRow(unsigned sequence, const char *str) = 0;
+    virtual void appendRaw(unsigned sequence, unsigned len, const char *data) = 0;
+};
+
+interface IHpccNativeProtocolResponse : extends IHpccProtocolResponse
+{
+    virtual SafeSocket *querySafeSocket() = 0; //still passed to debug context, and row streaming, for now.. tbd get rid of this
+};
+
+#endif

+ 3 - 3
roxie/ccd/ccdquery.cpp

@@ -1487,7 +1487,7 @@ public:
         throwUnexpected();   // only implemented in derived slave class
     }
 
-    virtual IRoxieServerContext *createContext(IPropertyTree *xml, SafeSocket &client, TextMarkupFormat mlFmt, bool isRaw, bool isBlocked, HttpHelper &httpHelper, bool trim, const ContextLogger &_logctx, PTreeReaderOptions xmlReadFlags, const char *querySetName) const
+    virtual IRoxieServerContext *createContext(IPropertyTree *xml, IHpccProtocolResponse *protocol, unsigned flags, const ContextLogger &_logctx, PTreeReaderOptions xmlReadFlags, const char *querySetName) const
     {
         throwUnexpected();   // only implemented in derived server class
     }
@@ -1611,10 +1611,10 @@ public:
         return activities;
     }
 
-    virtual IRoxieServerContext *createContext(IPropertyTree *context, SafeSocket &client, TextMarkupFormat mlFmt, bool isRaw, bool isBlocked, HttpHelper &httpHelper, bool trim, const ContextLogger &_logctx, PTreeReaderOptions _xmlReadFlags, const char *_querySetName) const
+    virtual IRoxieServerContext *createContext(IPropertyTree *context, IHpccProtocolResponse *protocol, unsigned flags, const ContextLogger &_logctx, PTreeReaderOptions _xmlReadFlags, const char *_querySetName) const
     {
         checkSuspended();
-        return createRoxieServerContext(context, this, client, mlFmt==MarkupFmt_XML, isRaw, isBlocked, httpHelper, trim, _logctx, _xmlReadFlags, _querySetName);
+        return createRoxieServerContext(context, protocol, this, flags, _logctx, _xmlReadFlags, _querySetName);
     }
 
     virtual IRoxieServerContext *createContext(IConstWorkUnit *wu, const ContextLogger &_logctx) const

+ 2 - 1
roxie/ccd/ccdquery.hpp

@@ -22,6 +22,7 @@
 #include "ccdserver.hpp"
 #include "ccdkey.hpp"
 #include "ccdfile.hpp"
+#include "ccdprotocol.hpp"
 #include "jhtree.hpp"
 #include "jisem.hpp"
 #include "dllserver.hpp"
@@ -165,7 +166,7 @@ interface IQueryFactory : extends IInterface
     virtual CRoxieWorkflowMachine *createWorkflowMachine(IConstWorkUnit *wu, bool isOnce, const IRoxieContextLogger &logctx) const = 0;
     virtual char *getEnv(const char *name, const char *defaultValue) const = 0;
 
-    virtual IRoxieServerContext *createContext(IPropertyTree *xml, SafeSocket &client, TextMarkupFormat mlFmt, bool isRaw, bool isBlocked, HttpHelper &httpHelper, bool trim, const ContextLogger &_logctx, PTreeReaderOptions xmlReadFlags, const char *querySetName) const = 0;
+    virtual IRoxieServerContext *createContext(IPropertyTree *xml, IHpccProtocolResponse *protocol, unsigned flags, const ContextLogger &_logctx, PTreeReaderOptions xmlReadFlags, const char *querySetName) const = 0;
     virtual IRoxieServerContext *createContext(IConstWorkUnit *wu, const ContextLogger &_logctx) const = 0;
     virtual void noteQuery(time_t startTime, bool failed, unsigned elapsed, unsigned memused, unsigned slavesReplyLen, unsigned bytesOut) = 0;
     virtual IPropertyTree *getQueryStats(time_t from, time_t to) = 0;

+ 34 - 34
roxie/ccd/ccdserver.cpp

@@ -20124,29 +20124,25 @@ public:
             storedName = "Dataset";
 
         MemoryBuffer result;
-        FlushingStringBuffer *response = NULL;
         bool saveInContext = (int) sequence < 0 || isReread;
         if (!meta.queryOriginal()) // this is a bit of a hack - don't know why no meta on an output....
             meta.set(input->queryOutputMeta());
         Owned<IOutputRowSerializer> rowSerializer;
-        Owned<IXmlWriter> writer;
+        Owned<IXmlWriter> xmlwriter;
+        bool appendRaw = false;
+
+        IHpccProtocolResultsWriter *results = NULL;
+        IHpccNativeProtocolResultsWriter *nativeResults = NULL;
+        IHpccProtocolResponse *protocol = serverContext->queryProtocol();
+        unsigned protocolFlags = (protocol) ? protocol->getFlags() : 0;
         if ((int) sequence >= 0)
         {
-            response = serverContext->queryResult(sequence);
-            if (response)
+            results = (protocol) ? protocol->queryHpccResultsSection() : NULL;
+            if (results)
             {
-                const IProperties *xmlns = serverContext->queryXmlns(sequence);
-                response->startDataset("Dataset", helper.queryName(), sequence, (helper.getFlags() & POFextend) != 0, xmlns);
-                if (response->mlFmt==MarkupFmt_XML || response->mlFmt==MarkupFmt_JSON)
-                {
-                    unsigned int writeFlags = serverContext->getXmlFlags();
-                    if (response->mlFmt==MarkupFmt_JSON)
-                        writeFlags |= XWFnoindent;
-                    writer.setown(createIXmlWriterExt(writeFlags, 1, response, (response->mlFmt==MarkupFmt_JSON) ? WTJSON : WTStandard));
-                    writer->outputBeginArray(DEFAULTXMLROWTAG);
-                }
+                nativeResults = dynamic_cast<IHpccNativeProtocolResultsWriter*>(results);
+                xmlwriter.setown(results->addDataset(helper.queryName(), sequence, "Dataset", appendRaw, serverContext->getXmlFlags(), (helper.getFlags() & POFextend) != 0, serverContext->queryXmlns(sequence)));  //xmlwriter only returned if needed
             }
-
         }
         size32_t outputLimitBytes = 0;
         IConstWorkUnit *workunit = serverContext->queryWorkUnit();
@@ -20165,7 +20161,7 @@ public:
             assertex(outputLimit<=0x1000); // 32bit limit because MemoryBuffer/CMessageBuffers involved etc.
             outputLimitBytes = outputLimit * 0x100000;
         }
-        if (workunit != NULL || (response && response->isRaw))
+        if (workunit != NULL || (results && protocol->getFlags() & HPCC_PROTOCOL_NATIVE_RAW))
         {
             createRowAllocator();
             rowSerializer.setown(rowAllocator->createDiskSerializer(ctx->queryCodeContext()));
@@ -20184,13 +20180,18 @@ public:
             {
                 if (workunit)
                     result.append(row == NULL);
-                if (response)
+                if (results)
                 {
-                    if (response->isRaw)
-                        response->append((char)(row == NULL));
-                    else
+                    if (protocolFlags & HPCC_PROTOCOL_NATIVE_RAW && nativeResults)
                     {
-                        response->append("<Row __GroupBreak__=\"1\"/>");        // sensible, but need to handle on input
+                        char val = (char)(row == NULL);
+                        nativeResults->appendRaw(sequence, 1, &val);
+                    }
+                    else if (xmlwriter)
+                    {
+                        xmlwriter->outputBeginNested("Row", false);
+                        xmlwriter->outputCString("1", "@__GroupBreak__");
+                        xmlwriter->outputEndNested("Row");
                     }
                 }
             }
@@ -20208,31 +20209,30 @@ public:
                 CThorDemoRowSerializer serializerTarget(result);
                 rowSerializer->serialize(serializerTarget, (const byte *) row);
             }
-            if (response)
+            if ((int) sequence >= 0)
             {
-                if (response->isRaw)
+                if (appendRaw && nativeResults)
                 {
                     // MORE - should be able to serialize straight to the response...
                     MemoryBuffer rowbuff;
                     CThorDemoRowSerializer serializerTarget(rowbuff);
                     rowSerializer->serialize(serializerTarget, (const byte *) row);
-                    response->append(rowbuff.length(), rowbuff.toByteArray());
+                    nativeResults->appendRawRow(sequence, rowbuff.length(), rowbuff.toByteArray());
                 }
-                else if (writer)
+                else if (xmlwriter)
                 {
-                    writer->outputBeginNested(DEFAULTXMLROWTAG, false);
-                    helper.serializeXml((byte *) row, *writer);
-                    writer->outputEndNested(DEFAULTXMLROWTAG);
+                    xmlwriter->outputBeginNested(DEFAULTXMLROWTAG, false);
+                    helper.serializeXml((byte *) row, *xmlwriter);
+                    xmlwriter->outputEndNested(DEFAULTXMLROWTAG);
+                    results->finalizeXmlRow(sequence);
                 }
-                else
+                else if (nativeResults)
                 {
                     SimpleOutputWriter x;
                     helper.serializeXml((byte *) row, x);
                     x.newline();
-                    response->append(x.str());
+                    nativeResults->appendSimpleRow(sequence, x.str());
                 }
-                response->incrementRowCount();
-                response->flush(false);
             }
             ReleaseRoxieRow(row);
             if (outputLimitBytes && result.length() > outputLimitBytes)
@@ -20248,8 +20248,8 @@ public:
                 throw MakeStringExceptionDirect(0, errMsg.str());
             }
         }
-        if (writer)
-            writer->outputEndArray(DEFAULTXMLROWTAG);
+        if (xmlwriter)
+            xmlwriter->outputEndArray(DEFAULTXMLROWTAG);
         if (saveInContext)
             serverContext->appendResultDeserialized(storedName, sequence, builder.getcount(), builder.linkrows(), (helper.getFlags() & POFextend) != 0, LINK(meta.queryOriginal()));
         if (workunit)

+ 145 - 0
roxie/ccd/hpccprotocol.hpp

@@ -0,0 +1,145 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2016 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+#ifndef _HPCCPROTOCOL_INCL
+#define _HPCCPROTOCOL_INCL
+
+#include <jlib.hpp>
+#include "roxiehelper.hpp"
+
+#define HPCC_PROTOCOL_BLOCKED          0x0001
+#define HPCC_PROTOCOL_TRIM             0x0002
+
+interface IHpccProtocolMsgContext : extends IInterface
+{
+    virtual bool initQuery(StringBuffer &target, const char *queryname) = 0;
+    virtual void noteQueryActive() = 0;
+    virtual unsigned getQueryPriority() = 0;
+    virtual IContextLogger *queryLogContext() = 0;
+    virtual bool checkSetBlind(bool blind) = 0;
+    virtual void verifyAllowDebug() = 0;
+    virtual bool logFullQueries() = 0;
+    virtual bool trapTooManyActiveQueries() = 0;
+    virtual bool getStripWhitespace() = 0;
+    virtual int getBindCores() = 0;
+    virtual void setTraceLevel(unsigned val) = 0;
+    virtual void setIntercept(bool val) = 0;
+    virtual bool getIntercept() = 0;
+    virtual void outputLogXML(IXmlStreamFlusher &out) = 0;
+};
+
+interface IHpccProtocolResultsWriter : extends IInterface
+{
+    virtual IXmlWriter *addDataset(const char *name, unsigned sequence, const char *elementName, bool &appendRawData, unsigned xmlFlags, bool _extend, const IProperties *xmlns) = 0;
+    virtual void finalizeXmlRow(unsigned sequence) = 0;
+
+    virtual void setResultBool(const char *name, unsigned sequence, bool value) = 0;
+    virtual void setResultData(const char *name, unsigned sequence, int len, const void * data) = 0;
+    virtual void setResultDecimal(const char * stepname, unsigned sequence, int len, int precision, bool isSigned, const void *val) = 0;
+    virtual void setResultInt(const char *name, unsigned sequence, __int64 value, unsigned size) = 0;
+    virtual void setResultRaw(const char *name, unsigned sequence, int len, const void * data) = 0;
+    virtual void setResultReal(const char * stepname, unsigned sequence, double value) = 0;
+    virtual void setResultSet(const char *name, unsigned sequence, bool isAll, size32_t len, const void * data, ISetToXmlTransformer * transformer) = 0;
+    virtual void setResultString(const char *name, unsigned sequence, int len, const char * str) = 0;
+    virtual void setResultUInt(const char *name, unsigned sequence, unsigned __int64 value, unsigned size) = 0;
+    virtual void setResultUnicode(const char *name, unsigned sequence, int len, UChar const * str) = 0;
+    virtual void setResultVarString(const char * name, unsigned sequence, const char * value) = 0;
+    virtual void setResultVarUnicode(const char * name, unsigned sequence, UChar const * value) = 0;
+};
+
+interface IHpccProtocolResponse : extends IInterface
+{
+    virtual unsigned getFlags() = 0;
+    virtual IHpccProtocolResultsWriter *queryHpccResultsSection() = 0;
+
+    virtual void appendContent(TextMarkupFormat mlFmt, const char *content, const char *name=NULL) = 0; //will be transformed
+    virtual IXmlWriter *writeAppendContent(const char *name = NULL) = 0;
+    virtual void appendProbeGraph(const char *xml) = 0;
+
+    virtual void finalize(unsigned seqNo) = 0;
+
+    virtual bool checkConnection() = 0;
+    virtual void sendHeartBeat() = 0;
+    virtual void flush() = 0;
+};
+
+interface IHpccProtocolMsgSink : extends IInterface
+{
+    virtual CriticalSection &getActiveCrit() = 0;
+    virtual bool getIsSuspended() = 0;
+    virtual unsigned getActiveThreadCount() = 0;
+    virtual unsigned getPoolSize() = 0;
+    virtual unsigned getMaxActiveThreads() = 0;
+    virtual void setMaxActiveThreads(unsigned val) = 0;
+    virtual void incActiveThreadCount() = 0;
+    virtual void decActiveThreadCount() = 0;
+    virtual bool suspend(bool suspendIt) = 0;
+
+    virtual void addAccess(bool allow, bool allowBlind, const char *ip, const char *mask, const char *query, const char *errorMsg, int errorCode) = 0;
+    virtual void checkAccess(IpAddress &peer, const char *queryName, const char *queryText, bool isBlind) = 0;
+    virtual void queryAccessInfo(StringBuffer &info) = 0;
+
+    virtual IHpccProtocolMsgContext *createMsgContext(time_t startTime) = 0;
+    virtual StringArray &getTargetNames(StringArray &targets) = 0;
+
+    virtual void noteQuery(IHpccProtocolMsgContext *msgctx, const char *peer, bool failed, unsigned bytesOut, unsigned elapsed, unsigned priority, unsigned memused, unsigned slavesReplyLen, bool continuationNeeded) = 0;
+    virtual void onQueryMsg(IHpccProtocolMsgContext *msgctx, IPropertyTree *msg, IHpccProtocolResponse *protocol, unsigned flags, PTreeReaderOptions readFlags, const char *target, unsigned idx, unsigned &memused, unsigned &slaveReplyLen) = 0;
+};
+
+interface IHpccProtocolListener : extends IInterface
+{
+    virtual IHpccProtocolMsgSink *queryMsgSink() = 0;
+
+    virtual unsigned queryPort() const = 0;
+    virtual const SocketEndpoint &queryEndpoint() const = 0;
+
+    virtual void start() = 0;
+    virtual bool stop(unsigned timeout) = 0;
+    virtual void stopListening() = 0;
+    virtual void disconnectQueue() = 0;
+    virtual bool suspend(bool suspendIt) = 0;
+
+    virtual void runOnce(const char *query) = 0;
+};
+
+interface IHpccProtocolPluginContext : extends IInterface
+{
+    virtual int ctxGetPropInt(const char *propName, int defaultValue) const = 0;
+    virtual bool ctxGetPropBool(const char *propName, bool defaultValue) const = 0;
+    virtual const char *ctxQueryProp(const char *propName) const = 0;
+};
+
+interface IActiveQueryLimiter : extends IInterface
+{
+    virtual bool isAccepted() = 0;
+};
+
+interface IActiveQueryLimiterFactory : extends IInterface
+{
+    virtual IActiveQueryLimiter *create(IHpccProtocolListener *listener) = 0;
+};
+
+interface IHpccProtocolPlugin : extends IInterface
+{
+    virtual IHpccProtocolListener *createListener(const char *protocol, IHpccProtocolMsgSink *sink, unsigned port, unsigned listenQueue, const char *config)=0;
+};
+
+extern IHpccProtocolPlugin *loadHpccProtocolPlugin(IHpccProtocolPluginContext *ctx, IActiveQueryLimiterFactory *limiterFactory);
+typedef IHpccProtocolPlugin *(HpccProtocolInstallFunction)(IHpccProtocolPluginContext *ctx, IActiveQueryLimiterFactory *limiterFactory);
+
+
+#endif