|
@@ -440,214 +440,6 @@ public:
|
|
|
class CSendManager : public CInterface, implements ISendManager
|
|
|
{
|
|
|
friend class send_send_flow;
|
|
|
- class CMessagePacker : public CInterface, implements IMessagePacker
|
|
|
- {
|
|
|
- CSendManager &parent;
|
|
|
- unsigned destNodeIndex;
|
|
|
- UdpPacketHeader package_header;
|
|
|
- DataBuffer *part_buffer;
|
|
|
- unsigned data_buffer_size;
|
|
|
- unsigned data_used;
|
|
|
- void *mem_buffer;
|
|
|
- unsigned mem_buffer_size;
|
|
|
- unsigned totalSize;
|
|
|
- bool packed_request;
|
|
|
- unsigned requested_size;
|
|
|
- MemoryBuffer metaInfo;
|
|
|
- bool last_message_done;
|
|
|
- int queue_number;
|
|
|
-
|
|
|
- public:
|
|
|
- IMPLEMENT_IINTERFACE;
|
|
|
-
|
|
|
- CMessagePacker(ruid_t ruid, unsigned msgId, const void *messageHeader, unsigned headerSize, CSendManager &_parent, unsigned _destNode, unsigned _sourceNode, unsigned _msgSeq, unsigned _queue)
|
|
|
- : parent(_parent)
|
|
|
- {
|
|
|
- queue_number = _queue;
|
|
|
- destNodeIndex = _destNode;
|
|
|
-
|
|
|
- package_header.length = 0; // filled in with proper value later
|
|
|
- package_header.metalength = 0;
|
|
|
- package_header.ruid = ruid;
|
|
|
- package_header.msgId = msgId;
|
|
|
- package_header.pktSeq = 0;
|
|
|
- package_header.nodeIndex = _sourceNode;
|
|
|
- package_header.msgSeq = _msgSeq;
|
|
|
- package_header.udpSequence = 0; // these are allocated when transmitted
|
|
|
-
|
|
|
- packed_request = false;
|
|
|
- part_buffer = bufferManager->allocate();
|
|
|
- data_buffer_size = DATA_PAYLOAD - sizeof(UdpPacketHeader);
|
|
|
- assertex(data_buffer_size >= headerSize + sizeof(unsigned short));
|
|
|
- *(unsigned short *) (&part_buffer->data[sizeof(UdpPacketHeader)]) = headerSize;
|
|
|
- memcpy(&part_buffer->data[sizeof(UdpPacketHeader)+sizeof(unsigned short)], messageHeader, headerSize);
|
|
|
- data_used = headerSize + sizeof(unsigned short);
|
|
|
- mem_buffer = 0;
|
|
|
- mem_buffer_size = 0;
|
|
|
- last_message_done = false;
|
|
|
- totalSize = 0;
|
|
|
-
|
|
|
- if (udpTraceLevel >= 40)
|
|
|
- DBGLOG("UdpSender: CMessagePacker::CMessagePacker - ruid=" RUIDF " id=0x%.8x mseq=%u node=%u queue=%d", ruid, msgId, _msgSeq, destNodeIndex, _queue);
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
- ~CMessagePacker()
|
|
|
- {
|
|
|
- if (part_buffer)
|
|
|
- part_buffer->Release();
|
|
|
- if (mem_buffer) free (mem_buffer);
|
|
|
-
|
|
|
- if (udpTraceLevel >= 40)
|
|
|
- {
|
|
|
- DBGLOG("UdpSender: CMessagePacker::~CMessagePacker - ruid=" RUIDF " id=0x%.8x mseq=%u pktSeq=%x node=%u",
|
|
|
- package_header.ruid, package_header.msgId, package_header.msgSeq, package_header.pktSeq, destNodeIndex);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- virtual void *getBuffer(unsigned len, bool variable)
|
|
|
- {
|
|
|
- if (variable)
|
|
|
- len += sizeof(RecordLengthType);
|
|
|
- if (DATA_PAYLOAD - sizeof(UdpPacketHeader) < len)
|
|
|
- {
|
|
|
- // Won't fit in one, so allocate temp location
|
|
|
- if (mem_buffer_size < len)
|
|
|
- {
|
|
|
- free(mem_buffer);
|
|
|
- mem_buffer = malloc(len);
|
|
|
- mem_buffer_size = len;
|
|
|
- }
|
|
|
- packed_request = false;
|
|
|
- if (variable)
|
|
|
- return ((char *) mem_buffer) + sizeof(RecordLengthType);
|
|
|
- else
|
|
|
- return mem_buffer;
|
|
|
- }
|
|
|
-
|
|
|
- if (part_buffer && ((data_buffer_size - data_used) < len))
|
|
|
- flush(false); // Note that we never span records that are small enough to fit - this can result in significant wastage if record just over DATA_PAYLOAD/2
|
|
|
-
|
|
|
- if (!part_buffer)
|
|
|
- {
|
|
|
- part_buffer = bufferManager->allocate();
|
|
|
- data_buffer_size = DATA_PAYLOAD - sizeof(UdpPacketHeader);
|
|
|
- }
|
|
|
- packed_request = true;
|
|
|
- if (variable)
|
|
|
- return &part_buffer->data[data_used + sizeof(UdpPacketHeader) + sizeof(RecordLengthType)];
|
|
|
- else
|
|
|
- return &part_buffer->data[data_used + sizeof(UdpPacketHeader)];
|
|
|
- }
|
|
|
-
|
|
|
- virtual void putBuffer(const void *buf, unsigned len, bool variable)
|
|
|
- {
|
|
|
- if (variable)
|
|
|
- {
|
|
|
- assertex(len < MAX_RECORD_LENGTH);
|
|
|
- buf = ((char *) buf) - sizeof(RecordLengthType);
|
|
|
- *(RecordLengthType *) buf = len;
|
|
|
- len += sizeof(RecordLengthType);
|
|
|
- }
|
|
|
- totalSize += len;
|
|
|
- if (packed_request)
|
|
|
- {
|
|
|
- assert(len <= (data_buffer_size - data_used));
|
|
|
- data_used += len;
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- while (len)
|
|
|
- {
|
|
|
- if (!part_buffer)
|
|
|
- {
|
|
|
- part_buffer = bufferManager->allocate();
|
|
|
- data_buffer_size = DATA_PAYLOAD - sizeof(UdpPacketHeader);
|
|
|
- data_used = 0;
|
|
|
- }
|
|
|
- unsigned chunkLen = data_buffer_size - data_used;
|
|
|
- if (chunkLen > len)
|
|
|
- chunkLen = len;
|
|
|
- memcpy(&part_buffer->data[sizeof(UdpPacketHeader)+data_used], buf, chunkLen);
|
|
|
- data_used += chunkLen;
|
|
|
- len -= chunkLen;
|
|
|
- buf = &(((char*)buf)[chunkLen]);
|
|
|
- if (len)
|
|
|
- flush(false);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- virtual void sendMetaInfo(const void *buf, unsigned len) {
|
|
|
- metaInfo.append(len, buf);
|
|
|
- }
|
|
|
-
|
|
|
- virtual void flush(bool last_msg = false)
|
|
|
- {
|
|
|
- if (!last_message_done && last_msg)
|
|
|
- {
|
|
|
- last_message_done = true;
|
|
|
- if (!part_buffer)
|
|
|
- part_buffer = bufferManager->allocate();
|
|
|
- const char *metaData = metaInfo.toByteArray();
|
|
|
- unsigned metaLength = metaInfo.length();
|
|
|
- unsigned maxMetaLength = DATA_PAYLOAD - (sizeof(UdpPacketHeader) + data_used);
|
|
|
- while (metaLength > maxMetaLength)
|
|
|
- {
|
|
|
- memcpy(&part_buffer->data[sizeof(UdpPacketHeader)+data_used], metaData, maxMetaLength);
|
|
|
- put_package(part_buffer, data_used, maxMetaLength);
|
|
|
- metaLength -= maxMetaLength;
|
|
|
- metaData += maxMetaLength;
|
|
|
- data_used = 0;
|
|
|
- maxMetaLength = DATA_PAYLOAD - sizeof(UdpPacketHeader);
|
|
|
- part_buffer = bufferManager->allocate();
|
|
|
- }
|
|
|
- memcpy(&part_buffer->data[sizeof(UdpPacketHeader)+data_used], metaData, metaLength);
|
|
|
- package_header.pktSeq |= 0x80000000;
|
|
|
- put_package(part_buffer, data_used, metaLength);
|
|
|
- }
|
|
|
- else if (part_buffer)
|
|
|
- {
|
|
|
- // Just flush current - used when no room for current row
|
|
|
- if (data_used)
|
|
|
- put_package(part_buffer, data_used, 0); // buffer released in put_package
|
|
|
- else
|
|
|
- part_buffer->Release(); // If NO data in buffer, release buffer back to pool
|
|
|
- }
|
|
|
- part_buffer = 0;
|
|
|
- data_buffer_size = 0;
|
|
|
- data_used = 0;
|
|
|
- }
|
|
|
-
|
|
|
- void put_package(DataBuffer *dataBuff, unsigned datalength, unsigned metalength)
|
|
|
- {
|
|
|
- package_header.length = datalength + metalength + sizeof(UdpPacketHeader);
|
|
|
- package_header.metalength = metalength;
|
|
|
- memcpy(dataBuff->data, &package_header, sizeof(package_header));
|
|
|
- parent.writeOwn(destNodeIndex, dataBuff, package_header.length, queue_number);
|
|
|
-
|
|
|
- if (udpTraceLevel >= 50)
|
|
|
- {
|
|
|
- if (package_header.length==991)
|
|
|
- DBGLOG("NEarly");
|
|
|
- DBGLOG("UdpSender: CMessagePacker::put_package Qed packet - ruid=" RUIDF " id=0x%.8X mseq=%u pkseq=0x%.8X len=%u node=%u queue=%d",
|
|
|
- package_header.ruid, package_header.msgId, package_header.msgSeq,
|
|
|
- package_header.pktSeq, package_header.length, destNodeIndex, queue_number);
|
|
|
- }
|
|
|
- package_header.pktSeq++;
|
|
|
- }
|
|
|
-
|
|
|
- virtual bool dataQueued()
|
|
|
- {
|
|
|
- return(parent.dataQueued(package_header.ruid, package_header.msgId, destNodeIndex));
|
|
|
- }
|
|
|
-
|
|
|
- virtual unsigned size() const
|
|
|
- {
|
|
|
- return totalSize;
|
|
|
- }
|
|
|
- };
|
|
|
-
|
|
|
class StartedThread : public Thread
|
|
|
{
|
|
|
private:
|
|
@@ -1197,7 +989,7 @@ public:
|
|
|
{
|
|
|
if (destNodeIndex >= numNodes)
|
|
|
throw MakeStringException(ROXIE_UDP_ERROR, "createMessagePacker: invalid destination node index %i", destNodeIndex);
|
|
|
- return new CMessagePacker(ruid, sequence, messageHeader, headerSize, *this, destNodeIndex, myNodeIndex, getNextMessageSequence(), queue);
|
|
|
+ return ::createMessagePacker(ruid, sequence, messageHeader, headerSize, *this, destNodeIndex, myNodeIndex, getNextMessageSequence(), queue);
|
|
|
}
|
|
|
|
|
|
virtual bool dataQueued(ruid_t ruid, unsigned msgId, unsigned destIndex)
|
|
@@ -1233,4 +1025,216 @@ ISendManager *createSendManager(int server_flow_port, int data_port, int client_
|
|
|
return new CSendManager(server_flow_port, data_port, client_flow_port, sniffer_port, sniffer_multicast_ip, queue_size_pr_server, queues_pr_server, maxRetryData, myNodeIndex, rateLimiter);
|
|
|
}
|
|
|
|
|
|
+class CMessagePacker : public CInterface, implements IMessagePacker
|
|
|
+{
|
|
|
+ ISendManager &parent;
|
|
|
+ unsigned destNodeIndex;
|
|
|
+ UdpPacketHeader package_header;
|
|
|
+ DataBuffer *part_buffer;
|
|
|
+ unsigned data_buffer_size;
|
|
|
+ unsigned data_used;
|
|
|
+ void *mem_buffer;
|
|
|
+ unsigned mem_buffer_size;
|
|
|
+ unsigned totalSize;
|
|
|
+ bool packed_request;
|
|
|
+ unsigned requested_size;
|
|
|
+ MemoryBuffer metaInfo;
|
|
|
+ bool last_message_done;
|
|
|
+ int queue_number;
|
|
|
+
|
|
|
+public:
|
|
|
+ IMPLEMENT_IINTERFACE;
|
|
|
+
|
|
|
+ CMessagePacker(ruid_t ruid, unsigned msgId, const void *messageHeader, unsigned headerSize, ISendManager &_parent, unsigned _destNode, unsigned _sourceNode, unsigned _msgSeq, unsigned _queue)
|
|
|
+ : parent(_parent)
|
|
|
+ {
|
|
|
+ queue_number = _queue;
|
|
|
+ destNodeIndex = _destNode;
|
|
|
+
|
|
|
+ package_header.length = 0; // filled in with proper value later
|
|
|
+ package_header.metalength = 0;
|
|
|
+ package_header.ruid = ruid;
|
|
|
+ package_header.msgId = msgId;
|
|
|
+ package_header.pktSeq = 0;
|
|
|
+ package_header.nodeIndex = _sourceNode;
|
|
|
+ package_header.msgSeq = _msgSeq;
|
|
|
+ package_header.udpSequence = 0; // these are allocated when transmitted
|
|
|
+
|
|
|
+ packed_request = false;
|
|
|
+ part_buffer = bufferManager->allocate();
|
|
|
+ data_buffer_size = DATA_PAYLOAD - sizeof(UdpPacketHeader);
|
|
|
+ assertex(data_buffer_size >= headerSize + sizeof(unsigned short));
|
|
|
+ *(unsigned short *) (&part_buffer->data[sizeof(UdpPacketHeader)]) = headerSize;
|
|
|
+ memcpy(&part_buffer->data[sizeof(UdpPacketHeader)+sizeof(unsigned short)], messageHeader, headerSize);
|
|
|
+ data_used = headerSize + sizeof(unsigned short);
|
|
|
+ mem_buffer = 0;
|
|
|
+ mem_buffer_size = 0;
|
|
|
+ last_message_done = false;
|
|
|
+ totalSize = 0;
|
|
|
+
|
|
|
+ if (udpTraceLevel >= 40)
|
|
|
+ DBGLOG("UdpSender: CMessagePacker::CMessagePacker - ruid=" RUIDF " id=0x%.8x mseq=%u node=%u queue=%d", ruid, msgId, _msgSeq, destNodeIndex, _queue);
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ ~CMessagePacker()
|
|
|
+ {
|
|
|
+ if (part_buffer)
|
|
|
+ part_buffer->Release();
|
|
|
+ if (mem_buffer) free (mem_buffer);
|
|
|
+
|
|
|
+ if (udpTraceLevel >= 40)
|
|
|
+ {
|
|
|
+ DBGLOG("UdpSender: CMessagePacker::~CMessagePacker - ruid=" RUIDF " id=0x%.8x mseq=%u pktSeq=%x node=%u",
|
|
|
+ package_header.ruid, package_header.msgId, package_header.msgSeq, package_header.pktSeq, destNodeIndex);
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
+ virtual void *getBuffer(unsigned len, bool variable)
|
|
|
+ {
|
|
|
+ if (variable)
|
|
|
+ len += sizeof(RecordLengthType);
|
|
|
+ if (DATA_PAYLOAD - sizeof(UdpPacketHeader) < len)
|
|
|
+ {
|
|
|
+ // Won't fit in one, so allocate temp location
|
|
|
+ if (mem_buffer_size < len)
|
|
|
+ {
|
|
|
+ free(mem_buffer);
|
|
|
+ mem_buffer = malloc(len);
|
|
|
+ mem_buffer_size = len;
|
|
|
+ }
|
|
|
+ packed_request = false;
|
|
|
+ if (variable)
|
|
|
+ return ((char *) mem_buffer) + sizeof(RecordLengthType);
|
|
|
+ else
|
|
|
+ return mem_buffer;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (part_buffer && ((data_buffer_size - data_used) < len))
|
|
|
+ flush(false); // Note that we never span records that are small enough to fit - this can result in significant wastage if record just over DATA_PAYLOAD/2
|
|
|
+
|
|
|
+ if (!part_buffer)
|
|
|
+ {
|
|
|
+ part_buffer = bufferManager->allocate();
|
|
|
+ data_buffer_size = DATA_PAYLOAD - sizeof(UdpPacketHeader);
|
|
|
+ }
|
|
|
+ packed_request = true;
|
|
|
+ if (variable)
|
|
|
+ return &part_buffer->data[data_used + sizeof(UdpPacketHeader) + sizeof(RecordLengthType)];
|
|
|
+ else
|
|
|
+ return &part_buffer->data[data_used + sizeof(UdpPacketHeader)];
|
|
|
+ }
|
|
|
+
|
|
|
+ virtual void putBuffer(const void *buf, unsigned len, bool variable)
|
|
|
+ {
|
|
|
+ if (variable)
|
|
|
+ {
|
|
|
+ assertex(len < MAX_RECORD_LENGTH);
|
|
|
+ buf = ((char *) buf) - sizeof(RecordLengthType);
|
|
|
+ *(RecordLengthType *) buf = len;
|
|
|
+ len += sizeof(RecordLengthType);
|
|
|
+ }
|
|
|
+ totalSize += len;
|
|
|
+ if (packed_request)
|
|
|
+ {
|
|
|
+ assert(len <= (data_buffer_size - data_used));
|
|
|
+ data_used += len;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ while (len)
|
|
|
+ {
|
|
|
+ if (!part_buffer)
|
|
|
+ {
|
|
|
+ part_buffer = bufferManager->allocate();
|
|
|
+ data_buffer_size = DATA_PAYLOAD - sizeof(UdpPacketHeader);
|
|
|
+ data_used = 0;
|
|
|
+ }
|
|
|
+ unsigned chunkLen = data_buffer_size - data_used;
|
|
|
+ if (chunkLen > len)
|
|
|
+ chunkLen = len;
|
|
|
+ memcpy(&part_buffer->data[sizeof(UdpPacketHeader)+data_used], buf, chunkLen);
|
|
|
+ data_used += chunkLen;
|
|
|
+ len -= chunkLen;
|
|
|
+ buf = &(((char*)buf)[chunkLen]);
|
|
|
+ if (len)
|
|
|
+ flush(false);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ virtual void sendMetaInfo(const void *buf, unsigned len) {
|
|
|
+ metaInfo.append(len, buf);
|
|
|
+ }
|
|
|
+
|
|
|
+ virtual void flush(bool last_msg = false)
|
|
|
+ {
|
|
|
+ if (!last_message_done && last_msg)
|
|
|
+ {
|
|
|
+ last_message_done = true;
|
|
|
+ if (!part_buffer)
|
|
|
+ part_buffer = bufferManager->allocate();
|
|
|
+ const char *metaData = metaInfo.toByteArray();
|
|
|
+ unsigned metaLength = metaInfo.length();
|
|
|
+ unsigned maxMetaLength = DATA_PAYLOAD - (sizeof(UdpPacketHeader) + data_used);
|
|
|
+ while (metaLength > maxMetaLength)
|
|
|
+ {
|
|
|
+ memcpy(&part_buffer->data[sizeof(UdpPacketHeader)+data_used], metaData, maxMetaLength);
|
|
|
+ put_package(part_buffer, data_used, maxMetaLength);
|
|
|
+ metaLength -= maxMetaLength;
|
|
|
+ metaData += maxMetaLength;
|
|
|
+ data_used = 0;
|
|
|
+ maxMetaLength = DATA_PAYLOAD - sizeof(UdpPacketHeader);
|
|
|
+ part_buffer = bufferManager->allocate();
|
|
|
+ }
|
|
|
+ memcpy(&part_buffer->data[sizeof(UdpPacketHeader)+data_used], metaData, metaLength);
|
|
|
+ package_header.pktSeq |= 0x80000000;
|
|
|
+ put_package(part_buffer, data_used, metaLength);
|
|
|
+ }
|
|
|
+ else if (part_buffer)
|
|
|
+ {
|
|
|
+ // Just flush current - used when no room for current row
|
|
|
+ if (data_used)
|
|
|
+ put_package(part_buffer, data_used, 0); // buffer released in put_package
|
|
|
+ else
|
|
|
+ part_buffer->Release(); // If NO data in buffer, release buffer back to pool
|
|
|
+ }
|
|
|
+ part_buffer = 0;
|
|
|
+ data_buffer_size = 0;
|
|
|
+ data_used = 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ void put_package(DataBuffer *dataBuff, unsigned datalength, unsigned metalength)
|
|
|
+ {
|
|
|
+ package_header.length = datalength + metalength + sizeof(UdpPacketHeader);
|
|
|
+ package_header.metalength = metalength;
|
|
|
+ memcpy(dataBuff->data, &package_header, sizeof(package_header));
|
|
|
+ parent.writeOwn(destNodeIndex, dataBuff, package_header.length, queue_number);
|
|
|
+
|
|
|
+ if (udpTraceLevel >= 50)
|
|
|
+ {
|
|
|
+ if (package_header.length==991)
|
|
|
+ DBGLOG("NEarly");
|
|
|
+ DBGLOG("UdpSender: CMessagePacker::put_package Qed packet - ruid=" RUIDF " id=0x%.8X mseq=%u pkseq=0x%.8X len=%u node=%u queue=%d",
|
|
|
+ package_header.ruid, package_header.msgId, package_header.msgSeq,
|
|
|
+ package_header.pktSeq, package_header.length, destNodeIndex, queue_number);
|
|
|
+ }
|
|
|
+ package_header.pktSeq++;
|
|
|
+ }
|
|
|
+
|
|
|
+ virtual bool dataQueued()
|
|
|
+ {
|
|
|
+ return(parent.dataQueued(package_header.ruid, package_header.msgId, destNodeIndex));
|
|
|
+ }
|
|
|
+
|
|
|
+ virtual unsigned size() const
|
|
|
+ {
|
|
|
+ return totalSize;
|
|
|
+ }
|
|
|
+};
|
|
|
+
|
|
|
+
|
|
|
+IMessagePacker *createMessagePacker(ruid_t ruid, unsigned msgId, const void *messageHeader, unsigned headerSize, ISendManager &_parent, unsigned _destNode, unsigned _sourceNode, unsigned _msgSeq, unsigned _queue)
|
|
|
+{
|
|
|
+ return new CMessagePacker(ruid, msgId, messageHeader, headerSize, _parent, _destNode, _sourceNode, _msgSeq, _queue);
|
|
|
+}
|