udpsha.cpp 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "udplib.hpp"
  14. #include "udpsha.hpp"
  15. #include "jsocket.hpp"
  16. #include "jlog.hpp"
  17. #include "roxie.hpp"
  18. #include "roxiemem.hpp"
  19. #ifdef _WIN32
  20. #include <winsock2.h>
  21. #else
  22. #include <sys/socket.h>
  23. #endif
  24. #if defined(_DEBUG) && defined(_WIN32) && !defined(USING_MPATROL)
  25. #define new new(_NORMAL_BLOCK, __FILE__, __LINE__)
  26. #endif
  27. using roxiemem::DataBuffer;
  28. using roxiemem::IDataBufferManager;
  29. IDataBufferManager *bufferManager;
  30. unsigned udpTraceLevel = 0;
  31. unsigned udpTraceCategories = (unsigned) -1;
  32. bool enableSocketMaxSetting = false;
  33. unsigned udpFlowSocketsSize = 131072;
  34. unsigned udpLocalWriteSocketSize = 1024000;
  35. unsigned multicastTTL = 1;
  36. MODULE_INIT(INIT_PRIORITY_STANDARD)
  37. {
  38. bufferManager = roxiemem::createDataBufferManager(roxiemem::DATA_ALIGNMENT_SIZE);
  39. return true;
  40. }
  41. MODULE_EXIT()
  42. {
  43. bufferManager->Release();
  44. }
  45. // Maintaining a table so each node in the system has a unique index
  46. static IpAddressArray nodeTable;
  47. extern UDPLIB_API const IpAddress &getNodeAddress(unsigned index)
  48. {
  49. return nodeTable.item(index);
  50. }
  51. extern UDPLIB_API unsigned addRoxieNode(const char *ipString)
  52. {
  53. IpAddress ip(ipString);
  54. if (ip.isNull())
  55. throw MakeStringException(ROXIE_UDP_ERROR, "Could not resolve address %s", ipString);
  56. ForEachItemIn(idx, nodeTable)
  57. {
  58. if (ip.ipequals(nodeTable.item(idx)))
  59. return idx;
  60. }
  61. nodeTable.append(ip);
  62. return nodeTable.ordinality()-1;
  63. }
  64. extern UDPLIB_API unsigned getNumNodes()
  65. {
  66. assertex(nodeTable.ordinality());
  67. return nodeTable.ordinality();
  68. }
  69. //---------------------------------------------------------------------------------------------
  70. void queue_t::set_queue_size(unsigned int queue_s)
  71. {
  72. queue_size = queue_s;
  73. element_count = queue_size;
  74. elements = new queue_element[queue_size];
  75. free_space.signal(queue_size);
  76. active_buffers = 0;
  77. first = 0;
  78. last = 0;
  79. }
  80. queue_t::queue_t(unsigned int queue_s)
  81. {
  82. set_queue_size(queue_s);
  83. signal_free_sl = 0;
  84. }
  85. queue_t::queue_t()
  86. {
  87. signal_free_sl = 0;
  88. }
  89. queue_t::~queue_t()
  90. {
  91. delete [] elements;
  92. }
  93. bool queue_t::empty()
  94. {
  95. c_region.enter();
  96. bool res = (active_buffers == 0);
  97. c_region.leave();
  98. return res;
  99. }
  100. int queue_t::free_slots()
  101. {
  102. int res=0;
  103. while (!res)
  104. {
  105. c_region.enter();
  106. res = queue_size - active_buffers;
  107. if (!res)
  108. signal_free_sl++;
  109. c_region.leave();
  110. if (!res)
  111. {
  112. while (!free_sl.wait(3000))
  113. {
  114. if (udpTraceLevel >= 1)
  115. DBGLOG("queue_t::free_slots blocked for 3 seconds waiting for free_sl semaphore");
  116. }
  117. }
  118. }
  119. return res;
  120. }
  121. void queue_t::interrupt()
  122. {
  123. data_avail.interrupt();
  124. }
  125. void queue_t::pushOwn(DataBuffer *buf)
  126. {
  127. while (!free_space.wait(3000))
  128. {
  129. if (udpTraceLevel >= 1)
  130. DBGLOG("queue_t::pushOwn blocked for 3 seconds waiting for free_space semaphore, activeBuffers == %d", active_buffers);
  131. }
  132. c_region.enter();
  133. int next = (last + 1) % element_count;
  134. elements[last].data = buf;
  135. last = next;
  136. active_buffers++;
  137. c_region.leave();
  138. data_avail.signal();
  139. }
  140. DataBuffer *queue_t::pop()
  141. {
  142. data_avail.wait();
  143. DataBuffer *ret = NULL;
  144. bool must_signal;
  145. {
  146. CriticalBlock b(c_region);
  147. if (!active_buffers)
  148. return NULL;
  149. ret = elements[first].data;
  150. first = (first + 1) % element_count;
  151. active_buffers--;
  152. must_signal = signal_free_sl>0;
  153. if (must_signal)
  154. signal_free_sl--;
  155. }
  156. free_space.signal();
  157. if (must_signal)
  158. free_sl.signal();
  159. return ret;
  160. }
  161. bool queue_t::removeData(void *key, PKT_CMP_FUN pkCmpFn)
  162. {
  163. bool ret = false;
  164. CriticalBlock b(c_region);
  165. if (active_buffers)
  166. {
  167. unsigned ix = first;
  168. loop
  169. {
  170. if (elements[ix].data &&
  171. ((key == NULL) || (pkCmpFn == NULL) || pkCmpFn((void*) elements[ix].data, key)))
  172. {
  173. ::Release(elements[ix].data);
  174. elements[ix].data = NULL; // safer than trying to remove it and close up queue - race conditions with code elsewhere
  175. ret = true;
  176. }
  177. ix++;
  178. if (ix==element_count)
  179. ix = 0;
  180. if (ix == last)
  181. break;
  182. }
  183. }
  184. return ret;
  185. }
  186. bool queue_t::dataQueued(void *key, PKT_CMP_FUN pkCmpFn)
  187. {
  188. bool ret = false;
  189. CriticalBlock b(c_region);
  190. if (active_buffers)
  191. {
  192. unsigned ix = first;
  193. loop
  194. {
  195. if (elements[ix].data && pkCmpFn((void*) elements[ix].data, key))
  196. {
  197. ret = true;
  198. break;
  199. }
  200. ix++;
  201. if (ix==element_count)
  202. ix = 0;
  203. if (ix==last)
  204. break;
  205. }
  206. }
  207. return ret;
  208. }
  209. #ifndef _WIN32
  210. #define HOSTENT hostent
  211. #include <netdb.h>
  212. #endif
  213. int check_set(const char *path, int value, bool modify)
  214. {
  215. #ifdef __linux__
  216. FILE *f = fopen(path,"r");
  217. char res[32];
  218. char *r = 0;
  219. int si = 0;
  220. if (f) {
  221. r = fgets(res, sizeof(res), f);
  222. fclose(f);
  223. }
  224. if (r) {
  225. si = atoi(r);
  226. }
  227. if (si<value) {
  228. if (modify) {
  229. f = fopen(path,"w");
  230. if (f) {
  231. sprintf(res, "%i", value);
  232. fputs(res,f);
  233. fclose(f);
  234. DBGLOG("%s changed from %i to %i", path, si, value);
  235. return 1;
  236. }
  237. else {
  238. DBGLOG("%s not set", path);
  239. return -1;
  240. }
  241. }
  242. else {
  243. DBGLOG("%s value %i is less than %i", path, si, value);
  244. return -1;
  245. }
  246. }
  247. #endif
  248. return 0;
  249. }
  250. int check_set_max_socket_read_buffer(int size) {
  251. return check_set("/proc/sys/net/core/rmem_max", size, true);
  252. }
  253. int check_set_max_socket_write_buffer(int size) {
  254. return check_set("/proc/sys/net/core/wmem_max", size, true);
  255. }
  256. int check_max_socket_read_buffer(int size) {
  257. return check_set("/proc/sys/net/core/rmem_max", size, false);
  258. }
  259. int check_max_socket_write_buffer(int size) {
  260. return check_set("/proc/sys/net/core/wmem_max", size, false);
  261. }
  262. #ifdef __linux__
  263. void setLinuxThreadPriority(int level)
  264. {
  265. pthread_t self = pthread_self();
  266. int policy;
  267. sched_param param;
  268. int rc;
  269. if (( rc = pthread_getschedparam(self, &policy, &param)) != 0)
  270. DBGLOG("pthread_getschedparam error: %d", rc);
  271. if (level < 0)
  272. UNIMPLEMENTED;
  273. else if (!level)
  274. {
  275. param.sched_priority = 0;
  276. policy = SCHED_OTHER;
  277. }
  278. else
  279. {
  280. policy = SCHED_RR;
  281. param.sched_priority = level;
  282. }
  283. if(( rc = pthread_setschedparam(self, policy, &param)) != 0)
  284. DBGLOG("pthread_setschedparam error: %d policy=%i pr=%i id=%" I64F "i PID=%i", rc, policy, param.sched_priority, (unsigned __int64) self, getpid());
  285. else
  286. DBGLOG("priority set id=%" I64F "i policy=%i pri=%i PID=%i", (unsigned __int64) self, policy, param.sched_priority, getpid());
  287. }
  288. #endif
  289. extern UDPLIB_API void queryMemoryPoolStats(StringBuffer &memStats)
  290. {
  291. if (bufferManager)
  292. bufferManager->poolStats(memStats);
  293. }
  294. /*
  295. Crazy thoughts on network-wide flow control
  296. Avoid sending data that clashes with other outbound or inbound data
  297. is outbound really an issue?
  298. if only inbound, should be easier
  299. can have each inbound node police its own, for a start
  300. udplib already tries to do this
  301. when sending permission to send, best to pick someone that is not sending to anyone else
  302. udplib already tries to do this
  303. but it can still lead to idleness - id node 1 sending to node 2, and node2 to node 1, node3 can't find anyone idle.
  304. If you do need global:
  305. Every bit of data getting sent (perhaps over a certain size threshold?) gets permission from central traffic cop
  306. Outbound packet says source node, target node size
  307. Reply says source,target,size
  308. Cop allows immediately if nothing inflight between those pairs
  309. Cop assumes completion
  310. Cop redundancy
  311. - a backup cop is listening in?
  312. - use multicast for requests and replies?
  313. - no reply implies what?
  314. - backup cop just needs heartbeat from active cop
  315. - permission expires
  316. - multiple cops for blocks of targets?
  317. - but I want global view of who is sending
  318. */