udpmsgpk.cpp 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #ifdef _WIN32
  14. #pragma warning( disable : 4786)
  15. #pragma warning( disable : 4018)
  16. #endif
  17. #include "platform.h"
  18. #include <string>
  19. #include <map>
  20. #include <queue>
  21. #include "jthread.hpp"
  22. #include "jlog.hpp"
  23. #include "jisem.hpp"
  24. #include "udplib.hpp"
  25. #include "udptrr.hpp"
  26. #include "udptrs.hpp"
  27. #include "udpmsgpk.hpp"
  28. #include "roxiemem.hpp"
  29. #include "roxie.hpp"
  30. using roxiemem::DataBuffer;
  31. using roxiemem::IRowManager;
  32. RelaxedAtomic<unsigned> unwantedDiscarded;
  33. // PackageSequencer ====================================================================================
  34. //
  35. typedef DataBuffer * data_buffer_ptr;
  36. int g_sequence_compare(const void *arg1, const void *arg2 )
  37. {
  38. DataBuffer *dataBuff1 = *(DataBuffer **) arg1;
  39. DataBuffer *dataBuff2 = *(DataBuffer **) arg2;
  40. UdpPacketHeader *pktHdr1 = (UdpPacketHeader*) dataBuff1->data;
  41. UdpPacketHeader *pktHdr2 = (UdpPacketHeader*) dataBuff2->data;
  42. if (pktHdr1->pktSeq < pktHdr2->pktSeq) return -1;
  43. if (pktHdr1->pktSeq > pktHdr2->pktSeq) return 1;
  44. return 0;
  45. }
  46. class PackageSequencer : public CInterface, implements IInterface
  47. {
  48. DataBuffer *firstPacket;
  49. DataBuffer *lastContiguousPacket;
  50. DataBuffer *tail = nullptr;
  51. unsigned metaSize;
  52. unsigned headerSize;
  53. const void *header;
  54. #ifdef _DEBUG
  55. unsigned numPackets = 0;
  56. unsigned maxSeqSeen = 0;
  57. unsigned scans = 0;
  58. unsigned overscans = 0;
  59. #endif
  60. MemoryBuffer metadata;
  61. InterruptableSemaphore dataAvailable; // MORE - need to work out when to interrupt it!
  62. public:
  63. IMPLEMENT_IINTERFACE;
  64. PackageSequencer()
  65. {
  66. if (checkTraceLevel(TRACE_MSGPACK, 3))
  67. DBGLOG("UdpCollator: PackageSequencer::PackageSequencer this=%p", this);
  68. metaSize = 0;
  69. headerSize = 0;
  70. header = NULL;
  71. firstPacket = NULL;
  72. lastContiguousPacket = NULL;
  73. }
  74. ~PackageSequencer()
  75. {
  76. if (checkTraceLevel(TRACE_MSGPACK, 3))
  77. DBGLOG("UdpCollator: PackageSequencer::~PackageSequencer this=%p", this);
  78. DataBuffer *finger = firstPacket;
  79. while (finger)
  80. {
  81. DataBuffer *goer = finger;
  82. finger = finger->msgNext;
  83. goer->Release();
  84. }
  85. }
  86. DataBuffer *next(DataBuffer *after)
  87. {
  88. dataAvailable.wait(); // MORE - when do I interrupt? Should I time out? Will potentially block indefinitely if sender restarts (leading to an abandoned packet) or stalls.
  89. DataBuffer *ret;
  90. if (after)
  91. ret = after->msgNext;
  92. else
  93. ret = firstPacket;
  94. if (checkTraceLevel(TRACE_MSGPACK, 5))
  95. {
  96. if (ret)
  97. {
  98. StringBuffer s;
  99. UdpPacketHeader *pktHdr = (UdpPacketHeader*) ret->data;
  100. DBGLOG("UdpCollator: PackageSequencer::next returns ruid=" RUIDF " id=0x%.8X mseq=%u pkseq=0x%.8X node=%s dataBuffer=%p this=%p",
  101. pktHdr->ruid, pktHdr->msgId, pktHdr->msgSeq, pktHdr->pktSeq, pktHdr->node.getTraceText(s).str(), ret, this);
  102. }
  103. else
  104. DBGLOG("UdpCollator: PackageSequencer::next returns NULL this=%p", this);
  105. }
  106. return ret;
  107. }
  108. bool insert(DataBuffer *dataBuff) // returns true if message is complete.
  109. {
  110. bool res = false;
  111. assert(dataBuff->msgNext == NULL);
  112. UdpPacketHeader *pktHdr = (UdpPacketHeader*) dataBuff->data;
  113. unsigned pktseq = pktHdr->pktSeq;
  114. #ifdef _DEBUG
  115. if ((pktseq & UDP_PACKET_SEQUENCE_MASK) > maxSeqSeen)
  116. maxSeqSeen = pktseq & UDP_PACKET_SEQUENCE_MASK;
  117. #endif
  118. if (checkTraceLevel(TRACE_MSGPACK, 5))
  119. {
  120. StringBuffer s;
  121. DBGLOG("UdpCollator: PackageSequencer::insert ruid=" RUIDF " id=0x%.8X mseq=%u pkseq=0x%.8X node=%s dataBuffer=%p this=%p",
  122. pktHdr->ruid, pktHdr->msgId, pktHdr->msgSeq, pktHdr->pktSeq, pktHdr->node.getTraceText(s).str(), dataBuff, this);
  123. }
  124. // Optimize the (very) common case where I need to add to the end
  125. DataBuffer *finger;
  126. DataBuffer *prev;
  127. if (tail && (pktseq > ((UdpPacketHeader*) tail->data)->pktSeq))
  128. {
  129. assert(tail->msgNext==nullptr);
  130. finger = nullptr;
  131. prev = tail;
  132. tail = dataBuff;
  133. }
  134. else
  135. {
  136. // This is an insertion sort - YUK!
  137. if (lastContiguousPacket)
  138. {
  139. UdpPacketHeader *oldHdr = (UdpPacketHeader*) lastContiguousPacket->data;
  140. if (pktHdr->pktSeq <= oldHdr->pktSeq)
  141. {
  142. // discard duplicated incoming packet - should be uncommon unless we requested a resend for a packet already in flight
  143. if (checkTraceLevel(TRACE_MSGPACK, 5))
  144. DBGLOG("UdpCollator: Discarding duplicate incoming packet");
  145. dataBuff->Release();
  146. return false;
  147. }
  148. finger = lastContiguousPacket->msgNext;
  149. prev = lastContiguousPacket;
  150. }
  151. else
  152. {
  153. finger = firstPacket;
  154. prev = NULL;
  155. }
  156. while (finger)
  157. {
  158. #ifdef _DEBUG
  159. scans++;
  160. if (scans==1000000)
  161. {
  162. overscans++;
  163. DBGLOG("%u million scans in UdpCollator insert(DataBuffer *dataBuff", overscans);
  164. if (lastContiguousPacket)
  165. {
  166. UdpPacketHeader *oldHdr = (UdpPacketHeader*) lastContiguousPacket->data;
  167. DBGLOG("lastContiguousPacket is at %u , last packet seen is %u", oldHdr->pktSeq & UDP_PACKET_SEQUENCE_MASK, pktHdr->pktSeq & UDP_PACKET_SEQUENCE_MASK);
  168. }
  169. else
  170. DBGLOG("lastContiguousPacket is NULL , last packet seen is %u", pktHdr->pktSeq & UDP_PACKET_SEQUENCE_MASK);
  171. scans = 0;
  172. }
  173. #endif
  174. UdpPacketHeader *oldHdr = (UdpPacketHeader*) finger->data;
  175. if (pktHdr->pktSeq == oldHdr->pktSeq)
  176. {
  177. // discard duplicated incoming packet - should be uncommon unless we requested a resend for a packet already in flight
  178. if (checkTraceLevel(TRACE_MSGPACK, 5))
  179. DBGLOG("UdpCollator: Discarding duplicate incoming packet");
  180. dataBuff->Release();
  181. return false;
  182. }
  183. else if (pktHdr->pktSeq < oldHdr->pktSeq)
  184. {
  185. break;
  186. }
  187. else
  188. {
  189. prev = finger;
  190. finger = finger->msgNext;
  191. }
  192. }
  193. }
  194. if (prev)
  195. {
  196. assert(prev->msgNext == finger);
  197. prev->msgNext = dataBuff;
  198. }
  199. else
  200. {
  201. firstPacket = dataBuff;
  202. if (!tail)
  203. tail = dataBuff;
  204. }
  205. dataBuff->msgNext = finger;
  206. #ifdef _DEBUG
  207. numPackets++;
  208. #endif
  209. if (prev == lastContiguousPacket)
  210. {
  211. unsigned prevseq;
  212. if (lastContiguousPacket)
  213. {
  214. UdpPacketHeader *lastHdr = (UdpPacketHeader*) lastContiguousPacket->data;
  215. prevseq = lastHdr->pktSeq & UDP_PACKET_SEQUENCE_MASK;
  216. finger = lastContiguousPacket->msgNext;
  217. }
  218. else
  219. {
  220. prevseq = (unsigned) -1;
  221. finger = firstPacket;
  222. }
  223. while (finger)
  224. {
  225. UdpPacketHeader *fingerHdr = (UdpPacketHeader*) finger->data;
  226. unsigned pktseq = fingerHdr->pktSeq & UDP_PACKET_SEQUENCE_MASK;
  227. if (pktseq == prevseq+1)
  228. {
  229. unsigned packetDataSize = fingerHdr->length - fingerHdr->metalength - sizeof(UdpPacketHeader);
  230. assert(packetDataSize < roxiemem::DATA_ALIGNMENT_SIZE);
  231. if (pktseq == 0)
  232. {
  233. // MORE - Is this safe - header lifetime is somewhat unpredictable without a copy of it...
  234. // Client header is at the start of packet 0
  235. headerSize = *(unsigned short *)(finger->data + sizeof(UdpPacketHeader));
  236. header = finger->data + sizeof(UdpPacketHeader) + sizeof(unsigned short);
  237. packetDataSize -= headerSize + sizeof(unsigned short);
  238. }
  239. if (fingerHdr->metalength)
  240. {
  241. // MORE - may be worth taking the effort to avoid copy of metadata unless it's split up
  242. metaSize += fingerHdr->metalength;
  243. metadata.append(fingerHdr->metalength, finger->data + fingerHdr->length - fingerHdr->metalength);
  244. }
  245. lastContiguousPacket = finger;
  246. dataAvailable.signal();
  247. if (fingerHdr->pktSeq & UDP_PACKET_COMPLETE)
  248. {
  249. res = true;
  250. dataAvailable.signal(); // allowing us to read the NULL that signifies end of message. May prefer to use the flag to stop?
  251. }
  252. }
  253. else
  254. break;
  255. finger = finger->msgNext;
  256. prevseq = pktseq;
  257. }
  258. }
  259. return res;
  260. }
  261. inline const void *getMetaData(unsigned &length)
  262. {
  263. length = metadata.length();
  264. return metadata.toByteArray();
  265. }
  266. inline const void *getMessageHeader()
  267. {
  268. return header;
  269. }
  270. inline unsigned getHeaderSize()
  271. {
  272. return headerSize;
  273. }
  274. #ifdef _DEBUG
  275. void dump()
  276. {
  277. DBGLOG("Contains %u packets, lastSeq = %u", numPackets, maxSeqSeen);
  278. }
  279. #endif
  280. };
  281. // MessageResult ====================================================================================
  282. //
  283. class CMessageUnpackCursor: implements IMessageUnpackCursor, public CInterface
  284. {
  285. PackageSequencer *pkSequencer;
  286. DataBuffer *dataBuff;
  287. unsigned current_pos;
  288. Linked<IRowManager> rowMgr;
  289. public:
  290. IMPLEMENT_IINTERFACE;
  291. CMessageUnpackCursor(PackageSequencer *_pkSqncr, IRowManager *_rowMgr) : rowMgr(_rowMgr)
  292. {
  293. if (checkTraceLevel(TRACE_MSGPACK, 3))
  294. DBGLOG("UdpCollator: CMessageUnpackCursor::CMessageUnpackCursor this=%p", this);
  295. pkSequencer = _pkSqncr;
  296. dataBuff = pkSequencer->next(NULL);
  297. UdpPacketHeader *pktHdr = (UdpPacketHeader*) dataBuff->data;
  298. current_pos = sizeof(UdpPacketHeader) + pkSequencer->getHeaderSize() + sizeof(unsigned short); // YUK - code neads cleaning!
  299. unsigned packetDataLimit = pktHdr->length - pktHdr->metalength;
  300. while (current_pos >= packetDataLimit)
  301. {
  302. current_pos = sizeof(UdpPacketHeader);
  303. dataBuff = pkSequencer->next(dataBuff); // NULL, we expect, unless packing was really weird.
  304. if (dataBuff)
  305. {
  306. pktHdr = (UdpPacketHeader*) dataBuff->data;
  307. packetDataLimit = pktHdr->length - pktHdr->metalength;
  308. }
  309. else
  310. break;
  311. }
  312. }
  313. ~CMessageUnpackCursor()
  314. {
  315. if (checkTraceLevel(TRACE_MSGPACK, 3))
  316. DBGLOG("UdpCollator: CMessageUnpackCursor::~CMessageUnpackCursor this=%p", this);
  317. pkSequencer->Release();
  318. }
  319. virtual bool atEOF() const
  320. {
  321. return dataBuff == NULL;
  322. }
  323. virtual bool isSerialized() const
  324. {
  325. return true;
  326. }
  327. virtual const void *getNext(int length)
  328. {
  329. // YUK horrid code! Though packer is even more horrid
  330. void *res = 0;
  331. if (dataBuff)
  332. {
  333. UdpPacketHeader *pktHdr = (UdpPacketHeader*) dataBuff->data;
  334. if (checkTraceLevel(TRACE_MSGPACK, 4))
  335. {
  336. StringBuffer s;
  337. DBGLOG("UdpCollator: CMessageUnpackCursor::getNext(%u) pos=%u pktLength=%u metaLen=%u ruid=" RUIDF " id=0x%.8X mseq=%u pkseq=0x%.8X node=%s dataBuff=%p this=%p",
  338. length, current_pos, pktHdr->length, pktHdr->metalength,
  339. pktHdr->ruid, pktHdr->msgId, pktHdr->msgSeq, pktHdr->pktSeq,
  340. pktHdr->node.getTraceText(s).str(), dataBuff, this);
  341. }
  342. unsigned packetDataLimit = pktHdr->length - pktHdr->metalength;
  343. if ((packetDataLimit - current_pos) >= (unsigned) length)
  344. {
  345. // Simple case - no need to copy
  346. res = &dataBuff->data[current_pos];
  347. current_pos += length;
  348. while (current_pos >= packetDataLimit)
  349. {
  350. dataBuff = pkSequencer->next(dataBuff);
  351. current_pos = sizeof(UdpPacketHeader);
  352. if (dataBuff)
  353. {
  354. pktHdr = (UdpPacketHeader*) dataBuff->data;
  355. packetDataLimit = pktHdr->length - pktHdr->metalength;
  356. }
  357. else
  358. break;
  359. }
  360. LinkRoxieRow(res);
  361. return res;
  362. }
  363. char *currResLoc = (char*)rowMgr->allocate(length, 0);
  364. res = currResLoc;
  365. while (length && dataBuff)
  366. {
  367. // Spans more than one block - allocate and copy
  368. unsigned cpyLen = packetDataLimit - current_pos;
  369. if (cpyLen > (unsigned) length) cpyLen = length;
  370. memcpy(currResLoc, &dataBuff->data[current_pos], cpyLen);
  371. length -= cpyLen;
  372. currResLoc += cpyLen;
  373. current_pos += cpyLen;
  374. while (current_pos >= packetDataLimit)
  375. {
  376. dataBuff = pkSequencer->next(dataBuff);
  377. if (dataBuff)
  378. {
  379. current_pos = sizeof(UdpPacketHeader);
  380. pktHdr = (UdpPacketHeader*) dataBuff->data;
  381. packetDataLimit = pktHdr->length - pktHdr->metalength;
  382. }
  383. else
  384. {
  385. current_pos = 0;
  386. pktHdr = NULL;
  387. packetDataLimit = 0;
  388. break;
  389. }
  390. }
  391. }
  392. assertex(!length); // fail if not enough data available
  393. }
  394. else
  395. res = NULL;
  396. return res;
  397. }
  398. };
  399. class CMessageResult : public IMessageResult, CInterface {
  400. PackageSequencer *pkSequencer;
  401. mutable MemoryBuffer metaInfo;
  402. mutable CriticalSection metaCrit;
  403. public:
  404. IMPLEMENT_IINTERFACE;
  405. CMessageResult(PackageSequencer *_pkSqncr)
  406. {
  407. if (checkTraceLevel(TRACE_MSGPACK, 3))
  408. DBGLOG("UdpCollator: CMessageResult::CMessageResult pkSqncr=%p, this=%p", _pkSqncr, this);
  409. pkSequencer = _pkSqncr;
  410. }
  411. ~CMessageResult()
  412. {
  413. if (checkTraceLevel(TRACE_MSGPACK, 3))
  414. DBGLOG("UdpCollator: CMessageResult::~CMessageResult this=%p", this);
  415. pkSequencer->Release();
  416. }
  417. virtual IMessageUnpackCursor *getCursor(IRowManager *rowMgr) const
  418. {
  419. return new CMessageUnpackCursor(LINK(pkSequencer), rowMgr);
  420. }
  421. virtual const void *getMessageHeader(unsigned &length) const
  422. {
  423. length = pkSequencer->getHeaderSize();
  424. return pkSequencer->getMessageHeader();
  425. }
  426. virtual const void *getMessageMetadata(unsigned &length) const
  427. {
  428. return pkSequencer->getMetaData(length);
  429. }
  430. virtual void discard() const
  431. {
  432. if (checkTraceLevel(TRACE_MSGPACK, 2))
  433. DBGLOG("UdpCollator: CMessageResult - Roxie server discarded a packet");
  434. unwantedDiscarded++;
  435. }
  436. };
  437. // MessageCollator ====================================================================================
  438. //
  439. PUID GETPUID(DataBuffer *dataBuff)
  440. {
  441. UdpPacketHeader *pktHdr = (UdpPacketHeader*) dataBuff->data;
  442. unsigned ip4;
  443. if (pktHdr->node.getNodeAddress().getNetAddress(sizeof(ip4), &ip4) != sizeof(ip4))
  444. throw makeStringException(ROXIE_INTERNAL_ERROR, "IPv6 not supported in roxie"); // MORE - do we ever care about ipv6?
  445. return (((PUID) ip4) << 32) | (PUID) pktHdr->msgSeq;
  446. }
  447. CMessageCollator::CMessageCollator(IRowManager *_rowMgr, unsigned _ruid) : rowMgr(_rowMgr), ruid(_ruid)
  448. {
  449. if (checkTraceLevel(TRACE_MSGPACK, 3))
  450. DBGLOG("UdpCollator: CMessageCollator::CMessageCollator rowMgr=%p this=%p ruid=" RUIDF "", _rowMgr, this, ruid);
  451. memLimitExceeded = false;
  452. activity = false;
  453. totalBytesReceived = 0;
  454. }
  455. CMessageCollator::~CMessageCollator()
  456. {
  457. if (checkTraceLevel(TRACE_MSGPACK, 3))
  458. DBGLOG("UdpCollator: CMessageCollator::~CMessageCollator ruid=" RUIDF ", this=%p", ruid, this);
  459. while (!queue.empty())
  460. {
  461. PackageSequencer *pkSqncr = queue.front();
  462. queue.pop();
  463. pkSqncr->Release();
  464. }
  465. }
  466. unsigned CMessageCollator::queryBytesReceived() const
  467. {
  468. return totalBytesReceived; // Arguably should lock, but can't be bothered. Never going to cause an issue in practice.
  469. }
  470. bool CMessageCollator::attach_databuffer(DataBuffer *dataBuff)
  471. {
  472. activity = true;
  473. if (memLimitExceeded || roxiemem::memPoolExhausted())
  474. {
  475. DBGLOG("UdpCollator: mem limit exceeded");
  476. return false;
  477. }
  478. if (!dataBuff->attachToRowMgr(rowMgr))
  479. {
  480. memLimitExceeded = true;
  481. DBGLOG("UdpCollator: mem limit exceeded");
  482. return(false);
  483. }
  484. collate(dataBuff);
  485. return true;
  486. }
  487. bool CMessageCollator::attach_data(const void *data, unsigned len)
  488. {
  489. // Simple code can allocate databuffer, copy data in, then call attach_databuffer
  490. // MORE - we can attach as we create may be more sensible (and simplify roxiemem rather if it was the ONLY way)
  491. activity = true;
  492. if (memLimitExceeded || roxiemem::memPoolExhausted())
  493. {
  494. DBGLOG("UdpCollator: mem limit exceeded");
  495. return false;
  496. }
  497. DataBuffer *dataBuff = bufferManager->allocate();
  498. assertex(len <= DATA_PAYLOAD);
  499. memcpy(dataBuff->data, data, len);
  500. if (!dataBuff->attachToRowMgr(rowMgr))
  501. {
  502. memLimitExceeded = true;
  503. DBGLOG("UdpCollator: mem limit exceeded");
  504. dataBuff->Release();
  505. return(false);
  506. }
  507. collate(dataBuff);
  508. return true;
  509. }
  510. void CMessageCollator::collate(DataBuffer *dataBuff)
  511. {
  512. UdpPacketHeader *pktHdr = (UdpPacketHeader*) dataBuff->data;
  513. totalBytesReceived += pktHdr->length;
  514. PUID puid = GETPUID(dataBuff);
  515. // MORE - I think we leak a PackageSequencer for messages that we only receive parts of - maybe only an issue for "catchall" case
  516. PackageSequencer *pkSqncr = mapping.getValue(puid);
  517. if (!pkSqncr)
  518. {
  519. pkSqncr = new PackageSequencer;
  520. mapping.setValue(puid, pkSqncr);
  521. pkSqncr->Release();
  522. }
  523. bool isComplete = pkSqncr->insert(dataBuff);
  524. if (isComplete)
  525. {
  526. pkSqncr->Link();
  527. mapping.remove(puid);
  528. queueCrit.enter();
  529. queue.push(pkSqncr);
  530. sem.signal();
  531. queueCrit.leave();
  532. }
  533. }
  534. IMessageResult *CMessageCollator::getNextResult(unsigned time_out, bool &anyActivity)
  535. {
  536. if (checkTraceLevel(TRACE_MSGPACK, 3))
  537. DBGLOG("UdpCollator: CMessageCollator::getNextResult() timeout=%.8X ruid=%u rowMgr=%p this=%p", time_out, ruid, (void*) rowMgr, this);
  538. if (memLimitExceeded)
  539. {
  540. DBGLOG("UdpCollator: CMessageCollator::getNextResult() throwing memory limit exceeded exception - rowMgr=%p this=%p", (void*) rowMgr, this);
  541. throw MakeStringException(0, "memory limit exceeded");
  542. }
  543. else if (roxiemem::memPoolExhausted())
  544. {
  545. DBGLOG("UdpCollator: CMessageCollator::getNextResult() throwing memory pool exhausted exception - rowMgr=%p this=%p", (void*)rowMgr, this);
  546. throw MakeStringException(0, "memory pool exhausted");
  547. }
  548. if (sem.wait(time_out))
  549. {
  550. queueCrit.enter();
  551. PackageSequencer *pkSqncr = queue.front();
  552. queue.pop();
  553. queueCrit.leave();
  554. anyActivity = true;
  555. activity = false;
  556. return new CMessageResult(pkSqncr);
  557. }
  558. anyActivity = activity;
  559. activity = false;
  560. if (!anyActivity && ruid>=RUID_FIRST && checkTraceLevel(TRACE_MSGPACK, 1)) // suppress the tracing for pings where we expect the timeout...
  561. {
  562. #ifdef _DEBUG
  563. DBGLOG("GetNextResult timeout: mapping has %d partial results", mapping.ordinality());
  564. HashIterator h(mapping);
  565. ForEach(h)
  566. {
  567. auto *r = mapping.mapToValue(&h.query());
  568. PUID puid = *(PUID *) h.query().getKey();
  569. DBGLOG("puid=%" I64F "x:", puid);
  570. PackageSequencer *pkSqncr = mapping.getValue(puid);
  571. pkSqncr->dump();
  572. }
  573. #endif
  574. DBGLOG("UdpCollator: CMessageCollator::GetNextResult timeout");
  575. }
  576. return 0;
  577. }
  578. void CMessageCollator::interrupt(IException *E)
  579. {
  580. sem.interrupt(E);
  581. }
  582. // ====================================================================================
  583. //
  584. extern CMessageCollator *createCMessageCollator(IRowManager *rowManager, ruid_t ruid)
  585. {
  586. return new CMessageCollator(rowManager, ruid);
  587. }