/*##############################################################################
Copyright (C) 2011 HPCC Systems.
All rights reserved. This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see .
############################################################################## */
#include "udplib.hpp"
#include "udpsha.hpp"
#include "udptrs.hpp"
#include "jsocket.hpp"
#include "jlog.hpp"
#ifdef _WIN32
#include
#else
#include
#include
#include
#endif
#include
#if defined(_DEBUG) && defined(_WIN32) && !defined(USING_MPATROL)
#define new new(_NORMAL_BLOCK, __FILE__, __LINE__)
#endif
unsigned udpOutQsPriority = 0;
unsigned udpMaxRetryTimedoutReqs = 0; // 0 means off (keep retrying forever)
unsigned udpRequestToSendTimeout = 5; // value in sec
bool udpSnifferEnabled = true;
#ifdef _DEBUG
//#define _SIMULATE_LOST_PACKETS
#endif
using roxiemem::DataBuffer;
// MORE - why use DataBuffers on output side?? We could use zeroCopy techniques if we had a dedicated memory area.
class UdpReceiverEntry
{
queue_t *output_queue;
bool initialized;
// Circular buffer of packets available for resending
unsigned retryDataCount;
unsigned retryDataIdx;
unsigned maxRetryData;
DataBuffer **retryData;
public:
ISocket *send_flow_socket;
ISocket *data_socket;
unsigned numQueues;
int current_q;
int currentQNumPkts; // Current Queue Number of Consecutive Processed Packets.
int *maxPktsPerQ; // to minimise power function re-calc for evey packet
SpinLock lock;
unsigned maxUdpSequence;
unsigned nextUdpSequence()
{
SpinBlock b(lock); // MORE - is this needed?
unsigned ret = ++maxUdpSequence;
if (ret & UDP_SEQUENCE_BITS) // overflowed
ret = 1;
return ret;
}
// MORE - consider where we need critsecs in here!
void sendRequest(unsigned myNodeIndex, flow_t::flowmsg_t cmd)
{
unsigned minUdpSequence;
SpinBlock b(lock);
{
if (retryDataCount)
minUdpSequence = ((UdpPacketHeader *) retryData[retryDataIdx]->data)->udpSequence;
else
minUdpSequence = maxUdpSequence;
}
UdpRequestToSendMsg msg = {sizeof(UdpRequestToSendMsg), cmd, myNodeIndex, 0, minUdpSequence, maxUdpSequence};
try
{
send_flow_socket->write(&msg, msg.length);
}
catch(IException *e)
{
StringBuffer s;
DBGLOG("UdpSender: sendRequest write failed - %s", e->errorMessage(s).toCharArray());
e->Release();
}
catch (...)
{
DBGLOG("UdpSender: sendRequest write failed - unknown error");
}
}
unsigned cleanRetryData(const UdpPermitToSendMsg &permit, PointerArray &retries)
{
// Any saved packets < lastReceived that are not listed as missing can be deleted
SpinBlock b(lock);
unsigned totalData = 0;
if (checkTraceLevel(TRACE_RETRY_DATA, 3))
{
unsigned minUdpSequence;
if (retryDataCount)
minUdpSequence = ((UdpPacketHeader *) retryData[retryDataIdx]->data)->udpSequence;
else
minUdpSequence = maxUdpSequence;
StringBuffer permitStr;
permit.toString(permitStr);
DBGLOG("UdpSender: cleanRetryData (%s), total %u available between %u and %u", permitStr.str(), retryDataCount, minUdpSequence, maxUdpSequence);
}
unsigned lastReceived = permit.lastSequenceSeen;
unsigned missingIndex = 0;
unsigned missingCount = permit.missingCount;
unsigned i = 0;
if (maxRetryData)
{
while (i < retryDataCount && retries.length() < permit.max_data)
{
unsigned idx = (retryDataIdx + i) % maxRetryData;
DataBuffer *buffer = retryData[idx];
if (buffer)
{
UdpPacketHeader *header = (UdpPacketHeader*) buffer->data;
unsigned thisSequence = header->udpSequence;
if (thisSequence > lastReceived)
break;
if (!missingCount || thisSequence < permit.missingSequences[missingIndex])
{
::Release(buffer);
retryData[idx] = NULL;
if (i)
i++; // MORE - leaves holes - is this smart? Alternatively could close up... Should be rare anyway
else
{
retryDataIdx = (retryDataIdx + 1) % maxRetryData;
retryDataCount--;
}
}
else if (thisSequence == permit.missingSequences[missingIndex])
{
totalData += header->length;
retries.append(buffer);
i++;
missingIndex++;
missingCount--;
}
else
{
missingIndex++;
missingCount--;
}
}
else
{
if (i)
i++;
else
{
// Removing leading nulls
retryDataCount--;
retryDataIdx = (retryDataIdx + 1) % maxRetryData;
}
}
}
}
if (checkTraceLevel(TRACE_RETRY_DATA, 3))
DBGLOG("UdpSender: cleanRetryData found %u to resend total size %u, total %u still available", retries.length(), totalData, retryDataCount);
return totalData;
}
unsigned sendData(const UdpPermitToSendMsg &permit, bool isLocal, TokenBucket *bucket, bool &moreRequested, unsigned &maxPackets)
{
moreRequested = false;
maxPackets = permit.max_data;
PointerArray toSend;
unsigned totalSent = cleanRetryData(permit, toSend);
while (toSend.length() < maxPackets && dataQueued())
{
DataBuffer *buffer = popQueuedData();
if (buffer) // Aborted slave queries leave NULL records on queue
{
UdpPacketHeader *header = (UdpPacketHeader*) buffer->data;
toSend.append(buffer);
totalSent += header->length;
#ifdef __linux__
if (isLocal && (totalSent> 100000))
break;
#endif
}
}
maxPackets = toSend.length();
for (unsigned idx = 0; idx < maxPackets; idx++)
{
DataBuffer *buffer = (DataBuffer *) toSend.item(idx);
UdpPacketHeader *header = (UdpPacketHeader*) buffer->data;
bool isRetry = (header->udpSequence != 0);
if (isRetry)
{
if (checkTraceLevel(TRACE_RETRY_DATA, 1))
DBGLOG("UdpSender: Resending packet to destination node %u sequence %u", permit.destNodeIndex, header->udpSequence);
atomic_inc(&packetsRetried);
}
else
header->udpSequence = nextUdpSequence();
unsigned length = header->length;
if (bucket)
{
MTIME_SECTION(timer, "bucket_wait");
bucket->wait((length / 1024)+1);
}
try
{
if (udpSendCompletedInData)
{
if (idx == maxPackets-1)
{
// MORE - is this safe ? Any other thread looking at the data right now? Don't _think_ so...
if (false && dataQueued()) // Causes some problems because no flow control info gets through at all
{
moreRequested = true;
header->udpSequence |= (UDP_SEQUENCE_COMPLETE|UDP_SEQUENCE_MORE);
}
else
header->udpSequence |= UDP_SEQUENCE_COMPLETE;
}
}
#ifdef _SIMULATE_LOST_PACKETS
if (isRetry || (header->udpSequence % 100) != 0)
#endif
data_socket->write(buffer->data, length);
header->udpSequence &= ~UDP_SEQUENCE_BITS;
}
catch(IException *e)
{
StringBuffer s;
DBGLOG("UdpSender: write exception - write(%p, %u) - %s", buffer->data, length, e->errorMessage(s).str());
e->Release();
}
catch(...)
{
DBGLOG("UdpSender: write exception - unknown exception");
}
if (!isRetry && maxRetryData)
{
unsigned slot = (retryDataIdx + retryDataCount) % maxRetryData;
if (retryDataCount < maxRetryData)
retryDataCount++;
else
{
if (udpTraceLevel > 0)
DBGLOG("Overflow in resend packet buffer for destination node %u - discarding packet sequence %u", permit.destNodeIndex, header->udpSequence);
::Release(retryData[slot]);
}
retryData[slot] = buffer;
}
else
{
::Release(buffer);
}
}
return totalSent;
}
bool dataQueued()
{
for (unsigned i = 0; i < numQueues; i++)
{
if (!output_queue[i].empty())
return true;
}
return false;
}
bool dataQueued(void *key, PKT_CMP_FUN pkCmpFn)
{
for (unsigned i = 0; i < numQueues; i++)
{
if (output_queue[i].dataQueued(key, pkCmpFn))
return true;
}
return false;
}
bool removeData(void *key, PKT_CMP_FUN pkCmpFn)
{
bool anyRemoved = false;
for (unsigned i = 0; i < numQueues; i++)
{
if (output_queue[i].removeData(key, pkCmpFn))
anyRemoved = true;
}
return anyRemoved;
}
inline void pushData(unsigned queue, DataBuffer *buffer)
{
output_queue[queue].pushOwn(buffer);
}
DataBuffer *popQueuedData()
{
DataBuffer *buffer;
while (1)
{
for (unsigned i = 0; i < numQueues; i++)
{
if (udpOutQsPriority)
{
if (output_queue[current_q].empty())
{
if (udpTraceLevel >= 5)
DBGLOG("UdpSender: ---------- Empty Q %d", current_q);
currentQNumPkts = 0;
current_q = (current_q + 1) % numQueues;
}
else
{
buffer = output_queue[current_q].pop();
currentQNumPkts++;
if (udpTraceLevel >= 5)
DBGLOG("UdpSender: ---------- Packet from Q %d", current_q);
if (currentQNumPkts >= maxPktsPerQ[current_q])
{
currentQNumPkts = 0;
current_q = (current_q + 1) % numQueues;
}
return buffer;
}
}
else
{
current_q = (current_q + 1) % numQueues;
if (!output_queue[current_q].empty())
{
return output_queue[current_q].pop();
}
}
}
MilliSleep(10);
DBGLOG("UdpSender: ------------- this code should never execute --------------- ");
}
}
UdpReceiverEntry()
{
send_flow_socket = data_socket = NULL;
numQueues = 0;
current_q = 0;
initialized = false;
output_queue = 0;
currentQNumPkts = 0;
maxPktsPerQ = 0;
maxUdpSequence = 0;
retryDataCount = 0;
retryDataIdx = 0;
retryData = NULL;
}
void init(unsigned destNodeIndex, unsigned _numQueues, unsigned queueSize, unsigned _maxRetryData, unsigned sendFlowPort, unsigned dataPort, bool isLocal)
{
assert(!initialized);
maxRetryData = _maxRetryData;
numQueues = _numQueues;
const IpAddress &ip = getNodeAddress(destNodeIndex);
if (!ip.isNull())
{
try
{
SocketEndpoint sendFlowEp(sendFlowPort, ip);
SocketEndpoint dataEp(dataPort, ip);
send_flow_socket = ISocket::udp_connect(sendFlowEp);
data_socket = ISocket::udp_connect(dataEp);
if (isLocal)
{
data_socket->set_send_buffer_size(udpLocalWriteSocketSize);
if (udpTraceLevel > 0)
DBGLOG("UdpSender: sendbuffer set for local socket (size=%d)", udpLocalWriteSocketSize);
}
}
catch(IException *e)
{
StringBuffer error, ipstr;
DBGLOG("UdpSender: udp_connect failed %s %s", ip.getIpText(ipstr).str(), e->errorMessage(error).str());
throw;
}
catch(...)
{
StringBuffer ipstr;
DBGLOG("UdpSender: udp_connect failed %s %s", ip.getIpText(ipstr).str(), "Unknown error");
throw;
}
output_queue = new queue_t[numQueues];
maxPktsPerQ = new int[numQueues];
for (unsigned j = 0; j < numQueues; j++)
{
output_queue[j].set_queue_size(queueSize);
maxPktsPerQ[j] = (int) pow((double)udpOutQsPriority, (double)numQueues - j - 1);
}
if (maxRetryData)
{
retryData = new DataBuffer *[maxRetryData];
memset(retryData, 0, maxRetryData * sizeof(DataBuffer *));
}
initialized = true;
if (udpTraceLevel > 0)
{
StringBuffer ipStr;
DBGLOG("UdpSender: added entry for ip=%s to receivers table at index=%d - send_flow_port=%d", ip.getIpText(ipStr).str(), destNodeIndex, sendFlowPort);
}
}
}
~UdpReceiverEntry()
{
if (retryData) delete[] retryData; // MORE - should I release the values pointed to as well - not a big deal as quitting anyway...
if (send_flow_socket) send_flow_socket->Release();
if (data_socket) data_socket->Release();
if (output_queue) delete [] output_queue;
if (maxPktsPerQ) delete [] maxPktsPerQ;
}
};
class CSendManager : public CInterface, implements ISendManager
{
friend class send_send_flow;
class CMessagePacker : public CInterface, implements IMessagePacker
{
CSendManager &parent;
unsigned destNodeIndex;
UdpPacketHeader package_header;
DataBuffer *part_buffer;
unsigned data_buffer_size;
unsigned data_used;
void *mem_buffer;
unsigned mem_buffer_size;
unsigned totalSize;
bool packed_request;
unsigned requested_size;
MemoryBuffer metaInfo;
bool last_message_done;
bool aborted;
int queue_number;
public:
IMPLEMENT_IINTERFACE;
CMessagePacker(ruid_t ruid, unsigned msgId, const void *messageHeader, unsigned headerSize, CSendManager &_parent, unsigned _destNode, unsigned _sourceNode, unsigned _msgSeq, unsigned _queue)
: parent(_parent)
{
queue_number = _queue;
destNodeIndex = _destNode;
package_header.length = 0; // filled in with proper value later
package_header.metalength = 0;
package_header.ruid = ruid;
package_header.msgId = msgId;
package_header.pktSeq = 0;
package_header.nodeIndex = _sourceNode;
package_header.msgSeq = _msgSeq;
package_header.udpSequence = 0; // these are allocated when transmitted
aborted = false;
packed_request = false;
part_buffer = bufferManager->allocate();
data_buffer_size = DATA_PAYLOAD - sizeof(UdpPacketHeader);
assertex(data_buffer_size >= headerSize + sizeof(unsigned short));
*(unsigned short *) (&part_buffer->data[sizeof(UdpPacketHeader)]) = headerSize;
memcpy(&part_buffer->data[sizeof(UdpPacketHeader)+sizeof(unsigned short)], messageHeader, headerSize);
data_used = headerSize + sizeof(unsigned short);
mem_buffer = 0;
mem_buffer_size = 0;
last_message_done = false;
totalSize = 0;
if (udpTraceLevel >= 40)
DBGLOG("UdpSender: CMessagePacker::CMessagePacker - ruid="RUIDF" id=0x%.8x mseq=%u node=%u queue=%d", ruid, msgId, _msgSeq, destNodeIndex, _queue);
}
~CMessagePacker()
{
if (part_buffer)
part_buffer->Release();
if (mem_buffer) free (mem_buffer);
if (udpTraceLevel >= 40)
{
DBGLOG("UdpSender: CMessagePacker::~CMessagePacker - ruid="RUIDF" id=0x%.8x mseq=%u pktSeq=%x node=%u",
package_header.ruid, package_header.msgId, package_header.msgSeq, package_header.pktSeq, destNodeIndex);
}
}
virtual void abort()
{
aborted = true;
}
virtual void *getBuffer(unsigned len, bool variable)
{
if (variable)
len += sizeof(RecordLengthType);
if (DATA_PAYLOAD - sizeof(UdpPacketHeader) < len)
{
// Won't fit in one, so allocate temp location
if (mem_buffer_size < len)
{
free(mem_buffer);
mem_buffer = malloc(len);
mem_buffer_size = len;
}
packed_request = false;
if (variable)
return ((char *) mem_buffer) + sizeof(RecordLengthType);
else
return mem_buffer;
}
if (part_buffer && ((data_buffer_size - data_used) < len))
flush(false); // Note that we never span records that are small enough to fit - this can result in significant wastage if record just over DATA_PAYLOAD/2
if (!part_buffer)
{
part_buffer = bufferManager->allocate();
data_buffer_size = DATA_PAYLOAD - sizeof(UdpPacketHeader);
}
packed_request = true;
if (variable)
return &part_buffer->data[data_used + sizeof(UdpPacketHeader) + sizeof(RecordLengthType)];
else
return &part_buffer->data[data_used + sizeof(UdpPacketHeader)];
}
virtual void putBuffer(const void *buf, unsigned len, bool variable)
{
if (variable)
{
assertex(len < MAX_RECORD_LENGTH);
buf = ((char *) buf) - sizeof(RecordLengthType);
*(RecordLengthType *) buf = len;
len += sizeof(RecordLengthType);
}
totalSize += len;
if (packed_request)
{
assert(len <= (data_buffer_size - data_used));
data_used += len;
}
else
{
while (len)
{
if (!part_buffer)
{
part_buffer = bufferManager->allocate();
data_buffer_size = DATA_PAYLOAD - sizeof(UdpPacketHeader);
data_used = 0;
}
unsigned chunkLen = data_buffer_size - data_used;
if (chunkLen > len)
chunkLen = len;
memcpy(&part_buffer->data[sizeof(UdpPacketHeader)+data_used], buf, chunkLen);
data_used += chunkLen;
len -= chunkLen;
buf = &(((char*)buf)[chunkLen]);
if (len)
flush(false);
}
}
}
virtual void sendMetaInfo(const void *buf, unsigned len) {
metaInfo.append(len, buf);
}
virtual void flush(bool last_msg = false)
{
assert(!aborted);
if (!last_message_done && last_msg)
{
last_message_done = true;
if (!part_buffer)
part_buffer = bufferManager->allocate();
const char *metaData = metaInfo.toByteArray();
unsigned metaLength = metaInfo.length();
unsigned maxMetaLength = DATA_PAYLOAD - (sizeof(UdpPacketHeader) + data_used);
while (metaLength > maxMetaLength)
{
memcpy(&part_buffer->data[sizeof(UdpPacketHeader)+data_used], metaData, maxMetaLength);
put_package(part_buffer, data_used, maxMetaLength);
metaLength -= maxMetaLength;
metaData += maxMetaLength;
data_used = 0;
maxMetaLength = DATA_PAYLOAD - sizeof(UdpPacketHeader);
part_buffer = bufferManager->allocate();
}
memcpy(&part_buffer->data[sizeof(UdpPacketHeader)+data_used], metaData, metaLength);
package_header.pktSeq |= 0x80000000;
put_package(part_buffer, data_used, metaLength);
}
else if (part_buffer)
{
// Just flush current - used when no room for current row
if (data_used)
put_package(part_buffer, data_used, 0); // buffer released in put_package
else
part_buffer->Release(); // If NO data in buffer, release buffer back to pool
}
part_buffer = 0;
data_buffer_size = 0;
data_used = 0;
}
void put_package(DataBuffer *dataBuff, unsigned datalength, unsigned metalength)
{
package_header.length = datalength + metalength + sizeof(UdpPacketHeader);
package_header.metalength = metalength;
memcpy(dataBuff->data, &package_header, sizeof(package_header));
parent.writeOwn(destNodeIndex, dataBuff, package_header.length, queue_number);
if (udpTraceLevel >= 50)
{
if (package_header.length==991)
DBGLOG("NEarly");
DBGLOG("UdpSender: CMessagePacker::put_package Qed packet - ruid="RUIDF" id=0x%.8X mseq=%u pkseq=0x%.8X len=%u node=%u queue=%d",
package_header.ruid, package_header.msgId, package_header.msgSeq,
package_header.pktSeq, package_header.length, destNodeIndex, queue_number);
}
package_header.pktSeq++;
}
virtual bool dataQueued()
{
return(parent.dataQueued(package_header.ruid, package_header.msgId, destNodeIndex));
}
virtual unsigned size() const
{
return totalSize;
}
};
class StartedThread : public Thread
{
private:
Semaphore started;
virtual int run()
{
started.signal();
return doRun();
}
protected:
bool running;
public:
StartedThread(const char *name) : Thread(name)
{
running = false;
}
~StartedThread()
{
running = false;
join();
}
virtual void start()
{
running = true;
Thread::start();
started.wait();
}
virtual int doRun() = 0;
};
class send_send_flow : public StartedThread
{
/*
I don't like this code much at all
Looping round all every time status of any changes seems like a bad thing especially as scale goes up
Even though these look like a bitmap they are not used as such presently
- as a result, if data_added() is called while state is completed, we lose the request I think
- probably get away with it because of the dataqueued check
doRun() uses state bits without protection
A count of number pending for each might be better than just a flag
Circular buffers to give a list of which ones are in a given state would speed up the processing in the thread?
- normally MANY in pending (and order is interesting)
- normally few in any other state (only 1 if thread keeping up), order not really very interesting
- Want to keep expense on caller threads low (at the moment just set flag and sometimes signal)
- in particular don't lock while processing the chain
- Never need to be in >1 chain
msTick() probably better than time() for detecting timeouts
*/
enum bits { new_request = 1, pending_request = 2, sending_data = 4, completed = 8, completed_more = 16 };
unsigned target_count;
char *state;
unsigned char *timeouts; // Number of consecutive timeouts
time_t *request_time;
CriticalSection cr;
Semaphore sem;
CSendManager &parent;
virtual int doRun()
{
// MORE - this is reading the state values unprotected
// Not sure that this represents any issue in practice...
if (udpTraceLevel > 0)
DBGLOG("UdpSender: send_send_flow started - node=%u", parent.myNodeIndex);
while (running)
{
bool idle = false;
if (sem.wait(1000))
{
if (udpTraceLevel > 4)
DBGLOG("UdpSender: send_send_flow::doRun signal received");
}
else
idle = true;
if (!running) return 0;
time_t now;
time(&now);
// I don't really like this loop. Could keep a circular buffer of ones with non-zero state?
// In a typical heavy load scenario most will be pending
// Really two separate FIFOs - pending and active. Except that stuff pulled off pending in arbitrary order
// Separate lists for each state (don't need one for sending) ?
for (unsigned i = 0; i < target_count; i++)
{
switch (state[i]) // MORE - should really protect it?
{
case completed:
done(i, false);
break;
case completed_more:
done(i, true);
break;
case pending_request:
if ( (now - request_time[i]) < udpRequestToSendTimeout) // MORE - should really protect it?
break;
timeouts[i]++;
EXCLOG(MCoperatorError,"ERROR: UdpSender: timed out %i times (max=%i) waiting ok_to_send msg from node=%d timed out after=%i sec max=%i sec",
timeouts[i], udpMaxRetryTimedoutReqs,
i, (int) (now - request_time[i]), udpRequestToSendTimeout);
// 0 (zero) value of udpMaxRetryTimedoutReqs means NO limit on retries
if (udpMaxRetryTimedoutReqs && (timeouts[i] >= udpMaxRetryTimedoutReqs))
{
abort(i);
break;
}
// fall into...
case new_request:
sendRequest(i);
break;
default:
if (idle && parent.dataQueued(i))
{
EXCLOG(MCoperatorError, "State is idle but data is queued - should not happen (index = %u). Attempting recovery.", i);
data_added(i);
}
}
}
}
return 0;
}
void done(unsigned index, bool moreRequested)
{
bool dataRemaining;
{
CriticalBlock b(cr);
dataRemaining = parent.dataQueued(index);
if (dataRemaining)
{
state[index] = pending_request;
time(&request_time[index]);
}
else
{
state[index] = 0;
timeouts[index] = 0;
}
}
if (udpTraceLevel > 3)
DBGLOG("UdpSender: sending send_completed msg to node=%u, dataRemaining=%d", index, dataRemaining);
if (udpSendCompletedInData)
{
if (dataRemaining)
{
// MORE - we indicate the more to send via a bit already - don't need this unless we never go idle
// though there is a possible race to consider
if (!moreRequested)
parent.sendRequest(index, flow_t::request_to_send);
}
}
else
parent.sendRequest(index, dataRemaining ? flow_t::request_to_send_more : flow_t::send_completed);
}
void sendRequest(unsigned index)
{
if (udpTraceLevel > 3)
DBGLOG("UdpSender: sending request_to_send msg to node=%u", index);
CriticalBlock b(cr);
parent.sendRequest(index, flow_t::request_to_send);
state[index] = pending_request;
time(&request_time[index]);
}
void abort(unsigned index)
{
if (udpTraceLevel > 3)
DBGLOG("UdpSender: abort sending queued data to node=%u", index);
CriticalBlock b(cr);
state[index] = 0;
timeouts[index] = 0;
parent.abortData(index);
}
public:
send_send_flow(CSendManager &_parent, unsigned numNodes)
: StartedThread("UdpLib::send_send_flow"), parent(_parent)
{
target_count = numNodes;
state = new char [target_count];
memset(state, 0, target_count);
timeouts = new unsigned char [target_count];
memset(timeouts, 0, target_count);
request_time = new time_t [target_count];
memset(request_time, 0, sizeof(time_t) * target_count);
start();
}
~send_send_flow()
{
running = false;
sem.signal();
join();
delete [] state;
delete [] timeouts;
delete [] request_time;
}
void clear_to_send_received(unsigned index)
{
CriticalBlock b(cr);
state[index] = sending_data;
}
void send_done(unsigned index, bool moreRequested)
{
CriticalBlock b(cr);
state[index] = moreRequested ? completed_more : completed;
sem.signal();
}
void data_added(unsigned index)
{
CriticalBlock b(cr);
// MORE - this looks wrong. If I add data while sending, may get delayed until next time I have data to send?? Why declare as bitmap if not going to use it?
// Because done() checks to see if any data pending and re-calls data_added, we get away with it
// Using bits sounds more sensible though?
// Actually be careful, since a send may not send ALL the data - you'd still need to call data_added if that happened. Maybe as it is is ok.
if (!state[index]) // MORE - should just test the bit?
{
state[index] = new_request;
if (udpTraceLevel > 3)
DBGLOG("UdpSender: state set to new_request for node=%u", index);
sem.signal();
}
}
};
class send_receive_flow : public StartedThread
{
CSendManager &parent;
int receive_port;
Owned flow_socket;
public:
send_receive_flow(CSendManager &_parent, int r_port) : StartedThread("UdpLib::send_receive_flow"), parent(_parent)
{
receive_port = r_port;
if (check_max_socket_read_buffer(udpFlowSocketsSize) < 0)
{
if (!enableSocketMaxSetting)
throw MakeStringException(ROXIE_UDP_ERROR, "System Socket max read buffer is less than %i", udpFlowSocketsSize);
check_set_max_socket_read_buffer(udpFlowSocketsSize);
}
flow_socket.setown(ISocket::udp_create(receive_port));
flow_socket->set_receive_buffer_size(udpFlowSocketsSize);
size32_t actualSize = flow_socket->get_receive_buffer_size();
DBGLOG("UdpSender: rcv_flow_socket created port=%d sockbuffsize=%d actualsize=%d", receive_port, udpFlowSocketsSize, actualSize);
start();
}
~send_receive_flow()
{
running = false;
if (flow_socket)
flow_socket->close();
join();
}
virtual int doRun()
{
if (udpTraceLevel > 0)
DBGLOG("UdpSender: send_receive_flow started");
#ifdef __linux__
setLinuxThreadPriority(2);
#endif
while(running)
{
UdpPermitToSendMsg f;
while (running)
{
try
{
unsigned int res ;
flow_socket->read(&f, 1, sizeof(f), res, 5);
assertex(res == f.length);
#ifdef CRC_MESSAGES
assertex(f.crc == f.calcCRC());
#endif
switch (f.cmd)
{
case flow_t::ok_to_send:
if (udpTraceLevel > 1)
DBGLOG("UdpSender: received ok_to_send msg max %d packets from node=%u (length %u)", f.max_data, f.destNodeIndex, res);
parent.data->ok_to_send(f);
break;
default:
DBGLOG("UdpSender: received unknown flow message type=%d", f.cmd);
}
}
catch (IException *e)
{
if (running && e->errorCode() != JSOCKERR_timeout_expired)
{
StringBuffer s;
DBGLOG("UdpSender: send_receive_flow::read failed port=%i %s", receive_port, e->errorMessage(s).toCharArray());
}
e->Release();
}
catch (...)
{
if (running)
DBGLOG("UdpSender: send_receive_flow::unknown exception");
MilliSleep(0);
}
}
}
return 0;
}
};
class send_data : public StartedThread
{
CSendManager &parent;
ISocket *sniffer_socket;
SocketEndpoint ep;
simple_queue send_queue;
Linked bucket;
void send_sniff(bool busy)
{
sniff_msg msg = {sizeof(sniff_msg), busy ? flow_t::busy : flow_t::idle, parent.myNodeIndex};
try
{
if (!sniffer_socket)
{
sniffer_socket = ISocket::multicast_connect(ep, 3);
if (udpTraceLevel > 1)
{
StringBuffer url;
DBGLOG("UdpSender: multicast_connect ok to %s", ep.getUrlStr(url).str());
}
}
sniffer_socket->write(&msg, sizeof(msg));
if (udpTraceLevel > 1)
DBGLOG("UdpSender: sent busy=%d multicast msg", busy);
}
catch(IException *e)
{
StringBuffer s;
StringBuffer url;
DBGLOG("UdpSender: multicast_connect or write failed ep=%s - %s", ep.getUrlStr(url).str(), e->errorMessage(s).toCharArray());
e->Release();
}
catch(...)
{
StringBuffer url;
DBGLOG("UdpSender: multicast_connect or write unknown exception - ep=%s", ep.getUrlStr(url).str());
if (sniffer_socket)
{
sniffer_socket->Release();
sniffer_socket = NULL;
}
}
}
public:
send_data(CSendManager &_parent, int s_port, const IpAddress &snif_ip, TokenBucket *_bucket)
: StartedThread("UdpLib::send_data"), parent(_parent), bucket(_bucket), ep(s_port, snif_ip), send_queue(100) // MORE - send q size should be configurable and/or related to size of cluster?
{
sniffer_socket = NULL;
if (check_max_socket_write_buffer(udpLocalWriteSocketSize) < 0)
{
if (!enableSocketMaxSetting)
throw MakeStringException(ROXIE_UDP_ERROR, "System Socket max write buffer is less than %i", udpLocalWriteSocketSize);
check_set_max_socket_write_buffer(udpLocalWriteSocketSize);
}
start();
}
~send_data()
{
running = false;
UdpPermitToSendMsg dummy;
send_queue.push(dummy);
join();
if (sniffer_socket)
sniffer_socket->Release();
}
bool ok_to_send(const UdpPermitToSendMsg &msg)
{
if (send_queue.push(msg, 15))
return true;
else
{
DBGLOG("UdpSender: push() failed - ignored ok_to_send msg - index=%u, maxData=%u", msg.destNodeIndex, msg.max_data);
return false;
}
}
virtual int doRun()
{
if (udpTraceLevel > 0)
DBGLOG("UdpSender: send_data started");
#ifdef __linux__
setLinuxThreadPriority(1); // MORE - windows?
#endif
UdpPermitToSendMsg permit;
while (running)
{
send_queue.pop(permit);
if (!running)
return 0;
if (udpSnifferEnabled)
send_sniff(true);
parent.send_flow->clear_to_send_received(permit.destNodeIndex);
UdpReceiverEntry &receiverInfo = parent.receiversTable[permit.destNodeIndex];
bool moreRequested;
unsigned maxPackets;
unsigned payload = receiverInfo.sendData(permit, (parent.myNodeIndex == permit.destNodeIndex), bucket, moreRequested, maxPackets);
if (udpSendCompletedInData && !maxPackets)
parent.sendRequest(permit.destNodeIndex, flow_t::send_completed);
parent.send_flow->send_done(permit.destNodeIndex, moreRequested);
if (udpSnifferEnabled)
send_sniff(false);
if (udpTraceLevel > 1)
DBGLOG("UdpSender: sent %u bytes to node=%d", payload, permit.destNodeIndex);
}
if (udpTraceLevel > 0)
DBGLOG("UdpSender: send_data stopped");
return 0;
}
};
friend class send_send_flow;
friend class send_receive_flow;
friend class send_data;
unsigned numNodes;
int receive_flow_port;
int send_flow_port;
int data_port;
unsigned myNodeIndex;
unsigned numQueues;
UdpReceiverEntry *receiversTable;
send_send_flow *send_flow;
send_receive_flow *receive_flow;
send_data *data;
Linked bucket;
SpinLock msgSeqLock;
unsigned msgSeq;
static bool comparePacket(void *pkData, void *key)
{
UdpPacketHeader *dataHdr = (UdpPacketHeader*) ((DataBuffer*)pkData)->data;
UdpPacketHeader *keyHdr = (UdpPacketHeader*) key;
return ( (dataHdr->ruid == keyHdr->ruid) && (dataHdr->msgId == keyHdr->msgId) );
}
inline unsigned getNextMessageSequence()
{
SpinBlock b(msgSeqLock);
unsigned res = ++msgSeq;
if (!res)
res = ++msgSeq;
return res;
}
public:
IMPLEMENT_IINTERFACE;
CSendManager(int server_flow_port, int d_port, int client_flow_port, int sniffer_port, const IpAddress &sniffer_multicast_ip, int q_size, int _numQueues, unsigned maxRetryData, unsigned _myNodeIndex, TokenBucket *_bucket)
: bucket(_bucket)
{
#ifndef _WIN32
setpriority(PRIO_PROCESS, 0, -3);
#endif
numNodes = getNumNodes();
receive_flow_port = client_flow_port;
send_flow_port = server_flow_port;
data_port = d_port;
myNodeIndex = _myNodeIndex;
numQueues = _numQueues;
receiversTable = new UdpReceiverEntry[numNodes];
if (maxRetryData > MAX_RESEND_TABLE_SIZE)
maxRetryData = MAX_RESEND_TABLE_SIZE;
for (unsigned i = 0; i < numNodes; i++)
receiversTable[i].init(i, numQueues, q_size, maxRetryData, send_flow_port, data_port, i==myNodeIndex);
data = new send_data(*this, sniffer_port, sniffer_multicast_ip, bucket);
send_flow = new send_send_flow(*this, numNodes);
receive_flow = new send_receive_flow(*this, client_flow_port);
msgSeq = 0;
}
~CSendManager()
{
delete []receiversTable;
delete send_flow;
delete receive_flow;
delete data;
}
void writeOwn(unsigned destNodeIndex, DataBuffer *buffer, unsigned len, unsigned queue)
{
// NOTE: takes ownership of the DataBuffer
assert(queue < numQueues);
assert(destNodeIndex < numNodes);
receiversTable[destNodeIndex].pushData(queue, buffer);
send_flow->data_added(destNodeIndex);
}
inline void sendRequest(unsigned destIndex, flow_t::flowmsg_t cmd)
{
receiversTable[destIndex].sendRequest(myNodeIndex, cmd);
}
bool dataQueued(unsigned destIndex)
{
return receiversTable[destIndex].dataQueued();
}
bool abortData(unsigned destIndex)
{
return receiversTable[destIndex].removeData(NULL, NULL);
}
// Interface ISendManager
virtual IMessagePacker *createMessagePacker(ruid_t ruid, unsigned sequence, const void *messageHeader, unsigned headerSize, unsigned destNodeIndex, int queue)
{
if (destNodeIndex >= numNodes)
throw MakeStringException(ROXIE_UDP_ERROR, "createMesagePacker: invalid destination node index %i", destNodeIndex);
return new CMessagePacker(ruid, sequence, messageHeader, headerSize, *this, destNodeIndex, myNodeIndex, getNextMessageSequence(), queue);
}
virtual bool dataQueued(ruid_t ruid, unsigned msgId, unsigned destIndex)
{
UdpPacketHeader pkHdr;
pkHdr.ruid = ruid;
pkHdr.msgId = msgId;
return receiversTable[destIndex].dataQueued((void*) &pkHdr, &comparePacket);
}
virtual bool abortData(ruid_t ruid, unsigned msgId, unsigned destIndex)
{
UdpPacketHeader pkHdr;
pkHdr.ruid = ruid;
pkHdr.msgId = msgId;
return receiversTable[destIndex].removeData((void*) &pkHdr, &comparePacket);
}
virtual bool allDone()
{
for (unsigned i = 0; i < numNodes; i++)
{
if (receiversTable[i].dataQueued())
return false;
}
return true;
}
};
ISendManager *createSendManager(int server_flow_port, int data_port, int client_flow_port, int sniffer_port, const IpAddress &sniffer_multicast_ip, int queue_size_pr_server, int queues_pr_server, unsigned maxRetryData, TokenBucket *rateLimiter, unsigned myNodeIndex)
{
return new CSendManager(server_flow_port, data_port, client_flow_port, sniffer_port, sniffer_multicast_ip, queue_size_pr_server, queues_pr_server, maxRetryData, myNodeIndex, rateLimiter);
}