mpbase.cpp 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #define mp_decl __declspec(dllexport)
  14. #include "platform.h"
  15. #include "jlib.hpp"
  16. #include "jlog.hpp"
  17. #include "mpbase.hpp"
  18. static INode *MyNode=NULL;
  19. static INode *NullNode=NULL;
  20. class MPNode: public CInterface, implements INode
  21. {
  22. protected: friend class MPNodeCache;
  23. SocketEndpoint ep;
  24. public:
  25. IMPLEMENT_IINTERFACE;
  26. MPNode(const SocketEndpoint &_ep)
  27. : ep(_ep)
  28. {
  29. #ifdef _DEBUG
  30. // assertex(!_ep.LoopBack()); // localhost not supported for nodes
  31. #endif
  32. }
  33. bool equals(const INode *n) const { return endpoint().equals(n->endpoint()); }
  34. void serialize(MemoryBuffer &tgt)
  35. {
  36. ep.serialize(tgt);
  37. }
  38. static MPNode *deserialize(MemoryBuffer &src);
  39. unsigned getHash() const { return ep.hash(0); }
  40. virtual bool isHost() const
  41. {
  42. return ep.isHost();
  43. }
  44. virtual bool isLocalTo(INode *to) const
  45. {
  46. return ep.ipequals(to->endpoint());
  47. }
  48. const SocketEndpoint &endpoint() const { return ep; }
  49. };
  50. class MPNodeCache: public SuperHashTableOf<MPNode,SocketEndpoint>
  51. {
  52. CriticalSection sect;
  53. public:
  54. ~MPNodeCache()
  55. {
  56. releaseAll();
  57. }
  58. void onAdd(void *)
  59. {
  60. // not used
  61. }
  62. void onRemove(void *e)
  63. {
  64. MPNode &elem=*(MPNode *)e;
  65. elem.Release();
  66. }
  67. unsigned getHashFromElement(const void *e) const
  68. {
  69. const MPNode &elem=*(const MPNode *)e;
  70. return elem.ep.hash(0);
  71. }
  72. unsigned getHashFromFindParam(const void *fp) const
  73. {
  74. return ((const SocketEndpoint *)fp)->hash(0);
  75. }
  76. const void * getFindParam(const void *p) const
  77. {
  78. const MPNode &elem=*(const MPNode *)p;
  79. return &elem.ep;
  80. }
  81. bool matchesFindParam(const void * et, const void *fp, unsigned) const
  82. {
  83. return ((MPNode *)et)->ep.equals(*(SocketEndpoint *)fp);
  84. }
  85. IMPLEMENT_SUPERHASHTABLEOF_REF_FIND(MPNode,SocketEndpoint);
  86. MPNode *lookup(const SocketEndpoint &ep)
  87. {
  88. CriticalBlock block(sect);
  89. MPNode *item=SuperHashTableOf<MPNode,SocketEndpoint>::find(&ep);
  90. if (!item) {
  91. item = new MPNode(ep);
  92. add(*item);
  93. }
  94. return LINK(item);
  95. }
  96. } *NodeCache = NULL;
  97. MPNode *MPNode::deserialize(MemoryBuffer &src)
  98. {
  99. SocketEndpoint ep;
  100. ep.deserialize(src);
  101. if (NodeCache)
  102. return NodeCache->lookup(ep);
  103. return new MPNode(ep);
  104. }
  105. class CGroup: public CInterface, implements IGroup
  106. {
  107. protected: friend class CNodeIterator;
  108. rank_t count;
  109. mutable rank_t myrank;
  110. INode **nodes;
  111. public:
  112. IMPLEMENT_IINTERFACE;
  113. CGroup(rank_t _count,INode **_nodes=NULL)
  114. {
  115. count = _count;
  116. myrank = RANK_NULL;
  117. nodes = count?(INode **)malloc(count*sizeof(INode *)):NULL;
  118. if (_nodes) {
  119. for (rank_t i=0; i<count; i++)
  120. nodes[i] = LINK(_nodes[i]);
  121. }
  122. }
  123. CGroup(rank_t _count,const SocketEndpoint *ep)
  124. {
  125. count = _count;
  126. myrank = RANK_NULL;
  127. nodes = count?(INode **)malloc(count*sizeof(INode *)):NULL;
  128. for (rank_t i=0; i<count; i++) {
  129. if (NodeCache)
  130. nodes[i] = NodeCache->lookup(*ep);
  131. else
  132. nodes[i] = new MPNode(*ep);
  133. ep++;
  134. }
  135. }
  136. CGroup(SocketEndpointArray &epa)
  137. {
  138. count = epa.ordinality();
  139. myrank = RANK_NULL;
  140. nodes = count?(INode **)malloc(count*sizeof(INode *)):NULL;
  141. for (rank_t i=0; i<count; i++) {
  142. if (NodeCache)
  143. nodes[i] = NodeCache->lookup(epa.item(i));
  144. else
  145. nodes[i] = new MPNode(epa.item(i));
  146. }
  147. }
  148. ~CGroup()
  149. {
  150. for (rank_t i=0; i<count; i++)
  151. nodes[i]->Release();
  152. free(nodes);
  153. }
  154. rank_t ordinality() const { return count; }
  155. rank_t rank(const SocketEndpoint &ep) const
  156. {
  157. rank_t i=count;
  158. while (i) {
  159. i--;
  160. // a group is a list of IpAddresses or a list of full endpoints
  161. SocketEndpoint nep=nodes[i]->endpoint();
  162. if (nep.port) {
  163. if (ep.equals(nep))
  164. return i;
  165. }
  166. else if (ep.ipequals(nep)) // ip list so just check IP
  167. return i;
  168. }
  169. return RANK_NULL;
  170. }
  171. rank_t rank(INode *node) const { return node?rank(node->endpoint()):RANK_NULL; }
  172. rank_t rank() const { if (myrank==RANK_NULL) myrank = rank(MyNode); return myrank; }
  173. GroupRelation compare(const IGroup *grp) const
  174. {
  175. if (grp) {
  176. rank_t r1=ordinality();
  177. rank_t r2=grp->ordinality();
  178. rank_t r;
  179. if (r1==0) {
  180. if (r2==0)
  181. return GRidentical;
  182. return GRdisjoint;
  183. }
  184. if (r2==0)
  185. return GRdisjoint;
  186. bool notin1=false;
  187. bool notin2=false;
  188. bool somematch=false;
  189. if (r1==r2) { // check for identical
  190. r=r1;
  191. loop {
  192. r--;
  193. if (!nodes[r]->equals(&grp->queryNode(r)))
  194. break;
  195. somematch = true;
  196. if (r==0)
  197. return GRidentical;
  198. }
  199. }
  200. else if (r2>r1) {
  201. for (r=0;r<r1;r++)
  202. if (!nodes[r]->equals(&grp->queryNode(r)))
  203. break;
  204. if (r==r1)
  205. return GRbasesubset;
  206. }
  207. else {
  208. for (r=0;r<r1;r++)
  209. if (!nodes[r]->equals(&grp->queryNode(r%r2)))
  210. break;
  211. if (r==r1)
  212. return GRwrappedsuperset;
  213. }
  214. // the following could be improved
  215. bool *matched2=(bool *)calloc(r2,sizeof(bool));
  216. bool anymatched = false;
  217. bool allmatched1 = true;
  218. do {
  219. r1--;
  220. r=r2;
  221. loop {
  222. r--;
  223. if (!matched2[r]) {
  224. if (nodes[r1]->equals(&grp->queryNode(r))) {
  225. matched2[r] = true;
  226. anymatched = true;
  227. break;
  228. }
  229. }
  230. if (r==0) {
  231. allmatched1 = false;
  232. break;
  233. }
  234. }
  235. } while (r1);
  236. bool allmatched2 = true;
  237. do {
  238. r2--;
  239. if (!matched2[r2])
  240. allmatched2 = false;
  241. } while (r2);
  242. free(matched2);
  243. if (allmatched1) {
  244. if (allmatched2)
  245. return GRdifferentorder;
  246. return GRsubset;
  247. }
  248. if (allmatched2)
  249. return GRsuperset;
  250. if (anymatched)
  251. return GRintersection;
  252. }
  253. return GRdisjoint;
  254. }
  255. bool equals(const IGroup *grp) const
  256. {
  257. if (!grp)
  258. return false;
  259. rank_t r1=ordinality();
  260. if (r1!=grp->ordinality())
  261. return false;
  262. for (rank_t r=0;r<r1;r++)
  263. if (!nodes[r]->equals(&grp->queryNode(r)))
  264. return false;
  265. return true;
  266. }
  267. void translate(const IGroup *othergroup, rank_t nranks, const rank_t *otherranks, rank_t *resranks ) const
  268. {
  269. while (nranks) {
  270. *resranks = rank(&othergroup->queryNode(*otherranks));
  271. nranks--;
  272. resranks++;
  273. otherranks++;
  274. }
  275. }
  276. IGroup *subset(rank_t start,rank_t num) const
  277. {
  278. CGroup *newgrp = new CGroup(num);
  279. while (num) {
  280. num--;
  281. newgrp->nodes[num] = LINK(nodes[start+num]);
  282. }
  283. return newgrp;
  284. }
  285. virtual IGroup *subset(const rank_t *order,rank_t num) const
  286. {
  287. CGroup *newgrp = new CGroup(num);
  288. for( rank_t i=0; i<num; i++ ) {
  289. newgrp->nodes[i] = LINK(nodes[*order]);
  290. order++;
  291. }
  292. return newgrp;
  293. }
  294. virtual IGroup *combine(const IGroup *grp) const
  295. {
  296. rank_t g2ord = grp->ordinality();
  297. rank_t j = 0;
  298. INode **tmp = (INode **)malloc(g2ord*sizeof(INode *));
  299. rank_t i;
  300. for (i=0; i<g2ord; i++) {
  301. if (rank(&grp->queryNode(i))==RANK_NULL) {
  302. tmp[j] = grp->getNode(i);
  303. j++;
  304. }
  305. }
  306. CGroup *newgrp = new CGroup(count+j);
  307. for (i=0; i<count; i++)
  308. newgrp->nodes[i] = LINK(nodes[i]);
  309. for (rank_t k=0;k<j; k++)
  310. newgrp->nodes[i++] = tmp[k];
  311. free(tmp);
  312. return newgrp;
  313. }
  314. bool isMember(INode *node) const
  315. {
  316. if (!node)
  317. return false;
  318. return rank(node->endpoint())!=RANK_NULL;
  319. }
  320. bool isMember() const
  321. {
  322. assertex(MyNode);
  323. return rank()!=RANK_NULL;
  324. }
  325. unsigned distance(const IpAddress &ip) const
  326. {
  327. unsigned bestdist = (unsigned)-1;
  328. for (unsigned i=0;i<count;i++) {
  329. unsigned d = ip.ipdistance(nodes[i]->endpoint());
  330. if (d<bestdist)
  331. bestdist = d;
  332. }
  333. return bestdist;
  334. }
  335. unsigned distance(const IGroup *grp) const
  336. {
  337. if (!grp)
  338. return (unsigned)-1;
  339. unsigned c2 = grp->ordinality();
  340. if (c2<count)
  341. return grp->distance(this);
  342. unsigned ret = c2;
  343. unsigned bestdist = (unsigned)-1;
  344. for (unsigned i=0;i<count;i++) {
  345. INode &node1 = *nodes[i];
  346. if (node1.equals(&grp->queryNode(i)))
  347. ret--;
  348. else {
  349. unsigned d = grp->distance(node1.endpoint());
  350. if (d<bestdist)
  351. bestdist = d;
  352. }
  353. }
  354. if (bestdist!=(unsigned)-1)
  355. ret += bestdist;
  356. return ret;
  357. }
  358. IGroup *diff(const IGroup *g) const
  359. {
  360. PointerArray toadd;
  361. ForEachNodeInGroup(i,*this) {
  362. INode *node = &queryNode(i);
  363. if (node&&!g->isMember(node))
  364. toadd.append(node);
  365. }
  366. ForEachNodeInGroup(j,*g) {
  367. INode *node = &g->queryNode(j);
  368. if (node&&!isMember(node))
  369. toadd.append(node);
  370. }
  371. unsigned num = toadd.ordinality();
  372. CGroup *newgrp = new CGroup(num);
  373. while (num) {
  374. num--;
  375. newgrp->nodes[num] = LINK(((INode *)toadd.item(num)));
  376. }
  377. return newgrp;
  378. }
  379. bool overlaps(const IGroup *grp) const
  380. {
  381. if (grp) {
  382. ForEachNodeInGroup(i,*grp) {
  383. if (isMember(&grp->queryNode(i)))
  384. return true;
  385. }
  386. }
  387. return false;
  388. }
  389. IGroup *intersect(const IGroup *g) const
  390. {
  391. PointerArray toadd;
  392. ForEachNodeInGroup(i,*this) {
  393. INode *node = &queryNode(i);
  394. if (node&&g->isMember(node))
  395. toadd.append(node);
  396. }
  397. unsigned num = toadd.ordinality();
  398. CGroup *newgrp = new CGroup(num);
  399. while (num) {
  400. num--;
  401. newgrp->nodes[num] = LINK(((INode *)toadd.item(num)));
  402. }
  403. return newgrp;
  404. }
  405. IGroup *swap(rank_t r1,rank_t r2) const
  406. {
  407. CGroup *newgrp = new CGroup(count);
  408. rank_t i;
  409. for(i=0; i<count; i++ ) {
  410. if ((i==r1)&&(r2<count))
  411. newgrp->nodes[i] = LINK(nodes[r2]);
  412. if ((i==r2)&&(r1<count))
  413. newgrp->nodes[i] = LINK(nodes[r1]);
  414. else
  415. newgrp->nodes[i] = LINK(nodes[i]);
  416. }
  417. return newgrp;
  418. }
  419. virtual IGroup *add(INode *node) const
  420. {
  421. CGroup *newgrp = new CGroup(count+1);
  422. rank_t i;
  423. for(i=0; i<count; i++ ) {
  424. newgrp->nodes[i] = LINK(nodes[i]);
  425. }
  426. newgrp->nodes[i] = LINK(node);
  427. return newgrp;
  428. }
  429. virtual IGroup *add(INode *node,unsigned pos) const
  430. {
  431. CGroup *newgrp = new CGroup(count+1);
  432. unsigned j = 0;
  433. for( rank_t i=0; i<count; i++ ) {
  434. if (j==pos) {
  435. newgrp->nodes[j++] = LINK(node);
  436. }
  437. newgrp->nodes[j++] = LINK(nodes[i]);
  438. }
  439. return newgrp;
  440. }
  441. virtual IGroup *remove(unsigned pos) const
  442. {
  443. assertex(pos<count);
  444. CGroup *newgrp = new CGroup(count-1);
  445. unsigned j=0;
  446. for( rank_t i=0; i<count; i++ ) {
  447. if (i!=pos)
  448. newgrp->nodes[j++] = LINK(nodes[i]);
  449. }
  450. return newgrp;
  451. }
  452. virtual IGroup *rotate(int num) const
  453. {
  454. CGroup *newgrp = new CGroup(count);
  455. bool neg = false;
  456. if (num<0) {
  457. num=-num;
  458. neg = true;
  459. }
  460. unsigned j=(num%count);
  461. for( rank_t i=0; i<count; i++ ) {
  462. if (neg)
  463. newgrp->nodes[i] = LINK(nodes[j]);
  464. else
  465. newgrp->nodes[j] = LINK(nodes[i]);
  466. j++;
  467. if (j==count)
  468. j = 0;
  469. }
  470. return newgrp;
  471. }
  472. INodeIterator *getIterator(rank_t start=0,rank_t num=RANK_NULL) const ;
  473. INode &queryNode(rank_t r) const
  474. {
  475. assertex(r<count);
  476. return *nodes[r];
  477. }
  478. INode *getNode(rank_t r) const
  479. {
  480. assertex(r<count);
  481. return LINK(nodes[r]);
  482. }
  483. StringBuffer &getText(StringBuffer &text) const
  484. {
  485. if (!count)
  486. return text;
  487. if (count==1)
  488. return nodes[0]->endpoint().getUrlStr(text);
  489. // following is rather slow maybe could do with more direct method with pointers TBD
  490. SocketEndpointArray epa;
  491. for(unsigned i=0;i<count;i++) {
  492. SocketEndpoint ep(nodes[i]->endpoint()); // pretty horrible
  493. epa.append(ep);
  494. }
  495. return epa.getText(text);
  496. }
  497. static IGroup *fromText(const char *s,unsigned defport)
  498. {
  499. SocketEndpointArray epa;
  500. epa.fromText(s,defport);
  501. return createIGroup(epa);
  502. }
  503. void serialize(MemoryBuffer &tgt) const
  504. {
  505. tgt.append(count);
  506. for (rank_t i=0; i<count; i++)
  507. nodes[i]->serialize(tgt);
  508. }
  509. static CGroup *deserialize(MemoryBuffer &src)
  510. {
  511. CGroup *ret = new CGroup(0);
  512. src.read(ret->count);
  513. if (ret->count) {
  514. ret->nodes = ret->count?(INode **)malloc(ret->count*sizeof(INode *)):NULL;
  515. for (rank_t i=0; i<ret->count; i++)
  516. ret->nodes[i] = MPNode::deserialize(src);
  517. }
  518. return ret;
  519. }
  520. void getSocketEndpoints(SocketEndpointArray &sea) const
  521. {
  522. for (rank_t i=0; i<count; i++)
  523. sea.append((SocketEndpoint &)nodes[i]->endpoint());
  524. }
  525. };
  526. class CNodeIterator : public CInterface, implements INodeIterator
  527. {
  528. Linked<IGroup> parent;
  529. rank_t start;
  530. rank_t num;
  531. rank_t pos;
  532. public:
  533. IMPLEMENT_IINTERFACE;
  534. CNodeIterator(IGroup *_parent,rank_t _start,rank_t _num)
  535. : parent(_parent)
  536. {
  537. start = _start;
  538. num = _num;
  539. if ((num==RANK_NULL)||(num+start>parent->ordinality()))
  540. num = (start<parent->ordinality())?parent->ordinality()-start:0;
  541. }
  542. bool first()
  543. {
  544. pos = 0;
  545. return (pos<num);
  546. }
  547. bool next()
  548. {
  549. pos++;
  550. return (pos<num);
  551. }
  552. bool isValid()
  553. {
  554. return (pos<num);
  555. }
  556. INode &query()
  557. {
  558. return parent->queryNode(start+pos);
  559. }
  560. INode &get()
  561. {
  562. return *parent->getNode(start+pos);
  563. }
  564. };
  565. INodeIterator *CGroup::getIterator(rank_t start,rank_t num) const
  566. {
  567. return new CNodeIterator((IGroup *)this,start,num);
  568. }
  569. INode *deserializeINode(MemoryBuffer &src)
  570. {
  571. return MPNode::deserialize(src);
  572. }
  573. INode *createINode(const SocketEndpoint &ep)
  574. {
  575. if (NodeCache)
  576. return NodeCache->lookup(ep);
  577. return new MPNode(ep);
  578. }
  579. INode *createINodeIP(const IpAddress &ip,unsigned short port=0)
  580. {
  581. SocketEndpoint ep(port,ip);
  582. return createINode(ep);
  583. }
  584. INode *createINode(const char *name,unsigned short port)
  585. {
  586. SocketEndpoint ep(name,port);
  587. return createINode(ep);
  588. }
  589. IGroup *createIGroup(rank_t num,INode **nodes)
  590. {
  591. return new CGroup(num,nodes);
  592. }
  593. IGroup *createIGroup(rank_t num,const SocketEndpoint *ep)
  594. {
  595. return new CGroup(num,ep);
  596. }
  597. IGroup *createIGroup(SocketEndpointArray &epa)
  598. {
  599. return new CGroup(epa);
  600. }
  601. IGroup *createIGroup(const char *endpointlist,unsigned short defport)
  602. {
  603. const char *s = endpointlist;
  604. bool oldform=false;
  605. if (s)
  606. while (*s) {
  607. if (*s=='|') {
  608. oldform = true;
  609. break;
  610. }
  611. if (*s==',') {
  612. while (isspace(*s))
  613. s++;
  614. if ((*s=='=')||(*s=='*')) {
  615. oldform = true;
  616. break;
  617. }
  618. }
  619. s++;
  620. }
  621. if (oldform) {
  622. SocketListParser list(endpointlist);
  623. SocketEndpointArray eparray;
  624. unsigned n = list.getSockets(eparray,defport);
  625. return createIGroup(eparray);
  626. }
  627. return CGroup::fromText(endpointlist,defport);
  628. }
  629. IGroup *deserializeIGroup(MemoryBuffer &src)
  630. {
  631. return CGroup::deserialize(src);
  632. }
  633. void initMyNode(unsigned short port)
  634. {
  635. setNodeCaching(port != 0);
  636. ::Release(MyNode);
  637. MyNode = NULL;
  638. if (port) {
  639. SocketEndpoint ep(port);
  640. MyNode = new MPNode(ep);
  641. if (ep.isLoopBack()) {
  642. DBGLOG("MP Warning - localhost used for MP host address, NIC adaptor not identified");
  643. }
  644. queryNullNode();
  645. }
  646. else
  647. {
  648. ::Release(NullNode);
  649. NullNode = NULL;
  650. }
  651. }
  652. INode *queryMyNode()
  653. {
  654. return MyNode;
  655. }
  656. INode *queryNullNode()
  657. {
  658. if (!NullNode) {
  659. SocketEndpoint ep;
  660. NullNode = new MPNode(ep);
  661. }
  662. return NullNode;
  663. }
  664. void setNodeCaching(bool on)
  665. {
  666. if (on) {
  667. if (!NodeCache)
  668. NodeCache = new MPNodeCache();
  669. }
  670. else {
  671. MPNodeCache *nc = NodeCache;
  672. NodeCache = NULL;
  673. delete nc;
  674. }
  675. }