Browse Source

HPCC-13566 Encryption in transit for Roxie

Encrypt outbound traffic from server to agents. Using a fixed key at
present but should be enough for us to test the concept and potentially see
what the impact on performance might be.

Also encrypt return UDP traffic
Richard Chapman 4 years ago
parent
commit
830ef49ecf

+ 20 - 2
roxie/ccd/ccd.hpp

@@ -144,6 +144,7 @@ public:
 };
 
 extern bool localAgent;
+extern bool encryptInTransit;
 
 class RoxiePacketHeader
 {
@@ -234,6 +235,16 @@ public:
     }
 };
 
+
+interface ISerializedRoxieQueryPacket : extends IInterface
+{
+    virtual RoxiePacketHeader &queryHeader() const = 0;
+    virtual const byte *queryTraceInfo() const = 0;
+    virtual unsigned getTraceLength() const = 0;
+    virtual IRoxieQueryPacket *deserialize() const = 0;
+    virtual ISerializedRoxieQueryPacket *cloneSerializedPacket(unsigned channel) const = 0;
+};
+
 interface IRoxieQueryPacket : extends IInterface
 {
     virtual RoxiePacketHeader &queryHeader() const = 0;
@@ -248,6 +259,8 @@ interface IRoxieQueryPacket : extends IInterface
 
     virtual IRoxieQueryPacket *clonePacket(unsigned channel) const = 0;
     virtual IRoxieQueryPacket *insertSkipData(size32_t skipDataLen, const void *skipData) const = 0;
+
+    virtual ISerializedRoxieQueryPacket *serialize() const = 0;
 };
 
 interface IQueryDll;
@@ -413,6 +426,11 @@ extern void doUNIMPLEMENTED(unsigned line, const char *file);
 
 extern IRoxieQueryPacket *createRoxiePacket(void *data, unsigned length);
 extern IRoxieQueryPacket *createRoxiePacket(MemoryBuffer &donor); // note: donor is empty after call
+// Direct deserialize callbeck packets from received network data
+extern IRoxieQueryPacket *deserializeCallbackPacket(MemoryBuffer &donor); // note: donor is empty after call
+// Delayed deserialize from received network data
+extern ISerializedRoxieQueryPacket *createSerializedRoxiePacket(MemoryBuffer &donor); // note: donor is empty after call
+
 extern void dumpBuffer(const char *title, const void *buf, unsigned recSize);
 
 inline unsigned getBondedChannel(unsigned partNo)
@@ -733,8 +751,8 @@ class AgentContextLogger : public StringContextLogger
     StringAttr wuid;
 public:
     AgentContextLogger();
-    AgentContextLogger(IRoxieQueryPacket *packet);
-    void set(IRoxieQueryPacket *packet);
+    AgentContextLogger(ISerializedRoxieQueryPacket *packet);
+    void set(ISerializedRoxieQueryPacket *packet);
     void putStatProcessed(unsigned subGraphId, unsigned actId, unsigned idx, unsigned processed, unsigned strands) const;
     void putStats(unsigned subGraphId, unsigned actId, const CRuntimeStatisticCollection &stats) const;
     void flush();

+ 1 - 1
roxie/ccd/ccdfile.cpp

@@ -1707,7 +1707,7 @@ public:
             // We are parsing lines that look like:
             // <channel>|<filename>|<pagelist>
             //
-            // Where pagelist is a space-separated list of page numers or (inclusive) ranges.
+            // Where pagelist is a space-separated list of page numbers or (inclusive) ranges.
             // A page number or range prefixed by a * means that the page(s) was found in the jhtree cache.
             //
             // For example,

+ 3 - 1
roxie/ccd/ccdmain.cpp

@@ -116,6 +116,7 @@ bool useRemoteResources;
 bool checkFileDate;
 bool lazyOpen;
 bool localAgent;
+bool encryptInTransit;
 bool useAeron;
 bool ignoreOrphans;
 bool doIbytiDelay = true; 
@@ -653,6 +654,7 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml)
         topology = loadConfiguration(useOldTopology ? nullptr : defaultYaml, argv, "roxie", "ROXIE", topologyFile, nullptr, "@netAddress");
         saveTopology();
         localAgent = topology->getPropBool("@localAgent", topology->getPropBool("@localSlave", false));  // legacy name
+        encryptInTransit = topology->getPropBool("@encryptInTransit", true) && !localAgent;
         numChannels = topology->getPropInt("@numChannels", 0);
 #ifdef _CONTAINERIZED
         if (!numChannels)
@@ -1224,7 +1226,7 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml)
         createDelayedReleaser();
         globalPackageSetManager = createRoxiePackageSetManager(standAloneDll.getClear());
         globalPackageSetManager->load();
-        ROQ = createOutputQueueManager(snifferChannel, numAgentThreads);
+        ROQ = createOutputQueueManager(snifferChannel, numAgentThreads, encryptInTransit);
         ROQ->setHeadRegionSize(headRegionSize);
         ROQ->start();
         Owned<IPacketDiscarder> packetDiscarder = createPacketDiscarder();

+ 312 - 131
roxie/ccd/ccdqueue.cpp

@@ -22,6 +22,8 @@
 #include <jsocket.hpp>
 #include <jlog.hpp>
 #include "jisem.hpp"
+#include "jencrypt.hpp"
+
 #include "udplib.hpp"
 #include "udptopo.hpp"
 #include "ccd.hpp"
@@ -434,23 +436,17 @@ static bool channelWrite(RoxiePacketHeader &buf, bool includeSelf)
 
 //============================================================================================
 
-class CRoxieQueryPacket : implements IRoxieQueryPacket, public CInterface
+class CRoxieQueryPacketBase : public CInterface
 {
 protected:
     RoxiePacketHeader *data;
-    const byte *continuationData; 
-    unsigned continuationLength;
-    const byte *smartStepInfoData; 
-    unsigned smartStepInfoLength;
-    const byte *contextData;
-    unsigned contextLength;
     const byte *traceInfo;
     unsigned traceLength;
 
 public:
     IMPLEMENT_IINTERFACE;
 
-    CRoxieQueryPacket(const void *_data, int lengthRemaining) : data((RoxiePacketHeader *) _data)
+    CRoxieQueryPacketBase(const void *_data, int lengthRemaining) : data((RoxiePacketHeader *) _data)
     {
         assertex(lengthRemaining >= (int) sizeof(RoxiePacketHeader));
         data->packetlength = lengthRemaining;
@@ -458,10 +454,6 @@ public:
         lengthRemaining -= sizeof(RoxiePacketHeader);
         if (data->activityId == ROXIE_FILECALLBACK || data->activityId == ROXIE_DEBUGCALLBACK || data->retries == QUERY_ABORTED)
         {
-            continuationData = NULL;
-            continuationLength = 0;
-            smartStepInfoData = NULL;
-            smartStepInfoLength = 0;
             traceInfo = NULL;
             traceLength = 0;
         }
@@ -490,6 +482,49 @@ public:
                 finger++;
             }
             traceLength = finger - traceInfo;
+        }
+    }
+
+    ~CRoxieQueryPacketBase()
+    {
+        free(data);
+    }
+
+};
+
+// MORE - this is for TESTING ONLY - do not release with this key here like this!
+
+static byte key[32] = {
+    0xf7, 0xe8, 0x79, 0x40, 0x44, 0x16, 0x66, 0x18, 0x52, 0xb8, 0x18, 0x6e, 0x76, 0xd1, 0x68, 0xd3,
+    0x87, 0x47, 0x01, 0xe6, 0x66, 0x62, 0x2f, 0xbe, 0xc1, 0xd5, 0x9f, 0x4a, 0x53, 0x27, 0xae, 0xa1,
+};
+
+
+class CRoxieQueryPacket : public CRoxieQueryPacketBase, implements IRoxieQueryPacket
+{
+protected:
+    const byte *continuationData = nullptr;
+    unsigned continuationLength = 0;
+    const byte *smartStepInfoData = nullptr;
+    unsigned smartStepInfoLength = 0;
+    const byte *contextData = nullptr;
+    unsigned contextLength = 0;
+    
+public:
+    IMPLEMENT_IINTERFACE;
+    CRoxieQueryPacket(const void *_data, int length) : CRoxieQueryPacketBase(_data, length)
+    {
+        const byte *finger = (const byte *) (data + 1) + traceLength;
+        int lengthRemaining = length - sizeof(RoxiePacketHeader) - traceLength;
+        if (data->activityId == ROXIE_FILECALLBACK || data->activityId == ROXIE_DEBUGCALLBACK || data->retries == QUERY_ABORTED)
+        {
+            continuationData = NULL;
+            continuationLength = 0;
+            smartStepInfoData = NULL;
+            smartStepInfoLength = 0;
+        }
+        else
+        {
             if (data->continueSequence & ~CONTINUE_SEQUENCE_SKIPTO)
             {
                 assertex(lengthRemaining >= (int) sizeof(unsigned short));
@@ -498,11 +533,6 @@ public:
                 finger = continuationData + continuationLength;
                 lengthRemaining -= continuationLength + sizeof(unsigned short);
             }
-            else
-            {
-                continuationData = NULL;
-                continuationLength = 0;
-            }
             if (data->continueSequence & CONTINUE_SEQUENCE_SKIPTO)
             {
                 assertex(lengthRemaining >= (int) sizeof(unsigned short));
@@ -511,25 +541,25 @@ public:
                 finger = smartStepInfoData + smartStepInfoLength;
                 lengthRemaining -= smartStepInfoLength + sizeof(unsigned short);
             }
-            else
-            {
-                smartStepInfoData = NULL;
-                smartStepInfoLength = 0;
-            }
         }
         assertex(lengthRemaining >= 0);
         contextData = finger;
         contextLength = lengthRemaining;
     }
 
-    ~CRoxieQueryPacket()
+    virtual RoxiePacketHeader &queryHeader() const
     {
-        free(data);
+        return  *data;
     }
 
-    virtual RoxiePacketHeader &queryHeader() const
+    virtual const byte *queryTraceInfo() const
     {
-        return  *data;
+        return traceInfo;
+    }
+
+    virtual unsigned getTraceLength() const
+    {
+        return traceLength;
     }
 
     virtual const void *queryContinuationData() const
@@ -552,16 +582,6 @@ public:
         return smartStepInfoLength;
     }
 
-    virtual const byte *queryTraceInfo() const
-    {
-        return traceInfo;
-    }
-
-    virtual unsigned getTraceLength() const
-    {
-        return traceLength;
-    }
-
     virtual const void *queryContextData() const
     {
         return contextData;
@@ -601,11 +621,136 @@ public:
         return createRoxiePacket(newdata, newDataSize);
     }
 
+    virtual ISerializedRoxieQueryPacket *serialize() const override
+    {
+        unsigned length = data->packetlength;
+        MemoryBuffer mb;
+        if (encryptInTransit)
+        {
+            const byte *plainData = (const byte *) (data+1);
+            plainData += traceLength;
+            unsigned plainLen = length - sizeof(RoxiePacketHeader) - traceLength;
+            mb.append(sizeof(RoxiePacketHeader)+traceLength, data);  // Header and traceInfo are unencrypted
+            aesEncrypt(key, sizeof(key), plainData, plainLen, mb);   // Encrypt everything else
+            RoxiePacketHeader *newHeader = (RoxiePacketHeader *) mb.toByteArray();
+            newHeader->packetlength = mb.length();
+        }
+        else
+        {
+            mb.append(length, data);
+        }
+        return createSerializedRoxiePacket(mb);
+    }
+};
+
+// CNocryptRoxieQueryPacket implements both serialized and deserialized packet interfaces, to avoid additional copy operations when
+// using localAgent mode.
+
+class CNocryptRoxieQueryPacket: public CRoxieQueryPacket, implements ISerializedRoxieQueryPacket
+{
+public:
+    IMPLEMENT_IINTERFACE;
+    CNocryptRoxieQueryPacket(const void *_data, int length) : CRoxieQueryPacket(_data, length)
+    {
+    }
+
+    virtual RoxiePacketHeader &queryHeader() const
+    {
+        return CRoxieQueryPacket::queryHeader();
+    }
+
+    virtual const byte *queryTraceInfo() const
+    {
+        return traceInfo;
+    }
+
+    virtual unsigned getTraceLength() const
+    {
+        return traceLength;
+    }
+
+    virtual ISerializedRoxieQueryPacket *cloneSerializedPacket(unsigned channel) const
+    {
+        unsigned length = data->packetlength;
+        RoxiePacketHeader *newdata = (RoxiePacketHeader *) malloc(length);
+        memcpy(newdata, data, length);
+        newdata->channel = channel;
+        newdata->retries |= ROXIE_BROADCAST;
+        return new CNocryptRoxieQueryPacket(newdata, length);
+    }
+
+    virtual ISerializedRoxieQueryPacket *serialize() const override
+    {
+        return const_cast<CNocryptRoxieQueryPacket *>(LINK(this));
+    }
+
+    virtual IRoxieQueryPacket *deserialize() const override
+    {
+        return const_cast<CNocryptRoxieQueryPacket *>(LINK(this));
+    }
+};
+
+class CSerializedRoxieQueryPacket : public CRoxieQueryPacketBase, implements ISerializedRoxieQueryPacket
+{
+public:
+    IMPLEMENT_IINTERFACE;
+    CSerializedRoxieQueryPacket(const void *_data, int length) : CRoxieQueryPacketBase(_data, length)
+    {
+    }
+
+    virtual RoxiePacketHeader &queryHeader() const
+    {
+        return  *data;
+    }
+
+    virtual const byte *queryTraceInfo() const
+    {
+        return traceInfo;
+    }
+
+    virtual unsigned getTraceLength() const
+    {
+        return traceLength;
+    }
+
+    virtual ISerializedRoxieQueryPacket *cloneSerializedPacket(unsigned channel) const
+    {
+        unsigned length = data->packetlength;
+        RoxiePacketHeader *newdata = (RoxiePacketHeader *) malloc(length);
+        memcpy(newdata, data, length);
+        newdata->channel = channel;
+        newdata->retries |= ROXIE_BROADCAST;
+        return new CSerializedRoxieQueryPacket(newdata, length);
+    }
+
+    virtual IRoxieQueryPacket *deserialize() const override
+    {
+        unsigned length = data->packetlength;
+        MemoryBuffer mb;
+        if (encryptInTransit)
+        {
+            const byte *encryptedData = (const byte *) (data+1);
+            encryptedData += traceLength;
+            unsigned encryptedLen = length - sizeof(RoxiePacketHeader) - traceLength;
+            mb.append(sizeof(RoxiePacketHeader)+traceLength, data);         // Header and traceInfo are unencrypted
+            aesDecrypt(key, sizeof(key), encryptedData, encryptedLen, mb);  // Decrypt everything else
+            RoxiePacketHeader *newHeader = (RoxiePacketHeader *) mb.toByteArray();
+            newHeader->packetlength = mb.length();
+        }
+        else
+        {
+            mb.append(length, data);
+        }
+        return createRoxiePacket(mb);
+    }
+
 };
 
 extern IRoxieQueryPacket *createRoxiePacket(void *_data, unsigned _len)
 {
-    if ((unsigned short)_len != _len && !localAgent)
+    if (!encryptInTransit)
+        return new CNocryptRoxieQueryPacket(_data, _len);
+    if ((unsigned short)_len != _len)
     {
         StringBuffer s;
         RoxiePacketHeader *header = (RoxiePacketHeader *) _data;
@@ -622,6 +767,39 @@ extern IRoxieQueryPacket *createRoxiePacket(MemoryBuffer &m)
     return createRoxiePacket(m.detachOwn(), length);
 }
 
+extern IRoxieQueryPacket *deserializeCallbackPacket(MemoryBuffer &m)
+{
+    // Direct decryption of special packets - others are only decrypted after being dequeued
+    if (encryptInTransit)
+    {
+        RoxiePacketHeader *header = (RoxiePacketHeader *) m.toByteArray();
+        assertex(header != nullptr);
+        assertex(header->activityId == ROXIE_FILECALLBACK || header->activityId == ROXIE_DEBUGCALLBACK);
+        assertex(m.length() >= header->packetlength);
+        unsigned encryptedLen = header->packetlength - sizeof(RoxiePacketHeader);
+        const void *encryptedData = (const void *)(header+1);
+        MemoryBuffer decrypted;
+        decrypted.append(sizeof(RoxiePacketHeader), header);
+        decrypted.ensureCapacity(encryptedLen);  // May be up to 16 bytes smaller...
+        aesDecrypt(key, sizeof(key), encryptedData, encryptedLen, decrypted);
+        unsigned length = decrypted.length();
+        RoxiePacketHeader *newHeader = (RoxiePacketHeader *) decrypted.detachOwn();
+        newHeader->packetlength = length;
+        return createRoxiePacket(newHeader, length);
+    }
+    else
+    {
+        unsigned length = m.length(); // don't make assumptions about evaluation order of parameters...
+        return createRoxiePacket(m.detachOwn(), length);
+    }
+}
+
+extern ISerializedRoxieQueryPacket *createSerializedRoxiePacket(MemoryBuffer &m)
+{
+    unsigned length = m.length(); // don't make assumptions about evaluation order of parameters...
+    return new CSerializedRoxieQueryPacket(m.detachOwn(), length);
+}
+
 //=================================================================================
 
 AgentContextLogger::AgentContextLogger()
@@ -630,13 +808,13 @@ AgentContextLogger::AgentContextLogger()
     set(NULL);
 }
 
-AgentContextLogger::AgentContextLogger(IRoxieQueryPacket *packet)
+AgentContextLogger::AgentContextLogger(ISerializedRoxieQueryPacket *packet)
 {
     GetHostIp(ip);
     set(packet);
 }
 
-void AgentContextLogger::set(IRoxieQueryPacket *packet)
+void AgentContextLogger::set(ISerializedRoxieQueryPacket *packet)
 {
     anyOutput = false;
     intercept = false;
@@ -647,50 +825,54 @@ void AgentContextLogger::set(IRoxieQueryPacket *packet)
     start = msTick();
     if (packet)
     {
-        CriticalBlock b(crit);
+        CriticalBlock b(crit); // Why?
         RoxiePacketHeader &header = packet->queryHeader();
         const byte *traceInfo = packet->queryTraceInfo();
-        unsigned traceLength = packet->getTraceLength();
-        unsigned char loggingFlags = *traceInfo;
-        if (loggingFlags & LOGGING_FLAGSPRESENT) // should always be true.... but this flag is handy to avoid flags byte ever being NULL 
-        {
-            traceInfo++;
-            traceLength--;
-            if (loggingFlags & LOGGING_INTERCEPTED)
-                intercept = true;
-            if (loggingFlags & LOGGING_TRACELEVELSET)
+        StringBuffer s;
+        if (traceInfo)
+        {
+            unsigned traceLength = packet->getTraceLength();
+            unsigned char loggingFlags = *traceInfo;
+            if (loggingFlags & LOGGING_FLAGSPRESENT) // should always be true.... but this flag is handy to avoid flags byte ever being NULL
             {
-                ctxTraceLevel = (*traceInfo++ - 1); // avoid null byte here in case anyone still thinks there's just a null-terminated string
+                traceInfo++;
                 traceLength--;
-            }
-            if (loggingFlags & LOGGING_BLIND)
-                blind = true;
-            if (loggingFlags & LOGGING_CHECKINGHEAP)
-                checkingHeap = true;
-            if (loggingFlags & LOGGING_DEBUGGERACTIVE)
-            {
-                assertex(traceLength > sizeof(unsigned short));
-                debuggerActive = true;
-                unsigned short debugLen = *(unsigned short *) traceInfo;
-                traceInfo += debugLen + sizeof(unsigned short);
-                traceLength -= debugLen + sizeof(unsigned short);
-            }
-            // Passing the wuid via the logging context prefix is a lot of a hack...
-            if (loggingFlags & LOGGING_WUID)
-            {
-                unsigned wuidLen = 0;
-                while (wuidLen < traceLength)
+                if (loggingFlags & LOGGING_INTERCEPTED)
+                    intercept = true;
+                if (loggingFlags & LOGGING_TRACELEVELSET)
                 {
-                    if (traceInfo[wuidLen]=='@'||traceInfo[wuidLen]==':')
-                        break;
-                    wuidLen++;
+                    ctxTraceLevel = (*traceInfo++ - 1); // avoid null byte here in case anyone still thinks there's just a null-terminated string
+                    traceLength--;
+                }
+                if (loggingFlags & LOGGING_BLIND)
+                    blind = true;
+                if (loggingFlags & LOGGING_CHECKINGHEAP)
+                    checkingHeap = true;
+                if (loggingFlags & LOGGING_DEBUGGERACTIVE)
+                {
+                    assertex(traceLength > sizeof(unsigned short));
+                    debuggerActive = true;
+                    unsigned short debugLen = *(unsigned short *) traceInfo;
+                    traceInfo += debugLen + sizeof(unsigned short);
+                    traceLength -= debugLen + sizeof(unsigned short);
+                }
+                // Passing the wuid via the logging context prefix is a lot of a hack...
+                if (loggingFlags & LOGGING_WUID)
+                {
+                    unsigned wuidLen = 0;
+                    while (wuidLen < traceLength)
+                    {
+                        if (traceInfo[wuidLen]=='@'||traceInfo[wuidLen]==':')
+                            break;
+                        wuidLen++;
+                    }
+                    wuid.set((const char *) traceInfo, wuidLen);
                 }
-                wuid.set((const char *) traceInfo, wuidLen);
             }
+            s.append(traceLength, (const char *) traceInfo);
+            s.append("|");
         }
         channel = header.channel;
-        StringBuffer s(traceLength, (const char *) traceInfo);
-        s.append("|");
         ip.getIpText(s);
         s.append(':').append(channel);
         StringContextLogger::set(s.str());
@@ -785,19 +967,16 @@ static MapXToMyClass<hash64_t, hash64_t, IQueryFactory> onDemandQueryCache;
 
 void sendUnloadMessage(hash64_t hash, const char *id, const IRoxieContextLogger &logctx)
 {
-    unsigned packetSize = sizeof(RoxiePacketHeader) + sizeof(char) + strlen(id) + 1;
-    void *packetData = malloc(packetSize);
-    RoxiePacketHeader *header = (RoxiePacketHeader *) packetData;
     RemoteActivityId unloadId(ROXIE_UNLOAD, hash);
-    header->init(unloadId, 0, 0, 0);
+    RoxiePacketHeader header(unloadId, 0, 0, 0);
 
-    char *finger = (char *) (header + 1);
-    *finger++ = (char) LOGGING_FLAGSPRESENT;
-    strcpy(finger, id);
-    finger += strlen(id)+1;
+    MemoryBuffer mb;
+    mb.append(sizeof(RoxiePacketHeader), &header);
+    mb.append((char) LOGGING_FLAGSPRESENT);
+    mb.append(id);
     if (traceLevel > 1)
         DBGLOG("UNLOAD sent for query %s", id);
-    Owned<IRoxieQueryPacket> packet = createRoxiePacket(packetData, packetSize);
+    Owned<IRoxieQueryPacket> packet = createRoxiePacket(mb);
     ROQ->sendPacket(packet, logctx);
 }
 
@@ -910,7 +1089,7 @@ private:
 class RoxieQueue : public CInterface, implements IThreadFactory
 {
     Owned <IThreadPool> workers;
-    QueueOf<IRoxieQueryPacket, true> waiting;
+    QueueOf<ISerializedRoxieQueryPacket, true> waiting;
     Semaphore available;
     CriticalSection qcrit;
     unsigned headRegionSize;
@@ -985,7 +1164,7 @@ public:
         workers.clear();  // Breaks a cyclic reference count that would stop us from releasing RoxieReceiverThread otherwise
     }
 
-    void enqueue(IRoxieQueryPacket *x)
+    void enqueue(ISerializedRoxieQueryPacket *x)
     {
         {
 #ifdef TIME_PACKETS
@@ -998,7 +1177,7 @@ public:
         available.signal();
     }
 
-    void enqueueUnique(IRoxieQueryPacket *x, unsigned subChannel)
+    void enqueueUnique(ISerializedRoxieQueryPacket *x, unsigned subChannel)
     {
         RoxiePacketHeader &header = x->queryHeader();
 #ifdef TIME_PACKETS
@@ -1011,7 +1190,7 @@ public:
             unsigned i;
             for (i = 0; i < len; i++)
             {
-                IRoxieQueryPacket *queued = waiting.item(i);
+                ISerializedRoxieQueryPacket *queued = waiting.item(i);
                 if (queued && queued->queryHeader().matchPacket(header))
                 {
                     found = true;
@@ -1051,14 +1230,14 @@ public:
     bool remove(RoxiePacketHeader &x)
     {
         unsigned scanLength = 0;
-        IRoxieQueryPacket *found = nullptr;
+        ISerializedRoxieQueryPacket *found = nullptr;
         {
             CriticalBlock qc(qcrit);
             unsigned len = waiting.ordinality();
             unsigned i;
             for (i = 0; i < len; i++)
             {
-                IRoxieQueryPacket *queued = waiting.item(i);
+                ISerializedRoxieQueryPacket *queued = waiting.item(i);
                 if (queued)
                 {
                     scanLength++;
@@ -1103,7 +1282,7 @@ public:
         available.signal(num);
     }
 
-    IRoxieQueryPacket *dequeue()
+    ISerializedRoxieQueryPacket *dequeue()
     {
         CriticalBlock qc(qcrit);
         unsigned lim = waiting.ordinality();
@@ -1499,12 +1678,14 @@ public:
                     if (doIbytiDelay) 
                         ibytiSem.reinit(0U); // Make sure sem is is in no-signaled state
 #endif
-                    packet.setown(queue->dequeue());
-                    if (packet)
+                    Owned<ISerializedRoxieQueryPacket> next = queue->dequeue();
+                    if (next)
                     {
+                        logctx.set(next);
+                        packet.setown(next->deserialize());
+                        next.clear();
                         queueLength--;
                         RoxiePacketHeader &header = packet->queryHeader();
-                        logctx.set(packet);
 #ifdef TIME_PACKETS
                         {
                             unsigned now = msTick();
@@ -1808,14 +1989,13 @@ public:
             try
             {
                 Owned<IRoxieQueryPacket> packet = dequeue();
-                RoxiePacketHeader &header = packet->queryHeader();
                 unsigned length = packet->queryHeader().packetlength;
-
                 {
                     MTIME_SECTION(queryActiveTimer(), "bucket_wait");
                     bucket.wait((length / 1024) + 1);
                 }
-                if (!channelWrite(header, true))
+                Owned<ISerializedRoxieQueryPacket> serialized = packet->serialize();
+                if (!channelWrite(serialized->queryHeader(), true))
                     DBGLOG("Roxie packet write wrote too little");
                 packetsSent++;
             }
@@ -1836,7 +2016,7 @@ public:
         return 0;
     }
 
-    virtual void sendPacket(IRoxieQueryPacket *x, const IRoxieContextLogger &logctx)
+    void sendPacket(IRoxieQueryPacket *x, const IRoxieContextLogger &logctx)
     {
         RoxiePacketHeader &header = x->queryHeader();
 
@@ -1893,7 +2073,7 @@ class DelayedPacketQueue
         DelayedPacketEntry() = delete;
         DelayedPacketEntry(const DelayedPacketEntry&) = delete;
     public:
-        DelayedPacketEntry(IRoxieQueryPacket *_packet, unsigned _waitExpires)
+        DelayedPacketEntry(ISerializedRoxieQueryPacket *_packet, unsigned _waitExpires)
         : packet(_packet), waitExpires(_waitExpires)
         {
         }
@@ -1908,7 +2088,7 @@ class DelayedPacketQueue
         {
             return packet->queryHeader().matchPacket(ibyti);
         }
-        IRoxieQueryPacket *getClear()
+        ISerializedRoxieQueryPacket *getClear()
         {
             return packet.getClear();
         }
@@ -1917,7 +2097,7 @@ class DelayedPacketQueue
             return packet->queryHeader().toString(ret);
         }
 
-        Owned<IRoxieQueryPacket> packet;
+        Owned<ISerializedRoxieQueryPacket> packet;
         DelayedPacketEntry *next = nullptr;
         DelayedPacketEntry *prev = nullptr;
 
@@ -1953,7 +2133,7 @@ public:
         return false;
     }
 
-    void append(IRoxieQueryPacket *packet, unsigned expires)
+    void append(ISerializedRoxieQueryPacket *packet, unsigned expires)
     {
         // Goes on the end. But percolate the expiry time backwards
         assert(GetCurrentThreadId()==roxiePacketReaderThread);
@@ -1989,7 +2169,7 @@ public:
         {
             if (((int) (finger->waitExpires - now)) <= 0)   // Oddly coded to handle wrapping
             {
-                IRoxieQueryPacket *packet = finger->getClear();
+                ISerializedRoxieQueryPacket *packet = finger->getClear();
                 const RoxiePacketHeader &header = packet->queryHeader();
                 if (traceRoxiePackets)
                 {
@@ -2207,7 +2387,8 @@ public:
                 StringBuffer s;
                 throw MakeStringException(ROXIE_PACKET_ERROR, "Maximum packet length %d exceeded sending packet %s", maxPacketSize, header.toString(s).str());
             }
-            if (!channelWrite(header, true))
+            Owned <ISerializedRoxieQueryPacket> serialized = x->serialize();
+            if (!channelWrite(serialized->queryHeader(), true))
                 logctx.CTXLOG("Roxie packet write wrote too little");
             packetsSent++;
         }
@@ -2256,7 +2437,9 @@ public:
         {
             StringBuffer s; logctx.CTXLOG("Sending ABORT FILECALLBACK packet %s for file %s", abortHeader.toString(s).str(), lfn);
         }
-        if (!channelWrite(*(RoxiePacketHeader *) data.toByteArray(), true))
+        Owned<IRoxieQueryPacket> packet = createRoxiePacket(data);
+        Owned<ISerializedRoxieQueryPacket> serialized = packet->serialize();
+        if (!channelWrite(serialized->queryHeader(), true))
             logctx.CTXLOG("sendAbortCallback wrote too little");
         abortsSent++;
     }
@@ -2413,9 +2596,9 @@ public:
 
                 Owned<const ITopologyServer> topology = getTopology();
                 const std::vector<unsigned> channels = topology->queryChannels();
-                Owned<IRoxieQueryPacket> packet = createRoxiePacket(mb);
+                Owned<ISerializedRoxieQueryPacket> packet = createSerializedRoxiePacket(mb);
                 for (unsigned i = 1; i < channels.size(); i++)
-                    queue.enqueue(packet->clonePacket(channels[i]));
+                    queue.enqueue(packet->cloneSerializedPacket(channels[i]));
                 header.channel = channels[0];
                 queue.enqueue(packet.getClear());
                 return;
@@ -2428,7 +2611,7 @@ public:
 #endif
             if (header.activityId == ROXIE_FILECALLBACK || header.activityId == ROXIE_DEBUGCALLBACK )
             {
-                Owned<IRoxieQueryPacket> packet = createRoxiePacket(mb);
+                Owned<IRoxieQueryPacket> packet = deserializeCallbackPacket(mb);
                 if (traceLevel > 10)
                 {
                     StringBuffer s;
@@ -2448,7 +2631,7 @@ public:
             }
             else
             {
-                Owned<IRoxieQueryPacket> packet = createRoxiePacket(mb);
+                Owned<ISerializedRoxieQueryPacket> packet = createSerializedRoxiePacket(mb);
                 AgentContextLogger logctx(packet);
                 unsigned retries = header.thisChannelRetries(mySubchannel);
                 if (retries)
@@ -2647,7 +2830,7 @@ public:
 class RoxieUdpSocketQueueManager : public RoxieSocketQueueManager
 {
 public:
-    RoxieUdpSocketQueueManager(unsigned snifferChannel, unsigned _numWorkers) : RoxieSocketQueueManager(_numWorkers)
+    RoxieUdpSocketQueueManager(unsigned snifferChannel, unsigned _numWorkers, bool encryptionInTransit) : RoxieSocketQueueManager(_numWorkers)
     {
         int udpQueueSize = topology->getPropInt("@udpQueueSize", UDP_QUEUE_SIZE);
         int udpSendQueueSize = topology->getPropInt("@udpSendQueueSize", UDP_SEND_QUEUE_SIZE);
@@ -2668,8 +2851,8 @@ public:
         unsigned dataPort = topology->getPropInt("@dataPort", CCD_DATA_PORT);
         unsigned clientFlowPort = topology->getPropInt("@clientFlowPort", CCD_CLIENT_FLOW_PORT);
         unsigned snifferPort = topology->getPropInt("@snifferPort", CCD_SNIFFER_PORT);
-        receiveManager.setown(createReceiveManager(serverFlowPort, dataPort, clientFlowPort, snifferPort, snifferIp, udpQueueSize, udpMaxSlotsPerClient));
-        sendManager.setown(createSendManager(serverFlowPort, dataPort, clientFlowPort, snifferPort, snifferIp, udpSendQueueSize, fastLaneQueue ? 3 : 2, bucket));
+        receiveManager.setown(createReceiveManager(serverFlowPort, dataPort, clientFlowPort, snifferPort, snifferIp, udpQueueSize, udpMaxSlotsPerClient, encryptionInTransit));
+        sendManager.setown(createSendManager(serverFlowPort, dataPort, clientFlowPort, snifferPort, snifferIp, udpSendQueueSize, fastLaneQueue ? 3 : 2, bucket, encryptionInTransit));
     }
 
 };
@@ -2974,6 +3157,7 @@ public:
                 StringBuffer s; 
                 DBGLOG("ROXIE_CALLBACK %s", header.toString(s).str());
             }
+            // MORE - do we need to encrypt these?
             doFileCallback(packet);
         }
         else if (retries < SUBCHANNEL_MASK)
@@ -2994,19 +3178,19 @@ public:
             else
                 targetQueue = &loQueue;
 
+            Owned<ISerializedRoxieQueryPacket> serialized = packet->serialize();
             if (header.channel)
             {
-                targetQueue->enqueue(LINK(packet));
+                targetQueue->enqueue(serialized.getClear());
             }
             else
             {
                 // Turn broadcast packet (channel 0), as early as possible, into non-0 channel packets.
                 // So retries and other communication with Roxie server (which uses non-0 channel numbers) will not cause double work or confusion.
-                // In SUBCHANNELS_IN_HEADER mode this translation has been done on server before sending, except for some control messages like PING or UNLOAD
-                for (unsigned i = 0; i < numChannels; i++)
-                {
-                    targetQueue->enqueue(packet->clonePacket(i+1));
-                }
+                for (unsigned i = 1; i < numChannels; i++)
+                    targetQueue->enqueue(serialized->cloneSerializedPacket(i+1));
+                header.channel = 1;
+                targetQueue->enqueue(serialized.getClear());
             }
         }
     }
@@ -3074,14 +3258,14 @@ public:
 
 IRoxieOutputQueueManager *ROQ;
 
-extern IRoxieOutputQueueManager *createOutputQueueManager(unsigned snifferChannel, unsigned numWorkers)
+extern IRoxieOutputQueueManager *createOutputQueueManager(unsigned snifferChannel, unsigned numWorkers, bool encrypted)
 {
     if (localAgent)
         return new RoxieLocalQueueManager(numWorkers);
     else if (useAeron)
         return new RoxieAeronSocketQueueManager(numWorkers);
     else
-        return new RoxieUdpSocketQueueManager(snifferChannel, numWorkers);
+        return new RoxieUdpSocketQueueManager(snifferChannel, numWorkers, encrypted);
 
 }
 
@@ -3214,24 +3398,21 @@ class PingTimer : public Thread
     {
         try
         {
-            unsigned packetSize = sizeof(RoxiePacketHeader) + sizeof(char) + strlen("PING") + 1 + sizeof(PingRecord);
-            void *packetData = malloc(packetSize);
-            RoxiePacketHeader *header = (RoxiePacketHeader *) packetData;
             RemoteActivityId pingId(ROXIE_PING | priorityMask, 0);
-            header->init(pingId, 0, 0, 0);
+            RoxiePacketHeader header(pingId, 0, 0, 0);
 
-            char *finger = (char *) (header + 1);
-            *finger++ = (char) LOGGING_FLAGSPRESENT;
-            strcpy(finger, "PING");
-            finger += strlen("PING")+1;
-            if (traceLevel > 1)
-                DBGLOG("PING sent");
+            MemoryBuffer mb;
+            mb.append(sizeof(RoxiePacketHeader), &header);
+            mb.append((char) LOGGING_FLAGSPRESENT);
+            mb.append("PING");
 
             PingRecord data;
             data.senderIP.ipset(myNode.getIpAddress());
             data.tick = usTick();
-            memcpy(finger, &data, sizeof(PingRecord));
-            Owned<IRoxieQueryPacket> packet = createRoxiePacket(packetData, packetSize);
+            mb.append(sizeof(PingRecord), &data);
+            if (traceLevel > 1)
+                DBGLOG("PING sent");
+            Owned<IRoxieQueryPacket> packet = createRoxiePacket(mb);
             ROQ->sendPacket(packet, logctx);
         }
         catch (IException *E)

+ 1 - 1
roxie/ccd/ccdqueue.ipp

@@ -106,7 +106,7 @@ interface IPacketDiscarder : public IInterface
 };
 
 extern IRoxieOutputQueueManager *ROQ;
-extern IRoxieOutputQueueManager *createOutputQueueManager(unsigned snifferChannel, unsigned numWorkers);
+extern IRoxieOutputQueueManager *createOutputQueueManager(unsigned snifferChannel, unsigned numWorkers, bool encrypted);
 extern IReceiveManager *createLocalReceiveManager();
 extern IPacketDiscarder *createPacketDiscarder();
 extern void startPingTimer();

+ 2 - 2
roxie/udplib/udplib.hpp

@@ -147,8 +147,8 @@ interface ISendManager : extends IInterface
     virtual bool allDone() = 0;
 };
 
-extern UDPLIB_API IReceiveManager *createReceiveManager(int server_flow_port, int data_port, int client_flow_port, int sniffer_port, const IpAddress &sniffer_multicast_ip, int queue_size, unsigned maxSlotsPerSender);
-extern UDPLIB_API ISendManager *createSendManager(int server_flow_port, int data_port, int client_flow_port, int sniffer_port, const IpAddress &sniffer_multicast_ip, int queue_size_pr_server, int queues_pr_server, TokenBucket *rateLimiter);
+extern UDPLIB_API IReceiveManager *createReceiveManager(int server_flow_port, int data_port, int client_flow_port, int sniffer_port, const IpAddress &sniffer_multicast_ip, int queue_size, unsigned maxSlotsPerSender, bool encryptionInTransit);
+extern UDPLIB_API ISendManager *createSendManager(int server_flow_port, int data_port, int client_flow_port, int sniffer_port, const IpAddress &sniffer_multicast_ip, int queue_size_pr_server, int queues_pr_server, TokenBucket *rateLimiter, bool encryptionInTransit);
 
 extern UDPLIB_API void setAeronProperties(const IPropertyTree *config);
 extern UDPLIB_API IReceiveManager *createAeronReceiveManager(const SocketEndpoint &ep);

+ 27 - 4
roxie/udplib/udptrr.cpp

@@ -23,6 +23,7 @@
 #include "jlog.hpp"
 #include "jisem.hpp"
 #include "jsocket.hpp"
+#include "jencrypt.hpp"
 #include "udplib.hpp"
 #include "udptrr.hpp"
 #include "udptrs.hpp"
@@ -47,6 +48,11 @@ using roxiemem::IRowManager;
 
 unsigned udpRetryBusySenders = 0; // seems faster with 0 than 1 in my testing on small clusters and sustained throughput
 
+static byte key[32] = {
+    0xf7, 0xe8, 0x79, 0x40, 0x44, 0x16, 0x66, 0x18, 0x52, 0xb8, 0x18, 0x6e, 0x76, 0xd1, 0x68, 0xd3,
+    0x87, 0x47, 0x01, 0xe6, 0x66, 0x62, 0x2f, 0xbe, 0xc1, 0xd5, 0x9f, 0x4a, 0x53, 0x27, 0xae, 0xa1,
+};
+
 class CReceiveManager : implements IReceiveManager, public CInterface
 {
     /*
@@ -601,13 +607,27 @@ class CReceiveManager : implements IReceiveManager, public CInterface
         #endif
             DataBuffer *b = NULL;
             started.signal();
+            MemoryBuffer encryptData;
+            size32_t max_payload = DATA_PAYLOAD;
+            void *encryptedBuffer = nullptr;
+            if (parent.encrypted)
+            {
+                max_payload = DATA_PAYLOAD+16;  // AES function may add up to 16 bytes of padding
+                encryptedBuffer = encryptData.reserveTruncate(max_payload);
+            }
             while (running) 
             {
                 try 
                 {
                     unsigned int res;
                     b = bufferManager->allocate();
-                    receive_socket->read(b->data, 1, DATA_PAYLOAD, res, 5);
+                    if (parent.encrypted)
+                    {
+                        receive_socket->read(encryptedBuffer, 1, max_payload, res, 5);
+                        res = aesDecrypt(key, sizeof(key), encryptedBuffer, res, b->data, DATA_PAYLOAD);
+                    }
+                    else
+                        receive_socket->read(b->data, 1, DATA_PAYLOAD, res, 5);
                     parent.inflight--;
                     // MORE - reset it to zero if we fail to read data, or if avail_read returns 0.
                     UdpPacketHeader &hdr = *(UdpPacketHeader *) b->data;
@@ -681,6 +701,7 @@ class CReceiveManager : implements IReceiveManager, public CInterface
     int                  data_port;
 
     std::atomic<bool> running = { false };
+    bool encrypted = false;
 
     typedef std::map<ruid_t, CMessageCollator*> uid_map;
     uid_map         collators;
@@ -717,12 +738,13 @@ class CReceiveManager : implements IReceiveManager, public CInterface
 
     public:
     IMPLEMENT_IINTERFACE;
-    CReceiveManager(int server_flow_port, int d_port, int client_flow_port, int snif_port, const IpAddress &multicast_ip, int queue_size, int m_slot_pr_client)
+    CReceiveManager(int server_flow_port, int d_port, int client_flow_port, int snif_port, const IpAddress &multicast_ip, int queue_size, int m_slot_pr_client, bool _encrypted)
         : collatorThread(*this), sendersTable([client_flow_port](const ServerIdentifier &ip) { return new UdpSenderEntry(ip.getIpAddress(), client_flow_port);})
     {
 #ifndef _WIN32
         setpriority(PRIO_PROCESS, 0, -15);
 #endif
+        encrypted = _encrypted;
         receive_flow_port = server_flow_port;
         data_port = d_port;
         input_queue_size = queue_size;
@@ -838,10 +860,11 @@ class CReceiveManager : implements IReceiveManager, public CInterface
 
 IReceiveManager *createReceiveManager(int server_flow_port, int data_port, int client_flow_port,
                                       int sniffer_port, const IpAddress &sniffer_multicast_ip,
-                                      int udpQueueSize, unsigned maxSlotsPerSender)
+                                      int udpQueueSize, unsigned maxSlotsPerSender,
+                                      bool encrypted)
 {
     assertex (maxSlotsPerSender <= (unsigned) udpQueueSize);
-    return new CReceiveManager(server_flow_port, data_port, client_flow_port, sniffer_port, sniffer_multicast_ip, udpQueueSize, maxSlotsPerSender);
+    return new CReceiveManager(server_flow_port, data_port, client_flow_port, sniffer_port, sniffer_multicast_ip, udpQueueSize, maxSlotsPerSender, encrypted);
 }
 
 /*

+ 23 - 7
roxie/udplib/udptrs.cpp

@@ -22,6 +22,7 @@
 
 #include "jsocket.hpp"
 #include "jlog.hpp"
+#include "jencrypt.hpp"
 #include "roxie.hpp"
 #ifdef _WIN32
 #include <winsock.h>
@@ -78,12 +79,20 @@ using roxiemem::DataBuffer;
  *    - resend rts if we send data but there is some remaining
  */
 
+static byte key[32] = {
+    0xf7, 0xe8, 0x79, 0x40, 0x44, 0x16, 0x66, 0x18, 0x52, 0xb8, 0x18, 0x6e, 0x76, 0xd1, 0x68, 0xd3,
+    0x87, 0x47, 0x01, 0xe6, 0x66, 0x62, 0x2f, 0xbe, 0xc1, 0xd5, 0x9f, 0x4a, 0x53, 0x27, 0xae, 0xa1,
+};
+
+
+
 class UdpReceiverEntry : public IUdpReceiverEntry
 {
 private:
     queue_t *output_queue = nullptr;
     bool    initialized = false;
     const bool isLocal = false;
+    const bool encrypted = false;
     ISocket *send_flow_socket = nullptr;
     ISocket *data_socket = nullptr;
     const unsigned numQueues;
@@ -181,6 +190,7 @@ public:
                 break;
 #endif
         }
+        MemoryBuffer encryptBuffer;
         for (DataBuffer *buffer: toSend)
         {
             UdpPacketHeader *header = (UdpPacketHeader*) buffer->data;
@@ -192,7 +202,13 @@ public:
             }
             try
             {
-                data_socket->write(buffer->data, length);
+                if (encrypted)
+                {
+                    aesEncrypt(key, sizeof(key), buffer->data, length, encryptBuffer.clear());
+                    data_socket->write(encryptBuffer.toByteArray(), encryptBuffer.length());
+                }
+                else
+                    data_socket->write(buffer->data, length);
             }
             catch(IException *e)
             {
@@ -307,8 +323,8 @@ public:
         return nullptr;
     }
 
-    UdpReceiverEntry(const IpAddress &_ip, const IpAddress &_sourceIP, unsigned _numQueues, unsigned _queueSize, unsigned _sendFlowPort, unsigned _dataPort)
-    : ip (_ip), sourceIP(_sourceIP), numQueues(_numQueues), isLocal(_ip.isLocal())
+    UdpReceiverEntry(const IpAddress &_ip, const IpAddress &_sourceIP, unsigned _numQueues, unsigned _queueSize, unsigned _sendFlowPort, unsigned _dataPort, bool _encrypted)
+    : ip (_ip), sourceIP(_sourceIP), numQueues(_numQueues), isLocal(_ip.isLocal()), encrypted(_encrypted)
     {
         assert(!initialized);
         assert(numQueues > 0);
@@ -688,10 +704,10 @@ class CSendManager : implements ISendManager, public CInterface
 public:
     IMPLEMENT_IINTERFACE;
 
-    CSendManager(int server_flow_port, int data_port, int client_flow_port, int sniffer_port, const IpAddress &sniffer_multicast_ip, int q_size, int _numQueues, const IpAddress &_myIP, TokenBucket *_bucket)
+    CSendManager(int server_flow_port, int data_port, int client_flow_port, int sniffer_port, const IpAddress &sniffer_multicast_ip, int q_size, int _numQueues, const IpAddress &_myIP, TokenBucket *_bucket, bool encrypted)
         : bucket(_bucket),
           myIP(_myIP),
-          receiversTable([_myIP, _numQueues, q_size, server_flow_port, data_port](const ServerIdentifier &ip) { return new UdpReceiverEntry(ip.getIpAddress(), _myIP, _numQueues, q_size, server_flow_port, data_port);})
+          receiversTable([_myIP, _numQueues, q_size, server_flow_port, data_port, encrypted](const ServerIdentifier &ip) { return new UdpReceiverEntry(ip.getIpAddress(), _myIP, _numQueues, q_size, server_flow_port, data_port, encrypted);})
     {
 #ifndef _WIN32
         setpriority(PRIO_PROCESS, 0, -3);
@@ -753,10 +769,10 @@ public:
 
 };
 
-ISendManager *createSendManager(int server_flow_port, int data_port, int client_flow_port, int sniffer_port, const IpAddress &sniffer_multicast_ip, int queue_size_pr_server, int queues_pr_server, TokenBucket *rateLimiter)
+ISendManager *createSendManager(int server_flow_port, int data_port, int client_flow_port, int sniffer_port, const IpAddress &sniffer_multicast_ip, int queue_size_pr_server, int queues_pr_server, TokenBucket *rateLimiter, bool encryptionInTransit)
 {
     assertex(!myNode.getIpAddress().isNull());
-    return new CSendManager(server_flow_port, data_port, client_flow_port, sniffer_port, sniffer_multicast_ip, queue_size_pr_server, queues_pr_server, myNode.getIpAddress(), rateLimiter);
+    return new CSendManager(server_flow_port, data_port, client_flow_port, sniffer_port, sniffer_multicast_ip, queue_size_pr_server, queues_pr_server, myNode.getIpAddress(), rateLimiter, encryptionInTransit);
 }
 
 class CMessagePacker : implements IMessagePacker, public CInterface

+ 2 - 2
roxie/udplib/uttest.cpp

@@ -183,7 +183,7 @@ public:
             rcvMgr.setown(createAeronReceiveManager(myEP));
         }
         else
-            rcvMgr.setown(createReceiveManager(7000, 7001, 7002, 7003, multicastIP, udpQueueSize, maxPacketsPerSender));
+            rcvMgr.setown(createReceiveManager(7000, 7001, 7002, 7003, multicastIP, udpQueueSize, maxPacketsPerSender, false));
         Owned<roxiemem::IRowManager> rowMgr = roxiemem::createRowManager(0, NULL, queryDummyContextLogger(), NULL, false);
         Owned<IMessageCollator> collator = rcvMgr->createMessageCollator(rowMgr, 1);
         unsigned lastReport = 0;
@@ -293,7 +293,7 @@ void testNxN()
     if (useAeron)
         sendMgr.setown(createAeronSendManager(7000, udpNumQs, myNode.getIpAddress()));
     else
-        sendMgr.setown(createSendManager(7000, 7001, 7002, 7003, multicastIP, 100, udpNumQs, NULL));
+        sendMgr.setown(createSendManager(7000, 7001, 7002, 7003, multicastIP, 100, udpNumQs, NULL, false));
     Receiver receiver;
 
     IMessagePacker **packers = new IMessagePacker *[numNodes];

+ 35 - 10
system/jlib/jencrypt.cpp

@@ -100,6 +100,7 @@ typedef unsigned short UINT16;
 #define RIJNDAEL_NOT_INITIALIZED -5
 #define RIJNDAEL_BAD_DIRECTION -6
 #define RIJNDAEL_CORRUPTED_DATA -7
+#define RIJNDAEL_INSUFFICIENT_SPACE -8
 
 class Rijndael
 {   
@@ -164,9 +165,9 @@ public:
     int blockDecrypt(const UINT8 *input, int inputLen, UINT8 *outBuffer);
     // Decrypts the input vector
     // Input len is in BYTES!
-    // outBuffer must be at least inputLen bytes long
+    // outBuffer must be at least outputLen bytes long
     // Returns the decrypted buffer length in BYTES and an error code < 0 in case of error
-    int padDecrypt(const UINT8 *input, int inputOctets, UINT8 *outBuffer);
+    int padDecrypt(const UINT8 *input, int inputOctets, UINT8 *outBuffer, int outputLen);
 protected:
     void keySched(UINT8 key[_MAX_KEY_COLUMNS][4]);
     void keyEncToDec();
@@ -1415,7 +1416,7 @@ int Rijndael::blockDecrypt(const UINT8 *input, int inputLen, UINT8 *outBuffer)
     return 128*numBlocks;
 }
 
-int Rijndael::padDecrypt(const UINT8 *input, int inputOctets, UINT8 *outBuffer)
+int Rijndael::padDecrypt(const UINT8 *input, int inputOctets, UINT8 *outBuffer, int outlen)
 {
     int i, numBlocks, padLen;
     UINT8 block[16];
@@ -1427,7 +1428,8 @@ int Rijndael::padDecrypt(const UINT8 *input, int inputOctets, UINT8 *outBuffer)
     if(input == 0 || inputOctets <= 0)return 0;
 
     if((inputOctets % 16) != 0)return RIJNDAEL_CORRUPTED_DATA;
-
+    if (inputOctets-16 > outlen)
+        return RIJNDAEL_INSUFFICIENT_SPACE;
     numBlocks = inputOctets/16;
 
     switch(m_mode){
@@ -1446,6 +1448,8 @@ int Rijndael::padDecrypt(const UINT8 *input, int inputOctets, UINT8 *outBuffer)
             {
                 if(block[i] != padLen)return RIJNDAEL_CORRUPTED_DATA;
             }
+            if (outlen < inputOctets-padLen)
+                return RIJNDAEL_INSUFFICIENT_SPACE;
             memcpy(outBuffer, block, 16 - padLen);
         break;  
         case CBC:
@@ -1475,6 +1479,8 @@ int Rijndael::padDecrypt(const UINT8 *input, int inputOctets, UINT8 *outBuffer)
             {
                 if(block[i] != padLen)return RIJNDAEL_CORRUPTED_DATA;
             }
+            if (outlen < inputOctets-padLen)
+                return RIJNDAEL_INSUFFICIENT_SPACE;
             memcpy(outBuffer, block, 16 - padLen);
             break;
         
@@ -1753,6 +1759,7 @@ static const char *aesErrorText[] =
     "Not Initialized",
     "Bad Direction",
     "Corrupted Data"
+    "Output buffer too small"
 };
 
 inline const char *getAesErrorText(int err)
@@ -1782,10 +1789,10 @@ MemoryBuffer &aesEncrypt(const void *key, size_t keylen, const void *input, size
     
     rin.init(Rijndael::CBC, Rijndael::Encrypt, (const UINT8 *)key, keyType);
     size32_t truncInLen = (size32_t)inlen; //MORE: Modify the padEncrypt function
-    int len = rin.padEncrypt((const UINT8 *)input, truncInLen, (UINT8 *) output.clear().reserveTruncate(truncInLen + 16));
-
+    size32_t origLen = output.length();
+    int len = rin.padEncrypt((const UINT8 *)input, truncInLen, (UINT8 *) output.reserveTruncate(truncInLen + 16));
     if(len >= 0)
-        output.setLength(len);
+        output.setLength(origLen+len);
     else 
         throw MakeStringException(-1,"AES Encryption error: %d, %s", len, getAesErrorText(len));
     return output;
@@ -1798,15 +1805,28 @@ MemoryBuffer &aesDecrypt(const void *key, size_t keylen, const void *input, size
     
     rin.init(Rijndael::CBC, Rijndael::Decrypt, (const UINT8 *)key, keyType);
     size32_t truncInLen = (size32_t)inlen;
-    int len = rin.padDecrypt((const UINT8 *)input, truncInLen, (UINT8 *) output.clear().reserveTruncate(truncInLen));
-
+    size32_t origLen = output.length();
+    int len = rin.padDecrypt((const UINT8 *)input, truncInLen, (UINT8 *) output.reserveTruncate(truncInLen), truncInLen);
     if(len >= 0)
-        output.setLength(len);
+        output.setLength(origLen+len);
     else 
         throw MakeStringException(-1,"AES Decryption error: %d, %s", len, getAesErrorText(len));
     return output;
 }
 
+size_t aesDecrypt(const void *key, size_t keylen, const void *input, size_t inlen, void *output, size_t outlen)
+{
+    Rijndael rin;
+    Rijndael::KeyLength keyType = getAesKeyType(keylen);
+
+    rin.init(Rijndael::CBC, Rijndael::Decrypt, (const UINT8 *)key, keyType);
+    size32_t truncInLen = (size32_t)inlen;
+    int len = rin.padDecrypt((const UINT8 *)input, truncInLen, (UINT8 *) output, outlen);
+    if(len < 0)
+        throw MakeStringException(-1,"AES Decryption error: %d, %s", len, getAesErrorText(len));
+    return len;
+}
+
 } // end of namespace jlib
 
 MemoryBuffer &aesEncrypt(const void *key, size_t keylen, const void *input, size_t inlen, MemoryBuffer &output)
@@ -1819,6 +1839,11 @@ MemoryBuffer &aesDecrypt(const void *key, size_t keylen, const void *input, size
     return jlib::aesDecrypt(key, keylen, input, inlen, output);
 }
 
+size_t aesDecrypt(const void *key, size_t keylen, const void *input, size_t inlen, void *output, size_t outlen)
+{
+    return jlib::aesDecrypt(key, keylen, input, inlen, output, outlen);
+}
+
 #define CRYPTSIZE 32
 #define CRYPTKEY "Unable to find process %s version %s"
 

+ 3 - 0
system/jlib/jencrypt.hpp

@@ -30,11 +30,14 @@ namespace jlib
 {
     extern jlib_decl MemoryBuffer &aesEncrypt(const void *key, size_t keylen, const void *input, size_t inlen, MemoryBuffer &output);
     extern jlib_decl MemoryBuffer &aesDecrypt(const void *key, size_t keylen, const void *input, size_t inlen, MemoryBuffer &output);
+    extern jlib_decl size_t aesDecrypt(const void *key, size_t keylen, const void *input, size_t inlen, void *output, size_t outlen);
 } // end of namespace jlib;
 
 // NB: these are wrappers to either the openssl versions (if USE_OPENSSL) or the jlib version.
+// MORE - are they?? I see no evidence of that!
 extern jlib_decl MemoryBuffer &aesEncrypt(const void *key, size_t keylen, const void *input, size_t inlen, MemoryBuffer &output);
 extern jlib_decl MemoryBuffer &aesDecrypt(const void *key, size_t keylen, const void *input, size_t inlen, MemoryBuffer &output);
+extern jlib_decl size_t aesDecrypt(const void *key, size_t keylen, const void *input, size_t inlen, void *output, size_t outlen);
 
 
 #define encrypt _LogProcessError12