|
@@ -18,6 +18,8 @@
|
|
|
#include "udplib.hpp"
|
|
|
#include "udpsha.hpp"
|
|
|
#include "udptrs.hpp"
|
|
|
+#include "udpipmap.hpp"
|
|
|
+
|
|
|
#include "jsocket.hpp"
|
|
|
#include "jlog.hpp"
|
|
|
#include "roxie.hpp"
|
|
@@ -34,73 +36,154 @@
|
|
|
unsigned udpOutQsPriority = 0;
|
|
|
unsigned udpMaxRetryTimedoutReqs = 0; // 0 means off (keep retrying forever)
|
|
|
unsigned udpRequestToSendTimeout = 0; // value in milliseconds - 0 means calculate from query timeouts
|
|
|
+unsigned udpRequestToSendAckTimeout = 10; // value in milliseconds
|
|
|
bool udpSnifferEnabled = true;
|
|
|
|
|
|
-#ifdef _DEBUG
|
|
|
-//#define _SIMULATE_LOST_PACKETS
|
|
|
-#endif
|
|
|
-
|
|
|
using roxiemem::DataBuffer;
|
|
|
// MORE - why use DataBuffers on output side?? We could use zeroCopy techniques if we had a dedicated memory area.
|
|
|
-
|
|
|
-class UdpReceiverEntry
|
|
|
+// But using them on this side means we guarantee that the packets fit into databuffers on the other side... But so would matching their size
|
|
|
+
|
|
|
+/*
|
|
|
+ *
|
|
|
+ * There are 3 threads running to manage the data transfer from slave back to server:
|
|
|
+ * send_resend_flow
|
|
|
+ * - checks periodically that nothing is waiting for a "request to send" that timed out
|
|
|
+ * send_receive_flow
|
|
|
+ * - waits on socket receiving "ok_to_send" packets from servers
|
|
|
+ * - updates state of relevant receivers
|
|
|
+ * - pushes permission tokens to a queue
|
|
|
+ * send_data
|
|
|
+ * - waits on queue of permission tokens
|
|
|
+ * - broadcasts "busy"
|
|
|
+ * - writes data to server
|
|
|
+ * - broadcasts "no longer "
|
|
|
+ * - sends "completed" or "completed but I want to send more" flow message to server
|
|
|
+ *
|
|
|
+ * Queueing up data packets is done by the slave worker threads.
|
|
|
+ * *
|
|
|
+
|
|
|
+ *
|
|
|
+ * Data races to watch for
|
|
|
+ * 1. Two slave threads add data at same time - only one should sent rts (use atomic_inc for the count)
|
|
|
+ * 2. We check for timeout and resend rts or fail just as permission comes in
|
|
|
+ * - resend rts is harmless ?
|
|
|
+ * - fail is acceptable
|
|
|
+ * 3. After sending data, we need to decide whether to set state to 'pending' (and send rts) or empty. If we read count, decide it's zero
|
|
|
+ * and then (before we set state) someone adds data (and sends rts), we must not set state to empty. CAS to set state empty only if
|
|
|
+ * it's sending_data perhaps?
|
|
|
+ * 4. While sending data, someone adds new data. They need to send rts and set state to pending whether empty or sending_data
|
|
|
+ * 5. Do we need sending_data state? Is it the same as empty, really? Is empty the same as 'count==0' ? Do we even need state?
|
|
|
+ * - send rts whenever incrementing count from zero
|
|
|
+ * - resend rts if count is non-zero and timed out
|
|
|
+ * - resend rts if we send data but there is some remaining
|
|
|
+ */
|
|
|
+
|
|
|
+class UdpReceiverEntry : public IUdpReceiverEntry
|
|
|
{
|
|
|
- queue_t *output_queue;
|
|
|
- bool initialized;
|
|
|
-
|
|
|
-public:
|
|
|
- ISocket *send_flow_socket;
|
|
|
- ISocket *data_socket;
|
|
|
- unsigned numQueues;
|
|
|
- int current_q;
|
|
|
- int currentQNumPkts; // Current Queue Number of Consecutive Processed Packets.
|
|
|
- int *maxPktsPerQ; // to minimise power function re-calc for evey packet
|
|
|
-
|
|
|
- // MORE - consider where we need critsecs in here!
|
|
|
-
|
|
|
- void sendRequest(unsigned myNodeIndex, flow_t::flowmsg_t cmd)
|
|
|
- {
|
|
|
- UdpRequestToSendMsg msg = {sizeof(UdpRequestToSendMsg), static_cast<unsigned short>(cmd), static_cast<unsigned short>(myNodeIndex), 0};
|
|
|
- try
|
|
|
- {
|
|
|
- send_flow_socket->write(&msg, msg.length);
|
|
|
+private:
|
|
|
+ queue_t *output_queue = nullptr;
|
|
|
+ bool initialized = false;
|
|
|
+ const bool isLocal = false;
|
|
|
+ ISocket *send_flow_socket = nullptr;
|
|
|
+ ISocket *data_socket = nullptr;
|
|
|
+ const unsigned numQueues;
|
|
|
+ int current_q = 0;
|
|
|
+ int currentQNumPkts = 0; // Current Queue Number of Consecutive Processed Packets.
|
|
|
+ int *maxPktsPerQ = nullptr; // to minimise power function re-calc for every packet
|
|
|
+
|
|
|
+ void sendRequest(flowType::flowCmd cmd, unsigned packets )
|
|
|
+ {
|
|
|
+ UdpRequestToSendMsg msg = { cmd, static_cast<unsigned short>(packets), sourceIP };
|
|
|
+ try
|
|
|
+ {
|
|
|
+ if (udpTraceLevel > 3)
|
|
|
+ {
|
|
|
+ StringBuffer s;
|
|
|
+ DBGLOG("UdpSender: sending flowType::%s msg to node=%s", flowType::name(cmd), ip.getIpText(s).str());
|
|
|
+ }
|
|
|
+ send_flow_socket->write(&msg, sizeof(UdpRequestToSendMsg));
|
|
|
}
|
|
|
- catch(IException *e)
|
|
|
+ catch(IException *e)
|
|
|
{
|
|
|
StringBuffer s;
|
|
|
DBGLOG("UdpSender: sendRequest write failed - %s", e->errorMessage(s).str());
|
|
|
e->Release();
|
|
|
}
|
|
|
- catch (...)
|
|
|
+ catch (...)
|
|
|
{
|
|
|
DBGLOG("UdpSender: sendRequest write failed - unknown error");
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- unsigned sendData(const UdpPermitToSendMsg &permit, bool isLocal, TokenBucket *bucket, bool &moreRequested, unsigned &maxPackets)
|
|
|
+ const IpAddress sourceIP;
|
|
|
+public:
|
|
|
+ const IpAddress ip;
|
|
|
+ unsigned timeouts = 0; // Number of consecutive timeouts
|
|
|
+ unsigned requestExpiryTime = 0;
|
|
|
+
|
|
|
+ static bool comparePacket(const void *pkData, const void *key)
|
|
|
+ {
|
|
|
+ UdpPacketHeader *dataHdr = (UdpPacketHeader*) ((DataBuffer*)pkData)->data;
|
|
|
+ UdpPacketHeader *keyHdr = (UdpPacketHeader*) key;
|
|
|
+ return ( (dataHdr->ruid == keyHdr->ruid) && (dataHdr->msgId == keyHdr->msgId) );
|
|
|
+ }
|
|
|
+
|
|
|
+ std::atomic<unsigned> packetsQueued = { 0 };
|
|
|
+
|
|
|
+ void sendDone(unsigned packets)
|
|
|
+ {
|
|
|
+ bool dataRemaining = packetsQueued.load(std::memory_order_relaxed);
|
|
|
+ // If dataRemaining says 0, but someone adds a row in this window, the request_to_send will be sent BEFORE the send_completed
|
|
|
+ // So long as receiver handles that, are we good?
|
|
|
+ if (dataRemaining)
|
|
|
+ {
|
|
|
+ requestExpiryTime = msTick() + udpRequestToSendAckTimeout;
|
|
|
+ sendRequest(flowType::request_to_send_more, packets);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ requestExpiryTime = 0;
|
|
|
+ sendRequest(flowType::send_completed, packets);
|
|
|
+ }
|
|
|
+ timeouts = 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ void requestToSend()
|
|
|
+ {
|
|
|
+ requestExpiryTime = msTick() + udpRequestToSendAckTimeout;
|
|
|
+ sendRequest(flowType::request_to_send, 0);
|
|
|
+ }
|
|
|
+
|
|
|
+ void requestAcknowledged()
|
|
|
+ {
|
|
|
+ if (requestExpiryTime)
|
|
|
+ requestExpiryTime = msTick() + udpRequestToSendTimeout;
|
|
|
+ }
|
|
|
+
|
|
|
+ // MORE - consider where/if we need critsecs in here!
|
|
|
+
|
|
|
+ unsigned sendData(const UdpPermitToSendMsg &permit, TokenBucket *bucket)
|
|
|
{
|
|
|
- moreRequested = false;
|
|
|
- maxPackets = permit.max_data;
|
|
|
- PointerArray toSend;
|
|
|
+ requestExpiryTime = 0;
|
|
|
+ unsigned maxPackets = permit.max_data;
|
|
|
+ std::vector<DataBuffer *> toSend;
|
|
|
unsigned totalSent = 0;
|
|
|
- while (toSend.length() < maxPackets && dataQueued())
|
|
|
+ while (toSend.size() < maxPackets && packetsQueued.load(std::memory_order_relaxed))
|
|
|
{
|
|
|
DataBuffer *buffer = popQueuedData();
|
|
|
if (buffer) // Aborted slave queries leave NULL records on queue
|
|
|
{
|
|
|
UdpPacketHeader *header = (UdpPacketHeader*) buffer->data;
|
|
|
- toSend.append(buffer);
|
|
|
+ toSend.push_back(buffer);
|
|
|
totalSent += header->length;
|
|
|
-#ifdef __linux__
|
|
|
- if (isLocal && (totalSent> 100000))
|
|
|
+#if defined(__linux__) || defined(__APPLE__)
|
|
|
+ if (isLocal && (totalSent> 100000)) // Avoids sending too fast to local node, for reasons lost in the mists of time
|
|
|
break;
|
|
|
#endif
|
|
|
}
|
|
|
}
|
|
|
- maxPackets = toSend.length();
|
|
|
- for (unsigned idx = 0; idx < maxPackets; idx++)
|
|
|
+ for (DataBuffer *buffer: toSend)
|
|
|
{
|
|
|
- DataBuffer *buffer = (DataBuffer *) toSend.item(idx);
|
|
|
UdpPacketHeader *header = (UdpPacketHeader*) buffer->data;
|
|
|
unsigned length = header->length;
|
|
|
if (bucket)
|
|
@@ -124,43 +207,60 @@ public:
|
|
|
}
|
|
|
::Release(buffer);
|
|
|
}
|
|
|
+ sendDone(toSend.size());
|
|
|
return totalSent;
|
|
|
}
|
|
|
|
|
|
- bool dataQueued()
|
|
|
+ bool dataQueued(const UdpPacketHeader &key)
|
|
|
{
|
|
|
- for (unsigned i = 0; i < numQueues; i++)
|
|
|
+ // Used when a retry packet is received, to determine whether the query is in fact completed
|
|
|
+ // but just stuck in transit queues
|
|
|
+ if (packetsQueued.load(std::memory_order_relaxed))
|
|
|
{
|
|
|
- if (!output_queue[i].empty())
|
|
|
- return true;
|
|
|
+ for (unsigned i = 0; i < numQueues; i++)
|
|
|
+ {
|
|
|
+ if (output_queue[i].dataQueued(&key, &comparePacket))
|
|
|
+ return true;
|
|
|
+ }
|
|
|
}
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
- bool dataQueued(void *key, PKT_CMP_FUN pkCmpFn)
|
|
|
+ bool removeData(void *key, PKT_CMP_FUN pkCmpFn)
|
|
|
{
|
|
|
- for (unsigned i = 0; i < numQueues; i++)
|
|
|
+ // Used after receiving an abort, to avoid sending data that is no longer required
|
|
|
+ bool anyRemoved = false;
|
|
|
+ if (packetsQueued.load(std::memory_order_relaxed))
|
|
|
{
|
|
|
- if (output_queue[i].dataQueued(key, pkCmpFn))
|
|
|
- return true;
|
|
|
+ // NOTE - removeData replaces entries by null (so value of packetsQueued is not affected)
|
|
|
+ for (unsigned i = 0; i < numQueues; i++)
|
|
|
+ {
|
|
|
+ if (output_queue[i].removeData(key, pkCmpFn))
|
|
|
+ anyRemoved = true;
|
|
|
+ }
|
|
|
}
|
|
|
- return false;
|
|
|
+ return anyRemoved;
|
|
|
}
|
|
|
|
|
|
- bool removeData(void *key, PKT_CMP_FUN pkCmpFn)
|
|
|
+ void abort()
|
|
|
{
|
|
|
- bool anyRemoved = false;
|
|
|
- for (unsigned i = 0; i < numQueues; i++)
|
|
|
+ // Called if too many timeouts on a request to send
|
|
|
+
|
|
|
+ if (udpTraceLevel > 3)
|
|
|
{
|
|
|
- if (output_queue[i].removeData(key, pkCmpFn))
|
|
|
- anyRemoved = true;
|
|
|
+ StringBuffer s;
|
|
|
+ DBGLOG("UdpSender: abort sending queued data to node=%s", ip.getIpText(s).str());
|
|
|
}
|
|
|
- return anyRemoved;
|
|
|
+ timeouts = 0;
|
|
|
+ requestExpiryTime = 0;
|
|
|
+ removeData(nullptr, nullptr);
|
|
|
}
|
|
|
|
|
|
inline void pushData(unsigned queue, DataBuffer *buffer)
|
|
|
{
|
|
|
output_queue[queue].pushOwn(buffer);
|
|
|
+ if (!packetsQueued++)
|
|
|
+ requestToSend();
|
|
|
}
|
|
|
|
|
|
DataBuffer *popQueuedData()
|
|
@@ -190,6 +290,7 @@ public:
|
|
|
currentQNumPkts = 0;
|
|
|
current_q = (current_q + 1) % numQueues;
|
|
|
}
|
|
|
+ packetsQueued--;
|
|
|
return buffer;
|
|
|
}
|
|
|
}
|
|
@@ -198,37 +299,29 @@ public:
|
|
|
current_q = (current_q + 1) % numQueues;
|
|
|
if (!output_queue[current_q].empty())
|
|
|
{
|
|
|
+ packetsQueued--;
|
|
|
return output_queue[current_q].pop();
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+ // If we get here, it suggests we were told to get a buffer but no queue has one
|
|
|
+ // Should never happen
|
|
|
MilliSleep(10);
|
|
|
DBGLOG("UdpSender: ------------- this code should never execute --------------- ");
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- UdpReceiverEntry()
|
|
|
- {
|
|
|
- send_flow_socket = data_socket = NULL;
|
|
|
- numQueues = 0;
|
|
|
- current_q = 0;
|
|
|
- initialized = false;
|
|
|
- output_queue = 0;
|
|
|
- currentQNumPkts = 0;
|
|
|
- maxPktsPerQ = 0;
|
|
|
- }
|
|
|
-
|
|
|
- void init(unsigned destNodeIndex, unsigned _numQueues, unsigned queueSize, unsigned sendFlowPort, unsigned dataPort, bool isLocal)
|
|
|
+ UdpReceiverEntry(const IpAddress &_ip, const IpAddress &_sourceIP, unsigned _numQueues, unsigned _queueSize, unsigned _sendFlowPort, unsigned _dataPort)
|
|
|
+ : ip (_ip), sourceIP(_sourceIP), numQueues(_numQueues), isLocal(_ip.isLocal())
|
|
|
{
|
|
|
assert(!initialized);
|
|
|
- numQueues = _numQueues;
|
|
|
- const IpAddress &ip = getNodeAddress(destNodeIndex);
|
|
|
+ assert(numQueues > 0);
|
|
|
if (!ip.isNull())
|
|
|
{
|
|
|
try
|
|
|
{
|
|
|
- SocketEndpoint sendFlowEp(sendFlowPort, ip);
|
|
|
- SocketEndpoint dataEp(dataPort, ip);
|
|
|
+ SocketEndpoint sendFlowEp(_sendFlowPort, ip);
|
|
|
+ SocketEndpoint dataEp(_dataPort, ip);
|
|
|
send_flow_socket = ISocket::udp_connect(sendFlowEp);
|
|
|
data_socket = ISocket::udp_connect(dataEp);
|
|
|
if (isLocal)
|
|
@@ -254,14 +347,14 @@ public:
|
|
|
maxPktsPerQ = new int[numQueues];
|
|
|
for (unsigned j = 0; j < numQueues; j++)
|
|
|
{
|
|
|
- output_queue[j].set_queue_size(queueSize);
|
|
|
+ output_queue[j].set_queue_size(_queueSize);
|
|
|
maxPktsPerQ[j] = (int) pow((double)udpOutQsPriority, (double)numQueues - j - 1);
|
|
|
}
|
|
|
initialized = true;
|
|
|
if (udpTraceLevel > 0)
|
|
|
{
|
|
|
StringBuffer ipStr;
|
|
|
- DBGLOG("UdpSender: added entry for ip=%s to receivers table at index=%d - send_flow_port=%d", ip.getIpText(ipStr).str(), destNodeIndex, sendFlowPort);
|
|
|
+ DBGLOG("UdpSender: added entry for ip=%s to receivers table - send_flow_port=%d", ip.getIpText(ipStr).str(), _sendFlowPort);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -278,7 +371,6 @@ public:
|
|
|
|
|
|
class CSendManager : implements ISendManager, public CInterface
|
|
|
{
|
|
|
- friend class send_send_flow;
|
|
|
class StartedThread : public Thread
|
|
|
{
|
|
|
private:
|
|
@@ -312,202 +404,63 @@ class CSendManager : implements ISendManager, public CInterface
|
|
|
virtual int doRun() = 0;
|
|
|
};
|
|
|
|
|
|
-
|
|
|
- class send_send_flow : public StartedThread
|
|
|
+ class send_resend_flow : public StartedThread
|
|
|
{
|
|
|
- /*
|
|
|
- I don't like this code much at all
|
|
|
- Looping round all every time status of any changes seems like a bad thing especially as scale goes up
|
|
|
- Even though these look like a bitmap they are not used as such presently
|
|
|
- - as a result, if data_added() is called while state is completed, we lose the request I think
|
|
|
- - probably get away with it because of the dataqueued check
|
|
|
-
|
|
|
- doRun() uses state bits without protection
|
|
|
-
|
|
|
- A count of number pending for each might be better than just a flag
|
|
|
-
|
|
|
- Circular buffers to give a list of which ones are in a given state would speed up the processing in the thread?
|
|
|
- - normally MANY in pending (and order is interesting)
|
|
|
- - normally few in any other state (only 1 if thread keeping up), order not really very interesting
|
|
|
- - Want to keep expense on caller threads low (at the moment just set flag and sometimes signal)
|
|
|
- - in particular don't lock while processing the chain
|
|
|
- - Never need to be in >1 chain
|
|
|
- msTick() probably better than time() for detecting timeouts
|
|
|
-
|
|
|
- */
|
|
|
- enum bits { new_request = 1, pending_request = 2, sending_data = 4, completed = 8, completed_more = 16 };
|
|
|
-
|
|
|
- unsigned target_count;
|
|
|
-
|
|
|
- char *state;
|
|
|
- unsigned char *timeouts; // Number of consecutive timeouts
|
|
|
- unsigned *request_time;
|
|
|
-
|
|
|
- CriticalSection cr;
|
|
|
- Semaphore sem;
|
|
|
+ // Check if any senders have timed out
|
|
|
CSendManager &parent;
|
|
|
+ Semaphore terminated;
|
|
|
|
|
|
- virtual int doRun()
|
|
|
+ virtual int doRun() override
|
|
|
{
|
|
|
- // MORE - this is reading the state values unprotected
|
|
|
- // Not sure that this represents any issue in practice...
|
|
|
if (udpTraceLevel > 0)
|
|
|
- DBGLOG("UdpSender: send_send_flow started - node=%u", parent.myNodeIndex);
|
|
|
-
|
|
|
- while (running)
|
|
|
+ DBGLOG("UdpSender: send_resend_flow started");
|
|
|
+ unsigned timeout = udpRequestToSendTimeout;
|
|
|
+ while (running)
|
|
|
{
|
|
|
- bool idle = false;
|
|
|
- if (sem.wait(1000))
|
|
|
- {
|
|
|
- if (udpTraceLevel > 4)
|
|
|
- DBGLOG("UdpSender: send_send_flow::doRun signal received");
|
|
|
- }
|
|
|
- else
|
|
|
- idle = true;
|
|
|
- if (!running) return 0;
|
|
|
+ if (terminated.wait(timeout) || !running)
|
|
|
+ break;
|
|
|
|
|
|
unsigned now = msTick();
|
|
|
-
|
|
|
- // I don't really like this loop. Could keep a circular buffer of ones with non-zero state?
|
|
|
- // In a typical heavy load scenario most will be pending
|
|
|
- // Really two separate FIFOs - pending and active. Except that stuff pulled off pending in arbitrary order
|
|
|
- // Separate lists for each state (don't need one for sending) ?
|
|
|
-
|
|
|
- for (unsigned i = 0; i < target_count; i++)
|
|
|
+ timeout = udpRequestToSendTimeout;
|
|
|
+ for (auto&& dest: parent.receiversTable)
|
|
|
{
|
|
|
- switch (state[i]) // MORE - should really protect it?
|
|
|
+ unsigned expireTime = dest.requestExpiryTime;
|
|
|
+ if (expireTime)
|
|
|
{
|
|
|
- case completed:
|
|
|
- done(i, false);
|
|
|
- break;
|
|
|
- case completed_more:
|
|
|
- done(i, true);
|
|
|
- break;
|
|
|
- case pending_request:
|
|
|
- if ( (now - request_time[i]) < udpRequestToSendTimeout) // MORE - should really protect it?
|
|
|
- break;
|
|
|
- timeouts[i]++;
|
|
|
- EXCLOG(MCoperatorError,"ERROR: UdpSender: timed out %i times (max=%i) waiting ok_to_send msg from node=%d timed out after=%i msec max=%i msec",
|
|
|
- timeouts[i], udpMaxRetryTimedoutReqs,
|
|
|
- i, (int) (now - request_time[i]), udpRequestToSendTimeout);
|
|
|
- // 0 (zero) value of udpMaxRetryTimedoutReqs means NO limit on retries
|
|
|
- if (udpMaxRetryTimedoutReqs && (timeouts[i] >= udpMaxRetryTimedoutReqs))
|
|
|
- {
|
|
|
- abort(i);
|
|
|
- break;
|
|
|
- }
|
|
|
- // fall into...
|
|
|
-
|
|
|
- case new_request:
|
|
|
- sendRequest(i);
|
|
|
- break;
|
|
|
- default:
|
|
|
- if (idle && parent.dataQueued(i))
|
|
|
+ if (expireTime < now)
|
|
|
{
|
|
|
- EXCLOG(MCoperatorError, "State is idle but data is queued - should not happen (index = %u). Attempting recovery.", i);
|
|
|
- data_added(i);
|
|
|
+ dest.timeouts++;
|
|
|
+ {
|
|
|
+ StringBuffer s;
|
|
|
+ EXCLOG(MCoperatorError,"ERROR: UdpSender: timed out %i times (max=%i) waiting ok_to_send msg from node=%s",
|
|
|
+ dest.timeouts, udpMaxRetryTimedoutReqs, dest.ip.getIpText(s).str());
|
|
|
+ }
|
|
|
+ // 0 (zero) value of udpMaxRetryTimedoutReqs means NO limit on retries
|
|
|
+ if (udpMaxRetryTimedoutReqs && (dest.timeouts >= udpMaxRetryTimedoutReqs))
|
|
|
+ dest.abort();
|
|
|
+ else
|
|
|
+ dest.requestToSend();
|
|
|
}
|
|
|
+ else if (expireTime-now < timeout)
|
|
|
+ timeout = expireTime-now;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
return 0;
|
|
|
}
|
|
|
-
|
|
|
- void done(unsigned index, bool moreRequested)
|
|
|
- {
|
|
|
- bool dataRemaining;
|
|
|
- {
|
|
|
- CriticalBlock b(cr);
|
|
|
- dataRemaining = parent.dataQueued(index);
|
|
|
- if (dataRemaining)
|
|
|
- {
|
|
|
- state[index] = pending_request;
|
|
|
- request_time[index] = msTick();
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- state[index] = 0;
|
|
|
- timeouts[index] = 0;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- if (udpTraceLevel > 3)
|
|
|
- DBGLOG("UdpSender: sending send_completed msg to node=%u, dataRemaining=%d", index, dataRemaining);
|
|
|
- parent.sendRequest(index, dataRemaining ? flow_t::request_to_send_more : flow_t::send_completed);
|
|
|
- }
|
|
|
-
|
|
|
- void sendRequest(unsigned index)
|
|
|
- {
|
|
|
- if (udpTraceLevel > 3)
|
|
|
- DBGLOG("UdpSender: sending request_to_send msg to node=%u", index);
|
|
|
- CriticalBlock b(cr);
|
|
|
- parent.sendRequest(index, flow_t::request_to_send);
|
|
|
- state[index] = pending_request;
|
|
|
- request_time[index] = msTick();
|
|
|
- }
|
|
|
-
|
|
|
- void abort(unsigned index)
|
|
|
- {
|
|
|
- if (udpTraceLevel > 3)
|
|
|
- DBGLOG("UdpSender: abort sending queued data to node=%u", index);
|
|
|
-
|
|
|
- CriticalBlock b(cr);
|
|
|
- state[index] = 0;
|
|
|
- timeouts[index] = 0;
|
|
|
- parent.abortData(index);
|
|
|
- }
|
|
|
|
|
|
public:
|
|
|
- send_send_flow(CSendManager &_parent, unsigned numNodes)
|
|
|
- : StartedThread("UdpLib::send_send_flow"), parent(_parent)
|
|
|
+ send_resend_flow(CSendManager &_parent)
|
|
|
+ : StartedThread("UdpLib::send_resend_flow"), parent(_parent)
|
|
|
{
|
|
|
- target_count = numNodes;
|
|
|
- state = new char [target_count];
|
|
|
- memset(state, 0, target_count);
|
|
|
- timeouts = new unsigned char [target_count];
|
|
|
- memset(timeouts, 0, target_count);
|
|
|
- request_time = new unsigned [target_count];
|
|
|
- memset(request_time, 0, sizeof(unsigned) * target_count);
|
|
|
start();
|
|
|
}
|
|
|
|
|
|
- ~send_send_flow()
|
|
|
+ ~send_resend_flow()
|
|
|
{
|
|
|
running = false;
|
|
|
- sem.signal();
|
|
|
+ terminated.signal();
|
|
|
join();
|
|
|
- delete [] state;
|
|
|
- delete [] timeouts;
|
|
|
- delete [] request_time;
|
|
|
- }
|
|
|
-
|
|
|
- void clear_to_send_received(unsigned index)
|
|
|
- {
|
|
|
- CriticalBlock b(cr);
|
|
|
- state[index] = sending_data;
|
|
|
- }
|
|
|
-
|
|
|
- void send_done(unsigned index, bool moreRequested)
|
|
|
- {
|
|
|
- CriticalBlock b(cr);
|
|
|
- state[index] = moreRequested ? completed_more : completed;
|
|
|
- sem.signal();
|
|
|
- }
|
|
|
-
|
|
|
- void data_added(unsigned index)
|
|
|
- {
|
|
|
- CriticalBlock b(cr);
|
|
|
- // MORE - this looks wrong. If I add data while sending, may get delayed until next time I have data to send?? Why declare as bitmap if not going to use it?
|
|
|
- // Because done() checks to see if any data pending and re-calls data_added, we get away with it
|
|
|
- // Using bits sounds more sensible though?
|
|
|
- // Actually be careful, since a send may not send ALL the data - you'd still need to call data_added if that happened. Maybe as it is is ok.
|
|
|
- if (!state[index]) // MORE - should just test the bit?
|
|
|
- {
|
|
|
- state[index] = new_request;
|
|
|
- if (udpTraceLevel > 3)
|
|
|
- DBGLOG("UdpSender: state set to new_request for node=%u", index);
|
|
|
- sem.signal();
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
};
|
|
@@ -548,25 +501,34 @@ class CSendManager : implements ISendManager, public CInterface
|
|
|
#endif
|
|
|
while(running)
|
|
|
{
|
|
|
- UdpPermitToSendMsg f;
|
|
|
+ UdpPermitToSendMsg f = { flowType::ok_to_send, 0, { } };
|
|
|
while (running)
|
|
|
{
|
|
|
try
|
|
|
{
|
|
|
unsigned int res ;
|
|
|
- flow_socket->read(&f, 1, sizeof(f), res, 5);
|
|
|
- assertex(res == f.length);
|
|
|
-#ifdef CRC_MESSAGES
|
|
|
- assertex(f.hdr.crc == f.calcCRC());
|
|
|
-#endif
|
|
|
+ flow_socket->read(&f, sizeof(f), sizeof(f), res, 5);
|
|
|
+ assert(res==sizeof(f));
|
|
|
switch (f.cmd)
|
|
|
{
|
|
|
- case flow_t::ok_to_send:
|
|
|
+ case flowType::ok_to_send:
|
|
|
if (udpTraceLevel > 1)
|
|
|
- DBGLOG("UdpSender: received ok_to_send msg max %d packets from node=%u (length %u)", f.max_data, f.destNodeIndex, res);
|
|
|
+ {
|
|
|
+ StringBuffer s;
|
|
|
+ DBGLOG("UdpSender: received ok_to_send msg max %d packets from node=%s", f.max_data, f.destNode.getTraceText(s).str());
|
|
|
+ }
|
|
|
parent.data->ok_to_send(f);
|
|
|
break;
|
|
|
|
|
|
+ case flowType::request_received:
|
|
|
+ if (udpTraceLevel > 1)
|
|
|
+ {
|
|
|
+ StringBuffer s;
|
|
|
+ DBGLOG("UdpSender: received request_received msg from node=%s", f.destNode.getTraceText(s).str());
|
|
|
+ }
|
|
|
+ parent.receiversTable[f.destNode.getNodeAddress()].requestAcknowledged();
|
|
|
+ break;
|
|
|
+
|
|
|
default:
|
|
|
DBGLOG("UdpSender: received unknown flow message type=%d", f.cmd);
|
|
|
}
|
|
@@ -600,10 +562,9 @@ class CSendManager : implements ISendManager, public CInterface
|
|
|
simple_queue<UdpPermitToSendMsg> send_queue;
|
|
|
Linked<TokenBucket> bucket;
|
|
|
|
|
|
- void send_sniff(bool busy)
|
|
|
+ void send_sniff(sniffType::sniffCmd busy)
|
|
|
{
|
|
|
- unsigned short castCmd = static_cast<unsigned short>(busy ? flow_t::busy : flow_t::idle);
|
|
|
- sniff_msg msg = {sizeof(sniff_msg), castCmd, static_cast<unsigned short>(parent.myNodeIndex)};
|
|
|
+ sniff_msg msg = { busy, parent.myIP};
|
|
|
try
|
|
|
{
|
|
|
if (!sniffer_socket)
|
|
@@ -664,7 +625,8 @@ class CSendManager : implements ISendManager, public CInterface
|
|
|
return true;
|
|
|
else
|
|
|
{
|
|
|
- DBGLOG("UdpSender: push() failed - ignored ok_to_send msg - index=%u, maxData=%u", msg.destNodeIndex, msg.max_data);
|
|
|
+ StringBuffer s;
|
|
|
+ DBGLOG("UdpSender: push() failed - ignored ok_to_send msg - node=%s, maxData=%u", msg.destNode.getTraceText(s).str(), msg.max_data);
|
|
|
return false;
|
|
|
}
|
|
|
}
|
|
@@ -674,7 +636,7 @@ class CSendManager : implements ISendManager, public CInterface
|
|
|
if (udpTraceLevel > 0)
|
|
|
DBGLOG("UdpSender: send_data started");
|
|
|
#ifdef __linux__
|
|
|
- setLinuxThreadPriority(1); // MORE - windows?
|
|
|
+ setLinuxThreadPriority(1); // MORE - windows? Is this even a good idea? Must not send faster than receiver can pull off the socket
|
|
|
#endif
|
|
|
UdpPermitToSendMsg permit;
|
|
|
while (running)
|
|
@@ -684,19 +646,17 @@ class CSendManager : implements ISendManager, public CInterface
|
|
|
return 0;
|
|
|
|
|
|
if (udpSnifferEnabled)
|
|
|
- send_sniff(true);
|
|
|
- parent.send_flow->clear_to_send_received(permit.destNodeIndex);
|
|
|
- UdpReceiverEntry &receiverInfo = parent.receiversTable[permit.destNodeIndex];
|
|
|
- bool moreRequested;
|
|
|
- unsigned maxPackets;
|
|
|
- unsigned payload = receiverInfo.sendData(permit, (parent.myNodeIndex == permit.destNodeIndex), bucket, moreRequested, maxPackets);
|
|
|
- parent.send_flow->send_done(permit.destNodeIndex, moreRequested);
|
|
|
+ send_sniff(sniffType::busy);
|
|
|
+ UdpReceiverEntry &receiverInfo = parent.receiversTable[permit.destNode.getNodeAddress()];
|
|
|
+ unsigned payload = receiverInfo.sendData(permit, bucket);
|
|
|
if (udpSnifferEnabled)
|
|
|
- send_sniff(false);
|
|
|
+ send_sniff(sniffType::idle);
|
|
|
|
|
|
if (udpTraceLevel > 1)
|
|
|
- DBGLOG("UdpSender: sent %u bytes to node=%d", payload, permit.destNodeIndex);
|
|
|
-
|
|
|
+ {
|
|
|
+ StringBuffer s;
|
|
|
+ DBGLOG("UdpSender: sent %u bytes to node=%s", payload, permit.destNode.getTraceText(s).str());
|
|
|
+ }
|
|
|
}
|
|
|
if (udpTraceLevel > 0)
|
|
|
DBGLOG("UdpSender: send_data stopped");
|
|
@@ -704,32 +664,21 @@ class CSendManager : implements ISendManager, public CInterface
|
|
|
}
|
|
|
};
|
|
|
|
|
|
- friend class send_send_flow;
|
|
|
+ friend class send_resend_flow;
|
|
|
friend class send_receive_flow;
|
|
|
friend class send_data;
|
|
|
|
|
|
- unsigned numNodes;
|
|
|
- int receive_flow_port;
|
|
|
- int send_flow_port;
|
|
|
- int data_port;
|
|
|
- unsigned myNodeIndex;
|
|
|
unsigned numQueues;
|
|
|
|
|
|
- UdpReceiverEntry *receiversTable;
|
|
|
- send_send_flow *send_flow;
|
|
|
+ IpMapOf<UdpReceiverEntry> receiversTable;
|
|
|
+ send_resend_flow *resend_flow;
|
|
|
send_receive_flow *receive_flow;
|
|
|
send_data *data;
|
|
|
Linked<TokenBucket> bucket;
|
|
|
+ IpAddress myIP;
|
|
|
|
|
|
std::atomic<unsigned> msgSeq{0};
|
|
|
|
|
|
- static bool comparePacket(void *pkData, void *key)
|
|
|
- {
|
|
|
- UdpPacketHeader *dataHdr = (UdpPacketHeader*) ((DataBuffer*)pkData)->data;
|
|
|
- UdpPacketHeader *keyHdr = (UdpPacketHeader*) key;
|
|
|
- return ( (dataHdr->ruid == keyHdr->ruid) && (dataHdr->msgId == keyHdr->msgId) );
|
|
|
- }
|
|
|
-
|
|
|
inline unsigned getNextMessageSequence()
|
|
|
{
|
|
|
unsigned res;
|
|
@@ -743,90 +692,67 @@ class CSendManager : implements ISendManager, public CInterface
|
|
|
public:
|
|
|
IMPLEMENT_IINTERFACE;
|
|
|
|
|
|
- CSendManager(int server_flow_port, int d_port, int client_flow_port, int sniffer_port, const IpAddress &sniffer_multicast_ip, int q_size, int _numQueues, unsigned _myNodeIndex, TokenBucket *_bucket)
|
|
|
- : bucket(_bucket)
|
|
|
+ CSendManager(int server_flow_port, int data_port, int client_flow_port, int sniffer_port, const IpAddress &sniffer_multicast_ip, int q_size, int _numQueues, const IpAddress &_myIP, TokenBucket *_bucket)
|
|
|
+ : bucket(_bucket),
|
|
|
+ myIP(_myIP),
|
|
|
+ receiversTable([_myIP, _numQueues, q_size, server_flow_port, data_port](const IpAddress &ip) { return new UdpReceiverEntry(ip, _myIP, _numQueues, q_size, server_flow_port, data_port);})
|
|
|
{
|
|
|
#ifndef _WIN32
|
|
|
setpriority(PRIO_PROCESS, 0, -3);
|
|
|
#endif
|
|
|
- numNodes = getNumNodes();
|
|
|
- receive_flow_port = client_flow_port;
|
|
|
- send_flow_port = server_flow_port;
|
|
|
- data_port = d_port;
|
|
|
- myNodeIndex = _myNodeIndex;
|
|
|
numQueues = _numQueues;
|
|
|
- receiversTable = new UdpReceiverEntry[numNodes];
|
|
|
- for (unsigned i = 0; i < numNodes; i++)
|
|
|
- receiversTable[i].init(i, numQueues, q_size, send_flow_port, data_port, i==myNodeIndex);
|
|
|
-
|
|
|
data = new send_data(*this, sniffer_port, sniffer_multicast_ip, bucket);
|
|
|
- send_flow = new send_send_flow(*this, numNodes);
|
|
|
+ resend_flow = new send_resend_flow(*this);
|
|
|
receive_flow = new send_receive_flow(*this, client_flow_port);
|
|
|
}
|
|
|
|
|
|
|
|
|
~CSendManager()
|
|
|
{
|
|
|
- delete []receiversTable;
|
|
|
- delete send_flow;
|
|
|
+ delete resend_flow;
|
|
|
delete receive_flow;
|
|
|
delete data;
|
|
|
}
|
|
|
|
|
|
- void writeOwn(unsigned destNodeIndex, DataBuffer *buffer, unsigned len, unsigned queue)
|
|
|
+ // Interface ISendManager
|
|
|
+
|
|
|
+ virtual void writeOwn(IUdpReceiverEntry &receiver, DataBuffer *buffer, unsigned len, unsigned queue) override
|
|
|
{
|
|
|
// NOTE: takes ownership of the DataBuffer
|
|
|
assert(queue < numQueues);
|
|
|
- assert(destNodeIndex < numNodes);
|
|
|
- receiversTable[destNodeIndex].pushData(queue, buffer);
|
|
|
- send_flow->data_added(destNodeIndex);
|
|
|
+ static_cast<UdpReceiverEntry &>(receiver).pushData(queue, buffer);
|
|
|
}
|
|
|
|
|
|
- inline void sendRequest(unsigned destIndex, flow_t::flowmsg_t cmd)
|
|
|
- {
|
|
|
- receiversTable[destIndex].sendRequest(myNodeIndex, cmd);
|
|
|
- }
|
|
|
-
|
|
|
- bool dataQueued(unsigned destIndex)
|
|
|
- {
|
|
|
- return receiversTable[destIndex].dataQueued();
|
|
|
- }
|
|
|
-
|
|
|
- bool abortData(unsigned destIndex)
|
|
|
- {
|
|
|
- return receiversTable[destIndex].removeData(NULL, NULL);
|
|
|
- }
|
|
|
-
|
|
|
- // Interface ISendManager
|
|
|
-
|
|
|
- virtual IMessagePacker *createMessagePacker(ruid_t ruid, unsigned sequence, const void *messageHeader, unsigned headerSize, unsigned destNodeIndex, int queue)
|
|
|
+ virtual IMessagePacker *createMessagePacker(ruid_t ruid, unsigned sequence, const void *messageHeader, unsigned headerSize, const ServerIdentifier &destNode, int queue) override
|
|
|
{
|
|
|
- if (destNodeIndex >= numNodes)
|
|
|
- throw MakeStringException(ROXIE_UDP_ERROR, "createMessagePacker: invalid destination node index %i", destNodeIndex);
|
|
|
- return ::createMessagePacker(ruid, sequence, messageHeader, headerSize, *this, destNodeIndex, myNodeIndex, getNextMessageSequence(), queue);
|
|
|
+ const IpAddress &dest = destNode.getNodeAddress();
|
|
|
+ return ::createMessagePacker(ruid, sequence, messageHeader, headerSize, *this, receiversTable[dest], myIP, getNextMessageSequence(), queue);
|
|
|
}
|
|
|
|
|
|
- virtual bool dataQueued(ruid_t ruid, unsigned msgId, unsigned destIndex)
|
|
|
+ virtual bool dataQueued(ruid_t ruid, unsigned msgId, const ServerIdentifier &destNode) override
|
|
|
{
|
|
|
- UdpPacketHeader pkHdr;
|
|
|
+ const IpAddress &dest = destNode.getNodeAddress();
|
|
|
+ UdpPacketHeader pkHdr;
|
|
|
pkHdr.ruid = ruid;
|
|
|
pkHdr.msgId = msgId;
|
|
|
- return receiversTable[destIndex].dataQueued((void*) &pkHdr, &comparePacket);
|
|
|
+ return receiversTable[dest].dataQueued(pkHdr);
|
|
|
}
|
|
|
|
|
|
- virtual bool abortData(ruid_t ruid, unsigned msgId, unsigned destIndex)
|
|
|
+ virtual bool abortData(ruid_t ruid, unsigned msgId, const ServerIdentifier &destNode)
|
|
|
{
|
|
|
+ const IpAddress &dest = destNode.getNodeAddress();
|
|
|
UdpPacketHeader pkHdr;
|
|
|
pkHdr.ruid = ruid;
|
|
|
pkHdr.msgId = msgId;
|
|
|
- return receiversTable[destIndex].removeData((void*) &pkHdr, &comparePacket);
|
|
|
+ return receiversTable[dest].removeData((void*) &pkHdr, &UdpReceiverEntry::comparePacket);
|
|
|
}
|
|
|
|
|
|
virtual bool allDone()
|
|
|
{
|
|
|
- for (unsigned i = 0; i < numNodes; i++)
|
|
|
+ // Used for some timing tests only
|
|
|
+ for (auto&& dest: receiversTable)
|
|
|
{
|
|
|
- if (receiversTable[i].dataQueued())
|
|
|
+ if (dest.packetsQueued.load(std::memory_order_relaxed))
|
|
|
return false;
|
|
|
}
|
|
|
return true;
|
|
@@ -834,15 +760,16 @@ public:
|
|
|
|
|
|
};
|
|
|
|
|
|
-ISendManager *createSendManager(int server_flow_port, int data_port, int client_flow_port, int sniffer_port, const IpAddress &sniffer_multicast_ip, int queue_size_pr_server, int queues_pr_server, TokenBucket *rateLimiter, unsigned myNodeIndex)
|
|
|
+ISendManager *createSendManager(int server_flow_port, int data_port, int client_flow_port, int sniffer_port, const IpAddress &sniffer_multicast_ip, int queue_size_pr_server, int queues_pr_server, TokenBucket *rateLimiter)
|
|
|
{
|
|
|
- return new CSendManager(server_flow_port, data_port, client_flow_port, sniffer_port, sniffer_multicast_ip, queue_size_pr_server, queues_pr_server, myNodeIndex, rateLimiter);
|
|
|
+ assertex(!myNode.getNodeAddress().isNull());
|
|
|
+ return new CSendManager(server_flow_port, data_port, client_flow_port, sniffer_port, sniffer_multicast_ip, queue_size_pr_server, queues_pr_server, myNode.getNodeAddress(), rateLimiter);
|
|
|
}
|
|
|
|
|
|
class CMessagePacker : implements IMessagePacker, public CInterface
|
|
|
{
|
|
|
ISendManager &parent;
|
|
|
- unsigned destNodeIndex;
|
|
|
+ IUdpReceiverEntry &receiver;
|
|
|
UdpPacketHeader package_header;
|
|
|
DataBuffer *part_buffer;
|
|
|
unsigned data_buffer_size;
|
|
@@ -858,20 +785,18 @@ class CMessagePacker : implements IMessagePacker, public CInterface
|
|
|
public:
|
|
|
IMPLEMENT_IINTERFACE;
|
|
|
|
|
|
- CMessagePacker(ruid_t ruid, unsigned msgId, const void *messageHeader, unsigned headerSize, ISendManager &_parent, unsigned _destNode, unsigned _sourceNode, unsigned _msgSeq, unsigned _queue)
|
|
|
- : parent(_parent)
|
|
|
+ CMessagePacker(ruid_t ruid, unsigned msgId, const void *messageHeader, unsigned headerSize, ISendManager &_parent, IUdpReceiverEntry &_receiver, const IpAddress & _sourceNode, unsigned _msgSeq, unsigned _queue)
|
|
|
+ : parent(_parent), receiver(_receiver)
|
|
|
{
|
|
|
queue_number = _queue;
|
|
|
- destNodeIndex = _destNode;
|
|
|
|
|
|
package_header.length = 0; // filled in with proper value later
|
|
|
package_header.metalength = 0;
|
|
|
package_header.ruid = ruid;
|
|
|
package_header.msgId = msgId;
|
|
|
package_header.pktSeq = 0;
|
|
|
- package_header.nodeIndex = _sourceNode;
|
|
|
+ package_header.node.setIp(_sourceNode);
|
|
|
package_header.msgSeq = _msgSeq;
|
|
|
- package_header.udpSequence = 0; // these are allocated when transmitted
|
|
|
|
|
|
packed_request = false;
|
|
|
part_buffer = bufferManager->allocate();
|
|
@@ -884,10 +809,6 @@ public:
|
|
|
mem_buffer_size = 0;
|
|
|
last_message_done = false;
|
|
|
totalSize = 0;
|
|
|
-
|
|
|
- if (udpTraceLevel >= 40)
|
|
|
- DBGLOG("UdpSender: CMessagePacker::CMessagePacker - ruid=" RUIDF " id=0x%.8x mseq=%u node=%u queue=%d", ruid, msgId, _msgSeq, destNodeIndex, _queue);
|
|
|
-
|
|
|
}
|
|
|
|
|
|
~CMessagePacker()
|
|
@@ -895,15 +816,9 @@ public:
|
|
|
if (part_buffer)
|
|
|
part_buffer->Release();
|
|
|
if (mem_buffer) free (mem_buffer);
|
|
|
-
|
|
|
- if (udpTraceLevel >= 40)
|
|
|
- {
|
|
|
- DBGLOG("UdpSender: CMessagePacker::~CMessagePacker - ruid=" RUIDF " id=0x%.8x mseq=%u pktSeq=%x node=%u",
|
|
|
- package_header.ruid, package_header.msgId, package_header.msgSeq, package_header.pktSeq, destNodeIndex);
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
- virtual void *getBuffer(unsigned len, bool variable)
|
|
|
+ virtual void *getBuffer(unsigned len, bool variable) override
|
|
|
{
|
|
|
if (variable)
|
|
|
len += sizeof(RecordLengthType);
|
|
@@ -938,7 +853,7 @@ public:
|
|
|
return &part_buffer->data[data_used + sizeof(UdpPacketHeader)];
|
|
|
}
|
|
|
|
|
|
- virtual void putBuffer(const void *buf, unsigned len, bool variable)
|
|
|
+ virtual void putBuffer(const void *buf, unsigned len, bool variable) override
|
|
|
{
|
|
|
if (variable)
|
|
|
{
|
|
@@ -976,11 +891,19 @@ public:
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- virtual void sendMetaInfo(const void *buf, unsigned len) {
|
|
|
+ virtual void sendMetaInfo(const void *buf, unsigned len) override {
|
|
|
metaInfo.append(len, buf);
|
|
|
}
|
|
|
|
|
|
- virtual void flush(bool last_msg = false)
|
|
|
+ virtual void flush() override { flush(true); }
|
|
|
+
|
|
|
+ virtual unsigned size() const override
|
|
|
+ {
|
|
|
+ return totalSize;
|
|
|
+ }
|
|
|
+private:
|
|
|
+
|
|
|
+ void flush(bool last_msg)
|
|
|
{
|
|
|
if (!last_message_done && last_msg)
|
|
|
{
|
|
@@ -1022,32 +945,15 @@ public:
|
|
|
package_header.length = datalength + metalength + sizeof(UdpPacketHeader);
|
|
|
package_header.metalength = metalength;
|
|
|
memcpy(dataBuff->data, &package_header, sizeof(package_header));
|
|
|
- parent.writeOwn(destNodeIndex, dataBuff, package_header.length, queue_number);
|
|
|
-
|
|
|
- if (udpTraceLevel >= 50)
|
|
|
- {
|
|
|
- if (package_header.length==991)
|
|
|
- DBGLOG("NEarly");
|
|
|
- DBGLOG("UdpSender: CMessagePacker::put_package Qed packet - ruid=" RUIDF " id=0x%.8X mseq=%u pkseq=0x%.8X len=%u node=%u queue=%d",
|
|
|
- package_header.ruid, package_header.msgId, package_header.msgSeq,
|
|
|
- package_header.pktSeq, package_header.length, destNodeIndex, queue_number);
|
|
|
- }
|
|
|
+ parent.writeOwn(receiver, dataBuff, package_header.length, queue_number);
|
|
|
package_header.pktSeq++;
|
|
|
}
|
|
|
|
|
|
- virtual bool dataQueued()
|
|
|
- {
|
|
|
- return(parent.dataQueued(package_header.ruid, package_header.msgId, destNodeIndex));
|
|
|
- }
|
|
|
|
|
|
- virtual unsigned size() const
|
|
|
- {
|
|
|
- return totalSize;
|
|
|
- }
|
|
|
};
|
|
|
|
|
|
|
|
|
-IMessagePacker *createMessagePacker(ruid_t ruid, unsigned msgId, const void *messageHeader, unsigned headerSize, ISendManager &_parent, unsigned _destNode, unsigned _sourceNode, unsigned _msgSeq, unsigned _queue)
|
|
|
+extern UDPLIB_API IMessagePacker *createMessagePacker(ruid_t ruid, unsigned msgId, const void *messageHeader, unsigned headerSize, ISendManager &_parent, IUdpReceiverEntry &_receiver, const IpAddress & _sourceNode, unsigned _msgSeq, unsigned _queue)
|
|
|
{
|
|
|
- return new CMessagePacker(ruid, msgId, messageHeader, headerSize, _parent, _destNode, _sourceNode, _msgSeq, _queue);
|
|
|
+ return new CMessagePacker(ruid, msgId, messageHeader, headerSize, _parent, _receiver, _sourceNode, _msgSeq, _queue);
|
|
|
}
|