dasubs.cpp 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #define da_decl DECL_EXPORT
  14. #include "platform.h"
  15. #include "jlib.hpp"
  16. #include "jmisc.hpp"
  17. #include "jsuperhash.hpp"
  18. #include "daclient.hpp"
  19. // TBD local Coven subscriptions
  20. //#define SUPRESS_REMOVE_ABORTED
  21. #define TRACE_QWAITING
  22. #include "dacoven.hpp"
  23. #include "mpbuff.hpp"
  24. #include "mpcomm.hpp"
  25. #include "mputil.hpp"
  26. #include "daserver.hpp"
  27. #include "dasubs.ipp"
  28. #ifdef _MSC_VER
  29. #pragma warning (disable : 4355)
  30. #endif
  31. enum MSubscriptionRequestKind {
  32. MSR_REMOVE_SUBSCRIPTION_PRIMARY,
  33. MSR_ADD_SUBSCRIPTION_PRIMARY,
  34. MSR_REMOVE_SUBSCRIPTION_SECONDARY,
  35. MSR_ADD_SUBSCRIPTION_SECONDARY
  36. };
  37. class CSubscriptionStub: implements ISubscription, public CInterface
  38. { // Server (Coven) side
  39. unsigned tag;
  40. MemoryAttr data;
  41. SubscriptionId sid;
  42. INode *dst;
  43. bool hasaborted;
  44. public:
  45. IMPLEMENT_IINTERFACE;
  46. CSubscriptionStub(unsigned _tag,SubscriptionId _sid,size32_t _datalen, const byte *_data,INode *_dst)
  47. : data(_datalen,_data)
  48. {
  49. tag = _tag;
  50. sid = _sid;
  51. dst = LINK(_dst);
  52. hasaborted = false;
  53. }
  54. virtual ~CSubscriptionStub()
  55. {
  56. unlink();
  57. dst->Release();
  58. }
  59. const MemoryAttr &queryData()
  60. {
  61. return data;
  62. }
  63. void notify(MemoryBuffer &returndata) // if returns false should unsubscribe
  64. {
  65. if (hasaborted) {
  66. throw MakeStringException(-1,"Subscription notification aborted");
  67. return;
  68. }
  69. size32_t dlen = returndata.length();
  70. CMessageBuffer mb;
  71. mb.append(tag).append(sid).append(dlen).append(returndata);
  72. try {
  73. if (!queryWorldCommunicator().send(mb,dst,MPTAG_DALI_SUBSCRIPTION_FULFILL,1000*60*3)) {
  74. // Must reply in 3 Minutes
  75. // Kludge to avoid locking SDS on blocked client
  76. hasaborted = true;
  77. StringBuffer tmp;
  78. throw MakeStringException(-1,"Subscription notification to %s timed out",dst->endpoint().getUrlStr(tmp).str());
  79. return;
  80. }
  81. }
  82. catch (IMP_Exception *e) {
  83. PrintExceptionLog(e,"Dali CSubscriptionStub");
  84. hasaborted = true;
  85. throw;
  86. }
  87. }
  88. void abort()
  89. {
  90. hasaborted = true;
  91. }
  92. bool aborted()
  93. {
  94. return hasaborted;
  95. }
  96. void unlink();
  97. INode &queryNode() { return *dst; }
  98. unsigned queryTag() { return tag; }
  99. SubscriptionId querySubscriptionId() { return sid; }
  100. StringBuffer &getDetails(StringBuffer &buf)
  101. {
  102. StringBuffer ep;
  103. return buf.appendf("%16" I64F "X: %s %s",sid,dst->endpoint().getUrlStr(ep).str(),hasaborted?"aborted":"");
  104. }
  105. };
  106. static class CDaliPublisher
  107. {
  108. public:
  109. virtual ISubscriptionManager *queryManager(unsigned tag) = 0;
  110. virtual void stop() = 0;
  111. virtual ~CDaliPublisher() {}
  112. } *DaliPublisher;
  113. class CDaliPublisherServer: public IDaliServer, public Thread, implements CDaliPublisher, implements IConnectionMonitor
  114. {
  115. ICopyArrayOf<CSubscriptionStub> stubs;
  116. IArrayOf<ISubscriptionManager> managers;
  117. UnsignedArray tags;
  118. CheckedCriticalSection tagsect;
  119. CheckedCriticalSection stubsect;
  120. bool stopped;
  121. ReadWriteLock processlock;
  122. public:
  123. IMPLEMENT_IINTERFACE;
  124. CDaliPublisherServer()
  125. : Thread("CDaliPublisherServer")
  126. {
  127. stopped = true;
  128. }
  129. ~CDaliPublisherServer()
  130. {
  131. stopped = true;
  132. managers.kill();
  133. }
  134. void start()
  135. {
  136. Thread::start();
  137. }
  138. void ready()
  139. {
  140. addMPConnectionMonitor(this);
  141. }
  142. void suspend()
  143. {
  144. PROGLOG("Suspending subscriptions");
  145. removeMPConnectionMonitor(this);
  146. processlock.lockWrite();
  147. PROGLOG("Suspended subscriptions");
  148. }
  149. void stop()
  150. {
  151. if (!stopped) {
  152. stopped = true;
  153. queryCoven().cancel(RANK_ALL,MPTAG_DALI_SUBSCRIPTION_REQUEST);
  154. }
  155. processlock.unlockWrite();
  156. join();
  157. }
  158. int run()
  159. {
  160. ICoven &coven=queryCoven();
  161. CMessageHandler<CDaliPublisherServer> handler("CDaliPublisherServer",this,&CDaliPublisherServer::processMessage,NULL, 100);
  162. CMessageBuffer mb;
  163. stopped = false;
  164. while (!stopped) {
  165. try {
  166. mb.clear();
  167. #ifdef TRACE_QWAITING
  168. unsigned waiting = coven.probe(RANK_ALL,MPTAG_DALI_SUBSCRIPTION_REQUEST,NULL);
  169. if ((waiting!=0)&&(waiting%10==0))
  170. DBGLOG("QPROBE: MPTAG_DALI_SUBSCRIPTION_REQUEST has %d waiting",waiting);
  171. #endif
  172. if (coven.recv(mb,RANK_ALL,MPTAG_DALI_SUBSCRIPTION_REQUEST,NULL))
  173. handler.handleMessage(mb);
  174. else
  175. stopped = true;
  176. }
  177. catch (IException *e)
  178. {
  179. EXCLOG(e, "CDaliPublisherServer");
  180. e->Release();
  181. }
  182. }
  183. return 0;
  184. }
  185. void processMessage(CMessageBuffer &mb)
  186. {
  187. ReadLockBlock block(processlock);
  188. if (stopped)
  189. return;
  190. ICoven &coven=queryCoven();
  191. int fn;
  192. mb.read(fn);
  193. SubscriptionId sid;
  194. unsigned subtag;
  195. ISubscriptionManager *manager;
  196. switch (fn) {
  197. case MSR_ADD_SUBSCRIPTION_PRIMARY:
  198. case MSR_ADD_SUBSCRIPTION_SECONDARY:
  199. {
  200. Owned<IException> exception;
  201. Owned<CSubscriptionStub> sub;
  202. try
  203. {
  204. SubscriptionId sid;
  205. mb.read(subtag).read(sid);
  206. Owned<INode> subscriber = deserializeINode(mb);
  207. size32_t dsize;
  208. mb.read(dsize);
  209. sub.setown(new CSubscriptionStub(subtag,sid,dsize,mb.readDirect(dsize),subscriber));
  210. mb.clear();
  211. {
  212. CHECKEDCRITICALBLOCK(stubsect,60000);
  213. removeAborted();
  214. }
  215. manager = queryManager(subtag);
  216. if (manager) {
  217. if (fn==MSR_ADD_SUBSCRIPTION_PRIMARY) {
  218. rank_t n = coven.queryGroup().ordinality();
  219. rank_t mr = coven.queryGroup().rank();
  220. for (rank_t r = 0;r<n;r++) {
  221. if (r!=mr) {
  222. int fn = MSR_ADD_SUBSCRIPTION_SECONDARY;
  223. mb.clear().append(fn).append(subtag).append(sid);
  224. subscriber->serialize(mb);
  225. mb.append(dsize).append(dsize,sub->queryData().get());
  226. coven.sendRecv(mb,r,MPTAG_DALI_SUBSCRIPTION_REQUEST);
  227. // should check for server failure here
  228. }
  229. }
  230. }
  231. manager->add(sub.getLink(),sid);
  232. }
  233. }
  234. catch (IException *e) {
  235. exception.setown(e);
  236. sub.clear();
  237. }
  238. unsigned retry=0;
  239. if (exception)
  240. serializeException(exception, mb);
  241. while (!coven.reply(mb,60000)) {
  242. StringBuffer eps;
  243. DBGLOG("MSR_ADD_SUBSCRIPTION_PRIMARY reply timed out to %s try %d",mb.getSender().getUrlStr(eps).str(),retry+1);
  244. if (retry++==3)
  245. return;
  246. }
  247. if (sub)
  248. {
  249. CHECKEDCRITICALBLOCK(stubsect,60000);
  250. stubs.append(*sub);
  251. }
  252. }
  253. break;
  254. case MSR_REMOVE_SUBSCRIPTION_PRIMARY:
  255. case MSR_REMOVE_SUBSCRIPTION_SECONDARY:
  256. {
  257. unsigned tstart = msTick();
  258. {
  259. CHECKEDCRITICALBLOCK(stubsect,60000);
  260. removeAborted();
  261. mb.read(subtag);
  262. mb.read(sid);
  263. manager = queryManager(subtag);
  264. if (manager) {
  265. if (fn==MSR_REMOVE_SUBSCRIPTION_PRIMARY) {
  266. rank_t n = coven.queryGroup().ordinality();
  267. rank_t mr = coven.queryGroup().rank();
  268. for (rank_t r = 0;r<n;r++) {
  269. if (r!=mr) {
  270. mb.clear().append(MSR_REMOVE_SUBSCRIPTION_SECONDARY).append(subtag).append(sid);
  271. coven.sendRecv(mb,r,MPTAG_DALI_SUBSCRIPTION_REQUEST);
  272. // should check for server failure here
  273. }
  274. }
  275. }
  276. manager->remove(sid);
  277. }
  278. mb.clear();
  279. }
  280. coven.reply(mb);
  281. unsigned telapsed=msTick()-tstart;
  282. if (telapsed>1000)
  283. LOG(MCerror, unknownJob, "MSR_REMOVE_SUBSCRIPTION_PRIMARY.1 took %dms",telapsed);
  284. }
  285. break;
  286. }
  287. }
  288. void nodeDown(rank_t rank)
  289. {
  290. assertex(!"TBD");
  291. }
  292. ISubscriptionManager *queryManager(unsigned tag)
  293. {
  294. CHECKEDCRITICALBLOCK(tagsect,60000);
  295. unsigned i = tags.find(tag);
  296. if (i==NotFound)
  297. return NULL;
  298. return &managers.item(i);
  299. }
  300. void registerSubscriptionManager(unsigned tag, ISubscriptionManager *manager)
  301. {
  302. CHECKEDCRITICALBLOCK(tagsect,60000);
  303. tags.append(tag);
  304. manager->Link();
  305. managers.append(*manager);
  306. }
  307. void unlink(CSubscriptionStub *stub)
  308. {
  309. unsigned tstart = msTick();
  310. {
  311. CHECKEDCRITICALBLOCK(stubsect,60000);
  312. stubs.zap(*stub);
  313. }
  314. unsigned telapsed=msTick()-tstart;
  315. if (telapsed>1000)
  316. LOG(MCerror, unknownJob, "CDaliPublisherServer::unlink took %dms",telapsed);
  317. }
  318. void onClose(SocketEndpoint &ep)
  319. {
  320. // mark stub closed
  321. unsigned tstart = msTick();
  322. {
  323. CHECKEDCRITICALBLOCK(stubsect,60000);
  324. ForEachItemIn(i, stubs)
  325. {
  326. CSubscriptionStub &stub = stubs.item(i);
  327. if (stub.queryNode().endpoint().equals(ep)) {
  328. stub.abort();
  329. }
  330. }
  331. unsigned telapsed=msTick()-tstart;
  332. if (telapsed>1000)
  333. LOG(MCerror, unknownJob, "CDaliPublisherServer::onClose took %dms",telapsed);
  334. }
  335. }
  336. void removeAborted()
  337. {
  338. #ifdef SUPRESS_REMOVE_ABORTED
  339. return;
  340. #endif
  341. // called from critical section
  342. CIArrayOf<CSubscriptionStub> toremove;
  343. ForEachItemIn(i, stubs)
  344. {
  345. CSubscriptionStub &stub = stubs.item(i);
  346. if (stub.aborted()) {
  347. stub.Link();
  348. toremove.append(stub);
  349. }
  350. }
  351. if (toremove.ordinality()) {
  352. CHECKEDCRITICALUNBLOCK(stubsect,60000);
  353. ForEachItemIn(i2, toremove) {
  354. CSubscriptionStub &stub = toremove.item(i2);
  355. queryManager(stub.queryTag())->remove(stub.querySubscriptionId());
  356. }
  357. }
  358. }
  359. StringBuffer &getSubscriptionList(StringBuffer &buf)
  360. {
  361. unsigned tstart = msTick();
  362. {
  363. CHECKEDCRITICALBLOCK(stubsect,60000);
  364. ForEachItemIn(i, stubs)
  365. {
  366. CSubscriptionStub &stub = stubs.item(i);
  367. stub.getDetails(buf).append('\n');
  368. }
  369. }
  370. unsigned telapsed=msTick()-tstart;
  371. if (telapsed>1000)
  372. LOG(MCerror, unknownJob, "CDaliPublisherServer::getSubscriptionList took %dms",telapsed);
  373. return buf;
  374. }
  375. } *daliPublisherServer = NULL;
  376. StringBuffer &getSubscriptionList(StringBuffer &buf)
  377. {
  378. if (daliPublisherServer)
  379. daliPublisherServer->getSubscriptionList(buf);
  380. return buf;
  381. }
  382. void CSubscriptionStub::unlink()
  383. {
  384. if (daliPublisherServer)
  385. daliPublisherServer->unlink(this);
  386. }
  387. class CDaliSubscriptionManagerStub: implements ISubscriptionManager, public CInterface
  388. {
  389. // Client side
  390. unsigned tag;
  391. IArrayOf<ISubscription> subscriptions;
  392. Int64Array ids;
  393. CriticalSection subscriptionsect;
  394. public:
  395. IMPLEMENT_IINTERFACE;
  396. CDaliSubscriptionManagerStub(unsigned _tag)
  397. {
  398. tag = _tag;
  399. }
  400. ~CDaliSubscriptionManagerStub()
  401. {
  402. subscriptions.kill();
  403. }
  404. void add(ISubscription *subs,SubscriptionId id)
  405. {
  406. {
  407. CriticalBlock block(subscriptionsect);
  408. ids.append(id);
  409. subscriptions.append(*subs);
  410. }
  411. int fn = MSR_ADD_SUBSCRIPTION_PRIMARY;
  412. CMessageBuffer mb;
  413. mb.append(fn).append(tag).append(id);
  414. queryMyNode()->serialize(mb);
  415. const MemoryAttr &data = subs->queryData();
  416. size32_t dlen = (size32_t)data.length();
  417. mb.append(dlen);
  418. mb.append(dlen,data.get());
  419. try
  420. {
  421. queryCoven().sendRecv(mb,RANK_RANDOM,MPTAG_DALI_SUBSCRIPTION_REQUEST);
  422. if (mb.length())
  423. throw deserializeException(mb);
  424. }
  425. catch (IException *e)
  426. {
  427. PrintExceptionLog(e,"Dali CDaliSubscriptionManagerStub::add");
  428. {
  429. CriticalBlock block(subscriptionsect);
  430. unsigned idx = ids.find(id);
  431. if (NotFound != idx)
  432. {
  433. ids.remove(idx);
  434. subscriptions.remove(idx);
  435. }
  436. }
  437. throw;
  438. }
  439. }
  440. void remove(SubscriptionId id)
  441. {
  442. CriticalBlock block(subscriptionsect);
  443. unsigned idx = ids.find(id);
  444. if (idx == NotFound)
  445. return;
  446. int fn = MSR_REMOVE_SUBSCRIPTION_PRIMARY;
  447. CMessageBuffer mb;
  448. mb.append(fn);
  449. mb.append(tag);
  450. mb.append(id);
  451. try {
  452. queryCoven().sendRecv(mb,RANK_RANDOM,MPTAG_DALI_SUBSCRIPTION_REQUEST);
  453. }
  454. catch (IDaliClient_Exception *e) {
  455. PrintExceptionLog(e,"Dali CDaliSubscriptionManagerStub::remove");
  456. e->Release();
  457. }
  458. subscriptions.remove(idx);
  459. ids.remove(idx);
  460. }
  461. void notify(SubscriptionId id,MemoryBuffer &mb)
  462. {
  463. Linked<ISubscription> item;
  464. {
  465. CriticalBlock block(subscriptionsect);
  466. unsigned i = ids.find(id);
  467. if (i == NotFound)
  468. return;
  469. item.set(&subscriptions.item(i));
  470. }
  471. item->notify(mb);
  472. }
  473. void abort()
  474. {
  475. PrintLog("CDaliSubscriptionManagerStub aborting");
  476. CriticalBlock block(subscriptionsect);
  477. ForEachItemIn(i,subscriptions) {
  478. subscriptions.item(i).abort();
  479. }
  480. subscriptions.kill();
  481. ids.kill();
  482. PrintLog("CDaliSubscriptionManagerStub aborted");
  483. }
  484. };
  485. class CDaliPublisherClient: public Thread, public CDaliPublisher
  486. {
  487. CIArrayOf<CDaliSubscriptionManagerStub> managers;
  488. UnsignedArray tags;
  489. CheckedCriticalSection tagsect;
  490. bool stopped;
  491. public:
  492. CDaliPublisherClient()
  493. : Thread("CDaliPublisherClient")
  494. {
  495. stopped = true;
  496. start();
  497. }
  498. ~CDaliPublisherClient()
  499. {
  500. managers.kill();
  501. }
  502. ISubscriptionManager *queryManager(unsigned tag)
  503. {
  504. CHECKEDCRITICALBLOCK(tagsect,60000);
  505. unsigned i = tags.find(tag);
  506. if (i!=NotFound)
  507. return &managers.item(i);
  508. CDaliSubscriptionManagerStub *stub = new CDaliSubscriptionManagerStub(tag);
  509. tags.append(tag);
  510. managers.append(*stub);
  511. return stub;
  512. }
  513. int run()
  514. {
  515. ICoven &coven=queryCoven();
  516. CMessageHandler<CDaliPublisherClient> handler("CDaliPublisherClientMessages",this,&CDaliPublisherClient::processMessage);
  517. stopped = false;
  518. CMessageBuffer mb;
  519. stopped = false;
  520. while (!stopped) {
  521. mb.clear();
  522. try {
  523. #ifdef TRACE_QWAITING
  524. unsigned waiting = coven.probe(RANK_ALL,MPTAG_DALI_SUBSCRIPTION_FULFILL,NULL);
  525. if ((waiting!=0)&&(waiting%10==0))
  526. DBGLOG("QPROBE: MPTAG_DALI_SUBSCRIPTION_REQUEST has %d waiting",waiting);
  527. #endif
  528. if (coven.recv(mb,RANK_ALL,MPTAG_DALI_SUBSCRIPTION_FULFILL,NULL))
  529. handler.handleMessage(mb);
  530. else
  531. stopped = true;
  532. }
  533. catch (IException *e) {
  534. EXCLOG(e,"CDaliPublisherClient::run");
  535. e->Release();
  536. stopped = true;
  537. }
  538. }
  539. return 0;
  540. }
  541. void processMessage(CMessageBuffer &mb)
  542. {
  543. //ICoven &coven=queryCoven();
  544. //ICommunicator &comm=coven.queryComm();
  545. unsigned tag;
  546. mb.read(tag);
  547. SubscriptionId id;
  548. mb.read(id);
  549. unsigned i = tags.find(tag);
  550. if (i!=NotFound) {
  551. MemoryBuffer qb;
  552. size32_t dlen;
  553. mb.read(dlen);
  554. qb.append(dlen,mb.readDirect(dlen)); // this is bit inefficient - perhaps could be improved
  555. managers.item(i).notify(id,qb);
  556. }
  557. }
  558. void ready()
  559. {
  560. }
  561. void stop()
  562. {
  563. if (!stopped) {
  564. stopped = true;
  565. queryCoven().cancel(RANK_ALL,MPTAG_DALI_SUBSCRIPTION_FULFILL);
  566. }
  567. join();
  568. }
  569. };
  570. IDaliServer *createDaliPublisherServer()
  571. {
  572. assertex(!daliPublisherServer); // initialization problem
  573. daliPublisherServer = new CDaliPublisherServer();
  574. DaliPublisher = daliPublisherServer;
  575. return daliPublisherServer;
  576. }
  577. static CriticalSection subscriptionCrit;
  578. ISubscriptionManager *querySubscriptionManager(unsigned tag)
  579. {
  580. CriticalBlock block(subscriptionCrit);
  581. if (!DaliPublisher) {
  582. ICoven &coven=queryCoven();
  583. assertex(!coven.inCoven()); // Check not Coven server (if occurs - not initialized correctly;
  584. DaliPublisher = new CDaliPublisherClient();
  585. }
  586. return DaliPublisher->queryManager(tag);
  587. }
  588. void closeSubscriptionManager()
  589. {
  590. CriticalBlock block(subscriptionCrit);
  591. if (DaliPublisher) {
  592. try {
  593. DaliPublisher->stop();
  594. }
  595. catch (IMP_Exception *e)
  596. {
  597. if (e->errorCode()!=MPERR_link_closed)
  598. throw;
  599. e->Release();
  600. }
  601. catch (IDaliClient_Exception *e) {
  602. if (e->errorCode()!=DCERR_server_closed)
  603. throw;
  604. e->Release();
  605. }
  606. delete DaliPublisher;
  607. DaliPublisher = NULL;
  608. }
  609. }
  610. void registerSubscriptionManager(unsigned tag, ISubscriptionManager *manager)
  611. {
  612. assertex(daliPublisherServer); // initialization order check
  613. daliPublisherServer->registerSubscriptionManager(tag,manager);
  614. }