jbroadcast.cpp 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "jsocket.hpp"
  14. #include "jthread.hpp"
  15. #include "jexcept.hpp"
  16. #include "jbuff.hpp"
  17. #include "jlog.hpp"
  18. #include "jfile.hpp"
  19. #include "jbroadcast.ipp"
  20. // valid udp multicast range - 224.0.0.0 through 239.255.255.255
  21. #define DEFAULT_UNICAST_PCENT 10
  22. #define DEFAULT_UNICAST_LIMIT 8
  23. #define NACK_SINGLETON 0x80000000
  24. #define NACK_ENDMARKER 0x40000000
  25. #define PACKET_QUEUE_LIMIT 0 // disabled for now
  26. //#define TRACKLASTACK
  27. //#define COUNTRESENDS
  28. #define UDP_SEND_SIZE (128*1024)
  29. #define UDP_RECV_SIZE (128*1024)
  30. #define DEFAULT_POLL_DELAY 1000
  31. #define MAX_POLL_REPLY_DELAY 1000
  32. static CriticalSection *recvServerCrit, *jobIdCrit;
  33. static CMCastRecvServer *mcastRecvServer = NULL;
  34. static unsigned unicastPcent = DEFAULT_UNICAST_PCENT;
  35. static unsigned unicastLimit = DEFAULT_UNICAST_LIMIT;
  36. static bool useUniCast = true;
  37. static unsigned tracingPeriod = 0; // periodic tracing disabled by default
  38. static unsigned pollDelay = DEFAULT_POLL_DELAY;
  39. static unsigned nextJobId;
  40. MODULE_INIT(INIT_PRIORITY_JBROADCAST)
  41. {
  42. recvServerCrit = new CriticalSection;
  43. jobIdCrit = new CriticalSection;
  44. return true;
  45. }
  46. MODULE_EXIT()
  47. {
  48. delete recvServerCrit;
  49. delete jobIdCrit;
  50. if (mcastRecvServer) mcastRecvServer->Release();
  51. }
  52. static unsigned getNextJobId()
  53. {
  54. CriticalBlock b(*jobIdCrit);
  55. return nextJobId++;
  56. }
  57. static int pktOrderFunc(unsigned *m1, unsigned *m2)
  58. {
  59. return *m1-*m2;
  60. }
  61. static int pktRevOrderFunc(unsigned *m1, unsigned *m2)
  62. {
  63. return *m2-*m1;
  64. }
  65. static int dataPktOrderFunc(CInterface **dataPacket1, CInterface **dataPacket2)
  66. {
  67. return (*((CDataPacket **)dataPacket2))->header->id-(*((CDataPacket **)dataPacket1))->header->id;
  68. }
  69. class CUniq
  70. {
  71. public:
  72. CUniq()
  73. {
  74. reset();
  75. }
  76. void reset()
  77. {
  78. nextFree = bctag_DYNAMIC;
  79. freeList.kill();
  80. }
  81. inline __int64 getNextFree() { return nextFree; }
  82. inline __int64 getId()
  83. {
  84. return freeList.ordinality() ? freeList.pop() : nextFree++;
  85. }
  86. inline void freeId(__int64 id)
  87. {
  88. assertex(id<nextFree);
  89. freeList.append(id);
  90. }
  91. private:
  92. __int64 nextFree;
  93. Int64Array freeList;
  94. };
  95. static CUniq uniqIds;
  96. bctag_t jlib_decl allocBcTag()
  97. {
  98. return (bctag_t) uniqIds.getId();
  99. }
  100. void jlib_decl freeBcTag(bctag_t tag)
  101. {
  102. uniqIds.freeId(tag);
  103. }
  104. //
  105. // CMCastRecvServer impl.
  106. //
  107. CMCastRecvServer::CMCastRecvServer(const char *_broadcastRoot, unsigned _groupMember, SocketEndpoint &_mcastEp, unsigned _ackPort) : broadcastRoot(_broadcastRoot), groupMember(_groupMember), mcastEp(_mcastEp), ackPort(_ackPort)
  108. {
  109. stopped = false;
  110. start();
  111. }
  112. void CMCastRecvServer::stop()
  113. {
  114. stopped = true;
  115. sock->close();
  116. join();
  117. ackSock->close();
  118. }
  119. void CMCastRecvServer::registerReceiver(CMCastReceiver &receiver)
  120. {
  121. CriticalBlock b(receiversCrit);
  122. receiver.Link();
  123. receivers.replace(receiver);
  124. }
  125. void CMCastRecvServer::deregisterReceiver(CMCastReceiver &receiver)
  126. {
  127. CriticalBlock b(receiversCrit);
  128. receivers.removeExact(&receiver);
  129. }
  130. CMCastReceiver *CMCastRecvServer::getReceiver(bctag_t tag)
  131. {
  132. CriticalBlock b(receiversCrit);
  133. CMCastReceiver *receiver = receivers.find(tag);
  134. return LINK(receiver);
  135. }
  136. int CMCastRecvServer::run()
  137. {
  138. SocketEndpoint ackEp(broadcastRoot);
  139. ackEp.port = ackPort;
  140. StringBuffer s;
  141. ackEp.getIpText(s);
  142. ackSock.setown(ISocket::udp_connect(ackEp.port, s.str()));
  143. ackSock->set_send_buffer_size(UDP_SEND_SIZE);
  144. StringBuffer ipStr;
  145. mcastEp.getIpText(ipStr);
  146. sock.setown(ISocket::multicast_create(mcastEp.port, ipStr.str()));
  147. sock->set_receive_buffer_size(UDP_RECV_SIZE);
  148. SocketEndpoint ep(ipStr.str());
  149. sock->join_multicast_group(ep);
  150. MemoryBuffer mbAck;
  151. MCAckPacketHeader *ackPacket = (MCAckPacketHeader *)mbAck.reserveTruncate(MC_ACK_PACKET_SIZE);
  152. ackPacket->node = groupMember;
  153. LOG(MCdebugProgress(10), unknownJob, "Running as client %d connected to server %s", groupMember, broadcastRoot.get());
  154. unsigned *nackList = (unsigned *)(((byte *)ackPacket)+sizeof(MCAckPacketHeader));
  155. const unsigned *nackUpper = (unsigned *)((byte *)ackPacket)+MC_ACK_PACKET_SIZE-sizeof(unsigned);
  156. Owned<CDataPacket> dataPacket = new CDataPacket();
  157. CTimeMon logTm(10000), logTmCons(5000), logTmPoll(5000), logTmOld(5000), logTmNoRecv(5000);
  158. loop
  159. {
  160. try
  161. {
  162. unsigned startTime = msTick();
  163. loop
  164. {
  165. try
  166. {
  167. size32_t szRead;
  168. sock->read(dataPacket->header, sizeof(MCPacketHeader), MC_PACKET_SIZE, szRead, 5000);
  169. break;
  170. }
  171. catch (IException *e)
  172. {
  173. if (JSOCKERR_timeout_expired != e->errorCode())
  174. throw;
  175. else e->Release();
  176. LOG(MCdebugProgress(1), unknownJob, "Waiting on packet read socket (waited=%d)", msTick()-startTime);
  177. }
  178. }
  179. if (stopped) break;
  180. if (MCPacket_Stop == dataPacket->header->cmd)
  181. {
  182. stopped = true;
  183. break;
  184. }
  185. ackPacket->tag = dataPacket->header->tag;
  186. ackPacket->jobId = dataPacket->header->jobId;
  187. if (oldJobIds.find(dataPacket->header->jobId))
  188. {
  189. if (MCPacket_Poll == dataPacket->header->cmd)
  190. {
  191. ackPacket->ackDone = true;
  192. MilliSleep(MAX_POLL_REPLY_DELAY/(groupMember+1));
  193. ackSock->write(ackPacket, sizeof(MCAckPacketHeader));
  194. }
  195. if (tracingPeriod && logTmOld.timedout())
  196. {
  197. LOG(MCdebugProgress(1), unknownJob, "Old job polled=%s", MCPacket_Poll == dataPacket->header->cmd?"true":"false");
  198. logTmOld.reset(tracingPeriod);
  199. }
  200. }
  201. else
  202. {
  203. CMCastReceiver *receiver = getReceiver(dataPacket->header->tag);
  204. if (receiver)
  205. {
  206. if (MCPacket_Poll == dataPacket->header->cmd)
  207. {
  208. size32_t sz;
  209. bool res = receiver->buildNack(ackPacket, sz, dataPacket->header->total);
  210. MilliSleep(MAX_POLL_REPLY_DELAY/(groupMember+1));
  211. ackSock->write(ackPacket, sz);
  212. if (tracingPeriod && logTmPoll.timedout())
  213. {
  214. LOG(MCdebugProgress(1), unknownJob, "Send nack back sz=%d, res=%s, done=%s", sz, res?"true":"false", ackPacket->ackDone?"true":"false");
  215. logTmPoll.reset(tracingPeriod);
  216. }
  217. }
  218. else
  219. {
  220. unsigned total = dataPacket->header->total;
  221. bool done;
  222. if (receiver->packetReceived(*dataPacket, done)) // if true, packet consumed
  223. {
  224. unsigned level;
  225. if (tracingPeriod && logTmCons.timedout())
  226. {
  227. level = 1;
  228. logTmCons.reset(5000);
  229. } else level = 110;
  230. LOG(MCdebugProgress(level), unknownJob, "Pkt %d taken by receiver", dataPacket->header->id);
  231. if (done)
  232. {
  233. LOG(MCdebugProgress(10), unknownJob, "Client (tag=%x, jobId=%d) received all %d packets", dataPacket->header->tag, dataPacket->header->jobId, dataPacket->header->total);
  234. oldJobIds.replace(* new CUIntValue(dataPacket->header->jobId));
  235. }
  236. // JCSMORE should use packet pool.
  237. // init new packet
  238. dataPacket.setown(new CDataPacket());
  239. }
  240. else if (tracingPeriod && logTm.timedout())
  241. {
  242. LOG(MCdebugProgress(150), unknownJob, "throwing away packet %d", dataPacket->header->id);
  243. logTm.reset(tracingPeriod);
  244. }
  245. if (!done)
  246. {
  247. size32_t sz;
  248. if (receiver->buildNack(ackPacket, sz, total))
  249. ackSock->write(ackPacket, sz);
  250. }
  251. }
  252. }
  253. else if (tracingPeriod && logTmNoRecv.timedout())
  254. {
  255. LOG(MCdebugProgress(1), unknownJob, "No Receiver tag=%d", dataPacket->header->tag);
  256. logTmNoRecv.reset(tracingPeriod);
  257. }
  258. }
  259. }
  260. catch (IException *e)
  261. {
  262. pexception("Client Exception",e);
  263. break;
  264. }
  265. }
  266. PROGLOG("Receive server stopping, aborting receivers");
  267. {
  268. CriticalBlock b(receiversCrit);
  269. SuperHashIteratorOf<CMCastReceiver> iter(receivers);
  270. ForEach (iter)
  271. iter.query().stop();
  272. }
  273. return 0;
  274. }
  275. CMCastRecvServer *queryMCastRecvServer()
  276. {
  277. return mcastRecvServer;
  278. }
  279. void startMCastRecvServer(const char *broadcastRoot, unsigned groupMember, const char *mcastIp, unsigned mcastPort, unsigned ackPort)
  280. {
  281. SocketEndpoint mcastEp(mcastIp, mcastPort);
  282. startMCastRecvServer(broadcastRoot, groupMember, mcastEp, ackPort);
  283. }
  284. void startMCastRecvServer(const char *broadcastRoot, unsigned groupMember, SocketEndpoint &mcastEp, unsigned ackPort)
  285. {
  286. CriticalBlock b(*recvServerCrit);
  287. assertex(!mcastRecvServer);
  288. mcastRecvServer = new CMCastRecvServer(broadcastRoot, groupMember, mcastEp, ackPort);
  289. }
  290. void stopMCastRecvServer()
  291. {
  292. CriticalBlock b(*recvServerCrit);
  293. if (mcastRecvServer)
  294. {
  295. mcastRecvServer->stop();
  296. mcastRecvServer = NULL;
  297. }
  298. }
  299. //
  300. CMCastReceiver::CMCastReceiver(bctag_t _tag) : tag(_tag), nextPacket(0), eosHit(false), aborted(false)
  301. {
  302. { CriticalBlock b(*recvServerCrit);
  303. if (!mcastRecvServer)
  304. throw MakeStringException(0, "Multicast receive server not running");
  305. }
  306. logTmRecv.reset(tracingPeriod);
  307. logTmCons.reset(tracingPeriod);
  308. mcastRecvServer->registerReceiver(*this);
  309. }
  310. CMCastReceiver::~CMCastReceiver()
  311. {
  312. assertex(mcastRecvServer);
  313. mcastRecvServer->deregisterReceiver(*this);
  314. }
  315. bool CMCastReceiver::packetReceived(CDataPacket &dataPacket, bool &complete)
  316. {
  317. if (dataPacket.header->total == pktsReceived.ordinality())
  318. complete = true;
  319. else if (NotFound == pktsReceived.bSearch(dataPacket.header->id, pktOrderFunc))
  320. {
  321. bool ret = false;
  322. bool isNew;
  323. pktsReceived.bAdd(dataPacket.header->id, pktOrderFunc, isNew);
  324. assertex(isNew);
  325. CriticalBlock b(crit);
  326. bool overflow = PACKET_QUEUE_LIMIT && (dataPackets.ordinality() > PACKET_QUEUE_LIMIT);
  327. if (!overflow || dataPacket.header->id < dataPackets.item(0).header->id)
  328. {
  329. // process packet
  330. unsigned level;
  331. if (tracingPeriod && logTmRecv.timedout())
  332. {
  333. level = 1;
  334. logTmRecv.reset(tracingPeriod);
  335. }
  336. else level = 110;
  337. LOG(MCdebugProgress(level), unknownJob, "\nReceived \n"
  338. "packet id = %d\n"
  339. "packet length = %d\n"
  340. , dataPacket.header->id, dataPacket.header->length);
  341. {
  342. dataPacket.Link();
  343. CInterface *_dataPacket = &dataPacket;
  344. dataPackets.bAdd(_dataPacket, dataPktOrderFunc, isNew);
  345. }
  346. assertex(isNew);
  347. if (overflow)
  348. {
  349. LOG(MCdebugProgress(50), unknownJob, "Overflow, removed traling packet %d", dataPackets.item(0).header->id);
  350. unsigned pos = pktsReceived.bSearch(dataPackets.item(0).header->id, pktOrderFunc);
  351. assertex(NotFound != pos);
  352. dataPackets.remove(0);
  353. pktsReceived.remove(pos);
  354. }
  355. receivedSem.signal();
  356. ret = true;
  357. }
  358. complete = (dataPacket.header->total == pktsReceived.ordinality());
  359. return ret;
  360. }
  361. return false;
  362. }
  363. bool CMCastReceiver::buildNack(MCAckPacketHeader *ackPacket, size32_t &sz, unsigned total)
  364. {
  365. unsigned *nackList = (unsigned *)(((byte *)ackPacket)+sizeof(MCAckPacketHeader));
  366. const unsigned *nackUpper = (unsigned *)((byte *)ackPacket)+MC_ACK_PACKET_SIZE-sizeof(unsigned);
  367. unsigned *nList = nackList;
  368. unsigned nackStart = (unsigned)-1, prev = (unsigned)-1;
  369. unsigned nackEnd;
  370. unsigned p;
  371. for (p=0; p<pktsReceived.ordinality(); p++)
  372. {
  373. unsigned pkt = pktsReceived.item(p);
  374. if (prev != pkt-1)
  375. {
  376. nackStart = prev+1;
  377. if (pkt == nackStart+1)
  378. {
  379. nackEnd = nackStart;
  380. nackStart |= NACK_SINGLETON;
  381. }
  382. else
  383. nackEnd = pkt-1;
  384. *nList++ = nackStart;
  385. if (0 == (nackStart & NACK_SINGLETON))
  386. {
  387. assertex((unsigned)-1 != nackEnd);
  388. *nList++ = nackEnd;
  389. }
  390. }
  391. prev = pkt;
  392. assertex(nList<=nackUpper);
  393. }
  394. nackEnd = prev;
  395. if (total)
  396. {
  397. if (nackEnd == total-1)
  398. ackPacket->ackDone = true;
  399. else
  400. {
  401. ackPacket->ackDone = false;
  402. *nList++ = nackEnd+1;
  403. *nList++ = total-1;
  404. }
  405. }
  406. else
  407. ackPacket->ackDone = false;
  408. if (nList != nackList)
  409. {
  410. *(nList++) = NACK_ENDMARKER;
  411. sz = (size32_t)sizeof(MCAckPacketHeader)+(((byte *)nList)-((byte *)nackList));
  412. return true;
  413. }
  414. else
  415. {
  416. sz = 0;
  417. return false;
  418. }
  419. }
  420. void CMCastReceiver::reset()
  421. {
  422. dataPackets.kill();
  423. pktsReceived.kill();
  424. nextPacket = 0;
  425. eosHit = false;
  426. }
  427. // IBroadcastReceiver impl.
  428. bool CMCastReceiver::eos()
  429. {
  430. return eosHit;
  431. }
  432. void CMCastReceiver::stop()
  433. {
  434. aborted = true;
  435. receivedSem.signal();
  436. }
  437. bool CMCastReceiver::read(MemoryBuffer &mb)
  438. {
  439. loop
  440. {
  441. receivedSem.wait();
  442. if (aborted)
  443. {
  444. aborted = false;
  445. return false;
  446. }
  447. bool hadSome=false;
  448. loop
  449. {
  450. Linked<CDataPacket> dataPacket;
  451. { CriticalBlock b(crit);
  452. if (dataPackets.ordinality() && nextPacket==dataPackets.tos().header->id)
  453. {
  454. dataPacket.set(&dataPackets.tos());
  455. dataPackets.pop();
  456. hadSome = true;
  457. }
  458. else break;
  459. }
  460. if (dataPacket)
  461. {
  462. mb.append(dataPacket->header->length, dataPacket->queryData());
  463. unsigned level;
  464. if (tracingPeriod && logTmCons.timedout())
  465. {
  466. level = 1;
  467. logTmCons.reset(tracingPeriod);
  468. }
  469. else level = 110;
  470. LOG(MCdebugProgress(level), unknownJob, "Pkt %d consumed", dataPacket->header->id);
  471. if (nextPacket == dataPacket->header->total-1)
  472. {
  473. eosHit = true;
  474. return true;
  475. }
  476. nextPacket++;
  477. }
  478. }
  479. if (hadSome) return true;
  480. }
  481. return true;
  482. }
  483. //
  484. //
  485. // CAckProcessor impl.
  486. //
  487. struct TagJobIdTuple
  488. {
  489. TagJobIdTuple(bctag_t _tag, unsigned _jobId) : tag(_tag), jobId(_jobId) { }
  490. bctag_t tag;
  491. unsigned jobId;
  492. bool operator ==(TagJobIdTuple &other) { return other.tag == tag && other.jobId == jobId; }
  493. };
  494. class CAckProcessor : public CInterface
  495. {
  496. CriticalSection crit;
  497. CPktNodeTable pktNodeTable, nodeAckTable;
  498. UnsignedArray nackOrder;
  499. CUIntTable clientsDone;
  500. unsigned nodes;
  501. TagJobIdTuple tagJobId;
  502. #ifdef TRACKLASTACK
  503. unsigned *nodeLastAckTimes;
  504. #endif
  505. public:
  506. CAckProcessor(bctag_t tag, unsigned jobId, unsigned _nodes) : tagJobId( tag, jobId), nodes(_nodes)
  507. {
  508. #ifdef TRACKLASTACK
  509. nodeLastAckTimes = (unsigned *)malloc(nodes * sizeof(unsigned));
  510. memset(nodeLastAckTimes, 0, nodes * sizeof(unsigned));
  511. #endif
  512. }
  513. const void *queryFindParam() const
  514. {
  515. return (const void *) &tagJobId;
  516. }
  517. void clear()
  518. {
  519. pktNodeTable.kill();
  520. nackOrder.kill();
  521. nodeAckTable.kill();
  522. clientsDone.kill();
  523. }
  524. void initNackTable(unsigned packets)
  525. {
  526. pktNodeTable.reinit(packets);
  527. while (packets--)
  528. {
  529. if (packets < 10 && packets%3==0) continue;
  530. CUIntTableItem *nodeMap = new CUIntTableItem(packets);
  531. pktNodeTable.add(* nodeMap);
  532. nackOrder.append(packets);
  533. unsigned node;
  534. for (node=0; node<nodes; node++)
  535. nodeMap->add(* new CUIntValue(node));
  536. }
  537. }
  538. void addNackAll(unsigned pkt)
  539. {
  540. unsigned n = nodes;
  541. while (n--)
  542. addNack(pkt, n);
  543. }
  544. void addNack(unsigned pkt, unsigned node)
  545. {
  546. CUIntTableItem *nodeMap = pktNodeTable.find(pkt);
  547. if (!nodeMap)
  548. {
  549. nodeMap = new CUIntTableItem(pkt);
  550. pktNodeTable.add(* nodeMap);
  551. bool isNew;
  552. nackOrder.bAdd(pkt, pktRevOrderFunc, isNew);
  553. assertex(isNew);
  554. }
  555. if (!nodeMap->find(node))
  556. {
  557. nodeMap->add(* new CUIntValue(node));
  558. LOG(MCdebugProgress(150), unknownJob, "New NACK pkt = %d from node %d", pkt, node);
  559. }
  560. }
  561. void deleteNack(unsigned node, unsigned pkt)
  562. {
  563. CriticalBlock b(crit);
  564. if (pktNodeTable.remove(&pkt))
  565. {
  566. unsigned pos = nackOrder.bSearch(pkt, pktRevOrderFunc);
  567. assertex(pos != NotFound);
  568. nackOrder.remove(pos);
  569. }
  570. }
  571. CUIntTableItem *detachPktNack()
  572. {
  573. // detach first nacked pkt.
  574. CriticalBlock b(crit);
  575. if (!nackOrder.ordinality()) return NULL;
  576. unsigned nackPkt = nackOrder.pop();
  577. CUIntTableItem *map = (CUIntTableItem *)pktNodeTable.find(nackPkt);
  578. assertex(map);
  579. map->Link();
  580. pktNodeTable.removeExact(map);
  581. return map;
  582. }
  583. #ifdef TRACKLASTACK
  584. unsigned queryLastAckMs(unsigned node)
  585. {
  586. return nodeLastAckTimes[node];
  587. }
  588. #endif
  589. unsigned queryAcked(unsigned node)
  590. {
  591. CriticalBlock b(crit);
  592. CUIntTableItem *pktAcks = nodeAckTable.find(node);
  593. if (!pktAcks) return (unsigned)-1;
  594. return pktAcks->count();
  595. }
  596. bool queryClientDone(unsigned node)
  597. {
  598. CriticalBlock b(crit);
  599. if (clientsDone.find(node))
  600. return true;
  601. return false;
  602. }
  603. virtual int handleAck(MCAckPacketHeader *ackPacket)
  604. {
  605. if (ackPacket->ackDone)
  606. {
  607. CriticalBlock b(crit);
  608. clientsDone.replace(* new CUIntValue(ackPacket->node));
  609. LOG(MCdebugProgress(150), unknownJob, "Node %d signalled done", ackPacket->node);
  610. }
  611. else
  612. {
  613. #ifdef TRACKLASTACK
  614. nodeLastAckTimes[ackPacket->node] = msTick();
  615. #endif
  616. const unsigned *nackList = (unsigned *)(((byte *)ackPacket)+sizeof(MCAckPacketHeader));
  617. unsigned prevEnd = 0;
  618. StringBuffer msg;
  619. loop
  620. {
  621. unsigned nackStart = *nackList++, nackEnd;
  622. if (nackStart == NACK_ENDMARKER)
  623. break;
  624. if (nackStart & NACK_SINGLETON)
  625. {
  626. nackStart &= ~NACK_SINGLETON;
  627. nackEnd = nackStart;
  628. }
  629. else
  630. nackEnd = *nackList++;
  631. // fill-in implicit acked from node (prevEnd->nackStart)
  632. CriticalBlock b(crit);
  633. CUIntTableItem *ackedPkts = nodeAckTable.find(ackPacket->node);
  634. if (!ackedPkts)
  635. {
  636. ackedPkts = new CUIntTableItem(ackPacket->node);
  637. nodeAckTable.add(* ackedPkts);
  638. }
  639. unsigned pkt;
  640. for (pkt = prevEnd; pkt<nackStart; pkt++)
  641. {
  642. if (!ackedPkts->find(pkt))
  643. {
  644. ackedPkts->add(* new CUIntValue(pkt));
  645. deleteNack(ackPacket->node, pkt);
  646. }
  647. }
  648. if (prevEnd<nackStart) msg.appendf(", ACK[%d-%d]", prevEnd, nackStart-1);
  649. for (pkt = nackStart; pkt<=nackEnd; pkt++)
  650. addNack(pkt, ackPacket->node);
  651. msg.appendf(", NACK[%d-%d]", nackStart, nackEnd);
  652. prevEnd = nackEnd+1;
  653. }
  654. if (0 ==ackPacket->node)
  655. LOG(MCdebugProgress(55), unknownJob, "ACK/NACK node=%d, %s", ackPacket->node, msg.str());
  656. }
  657. return 0;
  658. }
  659. };
  660. static int CIOrderFunc(CInterface **cI1, CInterface **cI2)
  661. {
  662. return (*((CCountedItem **)cI2))->count-(*((CCountedItem **)cI1))->count;
  663. }
  664. static CriticalSection gBCInUseCrit;
  665. static bool groupBroadCastInUse = false;
  666. typedef ThreadSafeOwningSimpleHashTableOf<CAckProcessor, TagJobIdTuple> CAckProcessorTable;
  667. class CMCastBroadcaster : public CInterface, implements IBroadcast
  668. {
  669. unsigned nodes, ackPort;
  670. SocketEndpointArray eps;
  671. Owned<ISocket> mcastSock;
  672. IArrayOf<ISocket> unicastSocks;
  673. Owned<ISocket> ackSock;
  674. bool stopped;
  675. CAckProcessorTable ackProcessorTable;
  676. class CThreaded : public Thread
  677. {
  678. CMCastBroadcaster &mcB;
  679. public:
  680. CThreaded(CMCastBroadcaster &_mcB) : Thread("CMCastBroadcaster"), mcB(_mcB) { start(); }
  681. virtual int run() { mcB.main(); return 1; }
  682. } *threaded;
  683. public:
  684. IMPLEMENT_IINTERFACE;
  685. CMCastBroadcaster(SocketEndpointArray &_eps, SocketEndpoint &mcastEp, unsigned _ackPort)
  686. {
  687. {
  688. CriticalBlock b(gBCInUseCrit);
  689. assertex(!groupBroadCastInUse);
  690. groupBroadCastInUse = true;
  691. }
  692. srand((unsigned) time(NULL));
  693. nextJobId = rand();
  694. ackPort = _ackPort;
  695. CloneArray(eps, _eps);
  696. nodes = eps.ordinality();
  697. StringBuffer ipStr;
  698. mcastEp.getIpText(ipStr);
  699. mcastSock.setown(ISocket::multicast_connect(mcastEp.port,ipStr.str(),5));
  700. mcastSock->set_send_buffer_size(UDP_SEND_SIZE);
  701. ArrayIteratorOf<SocketEndpointArray, SocketEndpoint> iter(eps);
  702. ForEach (iter)
  703. {
  704. StringBuffer ipStr;
  705. iter.query().getIpText(ipStr);
  706. unicastSocks.append(*ISocket::udp_connect(mcastEp.port, ipStr.str()));
  707. }
  708. stopped = false;
  709. threaded = new CThreaded(*this);
  710. }
  711. ~CMCastBroadcaster()
  712. {
  713. CriticalBlock b(gBCInUseCrit);
  714. groupBroadCastInUse = false;
  715. if (threaded) { stop(); threaded->join(); threaded->Release(); }
  716. }
  717. void main()
  718. {
  719. ackSock.setown(ISocket::udp_create(ackPort));
  720. ackSock->set_receive_buffer_size(UDP_RECV_SIZE);
  721. try
  722. {
  723. MemoryBuffer mb;
  724. MCAckPacketHeader *ackPacket = (MCAckPacketHeader *)mb.reserveTruncate(MC_ACK_PACKET_SIZE);
  725. loop
  726. {
  727. size32_t szRead;
  728. ackSock->read(ackPacket, sizeof(MCAckPacketHeader), MC_ACK_PACKET_SIZE, szRead, WAIT_FOREVER);
  729. TagJobIdTuple tagJobId(ackPacket->tag, ackPacket->jobId);
  730. CAckProcessor *ackProcessor = ackProcessorTable.find(tagJobId);
  731. if (ackProcessor)
  732. ackProcessor->handleAck(ackPacket);
  733. }
  734. }
  735. catch (IException *e)
  736. {
  737. if (JSOCKERR_graceful_close != e->errorCode())
  738. LOG(MCwarning, unknownJob, e, "Ack handler");
  739. }
  740. }
  741. virtual bool send(bctag_t tag, unsigned size, const void *data)
  742. {
  743. unsigned packets = 0;
  744. unsigned remaining = size;
  745. unsigned offset = 0;
  746. #ifdef _DEBUG
  747. MTimeSection * mt = new MTimeSection(queryActiveTimer(), "ServerBroadcast", "SERVER BROADCAST"); // MORE is ServerBroadcast a scope (where) or a name (what)?
  748. #endif
  749. unsigned maxDataSz = MC_PACKET_SIZE-sizeof(MCPacketHeader);
  750. MemoryBuffer headerMb;
  751. MCPacketHeader *header = (MCPacketHeader *)headerMb.reserveTruncate(MC_PACKET_SIZE);
  752. byte *pktData = ((byte*)header) + sizeof(MCPacketHeader);
  753. header->total = size / maxDataSz;
  754. if (size % maxDataSz) header->total++;
  755. header->cmd = MCPacket_None;
  756. header->tag = tag;
  757. unsigned jobId = getNextJobId();
  758. header->jobId = jobId;
  759. CAckProcessor *ackProcessor = new CAckProcessor(tag, jobId, nodes);
  760. struct ScopedTableElem
  761. {
  762. ScopedTableElem(CAckProcessorTable &_table, CAckProcessor &_processor) : table(_table), processor(_processor) { table.replace(processor); }
  763. ~ScopedTableElem() { table.removeExact(&processor); }
  764. CAckProcessorTable &table;
  765. CAckProcessor &processor;
  766. } scopedTableElem(ackProcessorTable, *ackProcessor);
  767. LOG(MCdebugProgress(30), unknownJob, "Broadcasting");
  768. byte *dataPtr = (byte *) data;
  769. packets = (remaining+maxDataSz-1)/maxDataSz;
  770. ackProcessor->initNackTable(packets);
  771. LOG(MCdebugProgress(20), unknownJob, "Broadcasting %d packets", packets);
  772. unsigned lastPoll = msTick();
  773. CCountTable resendPktTable, resendNodeTable;
  774. CTimeMon logTm(5000);
  775. while (!stopped)
  776. {
  777. LOG(MCdebugProgress(30), unknownJob, "Resending cycle");
  778. while (!stopped)
  779. {
  780. Owned<CUIntTableItem> nodeMap = ackProcessor->detachPktNack();
  781. if (!nodeMap) break;
  782. unsigned pkt = nodeMap->queryPacket();
  783. assertex(pkt<packets);
  784. header->id = pkt;
  785. header->length = (pkt+1==packets) ? size-(maxDataSz*pkt) : maxDataSz;
  786. header->offset = maxDataSz*pkt;
  787. header->total = packets;
  788. header->cmd = MCPacket_None;
  789. // refill pktData
  790. memcpy(pktData, ((byte*)data)+header->offset, header->length);
  791. bool unicast = useUniCast && (nodeMap->count() <= unicastLimit || (nodeMap->count() * 100 / nodes < unicastPcent));
  792. unsigned level;
  793. if (logTm.timedout())
  794. {
  795. level = 1;
  796. logTm.reset(5000);
  797. }
  798. else level = 100;
  799. unsigned nodeCount = nodeMap->count();
  800. #ifdef COUNTRESENDS
  801. unsigned resendCount = resendPktTable.incItem(pkt);
  802. LOG(MCdebugProgress(level), unknownJob, "Resending packet %s (%d nodes request) %d (resent %d times)", unicast?"UNICAST":"MULTICAST", nodeMap->count(), pkt, resendCount);
  803. #endif
  804. if (unicast)
  805. {
  806. #ifdef COUNTRESENDS
  807. StringBuffer nodeList;
  808. #endif
  809. SuperHashIteratorOf<CUIntValue> iter(*nodeMap);
  810. if (iter.first())
  811. {
  812. loop
  813. {
  814. unsigned node = iter.query().queryValue();
  815. unicastSocks.item(node).write(header, sizeof(MCPacketHeader)+header->length);
  816. #ifdef COUNTRESENDS
  817. resendNodeTable.incItem(node);
  818. nodeList.append(node);
  819. #endif
  820. if (!iter.next()) break;
  821. #ifdef COUNTRESENDS
  822. nodeList.append(", ");
  823. #endif
  824. }
  825. #ifdef COUNTRESENDS
  826. LOG(MCdebugProgress(level), unknownJob, "Unicasted to nodes: %s", nodeList.str());
  827. #endif
  828. }
  829. }
  830. else
  831. {
  832. #ifdef COUNTRESENDS
  833. SuperHashIteratorOf<CUIntValue> iter(*nodeMap);
  834. ForEach (iter)
  835. {
  836. unsigned node = iter.query().queryValue();
  837. resendNodeTable.incItem(-node);
  838. }
  839. #endif
  840. mcastSock->write(header, sizeof(MCPacketHeader)+header->length);
  841. }
  842. }
  843. if (stopped) break;
  844. // run out of nacks - but not all acked - poll for more nacks.
  845. StringBuffer ackStr("Ack counts (total=");
  846. ackStr.append(packets).append("; ");
  847. unsigned node;
  848. LOG(MCdebugProgress(30), unknownJob, "Polling");
  849. UnsignedArray needPolling;
  850. for (node=0; node<nodes; node++)
  851. {
  852. unsigned ackedPkts = ackProcessor->queryAcked(node);
  853. if (packets != ackedPkts && !ackProcessor->queryClientDone(node))
  854. {
  855. needPolling.append(node);
  856. ackStr.append("n(").append(node).append("):").append((unsigned)-1==ackedPkts?0:ackedPkts).append(' ');
  857. }
  858. }
  859. if (needPolling.ordinality())
  860. {
  861. unsigned diff = msTick()-lastPoll;
  862. if (diff < pollDelay)
  863. {
  864. DBGLOG("Sleeping %d ms", pollDelay-diff);
  865. MilliSleep(pollDelay-diff);
  866. }
  867. header->cmd = MCPacket_Poll;
  868. header->total = packets;
  869. bool unicastPoll = useUniCast && (needPolling.ordinality() <= unicastLimit || (needPolling.ordinality() * 100 / nodes < unicastPcent));
  870. StringBuffer pollStr("Polled ");
  871. if (unicastPoll)
  872. {
  873. pollStr.append("node list : ");
  874. ForEachItemIn(n, needPolling)
  875. {
  876. unsigned node = needPolling.item(n);
  877. pollStr.append(node).append("(");
  878. eps.item(node).getUrlStr(pollStr);
  879. pollStr.append(") ");
  880. #ifdef TRACKLASTACK
  881. unsigned lastMs = ackProcessor->queryLastAckMs(node);
  882. if (lastMs)
  883. pollStr.append('[').append(msTick()-lastMs).append("] ");
  884. #endif
  885. unicastSocks.item(node).write(header, sizeof(MCPacketHeader));
  886. }
  887. }
  888. else // mcast
  889. {
  890. pollStr.append(" ALL (mcast) nodes");
  891. mcastSock->write(header, sizeof(MCPacketHeader));
  892. }
  893. LOG(MCdebugProgress(30), unknownJob, "%s", pollStr.str());
  894. LOG(MCdebugProgress(40), unknownJob, "%s", ackStr.str());
  895. lastPoll = msTick();
  896. }
  897. else
  898. {
  899. LOG(MCdebugProgress(10), unknownJob, "Done");
  900. bool allDone = true;
  901. LOG(MCdebugProgress(30), unknownJob, "Checking Done");
  902. for (node=0; node<nodes; node++)
  903. {
  904. if (!ackProcessor->queryClientDone(node))
  905. {
  906. allDone = false;
  907. break;
  908. }
  909. }
  910. if (allDone)
  911. break; //finished!
  912. else
  913. {
  914. header->cmd = MCPacket_Poll;
  915. header->total = packets;
  916. mcastSock->write(header, sizeof(MCPacketHeader));
  917. }
  918. }
  919. }
  920. #ifdef _DEBUG
  921. delete mt;
  922. #endif
  923. #ifdef COUNTRESENDS
  924. // timings/stats
  925. LOG(MCdebugProgress(110), unknownJob, "Packet resend history");
  926. SuperHashIteratorOf<CCountedItem> pktIter(resendPktTable);
  927. CopyCIArrayOf<CCountedItem> list;
  928. ForEach (pktIter)
  929. {
  930. CCountedItem &rv = pktIter.query();
  931. list.append(rv);
  932. }
  933. list.sort(CIOrderFunc);
  934. ForEachItemIn(ci1, list)
  935. {
  936. CCountedItem &rv = list.item(ci1);
  937. LOG(MCdebugProgress(110), unknownJob, "Resent packet %d, %d times", rv.queryValue(), rv.count);
  938. }
  939. list.kill();
  940. LOG(MCdebugProgress(110), unknownJob, "Node resend history");
  941. SuperHashIteratorOf<CCountedItem> nodeIter(resendNodeTable);
  942. ForEach (nodeIter)
  943. {
  944. CCountedItem &rv = nodeIter.query();
  945. list.append(rv);
  946. }
  947. list.sort(CIOrderFunc);
  948. ForEachItemIn(ci2, list)
  949. {
  950. CCountedItem &rv = list.item(ci2);
  951. int node = rv.queryValue();
  952. if (node < 0)
  953. LOG(MCdebugProgress(110), unknownJob, "Multicast to all nodes due (in part) to node %d, %d times", -node, rv.count);
  954. else
  955. LOG(MCdebugProgress(110), unknownJob, "Node %d was resent to %d times", node, rv.count);
  956. }
  957. #endif
  958. LOG(MCdebugProgress(10), unknownJob, "All %d packets from all %d nodes acknowledged", packets, nodes);
  959. #ifdef _DEBUG
  960. StringBuffer str;
  961. if (queryActiveTimer())
  962. queryActiveTimer()->getTimings(str);
  963. LOG(MCdebugProgress(10), unknownJob, "%s", str.str());
  964. #endif
  965. return !stopped;
  966. }
  967. virtual void stop()
  968. {
  969. if (!stopped)
  970. {
  971. stopped = true;
  972. if (ackSock) ackSock->close();
  973. }
  974. }
  975. virtual void stopClients()
  976. {
  977. // TBD
  978. unsigned attempts = 5;
  979. while (attempts--)
  980. {
  981. PROGLOG("Sending stop via mcast");
  982. MCPacketHeader header;
  983. header.cmd = MCPacket_Stop;
  984. mcastSock->write(&header, sizeof(MCPacketHeader));
  985. MilliSleep(1000);
  986. }
  987. }
  988. };
  989. IBroadcast *createGroupBroadcast(SocketEndpointArray &eps, const char *mcastIp, unsigned mcastPort, unsigned ackPort)
  990. {
  991. SocketEndpoint mcastEp(mcastIp, mcastPort);
  992. return createGroupBroadcast(eps, mcastEp, ackPort);
  993. }
  994. IBroadcast *createGroupBroadcast(SocketEndpointArray &eps, SocketEndpoint &mcastEp, unsigned ackPort)
  995. {
  996. return new CMCastBroadcaster(eps, mcastEp, ackPort);
  997. }
  998. IBroadcastReceiver *createGroupBroadcastReceiver(bctag_t tag)
  999. {
  1000. return new CMCastReceiver(tag);
  1001. }
  1002. void setBroadcastOpt(bcopt_t opt, unsigned value)
  1003. {
  1004. switch (opt)
  1005. {
  1006. case bcopt_pollDelay:
  1007. pollDelay = value;
  1008. break;
  1009. case bcopt_useUniCast:
  1010. useUniCast = 0!=value;
  1011. break;
  1012. case bcopt_unicastLimit:
  1013. unicastLimit = value;
  1014. break;
  1015. case bcopt_unicastPcent:
  1016. unicastPcent = value;
  1017. break;
  1018. default:
  1019. assertex(!"Unknown broadcast option");
  1020. }
  1021. }