|
@@ -54,6 +54,9 @@ static unsigned lastFlowPermitsSent = 0;
|
|
|
static unsigned lastFlowRequestsReceived = 0;
|
|
|
static unsigned lastDataPacketsReceived = 0;
|
|
|
|
|
|
+// The code that redirects flow messages from data socket to flow socket relies on the assumption tested here
|
|
|
+static_assert(sizeof(UdpRequestToSendMsg) < sizeof(UdpPacketHeader), "Expected UDP rts size to be less than packet header");
|
|
|
+
|
|
|
class CReceiveManager : implements IReceiveManager, public CInterface
|
|
|
{
|
|
|
/*
|
|
@@ -573,7 +576,8 @@ class CReceiveManager : implements IReceiveManager, public CInterface
|
|
|
class receive_data : public Thread
|
|
|
{
|
|
|
CReceiveManager &parent;
|
|
|
- ISocket *receive_socket;
|
|
|
+ ISocket *receive_socket = nullptr;
|
|
|
+ ISocket *selfFlowSocket = nullptr;
|
|
|
std::atomic<bool> running = { false };
|
|
|
Semaphore started;
|
|
|
|
|
@@ -585,6 +589,7 @@ class CReceiveManager : implements IReceiveManager, public CInterface
|
|
|
if (check_max_socket_read_buffer(ip_buffer) < 0)
|
|
|
throw MakeStringException(ROXIE_UDP_ERROR, "System socket max read buffer is less than %u", ip_buffer);
|
|
|
receive_socket = ISocket::udp_create(parent.data_port);
|
|
|
+ selfFlowSocket = ISocket::udp_connect(SocketEndpoint(parent.receive_flow_port, myNode.getIpAddress()));
|
|
|
receive_socket->set_receive_buffer_size(ip_buffer);
|
|
|
size32_t actualSize = receive_socket->get_receive_buffer_size();
|
|
|
DBGLOG("UdpReceiver: rcv_data_socket created port=%d requested sockbuffsize=%d actual sockbuffsize=%d", parent.data_port, ip_buffer, actualSize);
|
|
@@ -603,8 +608,11 @@ class CReceiveManager : implements IReceiveManager, public CInterface
|
|
|
running = false;
|
|
|
if (receive_socket)
|
|
|
receive_socket->close();
|
|
|
+ if (selfFlowSocket)
|
|
|
+ selfFlowSocket->close();
|
|
|
join();
|
|
|
::Release(receive_socket);
|
|
|
+ ::Release(selfFlowSocket);
|
|
|
}
|
|
|
|
|
|
virtual int run()
|
|
@@ -625,7 +633,13 @@ class CReceiveManager : implements IReceiveManager, public CInterface
|
|
|
{
|
|
|
unsigned int res;
|
|
|
b = bufferManager->allocate();
|
|
|
- receive_socket->read(b->data, 1, DATA_PAYLOAD, res, 5);
|
|
|
+ while (true)
|
|
|
+ {
|
|
|
+ receive_socket->read(b->data, 1, DATA_PAYLOAD, res, 5);
|
|
|
+ if (res!=sizeof(UdpRequestToSendMsg))
|
|
|
+ break;
|
|
|
+ selfFlowSocket->write(b->data, res);
|
|
|
+ }
|
|
|
dataPacketsReceived++;
|
|
|
UdpPacketHeader &hdr = *(UdpPacketHeader *) b->data;
|
|
|
assert(hdr.length == res && hdr.length > sizeof(hdr));
|
|
@@ -637,8 +651,7 @@ class CReceiveManager : implements IReceiveManager, public CInterface
|
|
|
StringBuffer s;
|
|
|
DBGLOG("UdpReceiver: discarding unwanted resent packet %" SEQF "u %x from %s", hdr.sendSeq, hdr.pktSeq, hdr.node.getTraceText(s).str());
|
|
|
}
|
|
|
- parent.noteDuplicate(b);
|
|
|
- ::Release(b);
|
|
|
+ hdr.node.clear(); // Used to indicate a duplicate that collate thread should discard. We don't discard on this thread as don't want to do anything that requires locks...
|
|
|
}
|
|
|
else
|
|
|
{
|
|
@@ -647,8 +660,8 @@ class CReceiveManager : implements IReceiveManager, public CInterface
|
|
|
StringBuffer s;
|
|
|
DBGLOG("UdpReceiver: %u bytes received packet %" SEQF "u %x from %s", res, hdr.sendSeq, hdr.pktSeq, hdr.node.getTraceText(s).str());
|
|
|
}
|
|
|
- parent.input_queue->pushOwn(b);
|
|
|
}
|
|
|
+ parent.input_queue->pushOwn(b);
|
|
|
b = NULL;
|
|
|
}
|
|
|
catch (IException *e)
|
|
@@ -790,29 +803,6 @@ class CReceiveManager : implements IReceiveManager, public CInterface
|
|
|
collatePacket(dataBuff);
|
|
|
}
|
|
|
}
|
|
|
- void noteDuplicate(DataBuffer *dataBuff)
|
|
|
- {
|
|
|
- const UdpPacketHeader *pktHdr = (UdpPacketHeader*) dataBuff->data;
|
|
|
- Linked <CMessageCollator> msgColl;
|
|
|
- SpinBlock b(collatorsLock);
|
|
|
- try
|
|
|
- {
|
|
|
- msgColl.set(collators[pktHdr->ruid]);
|
|
|
- }
|
|
|
- catch (IException *E)
|
|
|
- {
|
|
|
- EXCLOG(E);
|
|
|
- E->Release();
|
|
|
- }
|
|
|
- catch (...)
|
|
|
- {
|
|
|
- IException *E = MakeStringException(ROXIE_INTERNAL_ERROR, "Unexpected exception caught in CPacketCollator::run");
|
|
|
- EXCLOG(E);
|
|
|
- E->Release();
|
|
|
- }
|
|
|
- if (msgColl)
|
|
|
- msgColl->noteDuplicate((pktHdr->pktSeq & UDP_PACKET_RESENT) != 0);
|
|
|
- }
|
|
|
|
|
|
void collatePacket(DataBuffer *dataBuff)
|
|
|
{
|