mpbase.cpp 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #define mp_decl DECL_EXPORT
  14. #include "platform.h"
  15. #include "jlib.hpp"
  16. #include "jlog.hpp"
  17. #include "mpbase.hpp"
  18. static INode *MyNode=NULL;
  19. static INode *NullNode=NULL;
  20. class MPNode: implements INode, public CInterface
  21. {
  22. protected: friend class MPNodeCache;
  23. SocketEndpoint ep;
  24. public:
  25. IMPLEMENT_IINTERFACE;
  26. MPNode(const SocketEndpoint &_ep)
  27. : ep(_ep)
  28. {
  29. #ifdef _DEBUG
  30. // assertex(!_ep.LoopBack()); // localhost not supported for nodes
  31. #endif
  32. }
  33. bool equals(const INode *n) const { return endpoint().equals(n->endpoint()); }
  34. void serialize(MemoryBuffer &tgt)
  35. {
  36. ep.serialize(tgt);
  37. }
  38. static MPNode *deserialize(MemoryBuffer &src);
  39. unsigned getHash() const { return ep.hash(0); }
  40. virtual bool isHost() const
  41. {
  42. return ep.isHost();
  43. }
  44. virtual bool isLocalTo(INode *to) const
  45. {
  46. return ep.ipequals(to->endpoint());
  47. }
  48. const SocketEndpoint &endpoint() const { return ep; }
  49. };
  50. class MPNodeCache: public SuperHashTableOf<MPNode,SocketEndpoint>
  51. {
  52. CriticalSection sect;
  53. public:
  54. ~MPNodeCache()
  55. {
  56. _releaseAll();
  57. }
  58. void onAdd(void *)
  59. {
  60. // not used
  61. }
  62. void onRemove(void *e)
  63. {
  64. MPNode &elem=*(MPNode *)e;
  65. elem.Release();
  66. }
  67. unsigned getHashFromElement(const void *e) const
  68. {
  69. const MPNode &elem=*(const MPNode *)e;
  70. return elem.ep.hash(0);
  71. }
  72. unsigned getHashFromFindParam(const void *fp) const
  73. {
  74. return ((const SocketEndpoint *)fp)->hash(0);
  75. }
  76. const void * getFindParam(const void *p) const
  77. {
  78. const MPNode &elem=*(const MPNode *)p;
  79. return &elem.ep;
  80. }
  81. bool matchesFindParam(const void * et, const void *fp, unsigned) const
  82. {
  83. return ((MPNode *)et)->ep.equals(*(SocketEndpoint *)fp);
  84. }
  85. IMPLEMENT_SUPERHASHTABLEOF_REF_FIND(MPNode,SocketEndpoint);
  86. MPNode *lookup(const SocketEndpoint &ep)
  87. {
  88. CriticalBlock block(sect);
  89. MPNode *item=SuperHashTableOf<MPNode,SocketEndpoint>::find(&ep);
  90. if (!item) {
  91. item = new MPNode(ep);
  92. add(*item);
  93. }
  94. return LINK(item);
  95. }
  96. } *NodeCache = NULL;
  97. MPNode *MPNode::deserialize(MemoryBuffer &src)
  98. {
  99. SocketEndpoint ep;
  100. ep.deserialize(src);
  101. if (NodeCache)
  102. return NodeCache->lookup(ep);
  103. return new MPNode(ep);
  104. }
  105. class CGroup: implements IGroup, public CInterface
  106. {
  107. protected: friend class CNodeIterator;
  108. rank_t count;
  109. mutable rank_t myrank;
  110. INode **nodes;
  111. public:
  112. IMPLEMENT_IINTERFACE;
  113. CGroup(rank_t _count,INode **_nodes=NULL)
  114. {
  115. count = _count;
  116. myrank = RANK_NULL;
  117. nodes = count?(INode **)malloc(count*sizeof(INode *)):NULL;
  118. if (_nodes) {
  119. for (rank_t i=0; i<count; i++)
  120. nodes[i] = LINK(_nodes[i]);
  121. }
  122. }
  123. CGroup(rank_t _count,const SocketEndpoint *ep)
  124. {
  125. count = _count;
  126. myrank = RANK_NULL;
  127. nodes = count?(INode **)malloc(count*sizeof(INode *)):NULL;
  128. for (rank_t i=0; i<count; i++) {
  129. if (NodeCache)
  130. nodes[i] = NodeCache->lookup(*ep);
  131. else
  132. nodes[i] = new MPNode(*ep);
  133. ep++;
  134. }
  135. }
  136. CGroup(SocketEndpointArray &epa)
  137. {
  138. count = epa.ordinality();
  139. myrank = RANK_NULL;
  140. nodes = count?(INode **)malloc(count*sizeof(INode *)):NULL;
  141. for (rank_t i=0; i<count; i++) {
  142. if (NodeCache)
  143. nodes[i] = NodeCache->lookup(epa.item(i));
  144. else
  145. nodes[i] = new MPNode(epa.item(i));
  146. }
  147. }
  148. ~CGroup()
  149. {
  150. for (rank_t i=0; i<count; i++)
  151. nodes[i]->Release();
  152. free(nodes);
  153. }
  154. rank_t ordinality() const { return count; }
  155. rank_t rank(const SocketEndpoint &ep) const
  156. {
  157. rank_t i=count;
  158. while (i) {
  159. i--;
  160. // a group is a list of IpAddresses or a list of full endpoints
  161. SocketEndpoint nep=nodes[i]->endpoint();
  162. if (nep.port) {
  163. if (ep.equals(nep))
  164. return i;
  165. }
  166. else if (ep.ipequals(nep)) // ip list so just check IP
  167. return i;
  168. }
  169. return RANK_NULL;
  170. }
  171. rank_t rank(INode *node) const { return node?rank(node->endpoint()):RANK_NULL; }
  172. rank_t rank() const { if (myrank==RANK_NULL) myrank = rank(MyNode); return myrank; }
  173. GroupRelation compare(const IGroup *grp) const
  174. {
  175. if (grp) {
  176. rank_t r1=ordinality();
  177. rank_t r2=grp->ordinality();
  178. rank_t r;
  179. if (r1==0) {
  180. if (r2==0)
  181. return GRidentical;
  182. return GRdisjoint;
  183. }
  184. if (r2==0)
  185. return GRdisjoint;
  186. bool somematch=false;
  187. if (r1==r2) { // check for identical
  188. r=r1;
  189. for (;;) {
  190. r--;
  191. if (!nodes[r]->equals(&grp->queryNode(r)))
  192. break;
  193. somematch = true;
  194. if (r==0)
  195. return GRidentical;
  196. }
  197. }
  198. else if (r2>r1) {
  199. for (r=0;r<r1;r++)
  200. if (!nodes[r]->equals(&grp->queryNode(r)))
  201. break;
  202. if (r==r1)
  203. return GRbasesubset;
  204. }
  205. else {
  206. for (r=0;r<r1;r++)
  207. if (!nodes[r]->equals(&grp->queryNode(r%r2)))
  208. break;
  209. if (r==r1)
  210. return GRwrappedsuperset;
  211. }
  212. // the following could be improved
  213. bool *matched2=(bool *)calloc(r2,sizeof(bool));
  214. bool anymatched = false;
  215. bool allmatched1 = true;
  216. do {
  217. r1--;
  218. r=r2;
  219. for (;;) {
  220. r--;
  221. if (!matched2[r]) {
  222. if (nodes[r1]->equals(&grp->queryNode(r))) {
  223. matched2[r] = true;
  224. anymatched = true;
  225. break;
  226. }
  227. }
  228. if (r==0) {
  229. allmatched1 = false;
  230. break;
  231. }
  232. }
  233. } while (r1);
  234. bool allmatched2 = true;
  235. do {
  236. r2--;
  237. if (!matched2[r2])
  238. allmatched2 = false;
  239. } while (r2);
  240. free(matched2);
  241. if (allmatched1) {
  242. if (allmatched2)
  243. return GRdifferentorder;
  244. return GRsubset;
  245. }
  246. if (allmatched2)
  247. return GRsuperset;
  248. if (anymatched)
  249. return GRintersection;
  250. }
  251. return GRdisjoint;
  252. }
  253. bool equals(const IGroup *grp) const
  254. {
  255. if (!grp)
  256. return false;
  257. rank_t r1=ordinality();
  258. if (r1!=grp->ordinality())
  259. return false;
  260. for (rank_t r=0;r<r1;r++)
  261. if (!nodes[r]->equals(&grp->queryNode(r)))
  262. return false;
  263. return true;
  264. }
  265. void translate(const IGroup *othergroup, rank_t nranks, const rank_t *otherranks, rank_t *resranks ) const
  266. {
  267. while (nranks) {
  268. *resranks = rank(&othergroup->queryNode(*otherranks));
  269. nranks--;
  270. resranks++;
  271. otherranks++;
  272. }
  273. }
  274. IGroup *subset(rank_t start,rank_t num) const
  275. {
  276. CGroup *newgrp = new CGroup(num);
  277. while (num) {
  278. num--;
  279. newgrp->nodes[num] = LINK(nodes[start+num]);
  280. }
  281. return newgrp;
  282. }
  283. virtual IGroup *subset(const rank_t *order,rank_t num) const
  284. {
  285. CGroup *newgrp = new CGroup(num);
  286. for( rank_t i=0; i<num; i++ ) {
  287. newgrp->nodes[i] = LINK(nodes[*order]);
  288. order++;
  289. }
  290. return newgrp;
  291. }
  292. virtual IGroup *combine(const IGroup *grp) const
  293. {
  294. rank_t g2ord = grp->ordinality();
  295. rank_t j = 0;
  296. INode **tmp = (INode **)malloc(g2ord*sizeof(INode *));
  297. rank_t i;
  298. for (i=0; i<g2ord; i++) {
  299. if (rank(&grp->queryNode(i))==RANK_NULL) {
  300. tmp[j] = grp->getNode(i);
  301. j++;
  302. }
  303. }
  304. CGroup *newgrp = new CGroup(count+j);
  305. for (i=0; i<count; i++)
  306. newgrp->nodes[i] = LINK(nodes[i]);
  307. for (rank_t k=0;k<j; k++)
  308. newgrp->nodes[i++] = tmp[k];
  309. free(tmp);
  310. return newgrp;
  311. }
  312. bool isMember(INode *node) const
  313. {
  314. if (!node)
  315. return false;
  316. return rank(node->endpoint())!=RANK_NULL;
  317. }
  318. bool isMember() const
  319. {
  320. assertex(MyNode);
  321. return rank()!=RANK_NULL;
  322. }
  323. unsigned distance(const IpAddress &ip) const
  324. {
  325. unsigned bestdist = (unsigned)-1;
  326. for (unsigned i=0;i<count;i++) {
  327. unsigned d = ip.ipdistance(nodes[i]->endpoint());
  328. if (d<bestdist)
  329. bestdist = d;
  330. }
  331. return bestdist;
  332. }
  333. unsigned distance(const IGroup *grp) const
  334. {
  335. if (!grp)
  336. return (unsigned)-1;
  337. unsigned c2 = grp->ordinality();
  338. if (c2<count)
  339. return grp->distance(this);
  340. unsigned ret = c2;
  341. unsigned bestdist = (unsigned)-1;
  342. for (unsigned i=0;i<count;i++) {
  343. INode &node1 = *nodes[i];
  344. if (node1.equals(&grp->queryNode(i)))
  345. ret--;
  346. else {
  347. unsigned d = grp->distance(node1.endpoint());
  348. if (d<bestdist)
  349. bestdist = d;
  350. }
  351. }
  352. if (bestdist!=(unsigned)-1)
  353. ret += bestdist;
  354. return ret;
  355. }
  356. IGroup *diff(const IGroup *g) const
  357. {
  358. PointerArray toadd;
  359. ForEachNodeInGroup(i,*this) {
  360. INode *node = &queryNode(i);
  361. if (node&&!g->isMember(node))
  362. toadd.append(node);
  363. }
  364. ForEachNodeInGroup(j,*g) {
  365. INode *node = &g->queryNode(j);
  366. if (node&&!isMember(node))
  367. toadd.append(node);
  368. }
  369. unsigned num = toadd.ordinality();
  370. CGroup *newgrp = new CGroup(num);
  371. while (num) {
  372. num--;
  373. newgrp->nodes[num] = LINK(((INode *)toadd.item(num)));
  374. }
  375. return newgrp;
  376. }
  377. bool overlaps(const IGroup *grp) const
  378. {
  379. if (grp) {
  380. ForEachNodeInGroup(i,*grp) {
  381. if (isMember(&grp->queryNode(i)))
  382. return true;
  383. }
  384. }
  385. return false;
  386. }
  387. IGroup *intersect(const IGroup *g) const
  388. {
  389. PointerArray toadd;
  390. ForEachNodeInGroup(i,*this) {
  391. INode *node = &queryNode(i);
  392. if (node&&g->isMember(node))
  393. toadd.append(node);
  394. }
  395. unsigned num = toadd.ordinality();
  396. CGroup *newgrp = new CGroup(num);
  397. while (num) {
  398. num--;
  399. newgrp->nodes[num] = LINK(((INode *)toadd.item(num)));
  400. }
  401. return newgrp;
  402. }
  403. IGroup *swap(rank_t r1,rank_t r2) const
  404. {
  405. CGroup *newgrp = new CGroup(count);
  406. rank_t i;
  407. for(i=0; i<count; i++ ) {
  408. if ((i==r1)&&(r2<count))
  409. newgrp->nodes[i] = LINK(nodes[r2]);
  410. if ((i==r2)&&(r1<count))
  411. newgrp->nodes[i] = LINK(nodes[r1]);
  412. else
  413. newgrp->nodes[i] = LINK(nodes[i]);
  414. }
  415. return newgrp;
  416. }
  417. virtual IGroup *add(INode *node) const
  418. {
  419. CGroup *newgrp = new CGroup(count+1);
  420. rank_t i;
  421. for(i=0; i<count; i++ ) {
  422. newgrp->nodes[i] = LINK(nodes[i]);
  423. }
  424. newgrp->nodes[i] = LINK(node);
  425. return newgrp;
  426. }
  427. virtual IGroup *add(INode *node,unsigned pos) const
  428. {
  429. CGroup *newgrp = new CGroup(count+1);
  430. unsigned j = 0;
  431. for( rank_t i=0; i<count; i++ ) {
  432. if (j==pos) {
  433. newgrp->nodes[j++] = LINK(node);
  434. }
  435. newgrp->nodes[j++] = LINK(nodes[i]);
  436. }
  437. return newgrp;
  438. }
  439. virtual IGroup *remove(unsigned pos) const
  440. {
  441. assertex(pos<count);
  442. CGroup *newgrp = new CGroup(count-1);
  443. unsigned j=0;
  444. for( rank_t i=0; i<count; i++ ) {
  445. if (i!=pos)
  446. newgrp->nodes[j++] = LINK(nodes[i]);
  447. }
  448. return newgrp;
  449. }
  450. virtual IGroup *rotate(int num) const
  451. {
  452. CGroup *newgrp = new CGroup(count);
  453. bool neg = false;
  454. if (num<0) {
  455. num=-num;
  456. neg = true;
  457. }
  458. unsigned j=(num%count);
  459. for( rank_t i=0; i<count; i++ ) {
  460. if (neg)
  461. newgrp->nodes[i] = LINK(nodes[j]);
  462. else
  463. newgrp->nodes[j] = LINK(nodes[i]);
  464. j++;
  465. if (j==count)
  466. j = 0;
  467. }
  468. return newgrp;
  469. }
  470. INodeIterator *getIterator(rank_t start=0,rank_t num=RANK_NULL) const ;
  471. INode &queryNode(rank_t r) const
  472. {
  473. assertex(r<count);
  474. return *nodes[r];
  475. }
  476. INode *getNode(rank_t r) const
  477. {
  478. assertex(r<count);
  479. return LINK(nodes[r]);
  480. }
  481. StringBuffer &getText(StringBuffer &text) const
  482. {
  483. if (!count)
  484. return text;
  485. if (count==1)
  486. return nodes[0]->endpoint().getUrlStr(text);
  487. // following is rather slow maybe could do with more direct method with pointers TBD
  488. SocketEndpointArray epa;
  489. for(unsigned i=0;i<count;i++) {
  490. SocketEndpoint ep(nodes[i]->endpoint()); // pretty horrible
  491. epa.append(ep);
  492. }
  493. return epa.getText(text);
  494. }
  495. static IGroup *fromText(const char *s,unsigned defport)
  496. {
  497. SocketEndpointArray epa;
  498. epa.fromText(s,defport);
  499. ForEachItemIn(idx, epa)
  500. {
  501. if (!epa.item(idx).isNull())
  502. return createIGroup(epa);
  503. }
  504. throw MakeStringException(0, "Invalid group %s (all nodes null)", s);
  505. }
  506. void serialize(MemoryBuffer &tgt) const
  507. {
  508. tgt.append(count);
  509. for (rank_t i=0; i<count; i++)
  510. nodes[i]->serialize(tgt);
  511. }
  512. static CGroup *deserialize(MemoryBuffer &src)
  513. {
  514. CGroup *ret = new CGroup(0);
  515. src.read(ret->count);
  516. if (ret->count) {
  517. ret->nodes = ret->count?(INode **)malloc(ret->count*sizeof(INode *)):NULL;
  518. for (rank_t i=0; i<ret->count; i++)
  519. ret->nodes[i] = MPNode::deserialize(src);
  520. }
  521. return ret;
  522. }
  523. void getSocketEndpoints(SocketEndpointArray &sea) const
  524. {
  525. for (rank_t i=0; i<count; i++)
  526. sea.append((SocketEndpoint &)nodes[i]->endpoint());
  527. }
  528. };
  529. class CNodeIterator : implements INodeIterator, public CInterface
  530. {
  531. Linked<IGroup> parent;
  532. rank_t start;
  533. rank_t num;
  534. rank_t pos;
  535. public:
  536. IMPLEMENT_IINTERFACE;
  537. CNodeIterator(IGroup *_parent,rank_t _start,rank_t _num)
  538. : parent(_parent)
  539. {
  540. start = _start;
  541. num = _num;
  542. if ((num==RANK_NULL)||(num+start>parent->ordinality()))
  543. num = (start<parent->ordinality())?parent->ordinality()-start:0;
  544. }
  545. bool first()
  546. {
  547. pos = 0;
  548. return (pos<num);
  549. }
  550. bool next()
  551. {
  552. pos++;
  553. return (pos<num);
  554. }
  555. bool isValid()
  556. {
  557. return (pos<num);
  558. }
  559. INode &query()
  560. {
  561. return parent->queryNode(start+pos);
  562. }
  563. INode &get()
  564. {
  565. return *parent->getNode(start+pos);
  566. }
  567. };
  568. INodeIterator *CGroup::getIterator(rank_t start,rank_t num) const
  569. {
  570. return new CNodeIterator((IGroup *)this,start,num);
  571. }
  572. INode *deserializeINode(MemoryBuffer &src)
  573. {
  574. return MPNode::deserialize(src);
  575. }
  576. INode *createINode(const SocketEndpoint &ep)
  577. {
  578. if (NodeCache)
  579. return NodeCache->lookup(ep);
  580. return new MPNode(ep);
  581. }
  582. INode *createINodeIP(const IpAddress &ip,unsigned short port=0)
  583. {
  584. SocketEndpoint ep(port,ip);
  585. return createINode(ep);
  586. }
  587. INode *createINode(const char *name,unsigned short port)
  588. {
  589. SocketEndpoint ep(name,port);
  590. return createINode(ep);
  591. }
  592. IGroup *createIGroup(rank_t num,INode **nodes)
  593. {
  594. return new CGroup(num,nodes);
  595. }
  596. IGroup *createIGroup(rank_t num,const SocketEndpoint *ep)
  597. {
  598. return new CGroup(num,ep);
  599. }
  600. IGroup *createIGroup(SocketEndpointArray &epa)
  601. {
  602. return new CGroup(epa);
  603. }
  604. IGroup *createIGroup(const char *endpointlist,unsigned short defport)
  605. {
  606. const char *s = endpointlist;
  607. bool oldform=false;
  608. if (s)
  609. while (*s) {
  610. if (*s=='|') {
  611. oldform = true;
  612. break;
  613. }
  614. if (*s==',') {
  615. while (isspace(*s))
  616. s++;
  617. if ((*s=='=')||(*s=='*')) {
  618. oldform = true;
  619. break;
  620. }
  621. }
  622. s++;
  623. }
  624. if (oldform) {
  625. SocketListParser list(endpointlist);
  626. SocketEndpointArray eparray;
  627. list.getSockets(eparray,defport);
  628. return createIGroup(eparray);
  629. }
  630. return CGroup::fromText(endpointlist,defport);
  631. }
  632. IGroup *deserializeIGroup(MemoryBuffer &src)
  633. {
  634. return CGroup::deserialize(src);
  635. }
  636. void initMyNode(unsigned short port)
  637. {
  638. setNodeCaching(port != 0);
  639. ::Release(MyNode);
  640. MyNode = NULL;
  641. if (port) {
  642. SocketEndpoint ep(port);
  643. MyNode = new MPNode(ep);
  644. if (ep.isLoopBack()) {
  645. DBGLOG("MP Warning - localhost used for MP host address, NIC adaptor not identified");
  646. }
  647. queryNullNode();
  648. }
  649. else
  650. {
  651. ::Release(NullNode);
  652. NullNode = NULL;
  653. }
  654. }
  655. INode *queryMyNode()
  656. {
  657. return MyNode;
  658. }
  659. INode *queryNullNode()
  660. {
  661. if (!NullNode) {
  662. SocketEndpoint ep;
  663. NullNode = new MPNode(ep);
  664. }
  665. return NullNode;
  666. }
  667. void setNodeCaching(bool on)
  668. {
  669. if (on) {
  670. if (!NodeCache)
  671. NodeCache = new MPNodeCache();
  672. }
  673. else {
  674. MPNodeCache *nc = NodeCache;
  675. NodeCache = NULL;
  676. delete nc;
  677. }
  678. }