udpmsgpk.cpp 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #ifdef _WIN32
  14. #pragma warning( disable : 4786)
  15. #pragma warning( disable : 4018)
  16. #endif
  17. #include "platform.h"
  18. #undef new
  19. #include <string>
  20. #include <map>
  21. #include <queue>
  22. #include "jthread.hpp"
  23. #include "jlog.hpp"
  24. #include "jisem.hpp"
  25. #include "udplib.hpp"
  26. #include "udptrr.hpp"
  27. #include "udptrs.hpp"
  28. #include "roxiemem.hpp"
  29. using roxiemem::DataBuffer;
  30. using roxiemem::IRowManager;
  31. #if defined(_DEBUG) && defined(_WIN32) && !defined(USING_MPATROL)
  32. #define new new(_NORMAL_BLOCK, __FILE__, __LINE__)
  33. #endif
  34. atomic_t unwantedDiscarded;
  35. // PackageSequencer ====================================================================================
  36. //
  37. typedef DataBuffer * data_buffer_ptr;
  38. int g_sequence_compare(const void *arg1, const void *arg2 )
  39. {
  40. DataBuffer *dataBuff1 = *(DataBuffer **) arg1;
  41. DataBuffer *dataBuff2 = *(DataBuffer **) arg2;
  42. UdpPacketHeader *pktHdr1 = (UdpPacketHeader*) dataBuff1->data;
  43. UdpPacketHeader *pktHdr2 = (UdpPacketHeader*) dataBuff2->data;
  44. if (pktHdr1->pktSeq < pktHdr2->pktSeq) return -1;
  45. if (pktHdr1->pktSeq > pktHdr2->pktSeq) return 1;
  46. return 0;
  47. }
  48. class PackageSequencer : public CInterface, implements IInterface
  49. {
  50. DataBuffer *firstPacket;
  51. DataBuffer *lastContiguousPacket;
  52. unsigned metaSize;
  53. unsigned headerSize;
  54. const void *header;
  55. MemoryBuffer metadata;
  56. InterruptableSemaphore dataAvailable; // MORE - need to work out when to interrupt it!
  57. public:
  58. IMPLEMENT_IINTERFACE;
  59. PackageSequencer()
  60. {
  61. if (checkTraceLevel(TRACE_MSGPACK, 3))
  62. DBGLOG("UdpCollator: PackageSequencer::PackageSequencer this=%p", this);
  63. metaSize = 0;
  64. headerSize = 0;
  65. header = NULL;
  66. firstPacket = NULL;
  67. lastContiguousPacket = NULL;
  68. }
  69. ~PackageSequencer()
  70. {
  71. if (checkTraceLevel(TRACE_MSGPACK, 3))
  72. DBGLOG("UdpCollator: PackageSequencer::~PackageSequencer this=%p", this);
  73. DataBuffer *finger = firstPacket;
  74. while (finger)
  75. {
  76. DataBuffer *goer = finger;
  77. finger = finger->msgNext;
  78. goer->Release();
  79. }
  80. }
  81. DataBuffer *next(DataBuffer *after)
  82. {
  83. dataAvailable.wait(); // MORE - when do I interrupt? Should I time out? Will potentially block indefinitely if sender restarts (leading to an abandoned packet) or stalls.
  84. DataBuffer *ret;
  85. if (after)
  86. ret = after->msgNext;
  87. else
  88. ret = firstPacket;
  89. if (checkTraceLevel(TRACE_MSGPACK, 5))
  90. {
  91. if (ret)
  92. {
  93. UdpPacketHeader *pktHdr = (UdpPacketHeader*) ret->data;
  94. DBGLOG("UdpCollator: PackageSequencer::next returns ruid=" RUIDF " id=0x%.8X mseq=%u pkseq=0x%.8X node=%u dataBuffer=%p this=%p",
  95. pktHdr->ruid, pktHdr->msgId, pktHdr->msgSeq, pktHdr->pktSeq, pktHdr->nodeIndex, ret, this);
  96. }
  97. else
  98. DBGLOG("UdpCollator: PackageSequencer::next returns NULL this=%p", this);
  99. }
  100. return ret;
  101. }
  102. bool insert(DataBuffer *dataBuff) // returns true if message is complete.
  103. {
  104. bool res = false;
  105. assert(dataBuff->msgNext == NULL);
  106. UdpPacketHeader *pktHdr = (UdpPacketHeader*) dataBuff->data;
  107. if (checkTraceLevel(TRACE_MSGPACK, 5))
  108. {
  109. DBGLOG("UdpCollator: PackageSequencer::insert ruid=" RUIDF " id=0x%.8X mseq=%u pkseq=0x%.8X node=%u dataBuffer=%p this=%p",
  110. pktHdr->ruid, pktHdr->msgId, pktHdr->msgSeq, pktHdr->pktSeq, pktHdr->nodeIndex, dataBuff, this);
  111. }
  112. DataBuffer *finger;
  113. DataBuffer *prev;
  114. if (lastContiguousPacket)
  115. {
  116. UdpPacketHeader *oldHdr = (UdpPacketHeader*) lastContiguousPacket->data;
  117. if (pktHdr->pktSeq <= oldHdr->pktSeq)
  118. {
  119. // discard duplicated incoming packet - should be uncommon unless we requested a resend for a packet already in flight
  120. if (checkTraceLevel(TRACE_MSGPACK, 5))
  121. DBGLOG("UdpCollator: Discarding duplicate incoming packet");
  122. dataBuff->Release();
  123. return false;
  124. }
  125. finger = lastContiguousPacket->msgNext;
  126. prev = lastContiguousPacket;
  127. }
  128. else
  129. {
  130. finger = firstPacket;
  131. prev = NULL;
  132. }
  133. while (finger)
  134. {
  135. UdpPacketHeader *oldHdr = (UdpPacketHeader*) finger->data;
  136. if (pktHdr->pktSeq == oldHdr->pktSeq)
  137. {
  138. // discard duplicated incoming packet - should be uncommon unless we requested a resend for a packet already in flight
  139. if (checkTraceLevel(TRACE_MSGPACK, 5))
  140. DBGLOG("UdpCollator: Discarding duplicate incoming packet");
  141. dataBuff->Release();
  142. return false;
  143. }
  144. else if (pktHdr->pktSeq < oldHdr->pktSeq)
  145. {
  146. break;
  147. }
  148. else
  149. {
  150. prev = finger;
  151. finger = finger->msgNext;
  152. }
  153. }
  154. if (prev)
  155. {
  156. assert(prev->msgNext == finger);
  157. prev->msgNext = dataBuff;
  158. }
  159. else
  160. firstPacket = dataBuff;
  161. dataBuff->msgNext = finger;
  162. if (prev == lastContiguousPacket)
  163. {
  164. unsigned prevseq;
  165. if (lastContiguousPacket)
  166. {
  167. prevseq = ((UdpPacketHeader*) lastContiguousPacket->data)->pktSeq & UDP_PACKET_SEQUENCE_MASK;
  168. finger = lastContiguousPacket->msgNext;
  169. }
  170. else
  171. {
  172. prevseq = (unsigned) -1;
  173. finger = firstPacket;
  174. }
  175. while (finger)
  176. {
  177. UdpPacketHeader *fingerHdr = (UdpPacketHeader*) finger->data;
  178. unsigned pktseq = fingerHdr->pktSeq & UDP_PACKET_SEQUENCE_MASK;
  179. if (pktseq == prevseq+1)
  180. {
  181. unsigned packetDataSize = fingerHdr->length - fingerHdr->metalength - sizeof(UdpPacketHeader);
  182. assert(packetDataSize < roxiemem::DATA_ALIGNMENT_SIZE);
  183. if (pktseq == 0)
  184. {
  185. // MORE - Is this safe - header lifetime is somewhat unpredictable without a copy of it...
  186. // Client header is at the start of packet 0
  187. headerSize = *(unsigned short *)(finger->data + sizeof(UdpPacketHeader));
  188. header = finger->data + sizeof(UdpPacketHeader) + sizeof(unsigned short);
  189. packetDataSize -= headerSize + sizeof(unsigned short);
  190. }
  191. if (fingerHdr->metalength)
  192. {
  193. // MORE - may be worth taking the effort to avoid copy of metadata unless it's split up
  194. metaSize += fingerHdr->metalength;
  195. metadata.append(fingerHdr->metalength, finger->data + fingerHdr->length - fingerHdr->metalength);
  196. }
  197. lastContiguousPacket = finger;
  198. dataAvailable.signal();
  199. if (fingerHdr->pktSeq & UDP_PACKET_COMPLETE)
  200. {
  201. res = true;
  202. dataAvailable.signal(); // allowing us to read the NULL that signifies end of message. May prefer to use the flag to stop?
  203. }
  204. }
  205. else
  206. break;
  207. finger = finger->msgNext;
  208. prevseq = pktseq;
  209. }
  210. }
  211. return res;
  212. }
  213. inline const void *getMetaData(unsigned &length)
  214. {
  215. length = metadata.length();
  216. return metadata.toByteArray();
  217. }
  218. inline const void *getMessageHeader()
  219. {
  220. return header;
  221. }
  222. inline unsigned getHeaderSize()
  223. {
  224. return headerSize;
  225. }
  226. };
  227. typedef std::queue<PackageSequencer*> seq_map_que;
  228. typedef std::queue<void*> ptr_que;
  229. typedef unsigned __int64 PUID;
  230. typedef MapXToMyClass<PUID, PUID, PackageSequencer> msg_map;
  231. // MessageResult ====================================================================================
  232. //
  233. class CMessageUnpackCursor: implements IMessageUnpackCursor, public CInterface
  234. {
  235. PackageSequencer *pkSequencer;
  236. DataBuffer *dataBuff;
  237. unsigned current_pos;
  238. Linked<IRowManager> rowMgr;
  239. public:
  240. IMPLEMENT_IINTERFACE;
  241. CMessageUnpackCursor(PackageSequencer *_pkSqncr, IRowManager *_rowMgr) : rowMgr(_rowMgr)
  242. {
  243. if (checkTraceLevel(TRACE_MSGPACK, 3))
  244. DBGLOG("UdpCollator: CMessageUnpackCursor::CMessageUnpackCursor this=%p", this);
  245. pkSequencer = _pkSqncr;
  246. dataBuff = pkSequencer->next(NULL);
  247. UdpPacketHeader *pktHdr = (UdpPacketHeader*) dataBuff->data;
  248. current_pos = sizeof(UdpPacketHeader) + pkSequencer->getHeaderSize() + sizeof(unsigned short); // YUK - code neads cleaning!
  249. unsigned packetDataLimit = pktHdr->length - pktHdr->metalength;
  250. while (current_pos >= packetDataLimit)
  251. {
  252. current_pos = sizeof(UdpPacketHeader);
  253. dataBuff = pkSequencer->next(dataBuff); // NULL, we expect, unless packing was really weird.
  254. if (dataBuff)
  255. {
  256. pktHdr = (UdpPacketHeader*) dataBuff->data;
  257. packetDataLimit = pktHdr->length - pktHdr->metalength;
  258. }
  259. else
  260. break;
  261. }
  262. }
  263. ~CMessageUnpackCursor()
  264. {
  265. if (checkTraceLevel(TRACE_MSGPACK, 3))
  266. DBGLOG("UdpCollator: CMessageUnpackCursor::~CMessageUnpackCursor this=%p", this);
  267. pkSequencer->Release();
  268. }
  269. virtual bool atEOF() const
  270. {
  271. return dataBuff == NULL;
  272. }
  273. virtual bool isSerialized() const
  274. {
  275. return true;
  276. }
  277. virtual const void *getNext(int length)
  278. {
  279. // YUK horrid code! Though packer is even more horrid
  280. void *res = 0;
  281. if (dataBuff)
  282. {
  283. UdpPacketHeader *pktHdr = (UdpPacketHeader*) dataBuff->data;
  284. if (checkTraceLevel(TRACE_MSGPACK, 4))
  285. {
  286. StringBuffer s;
  287. DBGLOG("UdpCollator: CMessageUnpackCursor::getNext(%u) pos=%u pktLength=%u metaLen=%u ruid=" RUIDF " id=0x%.8X mseq=%u pkseq=0x%.8X node=%u dataBuff=%p this=%p",
  288. length, current_pos, pktHdr->length, pktHdr->metalength,
  289. pktHdr->ruid, pktHdr->msgId, pktHdr->msgSeq, pktHdr->pktSeq,
  290. pktHdr->nodeIndex, dataBuff, this);
  291. }
  292. unsigned packetDataLimit = pktHdr->length - pktHdr->metalength;
  293. if ((packetDataLimit - current_pos) >= length)
  294. {
  295. // Simple case - no need to copy
  296. res = &dataBuff->data[current_pos];
  297. current_pos += length;
  298. while (current_pos >= packetDataLimit)
  299. {
  300. dataBuff = pkSequencer->next(dataBuff);
  301. current_pos = sizeof(UdpPacketHeader);
  302. if (dataBuff)
  303. {
  304. pktHdr = (UdpPacketHeader*) dataBuff->data;
  305. packetDataLimit = pktHdr->length - pktHdr->metalength;
  306. }
  307. else
  308. break;
  309. }
  310. LinkRoxieRow(res);
  311. return res;
  312. }
  313. char *currResLoc = (char*)rowMgr->allocate(length, 0);
  314. res = currResLoc;
  315. while (length && dataBuff)
  316. {
  317. // Spans more than one block - allocate and copy
  318. assert(dataBuff);
  319. unsigned cpyLen = packetDataLimit - current_pos;
  320. if (cpyLen > length) cpyLen = length;
  321. memcpy(currResLoc, &dataBuff->data[current_pos], cpyLen);
  322. length -= cpyLen;
  323. currResLoc += cpyLen;
  324. current_pos += cpyLen;
  325. while (current_pos >= packetDataLimit)
  326. {
  327. dataBuff = pkSequencer->next(dataBuff);
  328. if (dataBuff)
  329. {
  330. current_pos = sizeof(UdpPacketHeader);
  331. pktHdr = (UdpPacketHeader*) dataBuff->data;
  332. packetDataLimit = pktHdr->length - pktHdr->metalength;
  333. }
  334. else
  335. {
  336. current_pos = 0;
  337. pktHdr = NULL;
  338. packetDataLimit = 0;
  339. break;
  340. }
  341. }
  342. }
  343. assertex(!length); // fail if not enough data available
  344. }
  345. else
  346. res = NULL;
  347. return res;
  348. }
  349. };
  350. class CMessageResult : public IMessageResult, CInterface {
  351. PackageSequencer *pkSequencer;
  352. mutable MemoryBuffer metaInfo;
  353. mutable CriticalSection metaCrit;
  354. public:
  355. IMPLEMENT_IINTERFACE;
  356. CMessageResult(PackageSequencer *_pkSqncr)
  357. {
  358. if (checkTraceLevel(TRACE_MSGPACK, 3))
  359. DBGLOG("UdpCollator: CMessageResult::CMessageResult pkSqncr=%p, this=%p", _pkSqncr, this);
  360. pkSequencer = _pkSqncr;
  361. }
  362. ~CMessageResult()
  363. {
  364. if (checkTraceLevel(TRACE_MSGPACK, 3))
  365. DBGLOG("UdpCollator: CMessageResult::~CMessageResult this=%p", this);
  366. pkSequencer->Release();
  367. }
  368. virtual IMessageUnpackCursor *getCursor(IRowManager *rowMgr) const
  369. {
  370. return new CMessageUnpackCursor(LINK(pkSequencer), rowMgr);
  371. }
  372. virtual const void *getMessageHeader(unsigned &length) const
  373. {
  374. length = pkSequencer->getHeaderSize();
  375. return pkSequencer->getMessageHeader();
  376. }
  377. virtual const void *getMessageMetadata(unsigned &length) const
  378. {
  379. return pkSequencer->getMetaData(length);
  380. }
  381. virtual void discard() const
  382. {
  383. if (checkTraceLevel(TRACE_MSGPACK, 2))
  384. DBGLOG("UdpCollator: CMessageResult - Roxie server discarded a packet");
  385. atomic_inc(&unwantedDiscarded);
  386. }
  387. };
  388. // MessageCollator ====================================================================================
  389. //
  390. PUID GETPUID(DataBuffer *dataBuff)
  391. {
  392. UdpPacketHeader *pktHdr = (UdpPacketHeader*) dataBuff->data;
  393. return (((PUID) pktHdr->nodeIndex) << 32) | (PUID) pktHdr->msgSeq;
  394. }
  395. class CMessageCollator : implements IMessageCollator, public CInterface
  396. {
  397. seq_map_que queue;
  398. msg_map mapping;
  399. bool activity;
  400. bool memLimitExceeded;
  401. CriticalSection queueCrit;
  402. CriticalSection mapCrit;
  403. InterruptableSemaphore sem;
  404. Linked<IRowManager> rowMgr;
  405. ruid_t ruid;
  406. unsigned totalBytesReceived;
  407. public:
  408. IMPLEMENT_IINTERFACE;
  409. CMessageCollator(IRowManager *_rowMgr, unsigned _ruid) : rowMgr(_rowMgr), ruid(_ruid)
  410. {
  411. if (checkTraceLevel(TRACE_MSGPACK, 3))
  412. DBGLOG("UdpCollator: CMessageCollator::CMessageCollator rowMgr=%p this=%p ruid=" RUIDF "", _rowMgr, this, ruid);
  413. memLimitExceeded = false;
  414. activity = false; // w/o it there is a race condition
  415. totalBytesReceived = 0;
  416. }
  417. virtual ~CMessageCollator()
  418. {
  419. if (checkTraceLevel(TRACE_MSGPACK, 3))
  420. DBGLOG("UdpCollator: CMessageCollator::~CMessageCollator ruid=" RUIDF ", this=%p", ruid, this);
  421. while (!queue.empty())
  422. {
  423. PackageSequencer *pkSqncr = queue.front();
  424. queue.pop();
  425. pkSqncr->Release();
  426. }
  427. }
  428. virtual ruid_t queryRUID() const
  429. {
  430. return ruid;
  431. }
  432. virtual unsigned queryBytesReceived() const
  433. {
  434. return totalBytesReceived; // Arguably should lock, but can't be bothered. Never going to cause an issue in practice.
  435. }
  436. virtual bool add_package(DataBuffer *dataBuff)
  437. {
  438. UdpPacketHeader *pktHdr = (UdpPacketHeader*) dataBuff->data;
  439. if (checkTraceLevel(TRACE_MSGPACK, 4))
  440. {
  441. DBGLOG("UdpCollator: CMessageCollator::add_package memLimitEx=%d ruid=" RUIDF " id=0x%.8X mseq=%u pkseq=0x%.8X node=%u udpSequence=%u rowMgr=%p this=%p",
  442. memLimitExceeded, pktHdr->ruid, pktHdr->msgId, pktHdr->msgSeq, pktHdr->pktSeq, pktHdr->nodeIndex, pktHdr->udpSequence, (void*)rowMgr, this);
  443. }
  444. if (memLimitExceeded || roxiemem::memPoolExhausted())
  445. {
  446. DBGLOG("UdpCollator: mem limit exceeded");
  447. return false;
  448. }
  449. if (!dataBuff->attachToRowMgr(rowMgr))
  450. {
  451. memLimitExceeded = true;
  452. DBGLOG("UdpCollator: mem limit exceeded");
  453. return(false);
  454. }
  455. activity = true;
  456. totalBytesReceived += pktHdr->length;
  457. PUID puid = GETPUID(dataBuff);
  458. // MORE - I think we leak a PackageSequencer for messages that we only receive parts of - maybe only an issue for "catchall" case
  459. CriticalBlock b(mapCrit);
  460. PackageSequencer *pkSqncr = mapping.getValue(puid);
  461. bool isComplete = false;
  462. if (!pkSqncr)
  463. {
  464. pkSqncr = new PackageSequencer;
  465. mapping.setValue(puid, pkSqncr);
  466. pkSqncr->Release();
  467. }
  468. isComplete = pkSqncr->insert(dataBuff);
  469. if (isComplete)
  470. {
  471. queueCrit.enter();
  472. pkSqncr->Link();
  473. queue.push(pkSqncr);
  474. sem.signal();
  475. queueCrit.leave();
  476. mapping.remove(puid);
  477. }
  478. return(true);
  479. }
  480. virtual IMessageResult *getNextResult(unsigned time_out, bool &anyActivity)
  481. {
  482. if (checkTraceLevel(TRACE_MSGPACK, 3))
  483. DBGLOG("UdpCollator: CMessageCollator::getNextResult() timeout=%.8X ruid=%u rowMgr=%p this=%p", time_out, ruid, (void*) rowMgr, this);
  484. if (memLimitExceeded)
  485. {
  486. DBGLOG("UdpCollator: CMessageCollator::getNextResult() throwing memory limit exceeded exception - rowMgr=%p this=%p", (void*) rowMgr, this);
  487. throw MakeStringException(0, "memory limit exceeded");
  488. }
  489. else if (roxiemem::memPoolExhausted())
  490. {
  491. DBGLOG("UdpCollator: CMessageCollator::getNextResult() throwing memory pool exhausted exception - rowMgr=%p this=%p", (void*)rowMgr, this);
  492. throw MakeStringException(0, "memory pool exhausted");
  493. }
  494. if (sem.wait(time_out))
  495. {
  496. queueCrit.enter();
  497. PackageSequencer *pkSqncr = queue.front();
  498. queue.pop();
  499. queueCrit.leave();
  500. anyActivity = true;
  501. activity = false;
  502. return new CMessageResult(pkSqncr);
  503. }
  504. anyActivity = activity;
  505. activity = false;
  506. if (!anyActivity && ruid>=RUID_FIRST && checkTraceLevel(TRACE_MSGPACK, 1)) // suppress the tracing for pings where we expect the timeout...
  507. {
  508. DBGLOG("UdpCollator: CMessageCollator::GetNextResult timeout, %d partial results", mapping.count());
  509. }
  510. return 0;
  511. }
  512. virtual void interrupt(IException *E) {
  513. sem.interrupt(E);
  514. }
  515. };
  516. // ====================================================================================
  517. //
  518. extern IMessageCollator *createCMessageCollator(IRowManager *rowManager, ruid_t ruid)
  519. {
  520. return new CMessageCollator(rowManager, ruid);
  521. }