123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365 |
- /*##############################################################################
- HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ############################################################################## */
- #include "udplib.hpp"
- #include "udpsha.hpp"
- #include "jsocket.hpp"
- #include "jlog.hpp"
- #include "roxie.hpp"
- #include "roxiemem.hpp"
- #ifdef _WIN32
- #include <winsock2.h>
- #else
- #include <sys/socket.h>
- #endif
- #if defined(_DEBUG) && defined(_WIN32) && !defined(USING_MPATROL)
- #define new new(_NORMAL_BLOCK, __FILE__, __LINE__)
- #endif
- using roxiemem::DataBuffer;
- using roxiemem::IDataBufferManager;
- IDataBufferManager *bufferManager;
- unsigned udpTraceLevel = 0;
- unsigned udpTraceCategories = (unsigned) -1;
- bool enableSocketMaxSetting = false;
- unsigned udpFlowSocketsSize = 131072;
- unsigned udpLocalWriteSocketSize = 1024000;
- unsigned multicastTTL = 1;
- MODULE_INIT(INIT_PRIORITY_STANDARD)
- {
- bufferManager = roxiemem::createDataBufferManager(roxiemem::DATA_ALIGNMENT_SIZE);
- return true;
- }
- MODULE_EXIT()
- {
- bufferManager->Release();
- }
- // Maintaining a table so each node in the system has a unique index
- static IpAddressArray nodeTable;
- extern UDPLIB_API const IpAddress &getNodeAddress(unsigned index)
- {
- return nodeTable.item(index);
- }
- extern UDPLIB_API unsigned addRoxieNode(const char *ipString)
- {
- IpAddress ip(ipString);
- if (ip.isNull())
- throw MakeStringException(ROXIE_UDP_ERROR, "Could not resolve address %s", ipString);
- ForEachItemIn(idx, nodeTable)
- {
- if (ip.ipequals(nodeTable.item(idx)))
- return idx;
- }
- nodeTable.append(ip);
- return nodeTable.ordinality()-1;
- }
- extern UDPLIB_API unsigned getNumNodes()
- {
- assertex(nodeTable.ordinality());
- return nodeTable.ordinality();
- }
- //---------------------------------------------------------------------------------------------
- void queue_t::set_queue_size(unsigned int queue_s)
- {
- queue_size = queue_s;
- element_count = queue_size;
- elements = new queue_element[queue_size];
- free_space.signal(queue_size);
- active_buffers = 0;
- first = 0;
- last = 0;
- }
- queue_t::queue_t(unsigned int queue_s)
- {
- set_queue_size(queue_s);
- signal_free_sl = 0;
- }
- queue_t::queue_t()
- {
- signal_free_sl = 0;
- }
- queue_t::~queue_t()
- {
- delete [] elements;
- }
- bool queue_t::empty()
- {
- c_region.enter();
- bool res = (active_buffers == 0);
- c_region.leave();
- return res;
- }
- int queue_t::free_slots()
- {
- int res=0;
- while (!res)
- {
- c_region.enter();
- res = queue_size - active_buffers;
- if (!res)
- signal_free_sl++;
- c_region.leave();
- if (!res)
- {
- while (!free_sl.wait(3000))
- {
- if (udpTraceLevel >= 1)
- DBGLOG("queue_t::free_slots blocked for 3 seconds waiting for free_sl semaphore");
- }
- }
- }
- return res;
- }
- void queue_t::interrupt()
- {
- data_avail.interrupt();
- }
- void queue_t::pushOwn(DataBuffer *buf)
- {
- while (!free_space.wait(3000))
- {
- if (udpTraceLevel >= 1)
- DBGLOG("queue_t::pushOwn blocked for 3 seconds waiting for free_space semaphore, activeBuffers == %d", active_buffers);
- }
- c_region.enter();
- int next = (last + 1) % element_count;
- elements[last].data = buf;
- last = next;
- active_buffers++;
- c_region.leave();
- data_avail.signal();
- }
- DataBuffer *queue_t::pop()
- {
- data_avail.wait();
- DataBuffer *ret = NULL;
- bool must_signal;
- {
- CriticalBlock b(c_region);
- if (!active_buffers)
- return NULL;
- ret = elements[first].data;
- first = (first + 1) % element_count;
- active_buffers--;
- must_signal = signal_free_sl>0;
- if (must_signal)
- signal_free_sl--;
- }
- free_space.signal();
- if (must_signal)
- free_sl.signal();
- return ret;
- }
- bool queue_t::removeData(void *key, PKT_CMP_FUN pkCmpFn)
- {
- bool ret = false;
- CriticalBlock b(c_region);
- if (active_buffers)
- {
- unsigned ix = first;
- loop
- {
- if (elements[ix].data &&
- ((key == NULL) || (pkCmpFn == NULL) || pkCmpFn((void*) elements[ix].data, key)))
- {
- ::Release(elements[ix].data);
- elements[ix].data = NULL; // safer than trying to remove it and close up queue - race conditions with code elsewhere
- ret = true;
- }
- ix++;
- if (ix==element_count)
- ix = 0;
- if (ix == last)
- break;
- }
- }
- return ret;
- }
- bool queue_t::dataQueued(void *key, PKT_CMP_FUN pkCmpFn)
- {
- bool ret = false;
- CriticalBlock b(c_region);
- if (active_buffers)
- {
- unsigned ix = first;
- loop
- {
- if (elements[ix].data && pkCmpFn((void*) elements[ix].data, key))
- {
- ret = true;
- break;
- }
- ix++;
- if (ix==element_count)
- ix = 0;
- if (ix==last)
- break;
- }
- }
- return ret;
- }
- #ifndef _WIN32
- #define HOSTENT hostent
- #include <netdb.h>
- #endif
- int check_set(const char *path, int value, bool modify)
- {
- #ifdef __linux__
- FILE *f = fopen(path,"r");
- char res[32];
- char *r = 0;
- int si = 0;
- if (f) {
- r = fgets(res, sizeof(res), f);
- fclose(f);
- }
- if (r) {
- si = atoi(r);
- }
- if (si<value) {
- if (modify) {
- f = fopen(path,"w");
- if (f) {
- sprintf(res, "%i", value);
- fputs(res,f);
- fclose(f);
- DBGLOG("%s changed from %i to %i", path, si, value);
- return 1;
- }
- else {
- DBGLOG("%s not set", path);
- return -1;
- }
- }
- else {
- DBGLOG("%s value %i is less than %i", path, si, value);
- return -1;
- }
- }
- #endif
- return 0;
- }
- int check_set_max_socket_read_buffer(int size) {
- return check_set("/proc/sys/net/core/rmem_max", size, true);
- }
- int check_set_max_socket_write_buffer(int size) {
- return check_set("/proc/sys/net/core/wmem_max", size, true);
- }
- int check_max_socket_read_buffer(int size) {
- return check_set("/proc/sys/net/core/rmem_max", size, false);
- }
- int check_max_socket_write_buffer(int size) {
- return check_set("/proc/sys/net/core/wmem_max", size, false);
- }
- #ifdef __linux__
- void setLinuxThreadPriority(int level)
- {
- pthread_t self = pthread_self();
- int policy;
- sched_param param;
- int rc;
- if (( rc = pthread_getschedparam(self, &policy, ¶m)) != 0)
- DBGLOG("pthread_getschedparam error: %d", rc);
- if (level < 0)
- UNIMPLEMENTED;
- else if (!level)
- {
- param.sched_priority = 0;
- policy = SCHED_OTHER;
- }
- else
- {
- policy = SCHED_RR;
- param.sched_priority = level;
- }
- if(( rc = pthread_setschedparam(self, policy, ¶m)) != 0)
- DBGLOG("pthread_setschedparam error: %d policy=%i pr=%i id=%" I64F "i PID=%i", rc, policy, param.sched_priority, (unsigned __int64) self, getpid());
- else
- DBGLOG("priority set id=%" I64F "i policy=%i pri=%i PID=%i", (unsigned __int64) self, policy, param.sched_priority, getpid());
- }
- #endif
- extern UDPLIB_API void queryMemoryPoolStats(StringBuffer &memStats)
- {
- if (bufferManager)
- bufferManager->poolStats(memStats);
- }
- /*
- Crazy thoughts on network-wide flow control
- Avoid sending data that clashes with other outbound or inbound data
- is outbound really an issue?
- if only inbound, should be easier
- can have each inbound node police its own, for a start
- udplib already tries to do this
- when sending permission to send, best to pick someone that is not sending to anyone else
- udplib already tries to do this
- but it can still lead to idleness - id node 1 sending to node 2, and node2 to node 1, node3 can't find anyone idle.
- If you do need global:
- Every bit of data getting sent (perhaps over a certain size threshold?) gets permission from central traffic cop
- Outbound packet says source node, target node size
- Reply says source,target,size
- Cop allows immediately if nothing inflight between those pairs
- Cop assumes completion
- Cop redundancy
- - a backup cop is listening in?
- - use multicast for requests and replies?
- - no reply implies what?
- - backup cop just needs heartbeat from active cop
- - permission expires
- - multiple cops for blocks of targets?
- - but I want global view of who is sending
- */
|