浏览代码

HPCC-25531 Roxie decryption should be moved off socket reader thread

Also disable by default, until performance impact is better understood and accepted.

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 4 年之前
父节点
当前提交
4844998364

+ 1 - 1
roxie/ccd/ccdmain.cpp

@@ -656,7 +656,7 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml)
         topology = loadConfiguration(useOldTopology ? nullptr : defaultYaml, argv, "roxie", "ROXIE", topologyFile, nullptr, "@netAddress");
         saveTopology();
         localAgent = topology->getPropBool("@localAgent", topology->getPropBool("@localSlave", false));  // legacy name
-        encryptInTransit = topology->getPropBool("@encryptInTransit", true) && !localAgent;
+        encryptInTransit = topology->getPropBool("@encryptInTransit", false) && !localAgent;
         numChannels = topology->getPropInt("@numChannels", 0);
 #ifdef _CONTAINERIZED
         if (!numChannels)

+ 4 - 4
roxie/ccd/ccdqueue.cpp

@@ -2863,13 +2863,13 @@ public:
 class RoxieAeronSocketQueueManager : public RoxieSocketQueueManager
 {
 public:
-    RoxieAeronSocketQueueManager(unsigned _numWorkers) : RoxieSocketQueueManager(_numWorkers)
+    RoxieAeronSocketQueueManager(unsigned _numWorkers, bool encryptionInTransit) : RoxieSocketQueueManager(_numWorkers)
     {
         unsigned dataPort = topology->getPropInt("@dataPort", CCD_DATA_PORT);
         SocketEndpoint ep(dataPort, myNode.getIpAddress());
-        receiveManager.setown(createAeronReceiveManager(ep));
+        receiveManager.setown(createAeronReceiveManager(ep, encryptionInTransit));
         assertex(!myNode.getIpAddress().isNull());
-        sendManager.setown(createAeronSendManager(dataPort, fastLaneQueue ? 3 : 2, myNode.getIpAddress()));
+        sendManager.setown(createAeronSendManager(dataPort, fastLaneQueue ? 3 : 2, myNode.getIpAddress(), encryptionInTransit));
     }
 
 };
@@ -3274,7 +3274,7 @@ extern IRoxieOutputQueueManager *createOutputQueueManager(unsigned snifferChanne
     if (localAgent)
         return new RoxieLocalQueueManager(numWorkers);
     else if (useAeron)
-        return new RoxieAeronSocketQueueManager(numWorkers);
+        return new RoxieAeronSocketQueueManager(numWorkers, encrypted);
     else
         return new RoxieUdpSocketQueueManager(snifferChannel, numWorkers, encrypted);
 

+ 15 - 12
roxie/udplib/udpaeron.cpp

@@ -149,9 +149,10 @@ private:
     std::thread receiveThread;
     std::atomic<bool> running = { true };
     const std::chrono::duration<long, std::milli> idleSleepMs;
+    bool encrypted;
 public:
-    CRoxieAeronReceiveManager(const SocketEndpoint &myEndpoint)
-    : idleSleepMs(aeronIdleSleepMs)
+    CRoxieAeronReceiveManager(const SocketEndpoint &myEndpoint, bool _encrypted)
+    : idleSleepMs(aeronIdleSleepMs), encrypted(_encrypted)
     {
         if (useEmbeddedAeronDriver && !is_running())
         {
@@ -256,7 +257,7 @@ public:
     // Note - some of this code could be in a common base class with udpreceivemanager, but hope to kill that at some point
     virtual IMessageCollator *createMessageCollator(roxiemem::IRowManager *rowManager, ruid_t ruid) override
     {
-        CMessageCollator *msgColl = new CMessageCollator(rowManager, ruid);
+        CMessageCollator *msgColl = new CMessageCollator(rowManager, ruid, encrypted);
         if (udpTraceLevel >= 2)
             DBGLOG("AeronReceiver: createMessageCollator %p %u", msgColl, ruid);
         {
@@ -377,6 +378,7 @@ class CRoxieAeronSendManager : public CInterfaceOf<ISendManager>
     const unsigned numQueues = 0;
     IpMapOf<UdpAeronReceiverEntry> receiversTable;
     const IpAddress myIP;
+    bool encrypted;
 
     std::atomic<unsigned> msgSeq{0};
 
@@ -390,11 +392,12 @@ class CRoxieAeronSendManager : public CInterfaceOf<ISendManager>
         return res;
     }
 public:
-    CRoxieAeronSendManager(unsigned _dataPort, unsigned _numQueues, const IpAddress &_myIP)
+    CRoxieAeronSendManager(unsigned _dataPort, unsigned _numQueues, const IpAddress &_myIP, bool _encrypted)
     : dataPort(_dataPort),
       numQueues(_numQueues),
       receiversTable([this](const ServerIdentifier ip) { return new UdpAeronReceiverEntry(ip.getIpAddress(), dataPort, aeron, numQueues);}),
-      myIP(_myIP)
+      myIP(_myIP),
+      encrypted(_encrypted)
     {
         if (useEmbeddedAeronDriver && !is_running())
         {
@@ -426,17 +429,17 @@ public:
 IMessagePacker *CRoxieAeronSendManager::createMessagePacker(ruid_t ruid, unsigned sequence, const void *messageHeader, unsigned headerSize, const ServerIdentifier &destNode, int queue)
 {
     const IpAddress dest = destNode.getIpAddress();
-    return ::createMessagePacker(ruid, sequence, messageHeader, headerSize, *this, receiversTable[dest], myIP, getNextMessageSequence(), queue);
+    return ::createMessagePacker(ruid, sequence, messageHeader, headerSize, *this, receiversTable[dest], myIP, getNextMessageSequence(), queue, encrypted);
 }
 
-extern UDPLIB_API IReceiveManager *createAeronReceiveManager(const SocketEndpoint &ep)
+extern UDPLIB_API IReceiveManager *createAeronReceiveManager(const SocketEndpoint &ep, bool encrypted)
 {
-    return new CRoxieAeronReceiveManager(ep);
+    return new CRoxieAeronReceiveManager(ep, encrypted);
 }
 
-extern UDPLIB_API ISendManager *createAeronSendManager(unsigned dataPort, unsigned numQueues, const IpAddress &myIP)
+extern UDPLIB_API ISendManager *createAeronSendManager(unsigned dataPort, unsigned numQueues, const IpAddress &myIP, bool encrypted)
 {
-    return new CRoxieAeronSendManager(dataPort, numQueues, myIP);
+    return new CRoxieAeronSendManager(dataPort, numQueues, myIP, encrypted);
 }
 
 #else
@@ -445,12 +448,12 @@ extern UDPLIB_API void setAeronProperties(const IPropertyTree *config)
 {
 }
 
-extern UDPLIB_API IReceiveManager *createAeronReceiveManager(const SocketEndpoint &ep)
+extern UDPLIB_API IReceiveManager *createAeronReceiveManager(const SocketEndpoint &ep, bool encrypted)
 {
     UNIMPLEMENTED;
 }
 
-extern UDPLIB_API ISendManager *createAeronSendManager(unsigned dataPort, unsigned numQueues, const IpAddress &myIP)
+extern UDPLIB_API ISendManager *createAeronSendManager(unsigned dataPort, unsigned numQueues, const IpAddress &myIP, bool encrypted)
 {
     UNIMPLEMENTED;
 }

+ 2 - 2
roxie/udplib/udplib.hpp

@@ -153,8 +153,8 @@ extern UDPLIB_API IReceiveManager *createReceiveManager(int server_flow_port, in
 extern UDPLIB_API ISendManager *createSendManager(int server_flow_port, int data_port, int client_flow_port, int sniffer_port, const IpAddress &sniffer_multicast_ip, int queue_size_pr_server, int queues_pr_server, TokenBucket *rateLimiter, bool encryptionInTransit);
 
 extern UDPLIB_API void setAeronProperties(const IPropertyTree *config);
-extern UDPLIB_API IReceiveManager *createAeronReceiveManager(const SocketEndpoint &ep);
-extern UDPLIB_API ISendManager *createAeronSendManager(unsigned dataPort, unsigned numQueues, const IpAddress &myIP);
+extern UDPLIB_API IReceiveManager *createAeronReceiveManager(const SocketEndpoint &ep, bool encrypted);
+extern UDPLIB_API ISendManager *createAeronSendManager(unsigned dataPort, unsigned numQueues, const IpAddress &myIP, bool encrypted);
 
 extern UDPLIB_API RelaxedAtomic<unsigned> unwantedDiscarded;
 

+ 20 - 8
roxie/udplib/udpmsgpk.cpp

@@ -28,6 +28,8 @@
 #include "jthread.hpp"
 #include "jlog.hpp"
 #include "jisem.hpp"
+#include "jencrypt.hpp"
+
 #include "udplib.hpp"
 #include "udptrr.hpp"
 #include "udptrs.hpp"
@@ -55,6 +57,10 @@ int g_sequence_compare(const void *arg1, const void *arg2 )
     return 0;
 }
 
+static byte key[32] = {
+    0xf7, 0xe8, 0x79, 0x40, 0x44, 0x16, 0x66, 0x18, 0x52, 0xb8, 0x18, 0x6e, 0x76, 0xd1, 0x68, 0xd3,
+    0x87, 0x47, 0x01, 0xe6, 0x66, 0x62, 0x2f, 0xbe, 0xc1, 0xd5, 0x9f, 0x4a, 0x53, 0x27, 0xae, 0xa1,
+};
 
 class PackageSequencer : public CInterface, implements IInterface
 {
@@ -73,12 +79,12 @@ class PackageSequencer : public CInterface, implements IInterface
 
     MemoryBuffer metadata;
     InterruptableSemaphore dataAvailable; // MORE - need to work out when to interrupt it!
-
+    bool encrypted = false;
 
 public:
     IMPLEMENT_IINTERFACE;
 
-    PackageSequencer() 
+    PackageSequencer(bool _encrypted) : encrypted(_encrypted)
     {
         if (checkTraceLevel(TRACE_MSGPACK, 3))
             DBGLOG("UdpCollator: PackageSequencer::PackageSequencer this=%p", this);
@@ -233,6 +239,16 @@ public:
 #ifdef _DEBUG
         numPackets++;
 #endif
+        // Now that we know we are keeping it, decrypt it
+        // MORE - could argue that we would prefer to wait even longer - until we know consumer wants it - but that might be complex
+        if (encrypted)
+        {
+            // MORE - This is decrypting in-place. Is that ok?? Seems to be with the code we currently use, but if that changed
+            // might need to rethink this
+            size_t decryptedSize = aesDecrypt(key, sizeof(key), pktHdr+1, pktHdr->length-sizeof(UdpPacketHeader), pktHdr+1, DATA_PAYLOAD-sizeof(UdpPacketHeader));
+            pktHdr->length = decryptedSize + sizeof(UdpPacketHeader);
+        }
+
         if (prev == lastContiguousPacket)
         {
             unsigned prevseq;
@@ -502,7 +518,7 @@ PUID GETPUID(DataBuffer *dataBuff)
     return (((PUID) ip4) << 32) | (PUID) pktHdr->msgSeq;
 }
 
-CMessageCollator::CMessageCollator(IRowManager *_rowMgr, unsigned _ruid) : rowMgr(_rowMgr), ruid(_ruid)
+CMessageCollator::CMessageCollator(IRowManager *_rowMgr, unsigned _ruid, bool _encrypted) : rowMgr(_rowMgr), ruid(_ruid), encrypted(_encrypted)
 {
     if (checkTraceLevel(TRACE_MSGPACK, 3))
         DBGLOG("UdpCollator: CMessageCollator::CMessageCollator rowMgr=%p this=%p ruid=" RUIDF "", _rowMgr, this, ruid);
@@ -598,7 +614,7 @@ void CMessageCollator::collate(DataBuffer *dataBuff)
     PackageSequencer *pkSqncr = mapping.getValue(puid);
     if (!pkSqncr)
     {
-        pkSqncr = new PackageSequencer;
+        pkSqncr = new PackageSequencer(encrypted);
         mapping.setValue(puid, pkSqncr);
         pkSqncr->Release();
     }
@@ -668,7 +684,3 @@ void CMessageCollator::interrupt(IException *E)
 // ====================================================================================
 //
 
-extern CMessageCollator *createCMessageCollator(IRowManager *rowManager, ruid_t ruid)
-{
-    return new CMessageCollator(rowManager, ruid);
-}

+ 2 - 1
roxie/udplib/udpmsgpk.hpp

@@ -30,6 +30,7 @@ private:
     msg_map             mapping;  // Note - only accessed from collator thread
     RelaxedAtomic<bool> activity;
     bool                memLimitExceeded;
+    bool                encrypted;
     CriticalSection     queueCrit;
     InterruptableSemaphore sem;
     Linked<roxiemem::IRowManager> rowMgr;
@@ -40,7 +41,7 @@ private:
 
     void collate(roxiemem::DataBuffer *dataBuff);
 public:
-    CMessageCollator(roxiemem::IRowManager *_rowMgr, unsigned _ruid);
+    CMessageCollator(roxiemem::IRowManager *_rowMgr, unsigned _ruid, bool encrypted);
     virtual ~CMessageCollator();
 
     virtual ruid_t queryRUID() const override

+ 2 - 22
roxie/udplib/udptrr.cpp

@@ -23,7 +23,6 @@
 #include "jlog.hpp"
 #include "jisem.hpp"
 #include "jsocket.hpp"
-#include "jencrypt.hpp"
 #include "udplib.hpp"
 #include "udptrr.hpp"
 #include "udptrs.hpp"
@@ -56,11 +55,6 @@ static unsigned lastFlowPermitsSent = 0;
 static unsigned lastFlowRequestsReceived = 0;
 static unsigned lastDataPacketsReceived = 0;
 
-static byte key[32] = {
-    0xf7, 0xe8, 0x79, 0x40, 0x44, 0x16, 0x66, 0x18, 0x52, 0xb8, 0x18, 0x6e, 0x76, 0xd1, 0x68, 0xd3,
-    0x87, 0x47, 0x01, 0xe6, 0x66, 0x62, 0x2f, 0xbe, 0xc1, 0xd5, 0x9f, 0x4a, 0x53, 0x27, 0xae, 0xa1,
-};
-
 class CReceiveManager : implements IReceiveManager, public CInterface
 {
     /*
@@ -622,14 +616,6 @@ class CReceiveManager : implements IReceiveManager, public CInterface
         #endif
             DataBuffer *b = NULL;
             started.signal();
-            MemoryBuffer encryptData;
-            size32_t max_payload = DATA_PAYLOAD;
-            void *encryptedBuffer = nullptr;
-            if (parent.encrypted)
-            {
-                max_payload = DATA_PAYLOAD+16;  // AES function may add up to 16 bytes of padding
-                encryptedBuffer = encryptData.reserveTruncate(max_payload);
-            }
             unsigned lastOOOReport = 0;
             unsigned lastPacketsOOO = 0;
             while (running) 
@@ -638,13 +624,7 @@ class CReceiveManager : implements IReceiveManager, public CInterface
                 {
                     unsigned int res;
                     b = bufferManager->allocate();
-                    if (parent.encrypted)
-                    {
-                        receive_socket->read(encryptedBuffer, 1, max_payload, res, 5);
-                        res = aesDecrypt(key, sizeof(key), encryptedBuffer, res, b->data, DATA_PAYLOAD);
-                    }
-                    else
-                        receive_socket->read(b->data, 1, DATA_PAYLOAD, res, 5);
+                    receive_socket->read(b->data, 1, DATA_PAYLOAD, res, 5);
                     dataPacketsReceived++;
                     UdpPacketHeader &hdr = *(UdpPacketHeader *) b->data;
                     assert(hdr.length == res && hdr.length > sizeof(hdr));
@@ -884,7 +864,7 @@ class CReceiveManager : implements IReceiveManager, public CInterface
 
     virtual IMessageCollator *createMessageCollator(IRowManager *rowManager, ruid_t ruid)
     {
-        CMessageCollator *msgColl = new CMessageCollator(rowManager, ruid);
+        CMessageCollator *msgColl = new CMessageCollator(rowManager, ruid, encrypted);
         if (udpTraceLevel > 2)
             DBGLOG("UdpReceiver: createMessageCollator %p %u", msgColl, ruid);
         {

+ 24 - 16
roxie/udplib/udptrs.cpp

@@ -420,7 +420,16 @@ public:
 #endif
                 if (encrypted)
                 {
-                    aesEncrypt(key, sizeof(key), buffer->data, length, encryptBuffer.clear());
+                    encryptBuffer.clear();
+                    encryptBuffer.append(sizeof(UdpPacketHeader), header);    // We don't encrypt the header
+                    length -= sizeof(UdpPacketHeader);
+                    const char *data = buffer->data + sizeof(UdpPacketHeader);
+                    aesEncrypt(key, sizeof(key), data, length, encryptBuffer);
+                    header->length = encryptBuffer.length();
+                    encryptBuffer.writeDirect(0, sizeof(UdpPacketHeader), header);   // Only really need length updating
+                    assert(length <= DATA_PAYLOAD);
+                    if (udpTraceLevel > 5)
+                        DBGLOG("ENCRYPT: Writing %u bytes to data socket", encryptBuffer.length());
                     data_socket->write(encryptBuffer.toByteArray(), encryptBuffer.length());
                 }
                 else
@@ -945,6 +954,7 @@ class CSendManager : implements ISendManager, public CInterface
     send_receive_flow *receive_flow;
     send_data         *data;
     Linked<TokenBucket> bucket;
+    bool encrypted;
     
     std::atomic<unsigned> msgSeq{0};
 
@@ -961,10 +971,11 @@ class CSendManager : implements ISendManager, public CInterface
 public:
     IMPLEMENT_IINTERFACE;
 
-    CSendManager(int server_flow_port, int data_port, int client_flow_port, int sniffer_port, const IpAddress &sniffer_multicast_ip, int q_size, int _numQueues, const IpAddress &_myIP, TokenBucket *_bucket, bool encrypted)
+    CSendManager(int server_flow_port, int data_port, int client_flow_port, int sniffer_port, const IpAddress &sniffer_multicast_ip, int q_size, int _numQueues, const IpAddress &_myIP, TokenBucket *_bucket, bool _encrypted)
         : bucket(_bucket),
           myIP(_myIP),
-          receiversTable([_numQueues, q_size, server_flow_port, data_port, encrypted](const ServerIdentifier ip) { return new UdpReceiverEntry(ip.getIpAddress(), _numQueues, q_size, server_flow_port, data_port, encrypted);})
+          receiversTable([_numQueues, q_size, server_flow_port, data_port, _encrypted](const ServerIdentifier ip) { return new UdpReceiverEntry(ip.getIpAddress(), _numQueues, q_size, server_flow_port, data_port, _encrypted);}),
+          encrypted(_encrypted)
     {
 #ifndef _WIN32
         setpriority(PRIO_PROCESS, 0, -3);
@@ -995,7 +1006,7 @@ public:
 
     virtual IMessagePacker *createMessagePacker(ruid_t ruid, unsigned sequence, const void *messageHeader, unsigned headerSize, const ServerIdentifier &destNode, int queue) override
     {
-        return ::createMessagePacker(ruid, sequence, messageHeader, headerSize, *this, receiversTable[destNode], myIP, getNextMessageSequence(), queue);
+        return ::createMessagePacker(ruid, sequence, messageHeader, headerSize, *this, receiversTable[destNode], myIP, getNextMessageSequence(), queue, encrypted);
     }
 
     virtual bool dataQueued(ruid_t ruid, unsigned msgId, const ServerIdentifier &destNode) override
@@ -1039,7 +1050,7 @@ class CMessagePacker : implements IMessagePacker, public CInterface
     IUdpReceiverEntry &receiver;
     UdpPacketHeader package_header;
     DataBuffer     *part_buffer;
-    unsigned data_buffer_size;
+    const unsigned data_buffer_size;
     unsigned data_used;
     void *mem_buffer;
     unsigned mem_buffer_size;
@@ -1052,8 +1063,9 @@ class CMessagePacker : implements IMessagePacker, public CInterface
 public:
     IMPLEMENT_IINTERFACE;
 
-    CMessagePacker(ruid_t ruid, unsigned msgId, const void *messageHeader, unsigned headerSize, ISendManager &_parent, IUdpReceiverEntry &_receiver, const IpAddress & _sourceNode, unsigned _msgSeq, unsigned _queue)
-        : parent(_parent), receiver(_receiver)
+    CMessagePacker(ruid_t ruid, unsigned msgId, const void *messageHeader, unsigned headerSize, ISendManager &_parent, IUdpReceiverEntry &_receiver, const IpAddress & _sourceNode, unsigned _msgSeq, unsigned _queue, bool _encrypted)
+        : parent(_parent), receiver(_receiver), data_buffer_size(DATA_PAYLOAD - sizeof(UdpPacketHeader) - (_encrypted ? 16 : 0))
+
     {
         queue_number = _queue;
 
@@ -1067,7 +1079,6 @@ public:
 
         packed_request = false;
         part_buffer = bufferManager->allocate();
-        data_buffer_size = DATA_PAYLOAD - sizeof(UdpPacketHeader);
         assertex(data_buffer_size >= headerSize + sizeof(unsigned short));
         *(unsigned short *) (&part_buffer->data[sizeof(UdpPacketHeader)]) = headerSize;
         memcpy(&part_buffer->data[sizeof(UdpPacketHeader)+sizeof(unsigned short)], messageHeader, headerSize);
@@ -1089,7 +1100,7 @@ public:
     {
         if (variable)
             len += sizeof(RecordLengthType);
-        if (DATA_PAYLOAD - sizeof(UdpPacketHeader) < len)
+        if (data_buffer_size < len)
         {
             // Won't fit in one, so allocate temp location
             if (mem_buffer_size < len)
@@ -1111,7 +1122,6 @@ public:
         if (!part_buffer)
         {
             part_buffer = bufferManager->allocate();
-            data_buffer_size = DATA_PAYLOAD - sizeof(UdpPacketHeader);
         }
         packed_request = true;
         if (variable)
@@ -1142,7 +1152,6 @@ public:
                 if (!part_buffer)
                 {
                     part_buffer = bufferManager->allocate();
-                    data_buffer_size = DATA_PAYLOAD - sizeof(UdpPacketHeader);
                     data_used = 0;
                 }
                 unsigned chunkLen = data_buffer_size - data_used;
@@ -1179,7 +1188,7 @@ private:
                 part_buffer = bufferManager->allocate();
             const char *metaData = metaInfo.toByteArray();
             unsigned metaLength = metaInfo.length();
-            unsigned maxMetaLength = DATA_PAYLOAD - (sizeof(UdpPacketHeader) + data_used);
+            unsigned maxMetaLength = data_buffer_size + data_used;
             while (metaLength > maxMetaLength)
             {
                 memcpy(&part_buffer->data[sizeof(UdpPacketHeader)+data_used], metaData, maxMetaLength);
@@ -1187,7 +1196,7 @@ private:
                 metaLength -= maxMetaLength;
                 metaData += maxMetaLength;
                 data_used = 0;
-                maxMetaLength = DATA_PAYLOAD - sizeof(UdpPacketHeader);
+                maxMetaLength = data_buffer_size;
                 part_buffer = bufferManager->allocate();
             }
             if (metaLength)
@@ -1204,7 +1213,6 @@ private:
                 part_buffer->Release(); // If NO data in buffer, release buffer back to pool
         }
         part_buffer = 0;
-        data_buffer_size = 0;
         data_used = 0;
     }
 
@@ -1221,7 +1229,7 @@ private:
 };
 
 
-extern UDPLIB_API IMessagePacker *createMessagePacker(ruid_t ruid, unsigned msgId, const void *messageHeader, unsigned headerSize, ISendManager &_parent, IUdpReceiverEntry &_receiver, const IpAddress & _sourceNode, unsigned _msgSeq, unsigned _queue)
+extern UDPLIB_API IMessagePacker *createMessagePacker(ruid_t ruid, unsigned msgId, const void *messageHeader, unsigned headerSize, ISendManager &_parent, IUdpReceiverEntry &_receiver, const IpAddress & _sourceNode, unsigned _msgSeq, unsigned _queue, bool _encrypted)
 {
-    return new CMessagePacker(ruid, msgId, messageHeader, headerSize, _parent, _receiver, _sourceNode, _msgSeq, _queue);
+    return new CMessagePacker(ruid, msgId, messageHeader, headerSize, _parent, _receiver, _sourceNode, _msgSeq, _queue, _encrypted);
 }

+ 1 - 1
roxie/udplib/udptrs.hpp

@@ -19,6 +19,6 @@
 #include "udplib.hpp"
 #include "udpsha.hpp"
 
-extern UDPLIB_API IMessagePacker *createMessagePacker(ruid_t ruid, unsigned msgId, const void *messageHeader, unsigned headerSize, ISendManager &_parent, IUdpReceiverEntry &_receiver, const IpAddress & _sourceNode, unsigned _msgSeq, unsigned _queue);
+extern UDPLIB_API IMessagePacker *createMessagePacker(ruid_t ruid, unsigned msgId, const void *messageHeader, unsigned headerSize, ISendManager &_parent, IUdpReceiverEntry &_receiver, const IpAddress & _sourceNode, unsigned _msgSeq, unsigned _queue, bool _encrypted);
 
 

+ 2 - 2
roxie/udplib/uttest.cpp

@@ -180,7 +180,7 @@ public:
         if (useAeron)
         {
             SocketEndpoint myEP(7000, myNode.getIpAddress());
-            rcvMgr.setown(createAeronReceiveManager(myEP));
+            rcvMgr.setown(createAeronReceiveManager(myEP, false));
         }
         else
             rcvMgr.setown(createReceiveManager(7000, 7001, 7002, 7003, multicastIP, udpQueueSize, maxPacketsPerSender, false));
@@ -291,7 +291,7 @@ void testNxN()
         maxPacketsPerSender = udpQueueSize;
     Owned <ISendManager> sendMgr;
     if (useAeron)
-        sendMgr.setown(createAeronSendManager(7000, udpNumQs, myNode.getIpAddress()));
+        sendMgr.setown(createAeronSendManager(7000, udpNumQs, myNode.getIpAddress(), false));
     else
         sendMgr.setown(createSendManager(7000, 7001, 7002, 7003, multicastIP, 100, udpNumQs, NULL, false));
     Receiver receiver;