udpaeron.cpp 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2019 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include <map>
  14. #include "jexcept.hpp"
  15. #include "jqueue.tpp"
  16. #include "udplib.hpp"
  17. #include "udpipmap.hpp"
  18. #include "udpmsgpk.hpp"
  19. #include "udpsha.hpp"
  20. #include "udptrs.hpp"
  21. #include "roxie.hpp"
  22. #ifdef _USE_AERON
  23. #include <Aeron.h>
  24. extern "C" {
  25. #include "aeronmd.h"
  26. #include "concurrent/aeron_atomic.h"
  27. #include "aeron_driver_context.h"
  28. #include "util/aeron_properties_util.h"
  29. }
  30. // Configurable variables // MORE - add relevant code to Roxie
  31. bool useEmbeddedAeronDriver = true;
  32. unsigned aeronConnectTimeout = 5000;
  33. unsigned aeronPollFragmentsLimit = 10;
  34. unsigned aeronIdleSleepMs = 1;
  35. unsigned aeronMtuLength = 0;
  36. unsigned aeronSocketRcvbuf = 0;
  37. unsigned aeronSocketSndbuf = 0;
  38. unsigned aeronInitialWindow = 0;
  39. extern UDPLIB_API void setAeronProperties(const IPropertyTree *config)
  40. {
  41. useEmbeddedAeronDriver = config->getPropBool("@aeronUseEmbeddedDriver", true);
  42. aeronConnectTimeout = config->getPropInt("@aeronConnectTimeout", 5000);
  43. aeronPollFragmentsLimit = config->getPropInt("@aeronPollFragmentsLimit", 10);
  44. aeronIdleSleepMs = config->getPropInt("@aeronIdleSleepMs", 1);
  45. aeronMtuLength = config->getPropInt("@aeronMtuLength", 0);
  46. aeronSocketRcvbuf = config->getPropInt("@aeronSocketRcvbuf", 0);
  47. aeronSocketSndbuf = config->getPropInt("@aeronSocketSndbuf", 0);
  48. aeronInitialWindow = config->getPropInt("@aeronInitialWindow", 0);
  49. }
  50. static std::thread aeronDriverThread;
  51. static InterruptableSemaphore driverStarted;
  52. std::atomic<bool> aeronDriverRunning = { false };
  53. extern UDPLIB_API void stopAeronDriver()
  54. {
  55. aeronDriverRunning = false;
  56. if (aeronDriverThread.joinable())
  57. aeronDriverThread.join();
  58. }
  59. void sigint_handler(int signal)
  60. {
  61. stopAeronDriver();
  62. }
  63. void termination_hook(void *state)
  64. {
  65. stopAeronDriver();
  66. }
  67. inline bool is_running()
  68. {
  69. return aeronDriverRunning;
  70. }
  71. int startAeronDriver()
  72. {
  73. aeron_driver_context_t *context = nullptr;
  74. aeron_driver_t *driver = nullptr;
  75. try
  76. {
  77. if (aeron_driver_context_init(&context) < 0)
  78. throw makeStringExceptionV(MSGAUD_operator, -1, "AERON: error initializing context (%d) %s", aeron_errcode(), aeron_errmsg());
  79. context->termination_hook_func = termination_hook;
  80. context->dirs_delete_on_start = true;
  81. context->warn_if_dirs_exist = false;
  82. context->term_buffer_sparse_file = false;
  83. if (aeronMtuLength) context->mtu_length = aeronMtuLength;
  84. if (aeronSocketRcvbuf) context->socket_rcvbuf = aeronSocketRcvbuf;
  85. if (aeronSocketSndbuf) context->socket_sndbuf = aeronSocketSndbuf;
  86. if (aeronInitialWindow) context->initial_window_length = aeronInitialWindow;
  87. if (aeron_driver_init(&driver, context) < 0)
  88. throw makeStringExceptionV(MSGAUD_operator, -1, "AERON: error initializing driver (%d) %s", aeron_errcode(), aeron_errmsg());
  89. if (aeron_driver_start(driver, true) < 0)
  90. throw makeStringExceptionV(MSGAUD_operator, -1, "AERON: error starting driver (%d) %s", aeron_errcode(), aeron_errmsg());
  91. driverStarted.signal();
  92. aeronDriverRunning = true;
  93. while (is_running())
  94. {
  95. aeron_driver_main_idle_strategy(driver, aeron_driver_main_do_work(driver));
  96. }
  97. aeron_driver_close(driver);
  98. aeron_driver_context_close(context);
  99. }
  100. catch (IException *E)
  101. {
  102. aeron_driver_close(driver);
  103. aeron_driver_context_close(context);
  104. driverStarted.interrupt(E);
  105. }
  106. catch (...)
  107. {
  108. aeron_driver_close(driver);
  109. aeron_driver_context_close(context);
  110. driverStarted.interrupt(makeStringException(0, "failed to start Aeron (unknown exception)"));
  111. }
  112. return 0;
  113. }
  114. class CRoxieAeronReceiveManager : public CInterfaceOf<IReceiveManager>
  115. {
  116. private:
  117. typedef std::map<ruid_t, CMessageCollator*> uid_map;
  118. uid_map collators;
  119. CriticalSection collatorsLock; // protects access to collators map
  120. std::shared_ptr<aeron::Aeron> aeron;
  121. std::shared_ptr<aeron::Subscription> loSub;
  122. std::shared_ptr<aeron::Subscription> hiSub;
  123. std::shared_ptr<aeron::Subscription> slaSub;
  124. std::thread receiveThread;
  125. std::atomic<bool> running = { true };
  126. const std::chrono::duration<long, std::milli> idleSleepMs;
  127. bool encrypted;
  128. public:
  129. CRoxieAeronReceiveManager(const SocketEndpoint &myEndpoint, bool _encrypted)
  130. : idleSleepMs(aeronIdleSleepMs), encrypted(_encrypted)
  131. {
  132. if (useEmbeddedAeronDriver && !is_running())
  133. {
  134. aeronDriverThread = std::thread([]() { startAeronDriver(); });
  135. driverStarted.wait();
  136. }
  137. aeron::Context context;
  138. if (udpTraceLevel)
  139. {
  140. context.newSubscriptionHandler(
  141. [](const std::string& channel, std::int32_t streamId, std::int64_t correlationId)
  142. {
  143. DBGLOG("AeronReceiver: Subscription: %s %" I64F "d %d", channel.c_str(), (__int64) correlationId, streamId);
  144. });
  145. context.availableImageHandler([](aeron::Image &image)
  146. {
  147. DBGLOG("AeronReceiver: Available image correlationId=%" I64F "d, sessionId=%d at position %" I64F "d from %s", (__int64) image.correlationId(), image.sessionId(), (__int64) image.position(), image.sourceIdentity().c_str());
  148. });
  149. context.unavailableImageHandler([](aeron::Image &image)
  150. {
  151. DBGLOG("AeronReceiver: Unavailable image correlationId=%" I64F "d, sessionId=%d at position %" I64F "d from %s", (__int64) image.correlationId(), image.sessionId(), (__int64) image.position(), image.sourceIdentity().c_str());
  152. });
  153. }
  154. aeron = aeron::Aeron::connect(context);
  155. loSub = addSubscription(myEndpoint, 0);
  156. hiSub = addSubscription(myEndpoint, 1);
  157. slaSub = addSubscription(myEndpoint, 2);
  158. aeron::fragment_handler_t handler = [this](const aeron::AtomicBuffer& buffer, aeron::util::index_t offset, aeron::util::index_t length, const aeron::Header& header)
  159. {
  160. collatePacket(buffer.buffer() + offset, length);
  161. };
  162. receiveThread = std::thread([this, handler]()
  163. {
  164. while (running)
  165. {
  166. int fragmentsRead = slaSub->poll(handler, aeronPollFragmentsLimit);
  167. if (!fragmentsRead)
  168. fragmentsRead = hiSub->poll(handler, aeronPollFragmentsLimit);
  169. if (!fragmentsRead)
  170. fragmentsRead = loSub->poll(handler, aeronPollFragmentsLimit);
  171. if (!fragmentsRead)
  172. std::this_thread::sleep_for(idleSleepMs);
  173. }
  174. });
  175. }
  176. ~CRoxieAeronReceiveManager()
  177. {
  178. running = false;
  179. receiveThread.join();
  180. }
  181. void collatePacket( std::uint8_t *buffer, aeron::util::index_t length)
  182. {
  183. const UdpPacketHeader *pktHdr = (UdpPacketHeader*) buffer;
  184. assert(pktHdr->length == length);
  185. if (udpTraceLevel >= 4)
  186. {
  187. StringBuffer s;
  188. DBGLOG("AeronReceiver: CPacketCollator - unQed packet - ruid=" RUIDF " id=0x%.8X mseq=%u pkseq=0x%.8X len=%d node=%s",
  189. pktHdr->ruid, pktHdr->msgId, pktHdr->msgSeq, pktHdr->pktSeq, pktHdr->length, pktHdr->node.getTraceText(s).str());
  190. }
  191. Linked <CMessageCollator> msgColl;
  192. bool isDefault = false; // Don't trace inside the spinBlock!
  193. {
  194. CriticalBlock b(collatorsLock);
  195. try
  196. {
  197. msgColl.set(collators[pktHdr->ruid]);
  198. if (!msgColl)
  199. {
  200. msgColl.set(collators[RUID_DISCARD]);
  201. // We could consider sending an abort to the agent, but it should have already been done by ccdserver code
  202. isDefault = true;
  203. unwantedDiscarded++;
  204. }
  205. }
  206. catch (IException *E)
  207. {
  208. EXCLOG(E);
  209. E->Release();
  210. }
  211. catch (...)
  212. {
  213. IException *E = MakeStringException(ROXIE_INTERNAL_ERROR, "Unexpected exception caught in CPacketCollator::run");
  214. EXCLOG(E);
  215. E->Release();
  216. }
  217. }
  218. if (udpTraceLevel && isDefault)
  219. {
  220. StringBuffer s;
  221. DBGLOG("AeronReceiver: CPacketCollator NO msg collator found - using default - ruid=" RUIDF " id=0x%.8X mseq=%u pkseq=0x%.8X node=%s", pktHdr->ruid, pktHdr->msgId, pktHdr->msgSeq, pktHdr->pktSeq, pktHdr->node.getTraceText(s).str());
  222. }
  223. if (msgColl)
  224. msgColl->attach_data(buffer, length);
  225. }
  226. // Note - some of this code could be in a common base class with udpreceivemanager, but hope to kill that at some point
  227. virtual IMessageCollator *createMessageCollator(roxiemem::IRowManager *rowManager, ruid_t ruid) override
  228. {
  229. CMessageCollator *msgColl = new CMessageCollator(rowManager, ruid, encrypted);
  230. if (udpTraceLevel >= 2)
  231. DBGLOG("AeronReceiver: createMessageCollator %p %u", msgColl, ruid);
  232. {
  233. CriticalBlock b(collatorsLock);
  234. collators[ruid] = msgColl;
  235. }
  236. msgColl->Link();
  237. return msgColl;
  238. }
  239. virtual void detachCollator(const IMessageCollator *msgColl) override
  240. {
  241. ruid_t ruid = msgColl->queryRUID();
  242. if (udpTraceLevel >= 2)
  243. DBGLOG("AeronReceiver: detach %p %u", msgColl, ruid);
  244. {
  245. CriticalBlock b(collatorsLock);
  246. collators.erase(ruid);
  247. }
  248. msgColl->Release();
  249. }
  250. private:
  251. std::shared_ptr<aeron::Subscription> addSubscription(const SocketEndpoint &myEndpoint, int queue)
  252. {
  253. StringBuffer channel("aeron:udp?endpoint=");
  254. myEndpoint.getUrlStr(channel);
  255. std::int64_t id = aeron->addSubscription(channel.str(), queue);
  256. std::shared_ptr<aeron::Subscription> subscription = aeron->findSubscription(id);
  257. while (!subscription)
  258. {
  259. std::this_thread::yield();
  260. subscription = aeron->findSubscription(id);
  261. }
  262. return subscription;
  263. }
  264. };
  265. class UdpAeronReceiverEntry : public IUdpReceiverEntry
  266. {
  267. private:
  268. std::shared_ptr<aeron::Aeron> aeron;
  269. unsigned numQueues;
  270. std::vector<std::shared_ptr<aeron::Publication>> publications;
  271. const IpAddress dest;
  272. public:
  273. UdpAeronReceiverEntry(const IpAddress &_ip, unsigned _dataPort, std::shared_ptr<aeron::Aeron> _aeron, unsigned _numQueues)
  274. : dest(_ip), aeron(_aeron), numQueues(_numQueues)
  275. {
  276. StringBuffer channel("aeron:udp?endpoint=");
  277. dest.getIpText(channel);
  278. channel.append(':').append(_dataPort);
  279. for (unsigned queue = 0; queue < numQueues; queue++)
  280. {
  281. if (udpTraceLevel)
  282. DBGLOG("AeronSender: Creating publication to channel %s for queue %d", channel.str(), queue);
  283. std::int64_t id = aeron->addPublication(channel.str(), queue);
  284. std::shared_ptr<aeron::Publication> publication = aeron->findPublication(id);
  285. // Wait for the publication to be valid
  286. while (!publication)
  287. {
  288. std::this_thread::yield();
  289. publication = aeron->findPublication(id);
  290. }
  291. if ((unsigned) publication->maxPayloadLength() < DATA_PAYLOAD)
  292. throw makeStringExceptionV(ROXIE_AERON_ERROR, "AeronSender: maximum payload %u too small (%u required)", (unsigned) publication->maxPayloadLength(), (unsigned) DATA_PAYLOAD);
  293. if (udpTraceLevel <= 4)
  294. DBGLOG("AeronSender: Publication maxima: %d %d", publication->maxPayloadLength(), publication->maxMessageLength());
  295. publications.push_back(publication);
  296. // Wait for up to 5 seconds to connect to a subscriber
  297. unsigned start = msTick();
  298. while (!publication->isConnected())
  299. {
  300. Sleep(10);
  301. if (msTick()-start > aeronConnectTimeout)
  302. throw makeStringExceptionV(ROXIE_PUBLICATION_NOT_CONNECTED, "AeronSender: Publication not connected to channel %s after %d seconds ", channel.str(), aeronConnectTimeout);
  303. }
  304. }
  305. }
  306. void write(roxiemem::DataBuffer *buffer, unsigned len, unsigned queue)
  307. {
  308. unsigned backoff = 1;
  309. aeron::concurrent::AtomicBuffer srcBuffer(reinterpret_cast<std::uint8_t *>(&buffer->data), len);
  310. for (;;)
  311. {
  312. const std::int64_t result = publications[queue]->offer(srcBuffer, 0, len);
  313. if (result < 0)
  314. {
  315. if (aeron::BACK_PRESSURED == result || aeron::ADMIN_ACTION == result)
  316. {
  317. // MORE - experiment with best policy. spinning without delay may be appropriate too, depending on cpu availability
  318. // and whether data write thread is high priority
  319. MilliSleep(backoff-1); // MilliSleep(0) just does a threadYield
  320. if (backoff < 256)
  321. backoff = backoff*2;
  322. continue;
  323. }
  324. StringBuffer target;
  325. dest.getIpText(target);
  326. if (aeron::NOT_CONNECTED == result)
  327. throw makeStringExceptionV(ROXIE_PUBLICATION_NOT_CONNECTED, "AeronSender: Offer failed because publisher is not connected to subscriber %s", target.str());
  328. else if (aeron::PUBLICATION_CLOSED == result)
  329. throw makeStringExceptionV(ROXIE_PUBLICATION_CLOSED, "AeronSender: Offer failed because publisher is closed sending to %s", target.str());
  330. else
  331. throw makeStringExceptionV(ROXIE_AERON_ERROR, "AeronSender: Offer failed for unknown reason %" I64F "d sending to %s", (__int64) result, target.str());
  332. }
  333. break;
  334. }
  335. }
  336. };
  337. class CRoxieAeronSendManager : public CInterfaceOf<ISendManager>
  338. {
  339. std::shared_ptr<aeron::Aeron> aeron;
  340. const unsigned dataPort = 0;
  341. const unsigned numQueues = 0;
  342. IpMapOf<UdpAeronReceiverEntry> receiversTable;
  343. const IpAddress myIP;
  344. bool encrypted;
  345. std::atomic<unsigned> msgSeq{0};
  346. inline unsigned getNextMessageSequence()
  347. {
  348. unsigned res;
  349. do
  350. {
  351. res = ++msgSeq;
  352. } while (unlikely(!res));
  353. return res;
  354. }
  355. public:
  356. CRoxieAeronSendManager(unsigned _dataPort, unsigned _numQueues, const IpAddress &_myIP, bool _encrypted)
  357. : dataPort(_dataPort),
  358. numQueues(_numQueues),
  359. receiversTable([this](const ServerIdentifier ip) { return new UdpAeronReceiverEntry(ip.getIpAddress(), dataPort, aeron, numQueues);}),
  360. myIP(_myIP),
  361. encrypted(_encrypted)
  362. {
  363. if (useEmbeddedAeronDriver && !is_running())
  364. {
  365. aeronDriverThread = std::thread([]() { startAeronDriver(); });
  366. driverStarted.wait();
  367. }
  368. aeron::Context context;
  369. if (udpTraceLevel)
  370. context.newPublicationHandler(
  371. [](const std::string& channel, std::int32_t streamId, std::int32_t sessionId, std::int64_t correlationId)
  372. {
  373. DBGLOG("AeronSender: Publication %s, correlation %" I64F "d, streamId %d, sessionId %d", channel.c_str(), (__int64) correlationId, streamId, sessionId);
  374. });
  375. aeron = aeron::Aeron::connect(context);
  376. }
  377. virtual void writeOwn(IUdpReceiverEntry &receiver, roxiemem::DataBuffer *buffer, unsigned len, unsigned queue) override
  378. {
  379. assert(queue < numQueues);
  380. static_cast<UdpAeronReceiverEntry &>(receiver).write(buffer, len, queue);
  381. buffer->Release();
  382. }
  383. virtual IMessagePacker *createMessagePacker(ruid_t id, unsigned sequence, const void *messageHeader, unsigned headerSize, const ServerIdentifier &destNode, int queue) override;
  384. virtual bool dataQueued(ruid_t ruid, unsigned sequence, const ServerIdentifier &destNode) override { return false; }
  385. virtual bool abortData(ruid_t ruid, unsigned sequence, const ServerIdentifier &destNode) override { return false; }
  386. virtual void abortAll(const ServerIdentifier &destNode) override { }
  387. virtual bool allDone() override { return true; }
  388. };
  389. IMessagePacker *CRoxieAeronSendManager::createMessagePacker(ruid_t ruid, unsigned sequence, const void *messageHeader, unsigned headerSize, const ServerIdentifier &destNode, int queue)
  390. {
  391. const IpAddress dest = destNode.getIpAddress();
  392. return ::createMessagePacker(ruid, sequence, messageHeader, headerSize, *this, receiversTable[dest], myIP, getNextMessageSequence(), queue, encrypted);
  393. }
  394. extern UDPLIB_API IReceiveManager *createAeronReceiveManager(const SocketEndpoint &ep, bool encrypted)
  395. {
  396. return new CRoxieAeronReceiveManager(ep, encrypted);
  397. }
  398. extern UDPLIB_API ISendManager *createAeronSendManager(unsigned dataPort, unsigned numQueues, const IpAddress &myIP, bool encrypted)
  399. {
  400. return new CRoxieAeronSendManager(dataPort, numQueues, myIP, encrypted);
  401. }
  402. #else
  403. extern UDPLIB_API void setAeronProperties(const IPropertyTree *config)
  404. {
  405. }
  406. extern UDPLIB_API IReceiveManager *createAeronReceiveManager(const SocketEndpoint &ep, bool encrypted)
  407. {
  408. UNIMPLEMENTED;
  409. }
  410. extern UDPLIB_API ISendManager *createAeronSendManager(unsigned dataPort, unsigned numQueues, const IpAddress &myIP, bool encrypted)
  411. {
  412. UNIMPLEMENTED;
  413. }
  414. extern UDPLIB_API void stopAeronDriver()
  415. {
  416. }
  417. #endif