1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072 |
- /*##############################################################################
- HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ############################################################################## */
- #include "udplib.hpp"
- #include "udpsha.hpp"
- #include "udptrs.hpp"
- #include "jsocket.hpp"
- #include "jlog.hpp"
- #include "roxie.hpp"
- #ifdef _WIN32
- #include <winsock.h>
- #else
- #include <sys/socket.h>
- #include <sys/time.h>
- #include <sys/resource.h>
- #endif
- #include <math.h>
- #if defined(_DEBUG) && defined(_WIN32) && !defined(USING_MPATROL)
- #define new new(_NORMAL_BLOCK, __FILE__, __LINE__)
- #endif
- unsigned udpOutQsPriority = 0;
- unsigned udpMaxRetryTimedoutReqs = 0; // 0 means off (keep retrying forever)
- unsigned udpRequestToSendTimeout = 0; // value in milliseconds - 0 means calculate from query timeouts
- bool udpSnifferEnabled = true;
- #ifdef _DEBUG
- //#define _SIMULATE_LOST_PACKETS
- #endif
- using roxiemem::DataBuffer;
- // MORE - why use DataBuffers on output side?? We could use zeroCopy techniques if we had a dedicated memory area.
- class UdpReceiverEntry
- {
- queue_t *output_queue;
- bool initialized;
- public:
- ISocket *send_flow_socket;
- ISocket *data_socket;
- unsigned numQueues;
- int current_q;
- int currentQNumPkts; // Current Queue Number of Consecutive Processed Packets.
- int *maxPktsPerQ; // to minimise power function re-calc for evey packet
- // MORE - consider where we need critsecs in here!
- void sendRequest(unsigned myNodeIndex, flow_t::flowmsg_t cmd)
- {
- UdpRequestToSendMsg msg = {sizeof(UdpRequestToSendMsg), static_cast<unsigned short>(cmd), static_cast<unsigned short>(myNodeIndex), 0};
- try
- {
- send_flow_socket->write(&msg, msg.length);
- }
- catch(IException *e)
- {
- StringBuffer s;
- DBGLOG("UdpSender: sendRequest write failed - %s", e->errorMessage(s).str());
- e->Release();
- }
- catch (...)
- {
- DBGLOG("UdpSender: sendRequest write failed - unknown error");
- }
- }
- unsigned sendData(const UdpPermitToSendMsg &permit, bool isLocal, TokenBucket *bucket, bool &moreRequested, unsigned &maxPackets)
- {
- moreRequested = false;
- maxPackets = permit.max_data;
- PointerArray toSend;
- unsigned totalSent = 0;
- while (toSend.length() < maxPackets && dataQueued())
- {
- DataBuffer *buffer = popQueuedData();
- if (buffer) // Aborted slave queries leave NULL records on queue
- {
- UdpPacketHeader *header = (UdpPacketHeader*) buffer->data;
- toSend.append(buffer);
- totalSent += header->length;
- #ifdef __linux__
- if (isLocal && (totalSent> 100000))
- break;
- #endif
- }
- }
- maxPackets = toSend.length();
- for (unsigned idx = 0; idx < maxPackets; idx++)
- {
- DataBuffer *buffer = (DataBuffer *) toSend.item(idx);
- UdpPacketHeader *header = (UdpPacketHeader*) buffer->data;
- unsigned length = header->length;
- if (bucket)
- {
- MTIME_SECTION(queryActiveTimer(), "bucket_wait");
- bucket->wait((length / 1024)+1);
- }
- if (udpSendCompletedInData && idx == maxPackets-1)
- header->pktSeq |= UDP_PACKET_ENDBURST;
- try
- {
- data_socket->write(buffer->data, length);
- }
- catch(IException *e)
- {
- StringBuffer s;
- DBGLOG("UdpSender: write exception - write(%p, %u) - %s", buffer->data, length, e->errorMessage(s).str());
- e->Release();
- }
- catch(...)
- {
- DBGLOG("UdpSender: write exception - unknown exception");
- }
- ::Release(buffer);
- }
- return totalSent;
- }
- bool dataQueued()
- {
- for (unsigned i = 0; i < numQueues; i++)
- {
- if (!output_queue[i].empty())
- return true;
- }
- return false;
- }
- bool dataQueued(void *key, PKT_CMP_FUN pkCmpFn)
- {
- for (unsigned i = 0; i < numQueues; i++)
- {
- if (output_queue[i].dataQueued(key, pkCmpFn))
- return true;
- }
- return false;
- }
- bool removeData(void *key, PKT_CMP_FUN pkCmpFn)
- {
- bool anyRemoved = false;
- for (unsigned i = 0; i < numQueues; i++)
- {
- if (output_queue[i].removeData(key, pkCmpFn))
- anyRemoved = true;
- }
- return anyRemoved;
- }
- inline void pushData(unsigned queue, DataBuffer *buffer)
- {
- output_queue[queue].pushOwn(buffer);
- }
- DataBuffer *popQueuedData()
- {
- DataBuffer *buffer;
- while (1)
- {
- for (unsigned i = 0; i < numQueues; i++)
- {
- if (udpOutQsPriority)
- {
- if (output_queue[current_q].empty())
- {
- if (udpTraceLevel >= 5)
- DBGLOG("UdpSender: ---------- Empty Q %d", current_q);
- currentQNumPkts = 0;
- current_q = (current_q + 1) % numQueues;
- }
- else
- {
- buffer = output_queue[current_q].pop();
- currentQNumPkts++;
- if (udpTraceLevel >= 5)
- DBGLOG("UdpSender: ---------- Packet from Q %d", current_q);
- if (currentQNumPkts >= maxPktsPerQ[current_q])
- {
- currentQNumPkts = 0;
- current_q = (current_q + 1) % numQueues;
- }
- return buffer;
- }
- }
- else
- {
- current_q = (current_q + 1) % numQueues;
- if (!output_queue[current_q].empty())
- {
- return output_queue[current_q].pop();
- }
- }
- }
- MilliSleep(10);
- DBGLOG("UdpSender: ------------- this code should never execute --------------- ");
- }
- }
- UdpReceiverEntry()
- {
- send_flow_socket = data_socket = NULL;
- numQueues = 0;
- current_q = 0;
- initialized = false;
- output_queue = 0;
- currentQNumPkts = 0;
- maxPktsPerQ = 0;
- }
- void init(unsigned destNodeIndex, unsigned _numQueues, unsigned queueSize, unsigned sendFlowPort, unsigned dataPort, bool isLocal)
- {
- assert(!initialized);
- numQueues = _numQueues;
- const IpAddress &ip = getNodeAddress(destNodeIndex);
- if (!ip.isNull())
- {
- try
- {
- SocketEndpoint sendFlowEp(sendFlowPort, ip);
- SocketEndpoint dataEp(dataPort, ip);
- send_flow_socket = ISocket::udp_connect(sendFlowEp);
- data_socket = ISocket::udp_connect(dataEp);
- if (isLocal)
- {
- data_socket->set_send_buffer_size(udpLocalWriteSocketSize);
- if (udpTraceLevel > 0)
- DBGLOG("UdpSender: sendbuffer set for local socket (size=%d)", udpLocalWriteSocketSize);
- }
- }
- catch(IException *e)
- {
- StringBuffer error, ipstr;
- DBGLOG("UdpSender: udp_connect failed %s %s", ip.getIpText(ipstr).str(), e->errorMessage(error).str());
- throw;
- }
- catch(...)
- {
- StringBuffer ipstr;
- DBGLOG("UdpSender: udp_connect failed %s %s", ip.getIpText(ipstr).str(), "Unknown error");
- throw;
- }
- output_queue = new queue_t[numQueues];
- maxPktsPerQ = new int[numQueues];
- for (unsigned j = 0; j < numQueues; j++)
- {
- output_queue[j].set_queue_size(queueSize);
- maxPktsPerQ[j] = (int) pow((double)udpOutQsPriority, (double)numQueues - j - 1);
- }
- initialized = true;
- if (udpTraceLevel > 0)
- {
- StringBuffer ipStr;
- DBGLOG("UdpSender: added entry for ip=%s to receivers table at index=%d - send_flow_port=%d", ip.getIpText(ipStr).str(), destNodeIndex, sendFlowPort);
- }
- }
- }
- ~UdpReceiverEntry()
- {
- if (send_flow_socket) send_flow_socket->Release();
- if (data_socket) data_socket->Release();
- if (output_queue) delete [] output_queue;
- if (maxPktsPerQ) delete [] maxPktsPerQ;
- }
- };
- class CSendManager : implements ISendManager, public CInterface
- {
- friend class send_send_flow;
- class StartedThread : public Thread
- {
- private:
- Semaphore started;
- virtual int run()
- {
- started.signal();
- return doRun();
- }
- protected:
- bool running;
- public:
- StartedThread(const char *name) : Thread(name)
- {
- running = false;
- }
- ~StartedThread()
- {
- running = false;
- join();
- }
- virtual void start()
- {
- running = true;
- Thread::start();
- started.wait();
- }
- virtual int doRun() = 0;
- };
- class send_send_flow : public StartedThread
- {
- /*
- I don't like this code much at all
- Looping round all every time status of any changes seems like a bad thing especially as scale goes up
- Even though these look like a bitmap they are not used as such presently
- - as a result, if data_added() is called while state is completed, we lose the request I think
- - probably get away with it because of the dataqueued check
- doRun() uses state bits without protection
- A count of number pending for each might be better than just a flag
- Circular buffers to give a list of which ones are in a given state would speed up the processing in the thread?
- - normally MANY in pending (and order is interesting)
- - normally few in any other state (only 1 if thread keeping up), order not really very interesting
- - Want to keep expense on caller threads low (at the moment just set flag and sometimes signal)
- - in particular don't lock while processing the chain
- - Never need to be in >1 chain
- msTick() probably better than time() for detecting timeouts
- */
- enum bits { new_request = 1, pending_request = 2, sending_data = 4, completed = 8, completed_more = 16 };
- unsigned target_count;
- char *state;
- unsigned char *timeouts; // Number of consecutive timeouts
- unsigned *request_time;
- CriticalSection cr;
- Semaphore sem;
- CSendManager &parent;
- virtual int doRun()
- {
- // MORE - this is reading the state values unprotected
- // Not sure that this represents any issue in practice...
- if (udpTraceLevel > 0)
- DBGLOG("UdpSender: send_send_flow started - node=%u", parent.myNodeIndex);
- while (running)
- {
- bool idle = false;
- if (sem.wait(1000))
- {
- if (udpTraceLevel > 4)
- DBGLOG("UdpSender: send_send_flow::doRun signal received");
- }
- else
- idle = true;
- if (!running) return 0;
- unsigned now = msTick();
- // I don't really like this loop. Could keep a circular buffer of ones with non-zero state?
- // In a typical heavy load scenario most will be pending
- // Really two separate FIFOs - pending and active. Except that stuff pulled off pending in arbitrary order
- // Separate lists for each state (don't need one for sending) ?
- for (unsigned i = 0; i < target_count; i++)
- {
- switch (state[i]) // MORE - should really protect it?
- {
- case completed:
- done(i, false);
- break;
- case completed_more:
- done(i, true);
- break;
- case pending_request:
- if ( (now - request_time[i]) < udpRequestToSendTimeout) // MORE - should really protect it?
- break;
- timeouts[i]++;
- EXCLOG(MCoperatorError,"ERROR: UdpSender: timed out %i times (max=%i) waiting ok_to_send msg from node=%d timed out after=%i msec max=%i msec",
- timeouts[i], udpMaxRetryTimedoutReqs,
- i, (int) (now - request_time[i]), udpRequestToSendTimeout);
- // 0 (zero) value of udpMaxRetryTimedoutReqs means NO limit on retries
- if (udpMaxRetryTimedoutReqs && (timeouts[i] >= udpMaxRetryTimedoutReqs))
- {
- abort(i);
- break;
- }
- // fall into...
- case new_request:
- sendRequest(i);
- break;
- default:
- if (idle && parent.dataQueued(i))
- {
- EXCLOG(MCoperatorError, "State is idle but data is queued - should not happen (index = %u). Attempting recovery.", i);
- data_added(i);
- }
- }
- }
- }
- return 0;
- }
-
- void done(unsigned index, bool moreRequested)
- {
- bool dataRemaining;
- {
- CriticalBlock b(cr);
- dataRemaining = parent.dataQueued(index);
- if (dataRemaining)
- {
- state[index] = pending_request;
- request_time[index] = msTick();
- }
- else
- {
- state[index] = 0;
- timeouts[index] = 0;
- }
- }
- if (udpTraceLevel > 3)
- DBGLOG("UdpSender: sending send_completed msg to node=%u, dataRemaining=%d", index, dataRemaining);
- if (udpSendCompletedInData)
- {
- if (dataRemaining)
- {
- // MORE - we indicate the more to send via a bit already - don't need this unless we never go idle
- // though there is a possible race to consider
- if (!moreRequested)
- parent.sendRequest(index, flow_t::request_to_send);
- }
- }
- else
- parent.sendRequest(index, dataRemaining ? flow_t::request_to_send_more : flow_t::send_completed);
- }
- void sendRequest(unsigned index)
- {
- if (udpTraceLevel > 3)
- DBGLOG("UdpSender: sending request_to_send msg to node=%u", index);
- CriticalBlock b(cr);
- parent.sendRequest(index, flow_t::request_to_send);
- state[index] = pending_request;
- request_time[index] = msTick();
- }
- void abort(unsigned index)
- {
- if (udpTraceLevel > 3)
- DBGLOG("UdpSender: abort sending queued data to node=%u", index);
-
- CriticalBlock b(cr);
- state[index] = 0;
- timeouts[index] = 0;
- parent.abortData(index);
- }
- public:
- send_send_flow(CSendManager &_parent, unsigned numNodes)
- : StartedThread("UdpLib::send_send_flow"), parent(_parent)
- {
- target_count = numNodes;
- state = new char [target_count];
- memset(state, 0, target_count);
- timeouts = new unsigned char [target_count];
- memset(timeouts, 0, target_count);
- request_time = new unsigned [target_count];
- memset(request_time, 0, sizeof(unsigned) * target_count);
- start();
- }
- ~send_send_flow()
- {
- running = false;
- sem.signal();
- join();
- delete [] state;
- delete [] timeouts;
- delete [] request_time;
- }
- void clear_to_send_received(unsigned index)
- {
- CriticalBlock b(cr);
- state[index] = sending_data;
- }
- void send_done(unsigned index, bool moreRequested)
- {
- CriticalBlock b(cr);
- state[index] = moreRequested ? completed_more : completed;
- sem.signal();
- }
- void data_added(unsigned index)
- {
- CriticalBlock b(cr);
- // MORE - this looks wrong. If I add data while sending, may get delayed until next time I have data to send?? Why declare as bitmap if not going to use it?
- // Because done() checks to see if any data pending and re-calls data_added, we get away with it
- // Using bits sounds more sensible though?
- // Actually be careful, since a send may not send ALL the data - you'd still need to call data_added if that happened. Maybe as it is is ok.
- if (!state[index]) // MORE - should just test the bit?
- {
- state[index] = new_request;
- if (udpTraceLevel > 3)
- DBGLOG("UdpSender: state set to new_request for node=%u", index);
- sem.signal();
- }
- }
- };
- class send_receive_flow : public StartedThread
- {
- CSendManager &parent;
- int receive_port;
- Owned<ISocket> flow_socket;
-
- public:
- send_receive_flow(CSendManager &_parent, int r_port) : StartedThread("UdpLib::send_receive_flow"), parent(_parent)
- {
- receive_port = r_port;
- if (check_max_socket_read_buffer(udpFlowSocketsSize) < 0)
- throw MakeStringException(ROXIE_UDP_ERROR, "System Socket max read buffer is less than %i", udpFlowSocketsSize);
- flow_socket.setown(ISocket::udp_create(receive_port));
- flow_socket->set_receive_buffer_size(udpFlowSocketsSize);
- size32_t actualSize = flow_socket->get_receive_buffer_size();
- DBGLOG("UdpSender: rcv_flow_socket created port=%d sockbuffsize=%d actualsize=%d", receive_port, udpFlowSocketsSize, actualSize);
- start();
- }
-
- ~send_receive_flow()
- {
- running = false;
- if (flow_socket)
- flow_socket->close();
- join();
- }
-
- virtual int doRun()
- {
- if (udpTraceLevel > 0)
- DBGLOG("UdpSender: send_receive_flow started");
- #ifdef __linux__
- setLinuxThreadPriority(2);
- #endif
- while(running)
- {
- UdpPermitToSendMsg f;
- while (running)
- {
- try
- {
- unsigned int res ;
- flow_socket->read(&f, 1, sizeof(f), res, 5);
- assertex(res == f.length);
- #ifdef CRC_MESSAGES
- assertex(f.hdr.crc == f.calcCRC());
- #endif
- switch (f.cmd)
- {
- case flow_t::ok_to_send:
- if (udpTraceLevel > 1)
- DBGLOG("UdpSender: received ok_to_send msg max %d packets from node=%u (length %u)", f.max_data, f.destNodeIndex, res);
- parent.data->ok_to_send(f);
- break;
- default:
- DBGLOG("UdpSender: received unknown flow message type=%d", f.cmd);
- }
- }
- catch (IException *e)
- {
- if (running && e->errorCode() != JSOCKERR_timeout_expired)
- {
- StringBuffer s;
- DBGLOG("UdpSender: send_receive_flow::read failed port=%i %s", receive_port, e->errorMessage(s).str());
- }
- e->Release();
- }
- catch (...)
- {
- if (running)
- DBGLOG("UdpSender: send_receive_flow::unknown exception");
- MilliSleep(0);
- }
- }
- }
- return 0;
- }
- };
- class send_data : public StartedThread
- {
- CSendManager &parent;
- ISocket *sniffer_socket;
- SocketEndpoint ep;
- simple_queue<UdpPermitToSendMsg> send_queue;
- Linked<TokenBucket> bucket;
- void send_sniff(bool busy)
- {
- unsigned short castCmd = static_cast<unsigned short>(busy ? flow_t::busy : flow_t::idle);
- sniff_msg msg = {sizeof(sniff_msg), castCmd, static_cast<unsigned short>(parent.myNodeIndex)};
- try
- {
- if (!sniffer_socket)
- {
- sniffer_socket = ISocket::multicast_connect(ep, multicastTTL);
- if (udpTraceLevel > 1)
- {
- StringBuffer url;
- DBGLOG("UdpSender: multicast_connect ok to %s", ep.getUrlStr(url).str());
- }
- }
- sniffer_socket->write(&msg, sizeof(msg));
- if (udpTraceLevel > 1)
- DBGLOG("UdpSender: sent busy=%d multicast msg", busy);
- }
- catch(IException *e)
- {
- StringBuffer s;
- StringBuffer url;
- DBGLOG("UdpSender: multicast_connect or write failed ep=%s - %s", ep.getUrlStr(url).str(), e->errorMessage(s).str());
- e->Release();
- }
- catch(...)
- {
- StringBuffer url;
- DBGLOG("UdpSender: multicast_connect or write unknown exception - ep=%s", ep.getUrlStr(url).str());
- if (sniffer_socket)
- {
- sniffer_socket->Release();
- sniffer_socket = NULL;
- }
- }
- }
- public:
- send_data(CSendManager &_parent, int s_port, const IpAddress &snif_ip, TokenBucket *_bucket)
- : StartedThread("UdpLib::send_data"), parent(_parent), bucket(_bucket), ep(s_port, snif_ip), send_queue(100) // MORE - send q size should be configurable and/or related to size of cluster?
- {
- sniffer_socket = NULL;
- if (check_max_socket_write_buffer(udpLocalWriteSocketSize) < 0)
- throw MakeStringException(ROXIE_UDP_ERROR, "System Socket max write buffer is less than %i", udpLocalWriteSocketSize);
- start();
- }
-
- ~send_data()
- {
- running = false;
- UdpPermitToSendMsg dummy;
- send_queue.push(dummy);
- join();
- if (sniffer_socket)
- sniffer_socket->Release();
- }
- bool ok_to_send(const UdpPermitToSendMsg &msg)
- {
- if (send_queue.push(msg, 15))
- return true;
- else
- {
- DBGLOG("UdpSender: push() failed - ignored ok_to_send msg - index=%u, maxData=%u", msg.destNodeIndex, msg.max_data);
- return false;
- }
- }
- virtual int doRun()
- {
- if (udpTraceLevel > 0)
- DBGLOG("UdpSender: send_data started");
- #ifdef __linux__
- setLinuxThreadPriority(1); // MORE - windows?
- #endif
- UdpPermitToSendMsg permit;
- while (running)
- {
- send_queue.pop(permit);
- if (!running)
- return 0;
- if (udpSnifferEnabled)
- send_sniff(true);
- parent.send_flow->clear_to_send_received(permit.destNodeIndex);
- UdpReceiverEntry &receiverInfo = parent.receiversTable[permit.destNodeIndex];
- bool moreRequested;
- unsigned maxPackets;
- unsigned payload = receiverInfo.sendData(permit, (parent.myNodeIndex == permit.destNodeIndex), bucket, moreRequested, maxPackets);
- if (udpSendCompletedInData && !maxPackets)
- parent.sendRequest(permit.destNodeIndex, flow_t::send_completed);
- parent.send_flow->send_done(permit.destNodeIndex, moreRequested);
- if (udpSnifferEnabled)
- send_sniff(false);
-
- if (udpTraceLevel > 1)
- DBGLOG("UdpSender: sent %u bytes to node=%d", payload, permit.destNodeIndex);
-
- }
- if (udpTraceLevel > 0)
- DBGLOG("UdpSender: send_data stopped");
- return 0;
- }
- };
- friend class send_send_flow;
- friend class send_receive_flow;
- friend class send_data;
- unsigned numNodes;
- int receive_flow_port;
- int send_flow_port;
- int data_port;
- unsigned myNodeIndex;
- unsigned numQueues;
- UdpReceiverEntry *receiversTable;
- send_send_flow *send_flow;
- send_receive_flow *receive_flow;
- send_data *data;
- Linked<TokenBucket> bucket;
-
- SpinLock msgSeqLock;
- unsigned msgSeq;
- static bool comparePacket(void *pkData, void *key)
- {
- UdpPacketHeader *dataHdr = (UdpPacketHeader*) ((DataBuffer*)pkData)->data;
- UdpPacketHeader *keyHdr = (UdpPacketHeader*) key;
- return ( (dataHdr->ruid == keyHdr->ruid) && (dataHdr->msgId == keyHdr->msgId) );
- }
- inline unsigned getNextMessageSequence()
- {
- SpinBlock b(msgSeqLock);
- unsigned res = ++msgSeq;
- if (!res)
- res = ++msgSeq;
- return res;
- }
-
- public:
- IMPLEMENT_IINTERFACE;
- CSendManager(int server_flow_port, int d_port, int client_flow_port, int sniffer_port, const IpAddress &sniffer_multicast_ip, int q_size, int _numQueues, unsigned _myNodeIndex, TokenBucket *_bucket)
- : bucket(_bucket)
- {
- #ifndef _WIN32
- setpriority(PRIO_PROCESS, 0, -3);
- #endif
- numNodes = getNumNodes();
- receive_flow_port = client_flow_port;
- send_flow_port = server_flow_port;
- data_port = d_port;
- myNodeIndex = _myNodeIndex;
- numQueues = _numQueues;
- receiversTable = new UdpReceiverEntry[numNodes];
- for (unsigned i = 0; i < numNodes; i++)
- receiversTable[i].init(i, numQueues, q_size, send_flow_port, data_port, i==myNodeIndex);
- data = new send_data(*this, sniffer_port, sniffer_multicast_ip, bucket);
- send_flow = new send_send_flow(*this, numNodes);
- receive_flow = new send_receive_flow(*this, client_flow_port);
- msgSeq = 0;
- }
- ~CSendManager()
- {
- delete []receiversTable;
- delete send_flow;
- delete receive_flow;
- delete data;
- }
- void writeOwn(unsigned destNodeIndex, DataBuffer *buffer, unsigned len, unsigned queue)
- {
- // NOTE: takes ownership of the DataBuffer
- assert(queue < numQueues);
- assert(destNodeIndex < numNodes);
- receiversTable[destNodeIndex].pushData(queue, buffer);
- send_flow->data_added(destNodeIndex);
- }
- inline void sendRequest(unsigned destIndex, flow_t::flowmsg_t cmd)
- {
- receiversTable[destIndex].sendRequest(myNodeIndex, cmd);
- }
- bool dataQueued(unsigned destIndex)
- {
- return receiversTable[destIndex].dataQueued();
- }
- bool abortData(unsigned destIndex)
- {
- return receiversTable[destIndex].removeData(NULL, NULL);
- }
- // Interface ISendManager
- virtual IMessagePacker *createMessagePacker(ruid_t ruid, unsigned sequence, const void *messageHeader, unsigned headerSize, unsigned destNodeIndex, int queue)
- {
- if (destNodeIndex >= numNodes)
- throw MakeStringException(ROXIE_UDP_ERROR, "createMessagePacker: invalid destination node index %i", destNodeIndex);
- return ::createMessagePacker(ruid, sequence, messageHeader, headerSize, *this, destNodeIndex, myNodeIndex, getNextMessageSequence(), queue);
- }
- virtual bool dataQueued(ruid_t ruid, unsigned msgId, unsigned destIndex)
- {
- UdpPacketHeader pkHdr;
- pkHdr.ruid = ruid;
- pkHdr.msgId = msgId;
- return receiversTable[destIndex].dataQueued((void*) &pkHdr, &comparePacket);
- }
- virtual bool abortData(ruid_t ruid, unsigned msgId, unsigned destIndex)
- {
- UdpPacketHeader pkHdr;
- pkHdr.ruid = ruid;
- pkHdr.msgId = msgId;
- return receiversTable[destIndex].removeData((void*) &pkHdr, &comparePacket);
- }
- virtual bool allDone()
- {
- for (unsigned i = 0; i < numNodes; i++)
- {
- if (receiversTable[i].dataQueued())
- return false;
- }
- return true;
- }
- };
- ISendManager *createSendManager(int server_flow_port, int data_port, int client_flow_port, int sniffer_port, const IpAddress &sniffer_multicast_ip, int queue_size_pr_server, int queues_pr_server, TokenBucket *rateLimiter, unsigned myNodeIndex)
- {
- return new CSendManager(server_flow_port, data_port, client_flow_port, sniffer_port, sniffer_multicast_ip, queue_size_pr_server, queues_pr_server, myNodeIndex, rateLimiter);
- }
- class CMessagePacker : implements IMessagePacker, public CInterface
- {
- ISendManager &parent;
- unsigned destNodeIndex;
- UdpPacketHeader package_header;
- DataBuffer *part_buffer;
- unsigned data_buffer_size;
- unsigned data_used;
- void *mem_buffer;
- unsigned mem_buffer_size;
- unsigned totalSize;
- bool packed_request;
- MemoryBuffer metaInfo;
- bool last_message_done;
- int queue_number;
- public:
- IMPLEMENT_IINTERFACE;
- CMessagePacker(ruid_t ruid, unsigned msgId, const void *messageHeader, unsigned headerSize, ISendManager &_parent, unsigned _destNode, unsigned _sourceNode, unsigned _msgSeq, unsigned _queue)
- : parent(_parent)
- {
- queue_number = _queue;
- destNodeIndex = _destNode;
- package_header.length = 0; // filled in with proper value later
- package_header.metalength = 0;
- package_header.ruid = ruid;
- package_header.msgId = msgId;
- package_header.pktSeq = 0;
- package_header.nodeIndex = _sourceNode;
- package_header.msgSeq = _msgSeq;
- packed_request = false;
- part_buffer = bufferManager->allocate();
- data_buffer_size = DATA_PAYLOAD - sizeof(UdpPacketHeader);
- assertex(data_buffer_size >= headerSize + sizeof(unsigned short));
- *(unsigned short *) (&part_buffer->data[sizeof(UdpPacketHeader)]) = headerSize;
- memcpy(&part_buffer->data[sizeof(UdpPacketHeader)+sizeof(unsigned short)], messageHeader, headerSize);
- data_used = headerSize + sizeof(unsigned short);
- mem_buffer = 0;
- mem_buffer_size = 0;
- last_message_done = false;
- totalSize = 0;
- if (udpTraceLevel >= 40)
- DBGLOG("UdpSender: CMessagePacker::CMessagePacker - ruid=" RUIDF " id=0x%.8x mseq=%u node=%u queue=%d", ruid, msgId, _msgSeq, destNodeIndex, _queue);
- }
- ~CMessagePacker()
- {
- if (part_buffer)
- part_buffer->Release();
- if (mem_buffer) free (mem_buffer);
- if (udpTraceLevel >= 40)
- {
- DBGLOG("UdpSender: CMessagePacker::~CMessagePacker - ruid=" RUIDF " id=0x%.8x mseq=%u pktSeq=%x node=%u",
- package_header.ruid, package_header.msgId, package_header.msgSeq, package_header.pktSeq, destNodeIndex);
- }
- }
- virtual void *getBuffer(unsigned len, bool variable)
- {
- if (variable)
- len += sizeof(RecordLengthType);
- if (DATA_PAYLOAD - sizeof(UdpPacketHeader) < len)
- {
- // Won't fit in one, so allocate temp location
- if (mem_buffer_size < len)
- {
- free(mem_buffer);
- mem_buffer = checked_malloc(len, ROXIE_MEMORY_ERROR);
- mem_buffer_size = len;
- }
- packed_request = false;
- if (variable)
- return ((char *) mem_buffer) + sizeof(RecordLengthType);
- else
- return mem_buffer;
- }
- if (part_buffer && ((data_buffer_size - data_used) < len))
- flush(false); // Note that we never span records that are small enough to fit - this can result in significant wastage if record just over DATA_PAYLOAD/2
- if (!part_buffer)
- {
- part_buffer = bufferManager->allocate();
- data_buffer_size = DATA_PAYLOAD - sizeof(UdpPacketHeader);
- }
- packed_request = true;
- if (variable)
- return &part_buffer->data[data_used + sizeof(UdpPacketHeader) + sizeof(RecordLengthType)];
- else
- return &part_buffer->data[data_used + sizeof(UdpPacketHeader)];
- }
- virtual void putBuffer(const void *buf, unsigned len, bool variable)
- {
- if (variable)
- {
- assertex(len < MAX_RECORD_LENGTH);
- buf = ((char *) buf) - sizeof(RecordLengthType);
- *(RecordLengthType *) buf = len;
- len += sizeof(RecordLengthType);
- }
- totalSize += len;
- if (packed_request)
- {
- assert(len <= (data_buffer_size - data_used));
- data_used += len;
- }
- else
- {
- while (len)
- {
- if (!part_buffer)
- {
- part_buffer = bufferManager->allocate();
- data_buffer_size = DATA_PAYLOAD - sizeof(UdpPacketHeader);
- data_used = 0;
- }
- unsigned chunkLen = data_buffer_size - data_used;
- if (chunkLen > len)
- chunkLen = len;
- memcpy(&part_buffer->data[sizeof(UdpPacketHeader)+data_used], buf, chunkLen);
- data_used += chunkLen;
- len -= chunkLen;
- buf = &(((char*)buf)[chunkLen]);
- if (len)
- flush(false);
- }
- }
- }
- virtual void sendMetaInfo(const void *buf, unsigned len) {
- metaInfo.append(len, buf);
- }
- virtual void flush(bool last_msg = false)
- {
- if (!last_message_done && last_msg)
- {
- last_message_done = true;
- if (!part_buffer)
- part_buffer = bufferManager->allocate();
- const char *metaData = metaInfo.toByteArray();
- unsigned metaLength = metaInfo.length();
- unsigned maxMetaLength = DATA_PAYLOAD - (sizeof(UdpPacketHeader) + data_used);
- while (metaLength > maxMetaLength)
- {
- memcpy(&part_buffer->data[sizeof(UdpPacketHeader)+data_used], metaData, maxMetaLength);
- put_package(part_buffer, data_used, maxMetaLength);
- metaLength -= maxMetaLength;
- metaData += maxMetaLength;
- data_used = 0;
- maxMetaLength = DATA_PAYLOAD - sizeof(UdpPacketHeader);
- part_buffer = bufferManager->allocate();
- }
- memcpy(&part_buffer->data[sizeof(UdpPacketHeader)+data_used], metaData, metaLength);
- package_header.pktSeq |= UDP_PACKET_COMPLETE;
- put_package(part_buffer, data_used, metaLength);
- }
- else if (part_buffer)
- {
- // Just flush current - used when no room for current row
- if (data_used)
- put_package(part_buffer, data_used, 0); // buffer released in put_package
- else
- part_buffer->Release(); // If NO data in buffer, release buffer back to pool
- }
- part_buffer = 0;
- data_buffer_size = 0;
- data_used = 0;
- }
- void put_package(DataBuffer *dataBuff, unsigned datalength, unsigned metalength)
- {
- package_header.length = datalength + metalength + sizeof(UdpPacketHeader);
- package_header.metalength = metalength;
- memcpy(dataBuff->data, &package_header, sizeof(package_header));
- parent.writeOwn(destNodeIndex, dataBuff, package_header.length, queue_number);
- if (udpTraceLevel >= 50)
- {
- if (package_header.length==991)
- DBGLOG("NEarly");
- DBGLOG("UdpSender: CMessagePacker::put_package Qed packet - ruid=" RUIDF " id=0x%.8X mseq=%u pkseq=0x%.8X len=%u node=%u queue=%d",
- package_header.ruid, package_header.msgId, package_header.msgSeq,
- package_header.pktSeq, package_header.length, destNodeIndex, queue_number);
- }
- package_header.pktSeq++;
- }
- virtual bool dataQueued()
- {
- return(parent.dataQueued(package_header.ruid, package_header.msgId, destNodeIndex));
- }
- virtual unsigned size() const
- {
- return totalSize;
- }
- };
- IMessagePacker *createMessagePacker(ruid_t ruid, unsigned msgId, const void *messageHeader, unsigned headerSize, ISendManager &_parent, unsigned _destNode, unsigned _sourceNode, unsigned _msgSeq, unsigned _queue)
- {
- return new CMessagePacker(ruid, msgId, messageHeader, headerSize, _parent, _destNode, _sourceNode, _msgSeq, _queue);
- }
|