|
@@ -38,7 +38,6 @@ unsigned udpOutQsPriority = 0;
|
|
|
unsigned udpMaxRetryTimedoutReqs = 0; // 0 means off (keep retrying forever)
|
|
|
unsigned udpRequestToSendTimeout = 0; // value in milliseconds - 0 means calculate from query timeouts
|
|
|
unsigned udpRequestToSendAckTimeout = 10; // value in milliseconds
|
|
|
-bool udpSnifferEnabled = false;
|
|
|
|
|
|
#ifdef _DEBUG
|
|
|
//#define TEST_DROPPED_PACKETS
|
|
@@ -840,53 +839,13 @@ class CSendManager : implements ISendManager, public CInterface
|
|
|
class send_data : public StartedThread
|
|
|
{
|
|
|
CSendManager &parent;
|
|
|
- ISocket *sniffer_socket;
|
|
|
- SocketEndpoint ep;
|
|
|
simple_queue<UdpPermitToSendMsg> send_queue;
|
|
|
Linked<TokenBucket> bucket;
|
|
|
|
|
|
- void send_sniff(sniffType::sniffCmd busy)
|
|
|
- {
|
|
|
- sniff_msg msg = { busy, parent.myIP};
|
|
|
- try
|
|
|
- {
|
|
|
- if (!sniffer_socket)
|
|
|
- {
|
|
|
- sniffer_socket = ISocket::multicast_connect(ep, multicastTTL);
|
|
|
- if (udpTraceLevel > 2)
|
|
|
- {
|
|
|
- StringBuffer url;
|
|
|
- DBGLOG("UdpSender: multicast_connect ok to %s", ep.getUrlStr(url).str());
|
|
|
- }
|
|
|
- }
|
|
|
- sniffer_socket->write(&msg, sizeof(msg));
|
|
|
- if (udpTraceLevel > 2)
|
|
|
- DBGLOG("UdpSender: sent busy=%d multicast msg", busy);
|
|
|
- }
|
|
|
- catch(IException *e)
|
|
|
- {
|
|
|
- StringBuffer s;
|
|
|
- StringBuffer url;
|
|
|
- DBGLOG("UdpSender: multicast_connect or write failed ep=%s - %s", ep.getUrlStr(url).str(), e->errorMessage(s).str());
|
|
|
- e->Release();
|
|
|
- }
|
|
|
- catch(...)
|
|
|
- {
|
|
|
- StringBuffer url;
|
|
|
- DBGLOG("UdpSender: multicast_connect or write unknown exception - ep=%s", ep.getUrlStr(url).str());
|
|
|
- if (sniffer_socket)
|
|
|
- {
|
|
|
- sniffer_socket->Release();
|
|
|
- sniffer_socket = NULL;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
public:
|
|
|
- send_data(CSendManager &_parent, int s_port, const IpAddress &snif_ip, TokenBucket *_bucket)
|
|
|
- : StartedThread("UdpLib::send_data"), parent(_parent), bucket(_bucket), ep(s_port, snif_ip), send_queue(100) // MORE - send q size should be configurable and/or related to size of cluster?
|
|
|
+ send_data(CSendManager &_parent, TokenBucket *_bucket)
|
|
|
+ : StartedThread("UdpLib::send_data"), parent(_parent), bucket(_bucket), send_queue(100) // MORE - send q size should be configurable and/or related to size of cluster?
|
|
|
{
|
|
|
- sniffer_socket = NULL;
|
|
|
if (check_max_socket_write_buffer(udpLocalWriteSocketSize) < 0)
|
|
|
throw MakeStringException(ROXIE_UDP_ERROR, "System Socket max write buffer is less than %i", udpLocalWriteSocketSize);
|
|
|
start();
|
|
@@ -898,8 +857,6 @@ class CSendManager : implements ISendManager, public CInterface
|
|
|
UdpPermitToSendMsg dummy = {};
|
|
|
send_queue.push(dummy);
|
|
|
join();
|
|
|
- if (sniffer_socket)
|
|
|
- sniffer_socket->Release();
|
|
|
}
|
|
|
|
|
|
bool ok_to_send(const UdpPermitToSendMsg &msg)
|
|
@@ -928,12 +885,8 @@ class CSendManager : implements ISendManager, public CInterface
|
|
|
if (!running)
|
|
|
return 0;
|
|
|
|
|
|
- if (udpSnifferEnabled)
|
|
|
- send_sniff(sniffType::busy);
|
|
|
UdpReceiverEntry &receiverInfo = parent.receiversTable[permit.destNode];
|
|
|
unsigned payload = receiverInfo.sendData(permit, bucket);
|
|
|
- if (udpSnifferEnabled)
|
|
|
- send_sniff(sniffType::idle);
|
|
|
|
|
|
if (udpTraceLevel > 2)
|
|
|
{
|
|
@@ -976,7 +929,7 @@ class CSendManager : implements ISendManager, public CInterface
|
|
|
public:
|
|
|
IMPLEMENT_IINTERFACE;
|
|
|
|
|
|
- CSendManager(int server_flow_port, int data_port, int client_flow_port, int sniffer_port, const IpAddress &sniffer_multicast_ip, int q_size, int _numQueues, const IpAddress &_myIP, TokenBucket *_bucket, bool _encrypted)
|
|
|
+ CSendManager(int server_flow_port, int data_port, int client_flow_port, int q_size, int _numQueues, const IpAddress &_myIP, TokenBucket *_bucket, bool _encrypted)
|
|
|
: bucket(_bucket),
|
|
|
myIP(_myIP),
|
|
|
receiversTable([_numQueues, q_size, server_flow_port, data_port, _encrypted](const ServerIdentifier ip) { return new UdpReceiverEntry(ip.getIpAddress(), _numQueues, q_size, server_flow_port, data_port, _encrypted);}),
|
|
@@ -986,7 +939,7 @@ public:
|
|
|
setpriority(PRIO_PROCESS, 0, -3);
|
|
|
#endif
|
|
|
numQueues = _numQueues;
|
|
|
- data = new send_data(*this, sniffer_port, sniffer_multicast_ip, bucket);
|
|
|
+ data = new send_data(*this, bucket);
|
|
|
resend_flow = new send_resend_flow(*this);
|
|
|
receive_flow = new send_receive_flow(*this, client_flow_port);
|
|
|
}
|
|
@@ -1043,10 +996,10 @@ public:
|
|
|
|
|
|
};
|
|
|
|
|
|
-ISendManager *createSendManager(int server_flow_port, int data_port, int client_flow_port, int sniffer_port, const IpAddress &sniffer_multicast_ip, int queue_size_pr_server, int queues_pr_server, TokenBucket *rateLimiter, bool encryptionInTransit)
|
|
|
+ISendManager *createSendManager(int server_flow_port, int data_port, int client_flow_port, int queue_size_pr_server, int queues_pr_server, TokenBucket *rateLimiter, bool encryptionInTransit)
|
|
|
{
|
|
|
assertex(!myNode.getIpAddress().isNull());
|
|
|
- return new CSendManager(server_flow_port, data_port, client_flow_port, sniffer_port, sniffer_multicast_ip, queue_size_pr_server, queues_pr_server, myNode.getIpAddress(), rateLimiter, encryptionInTransit);
|
|
|
+ return new CSendManager(server_flow_port, data_port, client_flow_port, queue_size_pr_server, queues_pr_server, myNode.getIpAddress(), rateLimiter, encryptionInTransit);
|
|
|
}
|
|
|
|
|
|
class CMessagePacker : implements IMessagePacker, public CInterface
|