dacsds.cpp 72 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #define da_decl DECL_EXPORT
  14. #include "platform.h"
  15. #include <typeinfo>
  16. #include "jlib.hpp"
  17. #include "jfile.hpp"
  18. #include "javahash.hpp"
  19. #include "javahash.tpp"
  20. #include "jptree.ipp"
  21. #include "mpbuff.hpp"
  22. #include "mpcomm.hpp"
  23. #include "mputil.hpp"
  24. #include "dacoven.hpp"
  25. #include "daserver.hpp"
  26. #include "dasess.hpp"
  27. #include "daclient.hpp"
  28. #include "dadfs.hpp"
  29. #include "dautils.hpp"
  30. #include "dasds.ipp" // common header for client/server sds
  31. #include "dacsds.ipp"
  32. static unsigned clientThrottleLimit;
  33. static unsigned clientThrottleDelay;
  34. static ISDSManager *SDSManager=NULL;
  35. static CriticalSection SDScrit;
  36. #define CHECK_CONNECTED(XSTR) \
  37. if (!connected) \
  38. { \
  39. LOG(MCerror, unknownJob, XSTR": Closed connection (xpath=%s, sessionId=%" I64F "d)", xpath.get(), sessionId); \
  40. return; \
  41. }
  42. ////////////////////
  43. class MonitoredChildMap : public ChildMap
  44. {
  45. CClientRemoteTree &owner;
  46. public:
  47. MonitoredChildMap(CClientRemoteTree &_owner) : ChildMap(), owner(_owner) { }
  48. virtual bool replace(const char *name, IPropertyTree *tree)
  49. {
  50. // suppress notification of old node - old node has been preserved.
  51. bool changes = owner.queryConnection().queryStateChanges();
  52. owner.queryConnection().setStateChanges(false);
  53. bool res = ChildMap::replace(name, tree);
  54. owner.queryConnection().setStateChanges(changes);
  55. return res;
  56. }
  57. virtual void onRemove(void *e)
  58. {
  59. if (owner.queryStateChanges())
  60. {
  61. CClientRemoteTree *child = (CClientRemoteTree *)((IPropertyTree *)e);
  62. assertex(child);
  63. __int64 sId = child->queryServerId();
  64. if (sId)
  65. owner.registerDeleted(child->queryName(), 0, sId);
  66. else
  67. {
  68. IPTArrayValue *value = child->queryValue();
  69. if (value)
  70. {
  71. if (value->isArray())
  72. {
  73. unsigned i = value->elements();
  74. while (i--)
  75. {
  76. CClientRemoteTree &child = *(CClientRemoteTree *)value->queryElement(i);
  77. sId = child.queryServerId();
  78. if (sId)
  79. owner.registerDeleted(child.queryName(), i, sId);
  80. }
  81. }
  82. }
  83. }
  84. }
  85. ChildMap::onRemove(e);
  86. }
  87. };
  88. bool collectChildless(CClientRemoteTree *parent, const char *xpath, CRTArray *childLessList, StringArray *headArr, StringArray *tailArr)
  89. {
  90. bool res = false;
  91. StringBuffer head;
  92. const char *tail;
  93. if (xpath && '/' == *xpath && '/' == *(xpath+1))
  94. {
  95. head.append("*");
  96. tail = xpath;
  97. }
  98. else
  99. {
  100. tail = xpath?queryHead(xpath, head):NULL;
  101. if (!tail && xpath)
  102. head.append(xpath);
  103. }
  104. if (parent->queryChildren())
  105. {
  106. if (head.length())
  107. {
  108. Owned<IPropertyTreeIterator> iter = parent->getElements(head.str());
  109. ForEach (*iter)
  110. {
  111. res |= collectChildless((CClientRemoteTree *)&iter->query(), tail, childLessList, headArr, tailArr);
  112. if (NULL == childLessList && res)
  113. break;
  114. }
  115. }
  116. else
  117. {
  118. Owned<IPropertyTreeIterator> iter = parent->getElements("*");
  119. ForEach (*iter)
  120. {
  121. res |= collectChildless((CClientRemoteTree *)&iter->query(), NULL, childLessList, headArr, tailArr);
  122. if (NULL == childLessList && res)
  123. break;
  124. }
  125. }
  126. }
  127. else if (parent->hasChildren()) // i.e. no local children, but server flagged as having
  128. {
  129. if (childLessList)
  130. {
  131. childLessList->append(*parent);
  132. if (headArr)
  133. headArr->append(head.length()?head.str():"");
  134. if (tailArr)
  135. tailArr->append(tail?tail:"");
  136. }
  137. res = true;
  138. }
  139. return res;
  140. }
  141. ////////////////////
  142. CRemoteConnection::CRemoteConnection(ISDSConnectionManager &manager, ConnectionId connectionId, const char *xpath, SessionId sessionId, unsigned mode, unsigned timeout)
  143. : CConnectionBase(manager, connectionId, xpath, sessionId, mode, timeout)
  144. {
  145. INIT_NAMEDCOUNT;
  146. lazyFetch = true;
  147. stateChanges = true;
  148. connected = true;
  149. serverIterAvailable = querySDS().queryProperties().getPropBool("Client/@serverIterAvailable");
  150. serverIter = querySDS().queryProperties().getPropBool("Client/@serverIter");
  151. useAppendOpt = querySDS().queryProperties().getPropBool("Client/@useAppendOpt");
  152. serverGetIdsAvailable = querySDS().queryProperties().getPropBool("Client/@serverGetIdsAvailable");
  153. lockCount = 0;
  154. }
  155. void CRemoteConnection::clearCommitChanges()
  156. {
  157. CClientRemoteTree *tree = (CClientRemoteTree *) queryRoot();
  158. bool lazyFetch = setLazyFetch(false);
  159. tree->clearCommitChanges();
  160. setLazyFetch(lazyFetch);
  161. }
  162. void CRemoteConnection::getDetails(MemoryBuffer &mb)
  163. {
  164. mb.append(connectionId);
  165. mb.append(sessionId);
  166. mb.append(mode);
  167. mb.append(timeout);
  168. }
  169. // IRemoteConnection impl.
  170. void CRemoteConnection::changeMode(unsigned mode, unsigned timeout, bool suppressReloads)
  171. {
  172. CHECK_CONNECTED("changeMode");
  173. manager.changeMode(*this, mode, timeout, suppressReloads);
  174. }
  175. void CRemoteConnection::rollback()
  176. {
  177. CConnectionLock b(*this);
  178. CHECK_CONNECTED("rollback");
  179. CDisableFetchChangeBlock block(*this);
  180. if (((CClientRemoteTree *)root.get())->queryState())
  181. reload(); // all
  182. else
  183. {
  184. class Cop : implements IIteratorOperator
  185. {
  186. public:
  187. virtual bool applyTop(IPropertyTree &_tree) { return true; }
  188. virtual bool applyChild(IPropertyTree &parent, IPropertyTree &_child, bool &levelBreak)
  189. {
  190. CClientRemoteTree &child = (CClientRemoteTree &)_child;
  191. if (child.queryState())
  192. {
  193. ((CClientRemoteTree &)parent).clearChildren(); // wipe children - SDS will lazy fetch them again as needed.
  194. levelBreak = true;
  195. return false;
  196. }
  197. return true;
  198. }
  199. } op;
  200. CIterationOperation iop(op);
  201. iop.iterate(*root);
  202. }
  203. }
  204. void CRemoteConnection::_rollbackChildren(IPropertyTree *_parent, bool force)
  205. {
  206. if (force)
  207. {
  208. CRemoteTreeBase *parent = QUERYINTERFACE(_parent, CRemoteTreeBase);
  209. if (parent)
  210. parent->clearChildren();
  211. }
  212. else
  213. {
  214. class CRollback
  215. {
  216. public:
  217. void apply(IPropertyTree &_parent)
  218. {
  219. CClientRemoteTree *parent = QUERYINTERFACE(&_parent, CClientRemoteTree);
  220. if (!parent)
  221. return;
  222. if (parent->queryState())
  223. parent->clearChildren();
  224. else
  225. {
  226. Owned<IPropertyTreeIterator> iter = parent->getElements("*");
  227. doit(*iter);
  228. }
  229. }
  230. void doit(IPropertyTreeIterator &iter)
  231. {
  232. ForEach (iter)
  233. apply(iter.query());
  234. }
  235. } op;
  236. op.apply(*_parent);
  237. }
  238. }
  239. void CRemoteConnection::rollbackChildren(IPropertyTree *parent, bool force)
  240. {
  241. CConnectionLock b(*this);
  242. CHECK_CONNECTED("rollbackChildren");
  243. CDisableFetchChangeBlock block(*this);
  244. _rollbackChildren(parent, force);
  245. }
  246. void CRemoteConnection::rollbackChildren(const char *_xpath, bool force)
  247. {
  248. CConnectionLock b(*this);
  249. CHECK_CONNECTED("rollbackChildren");
  250. CDisableFetchChangeBlock block(*this);
  251. Owned<IPropertyTreeIterator> iter = root->getElements(_xpath);
  252. ForEach (*iter)
  253. _rollbackChildren(&iter->query(), force);
  254. }
  255. void CRemoteConnection::reload(const char *_xpath)
  256. {
  257. CConnectionLock b(*this);
  258. CHECK_CONNECTED("close");
  259. CDisableFetchChangeBlock block(*this);
  260. // NB: any linked client trees will still be active.
  261. if (_xpath == NULL || '\0' == *_xpath)
  262. {
  263. clearChanges(*root);
  264. __int64 serverId = root->queryServerId();
  265. CRemoteTreeBase *newTree = manager.get(*this, serverId);
  266. if (NULL == newTree) throw MakeSDSException(SDSExcpt_Reload);
  267. root.setown(newTree);
  268. }
  269. else
  270. {
  271. ICopyArrayOf<CClientRemoteTree> parents;
  272. IArrayOf<CClientRemoteTree> children;
  273. if ('/' == *_xpath) ++_xpath;
  274. StringBuffer head;
  275. const char *tail = splitXPath(_xpath, head);
  276. Owned<IPropertyTreeIterator> iter = root->getElements(head.str());
  277. ForEach (*iter)
  278. {
  279. CClientRemoteTree &parent = (CClientRemoteTree &)iter->query();
  280. Owned<IPropertyTreeIterator> childIter = parent.getElements(tail);
  281. ForEach (*childIter)
  282. {
  283. parents.append(parent);
  284. children.append(*LINK((CClientRemoteTree *)&childIter->query()));
  285. }
  286. ForEachItemIn(c, children)
  287. {
  288. CClientRemoteTree &child = children.item(c);
  289. clearChanges(child);
  290. parent.removeTree(&child);
  291. }
  292. }
  293. ForEachItemIn(e, children)
  294. {
  295. CClientRemoteTree &child = (CClientRemoteTree &)children.item(e);
  296. CClientRemoteTree &parent = (CClientRemoteTree &)parents.item(e);
  297. if (child.queryServerId())
  298. {
  299. IPropertyTree *newChild = manager.get(*this, child.queryServerId());
  300. if (newChild)
  301. parent.addPropTree(child.queryName(), newChild);
  302. }
  303. }
  304. }
  305. }
  306. void CRemoteConnection::commit()
  307. {
  308. CConnectionLock b(*this);
  309. CHECK_CONNECTED("commit");
  310. manager.commit(*this, NULL);
  311. }
  312. void CRemoteConnection::close(bool deleteRoot)
  313. {
  314. CConnectionLock b(*this);
  315. CHECK_CONNECTED("close");
  316. manager.commit(*this, &deleteRoot);
  317. connected=false;
  318. }
  319. SubscriptionId CRemoteConnection::subscribe(ISDSConnectionSubscription &notify)
  320. {
  321. CSDSConnectionSubscriberProxy *subscriber = new CSDSConnectionSubscriberProxy(notify, connectionId);
  322. querySubscriptionManager(SDSCONN_PUBLISHER)->add(subscriber, subscriber->getId());
  323. return subscriber->getId();
  324. }
  325. void CRemoteConnection::unsubscribe(SubscriptionId id)
  326. {
  327. querySubscriptionManager(SDSCONN_PUBLISHER)->remove(id);
  328. }
  329. class CClientXPathIterator : public CXPathIterator
  330. {
  331. CRemoteConnection &connection;
  332. public:
  333. CClientXPathIterator(CRemoteConnection &_connection, IPropertyTree *root, IPropertyTree *matchTree, IPTIteratorCodes flags) : CXPathIterator(root, matchTree, flags), connection(_connection)
  334. {
  335. connection.Link();
  336. }
  337. ~CClientXPathIterator()
  338. {
  339. connection.Release();
  340. }
  341. virtual IPropertyTree *queryChild(IPropertyTree *parent, const char *path)
  342. {
  343. // NB: this is going to fetch into local cache what is necessary to satisfy prop[X]
  344. CSetServerIterBlock b(connection, false);
  345. return parent->queryPropTree(path);
  346. }
  347. };
  348. void mergeXPathPTree(IPropertyTree *target, IPropertyTree *toMerge)
  349. {
  350. Owned<IPropertyTreeIterator> iter = toMerge->getElements("*");
  351. ForEach (*iter)
  352. {
  353. IPropertyTree &e = iter->query();
  354. StringBuffer path(e.queryName());
  355. IPropertyTree *existing = target->queryPropTree(path.append("[@pos=\"").append(e.queryProp("@pos")).append("\"]").str());
  356. if (existing)
  357. mergeXPathPTree(existing, &e);
  358. else
  359. target->addPropTree(e.queryName(), LINK(&e));
  360. }
  361. }
  362. bool removeLocals(CRemoteConnection &connection, CRemoteTreeBase *tree, IPropertyTree *match)
  363. {
  364. if (0 == tree->queryServerId())
  365. return false;
  366. Owned<IPropertyTreeIterator> iter = match->getElements("*");
  367. bool res = false;
  368. StringArray toDelete;
  369. ForEach (*iter)
  370. {
  371. IPropertyTree &child = iter->query();
  372. StringBuffer childPath(child.queryName());
  373. CRemoteTreeBase *storeChild;
  374. {
  375. CSetServerIterBlock b(connection, false);
  376. CDisableLazyFetchBlock b2(connection);
  377. storeChild = (CRemoteTreeBase *)tree->queryPropTree(childPath.append('[').append(child.queryProp("@pos")).append(']').str());
  378. }
  379. if (storeChild)
  380. {
  381. if (0 != storeChild->queryServerId())
  382. {
  383. bool childRes = removeLocals(connection, storeChild, &child);
  384. if (childRes)
  385. {
  386. unsigned c = 0;
  387. Owned<IPropertyTreeIterator> iter = child.getElements("*");
  388. ForEach (*iter) c++;
  389. if (0 == c)
  390. {
  391. toDelete.append(childPath.str());
  392. res = true;
  393. }
  394. }
  395. }
  396. else
  397. {
  398. toDelete.append(childPath.str());
  399. res = true;
  400. }
  401. }
  402. }
  403. ForEachItemIn(d, toDelete)
  404. match->removeProp(toDelete.item(d));
  405. return res;
  406. }
  407. void extractServerIds(IPropertyTree &tree, MemoryBuffer &mb, bool completeTailBranch)
  408. {
  409. __int64 serverId = tree.getPropInt64("@serverId");
  410. assertex(serverId);
  411. mb.append(serverId);
  412. Owned<IPropertyTreeIterator> iter = tree.getElements("*");
  413. if (iter->first())
  414. {
  415. mb.append((unsigned) 1);
  416. do
  417. {
  418. extractServerIds(iter->query(), mb, completeTailBranch);
  419. }
  420. while (iter->next());
  421. }
  422. else
  423. mb.append(completeTailBranch ? (unsigned)0 : (unsigned)1);
  424. }
  425. static void walkAndFill(IPropertyTree &tree, CClientRemoteTree &parent, MemoryBuffer &mb, bool childrenCanBeMissing)
  426. {
  427. parent.createChildMap();
  428. bool r;
  429. if (childrenCanBeMissing)
  430. mb.read(r);
  431. else
  432. r = true;
  433. if (r)
  434. parent.deserializeChildrenRT(mb);
  435. Owned<IPropertyTreeIterator> iter = tree.getElements("*");
  436. ForEach (*iter)
  437. {
  438. IPropertyTree &elem = iter->query();
  439. StringBuffer path(elem.queryName());
  440. path.append("[").append(elem.queryProp("@pos")).append("]");
  441. CClientRemoteTree *child = (CClientRemoteTree *)parent.queryPropTree(path.str());
  442. assertex(child);
  443. walkAndFill(elem, *child, mb, childrenCanBeMissing);
  444. }
  445. }
  446. IPropertyTreeIterator *CRemoteConnection::doGetElements(CClientRemoteTree *tree, const char *xpath, IPTIteratorCodes flags)
  447. {
  448. CConnectionLock b(*this);
  449. CSetServerIterBlock b2(*this, false);
  450. Owned<IPropertyTree> matchTree, serverMatchTree;
  451. StringAttr path;
  452. if (xpath)
  453. {
  454. unsigned l = strlen(xpath);
  455. if ('/' == *(xpath+l-1))
  456. path.set(xpath, l-1);
  457. else
  458. path.set(xpath);
  459. }
  460. bool remoteGet = queryServerGetIdsAvailable() && iptiter_remoteget == (flags & iptiter_remoteget);
  461. {
  462. CDisableLazyFetchBlock b(*this);
  463. if (collectChildless(tree, path.get(), NULL, NULL, NULL))
  464. {
  465. serverMatchTree.setown(queryManager().getXPaths(tree->queryServerId(), xpath, remoteGet));
  466. if (serverMatchTree && removeLocals(*this, tree, serverMatchTree))
  467. serverMatchTree.clear();
  468. }
  469. // if all nodes had server-side children, then there'd be no point in this
  470. matchTree.setown(getXPathMatchTree(*tree, xpath));
  471. }
  472. if (matchTree && serverMatchTree)
  473. mergeXPathPTree(matchTree, serverMatchTree);
  474. else if (serverMatchTree)
  475. matchTree.setown(LINK(serverMatchTree));
  476. else if (!matchTree)
  477. return createNullPTreeIterator();
  478. if (remoteGet && serverMatchTree)
  479. queryManager().ensureLocal(*this, *tree, serverMatchTree, flags);
  480. return new CClientXPathIterator(*this, tree, matchTree, flags & ~iptiter_remote);
  481. }
  482. IPropertyTreeIterator *CRemoteConnection::getElements(const char *xpath, IPTIteratorCodes flags)
  483. {
  484. if (!serverIterAvailable)
  485. throw MakeSDSException(SDSExcpt_VersionMismatch, "Server-side getElements not supported by server versions prior to " SDS_SVER_MIN_GETXPATHS_CONNECT);
  486. flags |= iptiter_remote;
  487. return root->getElements(xpath, flags);
  488. }
  489. /////////////////
  490. CClientRemoteTree::CClientRemoteTree(CRemoteConnection &conn, CPState _state)
  491. : CRemoteTreeBase(NULL, NULL, NULL), connection(conn), serverTreeInfo(0), state(_state)
  492. {
  493. INIT_NAMEDCOUNT;
  494. assertex(!isnocase());
  495. }
  496. CClientRemoteTree::CClientRemoteTree(const char *name, IPTArrayValue *value, ChildMap *children, CRemoteConnection &conn, CPState _state)
  497. : CRemoteTreeBase(name, value, children), connection(conn), serverTreeInfo(0), state(_state)
  498. {
  499. INIT_NAMEDCOUNT;
  500. assertex(!isnocase());
  501. }
  502. void CClientRemoteTree::beforeDispose()
  503. {
  504. if (queryStateChanges())
  505. connection.clearChanges(*this);
  506. }
  507. void CClientRemoteTree::Link() const
  508. {
  509. connection.Link(); // inc ref count on connection
  510. CRemoteTreeBase::Link();
  511. }
  512. bool CClientRemoteTree::Release() const
  513. {
  514. //Note: getLinkCount() is not thread safe.
  515. if (1 < getLinkCount()) //NH -> JCS - you sure this is best way to do this?
  516. {
  517. bool res = CRemoteTreeBase::Release();
  518. connection.Release(); // if this tree is not being destroyed then decrement usage count on connection
  519. return res;
  520. }
  521. else
  522. return CRemoteTreeBase::Release();
  523. }
  524. void CClientRemoteTree::deserializeSelfRT(MemoryBuffer &mb)
  525. {
  526. CRemoteTreeBase::deserializeSelfRT(mb);
  527. mb.read(serverTreeInfo);
  528. }
  529. void CClientRemoteTree::deserializeChildrenRT(MemoryBuffer &src)
  530. {
  531. // if and only if there are children, must create monitored map here otherwise a non-monitored map could be create in base
  532. if (!children)
  533. {
  534. StringAttr eName;
  535. size32_t pos = src.getPos();
  536. src.read(eName);
  537. if (eName.length())
  538. createChildMap();
  539. src.reset(pos);
  540. }
  541. CRemoteTreeBase::deserializeChildrenRT(src);
  542. }
  543. bool CClientRemoteTree::renameTree(IPropertyTree *child, const char *newName)
  544. {
  545. class DisableStateChanges // supress reset that would result from ownPTree via addPropTree below
  546. {
  547. bool changes;
  548. CRemoteConnection &c;
  549. public:
  550. DisableStateChanges(CRemoteConnection &_c) : c(_c) { changes = c.queryStateChanges(); c.setStateChanges(false); }
  551. ~DisableStateChanges() { reset(); }
  552. void reset() { c.setStateChanges(changes); }
  553. } dc(connection);
  554. Linked<IPropertyTree> tmp = child;
  555. StringAttr oldName = child->queryName();
  556. if (removeTree(child))
  557. {
  558. addPropTree(newName, child);
  559. tmp.getClear(); // addPropTree has taken ownership.
  560. dc.reset();
  561. __int64 id = ((CClientRemoteTree *)child)->queryServerId();
  562. if (id)
  563. {
  564. unsigned pos = findChild(child);
  565. registerRenamed(((CClientRemoteTree *)child)->queryName(), oldName, pos+1, id); // flag new element as changed.
  566. }
  567. return true;
  568. }
  569. return false;
  570. }
  571. IPropertyTree *CClientRemoteTree::queryBranch(const char *xpath) const
  572. {
  573. return const_cast<CClientRemoteTree *>(this)->_queryBranch(xpath);
  574. }
  575. IPropertyTree *CClientRemoteTree::_queryBranch(const char *xpath)
  576. {
  577. if (queryLazyFetch())
  578. {
  579. CRTArray childLessList;
  580. StringArray headArr, tailArr;
  581. StringAttr path;
  582. if (xpath)
  583. {
  584. unsigned l = strlen(xpath);
  585. if ('/' == *(xpath+l-1))
  586. path.set(xpath, l-1);
  587. else
  588. path.set(xpath);
  589. }
  590. CConnectionLock b(connection);
  591. bool r;
  592. { CDisableLazyFetchBlock b2(connection);
  593. r = collectChildless(this, xpath, &childLessList, &headArr, &tailArr);
  594. }
  595. if (r)
  596. {
  597. bool getAll = true;
  598. ForEachItemIn(c, childLessList)
  599. {
  600. IPropertyTree &child = childLessList.item(c);
  601. const char *tail = tailArr.item(c);
  602. if (!*tail) tail = NULL;
  603. const char *head = headArr.item(c);
  604. if (!*head) head = NULL;
  605. if (head || tail)
  606. {
  607. getAll = false;
  608. break;
  609. }
  610. }
  611. if (getAll)
  612. queryManager().getChildrenFor(childLessList, connection, 0);
  613. else
  614. {
  615. bool useServerIter = connection.queryServerGetIdsAvailable();
  616. // bug in matching tree creation caused failure if path pointed to self
  617. if (useServerIter && queryDaliServerVersion().compare("3.7") < 0)
  618. {
  619. CDisableLazyFetchBlock b2(connection);
  620. const IPropertyTree *me = queryPropTree(xpath);
  621. if (me && me==this)
  622. useServerIter = false;
  623. }
  624. if (useServerIter)
  625. {
  626. CDisableLazyFetchBlock b2(connection);
  627. Owned<IPropertyTree> serverMatchTree = queryManager().getXPaths(queryServerId(), xpath, true);
  628. if (serverMatchTree && !removeLocals(connection, this, serverMatchTree))
  629. queryManager().ensureLocal(connection, *this, serverMatchTree);
  630. return queryPropTree(xpath); // intentionally inside disabled lazy fetching block, as now all local
  631. }
  632. else
  633. {
  634. queryManager().getChildrenFor(childLessList, connection, 1); // get 1 level of all parents without children that matched xpath
  635. ForEachItemIn(c, childLessList)
  636. {
  637. IPropertyTree &child = childLessList.item(c);
  638. const char *tail = tailArr.item(c);
  639. const char *head = headArr.item(c);
  640. if (!tail) // no children _below_ the tail or no match for tail portion of xpath
  641. {
  642. // get all nodes below
  643. CRTArray list;
  644. Owned<IPropertyTreeIterator> iter = child.getElements(head);
  645. ForEach (*iter)
  646. {
  647. IPropertyTree &c = iter->query();
  648. list.append((CRemoteTreeBase &)iter->query());
  649. }
  650. queryManager().getChildrenFor(list, connection, 0);
  651. }
  652. else
  653. {
  654. // couldn't fully match match against partial cache, request more missing children
  655. Owned<IPropertyTreeIterator> iter = child.getElements(head);
  656. ForEach (*iter)
  657. {
  658. IPropertyTree &e = iter->query();
  659. e.queryBranch(tail);
  660. }
  661. }
  662. }
  663. }
  664. }
  665. }
  666. }
  667. return queryPropTree(xpath);
  668. }
  669. ChildMap *CClientRemoteTree::checkChildren() const
  670. {
  671. return const_cast<CClientRemoteTree *>(this)->_checkChildren();
  672. }
  673. ChildMap *CClientRemoteTree::_checkChildren()
  674. {
  675. CConnectionLock b(connection);
  676. if (!children && STI_HaveChildren & serverTreeInfo)
  677. {
  678. if (queryLazyFetch())
  679. {
  680. serverTreeInfo &= ~STI_HaveChildren;
  681. createChildMap();
  682. if (serverId)
  683. queryManager().getChildren(*this, connection);
  684. }
  685. }
  686. return children;
  687. }
  688. IPropertyTree *CClientRemoteTree::ownPTree(IPropertyTree *tree)
  689. {
  690. // if taking ownership of an orphaned clientremote tree need to reset it's attributes.
  691. if ((connection.queryStateChanges()) && isEquivalent(tree) && (!QUERYINTERFACE(tree, CClientRemoteTree)->IsShared()))
  692. {
  693. CClientRemoteTree *_tree = QUERYINTERFACE(tree, CClientRemoteTree);
  694. if (_tree->queryServerId())
  695. ((CClientRemoteTree *)tree)->resetState(CPS_Changed, true);
  696. return tree;
  697. }
  698. else
  699. return PARENT::ownPTree(tree);
  700. }
  701. IPropertyTree *CClientRemoteTree::create(const char *name, IPTArrayValue *value, ChildMap *children, bool existing)
  702. {
  703. CClientRemoteTree *newTree = new CClientRemoteTree(name, value, children, connection);
  704. if (existing)
  705. {
  706. newTree->setServerId(queryServerId());
  707. setServerId(0);
  708. }
  709. return newTree;
  710. }
  711. IPropertyTree *CClientRemoteTree::create(MemoryBuffer &mb)
  712. {
  713. unsigned pos = mb.getPos();
  714. StringAttr name;
  715. mb.read(name);
  716. mb.reset(pos);
  717. CClientRemoteTree *tree = new CClientRemoteTree(connection);
  718. tree->deserializeSelfRT(mb);
  719. return tree;
  720. }
  721. void CClientRemoteTree::createChildMap()
  722. {
  723. children = new MonitoredChildMap(*this);
  724. }
  725. ChangeInfo *CClientRemoteTree::queryChanges()
  726. {
  727. return connection.queryChangeInfo(*this);
  728. }
  729. void CClientRemoteTree::setLocal(size32_t size, const void *data, bool _binary)
  730. {
  731. clearState(CPS_PropAppend);
  732. mergeState(CPS_Changed);
  733. PARENT::setLocal(size, data, _binary);
  734. }
  735. void CClientRemoteTree::appendLocal(size32_t size, const void *data, bool binary)
  736. {
  737. if (0 == size) return;
  738. if (0 != serverId)
  739. {
  740. if (0 != (CPS_PropAppend & state))
  741. {
  742. PARENT::appendLocal(size, data, binary);
  743. return;
  744. }
  745. else if (0 == (CPS_Changed & state))
  746. {
  747. if (value)
  748. {
  749. size32_t sz = value->queryValueSize();
  750. if (!binary && sz) --sz;
  751. if (sz)
  752. {
  753. mergeState(CPS_PropAppend);
  754. registerPropAppend(sz);
  755. PARENT::appendLocal(size, data, binary);
  756. return;
  757. }
  758. }
  759. else
  760. {
  761. if (STI_External & serverTreeInfo) // if it has, change will only be fetched on a get call
  762. {
  763. mergeState(CPS_PropAppend);
  764. registerPropAppend(0); // whole value on commit to be sent for external append.
  765. PARENT::appendLocal(size, data, binary);
  766. return;
  767. }
  768. }
  769. }
  770. }
  771. mergeState(CPS_Changed);
  772. PARENT::appendLocal(size, data, binary);
  773. }
  774. void CClientRemoteTree::addingNewElement(IPropertyTree &child, int pos)
  775. {
  776. ((CClientRemoteTree &)child).setState(CPS_New);
  777. #ifdef ENABLE_INSPOS
  778. if (pos >= 0)
  779. ((CRemoteTreeBase &)child).mergeState(CPS_InsPos);
  780. #endif
  781. PARENT::addingNewElement(child, pos);
  782. }
  783. void CClientRemoteTree::removingElement(IPropertyTree *tree, unsigned pos)
  784. {
  785. CRemoteTreeBase *child = QUERYINTERFACE(tree, CRemoteTreeBase); assertex(child);
  786. registerDeleted(child->queryName(), pos, child->queryServerId());
  787. PARENT::removingElement(tree, pos);
  788. }
  789. void CClientRemoteTree::setAttribute(const char *attr, const char *val)
  790. {
  791. PARENT::setAttribute(attr, val);
  792. mergeState(CPS_AttrChanges);
  793. registerAttrChange(attr);
  794. }
  795. bool CClientRemoteTree::removeAttribute(const char *attr)
  796. {
  797. if (PARENT::removeAttribute(attr))
  798. {
  799. registerDeletedAttr(attr);
  800. return true;
  801. }
  802. else
  803. return false;
  804. }
  805. void CClientRemoteTree::registerRenamed(const char *newName, const char *oldName, unsigned pos, __int64 id)
  806. {
  807. mergeState(CPS_Renames);
  808. if (queryStateChanges())
  809. connection.registerRenamed(*this, newName, oldName, pos, id);
  810. }
  811. void CClientRemoteTree::registerDeleted(const char *name, unsigned position, __int64 id)
  812. {
  813. if (id)
  814. {
  815. mergeState(CPS_Deletions);
  816. if (queryStateChanges())
  817. connection.registerDeleted(*this, name, position, id);
  818. }
  819. }
  820. void CClientRemoteTree::registerAttrChange(const char *attr)
  821. {
  822. mergeState(CPS_AttrChanges);
  823. if (queryStateChanges())
  824. connection.registerAttrChange(*this, attr);
  825. }
  826. void CClientRemoteTree::registerDeletedAttr(const char *attr)
  827. {
  828. mergeState(CPS_AttrDeletions);
  829. if (queryStateChanges())
  830. connection.registerDeletedAttr(*this, attr);
  831. }
  832. void CClientRemoteTree::registerPropAppend(size32_t l)
  833. {
  834. mergeState(CPS_PropAppend);
  835. if (queryStateChanges())
  836. connection.registerPropAppend(*this, l);
  837. }
  838. void CClientRemoteTree::clearChanges()
  839. {
  840. if (0 != (STI_External & serverTreeInfo) && 0 != (CPS_PropAppend & state))
  841. setProp(NULL, (char *)NULL);
  842. connection.clearChanges(*this);
  843. }
  844. // block these ops on other threads during a commit, otherwise can lead to internal sds inconsistency.
  845. void CClientRemoteTree::addProp(const char *xpath, const char *val)
  846. {
  847. CConnectionLock b(connection);
  848. CRemoteTreeBase::addProp(xpath, val);
  849. }
  850. void CClientRemoteTree::setProp(const char *xpath, const char *val)
  851. {
  852. CConnectionLock b(connection);
  853. CRemoteTreeBase::setProp(xpath, val);
  854. }
  855. void CClientRemoteTree::addPropInt64(const char *xpath, __int64 val)
  856. {
  857. CConnectionLock b(connection);
  858. CRemoteTreeBase::addPropInt64(xpath, val);
  859. }
  860. void CClientRemoteTree::setPropInt64(const char *xpath, __int64 val)
  861. {
  862. CConnectionLock b(connection);
  863. CRemoteTreeBase::setPropInt64(xpath, val);
  864. }
  865. void CClientRemoteTree::setPropBin(const char *xpath, size32_t size, const void *data)
  866. {
  867. CConnectionLock b(connection);
  868. CRemoteTreeBase::setPropBin(xpath, size, data);
  869. }
  870. IPropertyTree *CClientRemoteTree::setPropTree(const char *xpath, IPropertyTree *val)
  871. {
  872. CConnectionLock b(connection);
  873. return CRemoteTreeBase::setPropTree(xpath, val);
  874. }
  875. IPropertyTree *CClientRemoteTree::addPropTree(const char *xpath, IPropertyTree *val)
  876. {
  877. CConnectionLock b(connection);
  878. return CRemoteTreeBase::addPropTree(xpath, val);
  879. }
  880. bool CClientRemoteTree::removeProp(const char *xpath)
  881. {
  882. CConnectionLock b(connection);
  883. return CRemoteTreeBase::removeProp(xpath);
  884. }
  885. bool CClientRemoteTree::removeTree(IPropertyTree *child)
  886. {
  887. CConnectionLock b(connection);
  888. return CRemoteTreeBase::removeTree(child);
  889. }
  890. void CClientRemoteTree::checkExt() const
  891. {
  892. if (!connection.queryUseAppendOpt()) return;
  893. if (!value)
  894. {
  895. if (STI_External & serverTreeInfo)
  896. {
  897. MemoryBuffer mb;
  898. queryManager().getExternalValueFromServerId(serverId, mb);
  899. if (mb.length())
  900. {
  901. bool binary = IptFlagTst(flags, ipt_binary);
  902. const_cast<CClientRemoteTree *>(this)->setValue(new CPTValue(mb), binary);
  903. }
  904. else
  905. serverTreeInfo &= ~STI_External;
  906. }
  907. }
  908. else if (0 != (CPS_PropAppend & state))
  909. {
  910. if (STI_External & serverTreeInfo)
  911. {
  912. MemoryBuffer mb;
  913. bool binary = IptFlagTst(flags, ipt_binary);
  914. queryManager().getExternalValueFromServerId(serverId, mb);
  915. if (mb.length())
  916. {
  917. const_cast<CClientRemoteTree *>(this)->setValue(new CPTValue(mb), binary);
  918. assertex(queryStateChanges());
  919. connection.registerPropAppend(*const_cast<CClientRemoteTree *>(this), mb.length());
  920. if (value)
  921. value->getValue(mb, binary);
  922. }
  923. else
  924. serverTreeInfo &= ~STI_External;
  925. }
  926. }
  927. }
  928. bool CClientRemoteTree::isCompressed(const char *xpath) const
  929. {
  930. if (!xpath) checkExt();
  931. return CRemoteTreeBase::isCompressed(xpath);
  932. }
  933. bool CClientRemoteTree::getProp(const char *xpath, StringBuffer &ret) const
  934. {
  935. if (!xpath) checkExt();
  936. return CRemoteTreeBase::getProp(xpath, ret);
  937. }
  938. const char *CClientRemoteTree::queryProp(const char * xpath) const
  939. {
  940. if (!xpath) checkExt();
  941. return CRemoteTreeBase::queryProp(xpath);
  942. }
  943. bool CClientRemoteTree::getPropBool(const char *xpath, bool dft) const
  944. {
  945. if (!xpath) checkExt();
  946. return CRemoteTreeBase::getPropBool(xpath, dft);
  947. }
  948. __int64 CClientRemoteTree::getPropInt64(const char *xpath, __int64 dft) const
  949. {
  950. if (!xpath) checkExt();
  951. return CRemoteTreeBase::getPropInt64(xpath, dft);
  952. }
  953. bool CClientRemoteTree::getPropBin(const char *xpath, MemoryBuffer &ret) const
  954. {
  955. if (!xpath) checkExt();
  956. return CRemoteTreeBase::getPropBin(xpath, ret);
  957. }
  958. IPropertyTreeIterator *CClientRemoteTree::getElements(const char *xpath, IPTIteratorCodes flags) const
  959. {
  960. if (!serverId || !queryLazyFetch()
  961. || !xpath || '\0' == *xpath || ('*' == *xpath && '\0' == *(xpath+1)) || !connection.queryServerIterAvailable() || (0 == (flags & iptiter_remote) && !connection.queryServerIter()) )
  962. return CRemoteTreeBase::getElements(xpath, flags);
  963. if (!hasChildren()) // not necessarily local yet.
  964. return createNullPTreeIterator();
  965. // if it's a single id, then not worth getting matches from server as level either present or needed.
  966. const char *xxpath = xpath;
  967. if (isValidXPathStartChr(*xxpath))
  968. {
  969. do { ++xxpath; }
  970. while (isValidXPathChr(*xxpath));
  971. }
  972. if ('\0' == *xxpath || ('/' == *xxpath && '/' != *(xxpath+1)))
  973. return CRemoteTreeBase::getElements(xpath, flags);
  974. return connection.doGetElements(const_cast<CClientRemoteTree *>(this), xpath, flags);
  975. }
  976. void CClientRemoteTree::localizeElements(const char *xpath, bool allTail)
  977. {
  978. if (!serverId || !queryLazyFetch() || !connection.queryServerGetIdsAvailable()
  979. || !xpath || '\0' == *xpath || ('*' == *xpath && '\0' == *(xpath+1)) || !hasChildren())
  980. return;
  981. IPTIteratorCodes flags = iptiter_remoteget;
  982. if (allTail)
  983. flags = iptiter_remotegetbranch;
  984. Owned<IPropertyTreeIterator> iter = connection.doGetElements(this, xpath, flags);
  985. return;
  986. }
  987. void CClientRemoteTree::resetState(unsigned _state, bool sub)
  988. {
  989. state = _state;
  990. serverId = 0;
  991. if (sub)
  992. {
  993. IPropertyTreeIterator *iter = getElements("*");
  994. ForEach(*iter)
  995. {
  996. CClientRemoteTree &child = (CClientRemoteTree &)iter->query();
  997. child.resetState(state, sub);
  998. }
  999. iter->Release();
  1000. }
  1001. }
  1002. IPropertyTree *CClientRemoteTree::collateData()
  1003. {
  1004. ChangeInfo *changes = queryChanges();
  1005. struct ChangeTree
  1006. {
  1007. ChangeTree(IPropertyTree *donor=NULL) { ptree = LINK(donor); }
  1008. ~ChangeTree() { ::Release(ptree); }
  1009. inline void createTree() { assertex(!ptree); ptree = createPTree(RESERVED_CHANGE_NODE); }
  1010. inline IPropertyTree *queryTree() { return ptree; }
  1011. inline IPropertyTree *getTree() { return LINK(ptree); }
  1012. inline IPropertyTree *queryCreateTree()
  1013. {
  1014. if (!ptree)
  1015. ptree = createPTree(RESERVED_CHANGE_NODE);
  1016. return ptree;
  1017. }
  1018. private:
  1019. StringAttr name;
  1020. IPropertyTree *ptree;
  1021. } ct(changes?changes->tree:NULL);
  1022. if (changes) changes->tree.clear();
  1023. if (0 == serverId)
  1024. {
  1025. ct.createTree();
  1026. Owned<IAttributeIterator> iter = getAttributes();
  1027. if (iter->count())
  1028. {
  1029. IPropertyTree *t = createPTree();
  1030. ForEach(*iter)
  1031. t->setProp(iter->queryName(), queryProp(iter->queryName()));
  1032. ct.queryTree()->addPropTree(ATTRCHANGE_TAG, t);
  1033. }
  1034. ct.queryTree()->setPropBool("@new", true);
  1035. }
  1036. else
  1037. {
  1038. if (ct.queryTree())
  1039. {
  1040. Linked<IPropertyTree> ac = ct.queryTree()->queryPropTree(ATTRCHANGE_TAG);
  1041. if (ac)
  1042. {
  1043. ct.queryTree()->removeTree(ac);
  1044. Owned<IAttributeIterator> iter = ac->getAttributes();
  1045. IPropertyTree *t = createPTree();
  1046. ForEach(*iter)
  1047. t->setProp(iter->queryName(), queryProp(iter->queryName()));
  1048. ct.queryTree()->addPropTree(ATTRCHANGE_TAG, t);
  1049. }
  1050. }
  1051. }
  1052. if ((CPS_Changed & state) || (0 == serverId && queryValue()))
  1053. {
  1054. ct.queryCreateTree()->setPropBool("@localValue", true);
  1055. if (queryValue())
  1056. {
  1057. bool binary=isBinary(NULL);
  1058. ((PTree *)ct.queryTree())->setValue(new CPTValue(queryValue()->queryValueRawSize(), queryValue()->queryValueRaw(), binary, true, isCompressed(NULL)), binary);
  1059. }
  1060. else
  1061. ((PTree *)ct.queryTree())->setValue(new CPTValue(0, NULL, false, true, false), false);
  1062. }
  1063. else if (CPS_PropAppend & state)
  1064. {
  1065. assertex(serverId);
  1066. IPropertyTree *pa = ct.queryTree()->queryPropTree(APPEND_TAG);
  1067. assertex(pa);
  1068. unsigned from = pa->getPropInt(NULL);
  1069. ct.queryTree()->removeTree(pa);
  1070. ct.queryCreateTree()->setPropBool("@appendValue", true);
  1071. MemoryBuffer mb;
  1072. bool binary=isBinary(NULL);
  1073. queryValue()->getValue(mb, true);
  1074. ((PTree *)ct.queryTree())->setValue(new CPTValue(mb.length()-from, mb.toByteArray()+from, binary), binary);
  1075. }
  1076. Owned<IPropertyTree> childTree;
  1077. Owned<IPropertyTreeIterator> _iter = getElements("*");
  1078. IPropertyTreeIterator *iter = _iter;
  1079. if (iter->first())
  1080. {
  1081. while (iter->isValid())
  1082. {
  1083. CClientRemoteTree *child = (CClientRemoteTree *) &iter->query();
  1084. childTree.setown(child->collateData());
  1085. if (childTree)
  1086. {
  1087. if (0 == child->queryServerId())
  1088. {
  1089. if (CPS_InsPos & child->queryState())
  1090. {
  1091. int pos = findChild(child);
  1092. assertex(NotFound != pos);
  1093. childTree->setPropInt("@pos", pos+1);
  1094. }
  1095. }
  1096. else
  1097. {
  1098. int pos = findChild(child);
  1099. assertex(NotFound != pos);
  1100. childTree->setPropInt("@pos", pos+1);
  1101. childTree->setPropInt64("@id", child->queryServerId());
  1102. }
  1103. }
  1104. if (childTree)
  1105. ct.queryCreateTree()->addPropTree(RESERVED_CHANGE_NODE, childTree.getClear());
  1106. iter->next();
  1107. }
  1108. }
  1109. if (ct.queryTree())
  1110. ct.queryTree()->setProp("@name", queryName());
  1111. return ct.getTree();
  1112. }
  1113. void CClientRemoteTree::clearCommitChanges(MemoryBuffer *mb)
  1114. {
  1115. class Cop : implements IIteratorOperator
  1116. {
  1117. public:
  1118. Cop(MemoryBuffer *_mb=NULL) : mb(_mb) { }
  1119. virtual bool applyTop(IPropertyTree &_tree)
  1120. {
  1121. CClientRemoteTree &tree = (CClientRemoteTree &) _tree;
  1122. tree.clearChanges();
  1123. if (tree.queryState())
  1124. tree.setState(0);
  1125. return true;
  1126. }
  1127. virtual bool applyChild(IPropertyTree &parent, IPropertyTree &child, bool &levelBreak)
  1128. {
  1129. CClientRemoteTree &tree = (CClientRemoteTree &) child;
  1130. if (mb && 0==tree.queryServerId())
  1131. {
  1132. __int64 serverId;
  1133. mb->read(serverId);
  1134. tree.setServerId(serverId);
  1135. }
  1136. return true;
  1137. }
  1138. private:
  1139. MemoryBuffer *mb;
  1140. } op(mb);
  1141. CIterationOperation iop(op);
  1142. iop.iterate(*this);
  1143. }
  1144. /////////////////////
  1145. CClientSDSManager::CClientSDSManager()
  1146. {
  1147. CDaliVersion serverVersionNeeded("2.1"); // to ensure backward compatibility
  1148. childrenCanBeMissing = queryDaliServerVersion().compare(serverVersionNeeded) >= 0;
  1149. CDaliVersion serverVersionNeeded2("3.4"); // to ensure backward compatibility
  1150. lazyExtFlag = queryDaliServerVersion().compare(serverVersionNeeded2) >= 0 ? DAMP_SDSCMD_LAZYEXT : 0;
  1151. properties = NULL;
  1152. IPropertyTree &props = queryProperties();
  1153. CDaliVersion serverVersionNeeded3(SDS_SVER_MIN_GETXPATHS_CONNECT);
  1154. if (queryDaliServerVersion().compare(serverVersionNeeded3) < 0)
  1155. props.removeProp("Client/@serverIter");
  1156. else
  1157. props.setPropBool("Client/@serverIterAvailable", true);
  1158. clientThrottleLimit = props.getPropInt("Client/Throttle/@limit", CLIENT_THROTTLE_LIMIT);
  1159. clientThrottleDelay = props.getPropInt("Client/Throttle/@delay", CLIENT_THROTTLE_DELAY);
  1160. CDaliVersion appendOptVersionNeeded(SDS_SVER_MIN_APPEND_OPT); // min version for append optimization
  1161. props.setPropBool("Client/@useAppendOpt", queryDaliServerVersion().compare(appendOptVersionNeeded) >= 0);
  1162. CDaliVersion serverVersionNeeded4(SDS_SVER_MIN_GETIDS); // min version for get xpath with server ids
  1163. if (queryDaliServerVersion().compare(serverVersionNeeded4) >= 0)
  1164. props.setPropBool("Client/@serverGetIdsAvailable", true);
  1165. concurrentRequests.signal(clientThrottleLimit);
  1166. }
  1167. CClientSDSManager::~CClientSDSManager()
  1168. {
  1169. CriticalBlock block(connections.crit);
  1170. SuperHashIteratorOf<CConnectionBase> iter(connections.queryBaseTable());
  1171. ForEach(iter)
  1172. {
  1173. CRemoteConnection &conn = (CRemoteConnection &) iter.query();
  1174. conn.setConnected(false);
  1175. }
  1176. ::Release(properties);
  1177. }
  1178. bool CClientSDSManager::sendRequest(CMessageBuffer &mb, bool throttle)
  1179. {
  1180. if (throttle)
  1181. {
  1182. bool avail = concurrentRequests.wait(clientThrottleDelay);
  1183. if (!avail)
  1184. WARNLOG("Excessive concurrent Dali SDS client transactions. Transaction delayed.");
  1185. bool res;
  1186. try { res = queryCoven().sendRecv(mb, RANK_RANDOM, MPTAG_DALI_SDS_REQUEST); }
  1187. catch (IException *)
  1188. {
  1189. if (avail)
  1190. concurrentRequests.signal();
  1191. throw;
  1192. }
  1193. if (avail)
  1194. concurrentRequests.signal();
  1195. return res;
  1196. }
  1197. else
  1198. return queryCoven().sendRecv(mb, RANK_RANDOM, MPTAG_DALI_SDS_REQUEST);
  1199. }
  1200. CRemoteTreeBase *CClientSDSManager::get(CRemoteConnection &connection, __int64 serverId)
  1201. {
  1202. CMessageBuffer mb;
  1203. if (childrenCanBeMissing)
  1204. mb.append((int)DAMP_SDSCMD_GET2 | lazyExtFlag);
  1205. else
  1206. mb.append((int)DAMP_SDSCMD_GET | lazyExtFlag);
  1207. mb.append(connection.queryConnectionId());
  1208. mb.append(serverId);
  1209. if (!sendRequest(mb))
  1210. throw MakeSDSException(SDSExcpt_FailedToCommunicateWithServer);
  1211. SdsReply replyMsg;
  1212. mb.read((int &)replyMsg);
  1213. CClientRemoteTree *tree = NULL;
  1214. switch (replyMsg)
  1215. {
  1216. case DAMP_SDSREPLY_OK:
  1217. {
  1218. CDisableFetchChangeBlock block(connection);
  1219. tree = new CClientRemoteTree(connection);
  1220. tree->deserializeSelfRT(mb);
  1221. break;
  1222. }
  1223. case DAMP_SDSREPLY_EMPTY:
  1224. break;
  1225. default:
  1226. throwMbException("SDS Reply Error ", mb);
  1227. }
  1228. return tree;
  1229. }
  1230. void CClientSDSManager::getChildren(CRemoteTreeBase &parent, CRemoteConnection &connection, unsigned levels)
  1231. {
  1232. CMessageBuffer mb;
  1233. if (childrenCanBeMissing)
  1234. mb.append((int)DAMP_SDSCMD_GETCHILDREN2 | lazyExtFlag);
  1235. else
  1236. mb.append((int)DAMP_SDSCMD_GETCHILDREN | lazyExtFlag);
  1237. mb.append(connection.queryConnectionId());
  1238. mb.append(parent.queryServerId());
  1239. mb.append(levels);
  1240. mb.append((__int64)0); // terminator
  1241. if (!sendRequest(mb))
  1242. throw MakeSDSException(SDSExcpt_FailedToCommunicateWithServer, "fetching SDS branch");
  1243. SdsReply replyMsg;
  1244. mb.read((int &)replyMsg);
  1245. switch (replyMsg)
  1246. {
  1247. case DAMP_SDSREPLY_OK:
  1248. break;
  1249. case DAMP_SDSREPLY_EMPTY:
  1250. return;
  1251. case DAMP_SDSREPLY_ERROR:
  1252. throwMbException("SDS Reply Error ", mb);
  1253. default:
  1254. assertex(false);
  1255. }
  1256. CDisableFetchChangeBlock block(connection);
  1257. if (childrenCanBeMissing)
  1258. {
  1259. bool r;
  1260. mb.read(r);
  1261. if (!r) return;
  1262. }
  1263. parent.deserializeChildrenRT(mb);
  1264. }
  1265. static void matchServerTree(CClientRemoteTree *local, IPropertyTree &matchTree, ICopyArrayOf<CClientRemoteTree> &matchedLocals, ICopyArrayOf<IPropertyTree> &matched, bool allTail, MemoryBuffer &mb)
  1266. {
  1267. Owned<IPropertyTreeIterator> matchIter = matchTree.getElements("*");
  1268. if (matchIter->first())
  1269. {
  1270. if (!local || (local->hasChildren() && NULL == local->queryChildren()))
  1271. {
  1272. if (local)
  1273. {
  1274. matchedLocals.append(*local);
  1275. matched.append(matchTree);
  1276. }
  1277. mb.append(matchTree.getPropInt64("@serverId"));
  1278. mb.append((unsigned)1);
  1279. }
  1280. do
  1281. {
  1282. IPropertyTree &elem = matchIter->query();
  1283. StringBuffer path(elem.queryName());
  1284. path.append('[').append(elem.getPropInt("@pos")).append(']');
  1285. CClientRemoteTree *child = local ? (CClientRemoteTree *)local->queryPropTree(path.str()) : NULL;
  1286. matchServerTree(child, elem, matchedLocals, matched, allTail, mb);
  1287. }
  1288. while (matchIter->next());
  1289. }
  1290. else
  1291. {
  1292. if (!local || (local->hasChildren() && NULL == local->queryChildren()))
  1293. {
  1294. if (local)
  1295. {
  1296. matchedLocals.append(*local);
  1297. matched.append(matchTree);
  1298. }
  1299. mb.append(matchTree.getPropInt64("@serverId"));
  1300. mb.append(allTail ? (unsigned)0 : (unsigned)1);
  1301. }
  1302. }
  1303. }
  1304. void CClientSDSManager::ensureLocal(CRemoteConnection &connection, CRemoteTreeBase &_parent, IPropertyTree *serverMatchTree, IPTIteratorCodes flags)
  1305. {
  1306. CClientRemoteTree &parent = (CClientRemoteTree &)_parent;
  1307. CMessageBuffer remoteGetMb;
  1308. if (childrenCanBeMissing)
  1309. remoteGetMb.append((int)DAMP_SDSCMD_GETCHILDREN2 | lazyExtFlag);
  1310. else
  1311. remoteGetMb.append((int)DAMP_SDSCMD_GETCHILDREN | lazyExtFlag);
  1312. remoteGetMb.append(connection.queryConnectionId());
  1313. ICopyArrayOf<CClientRemoteTree> matchedLocals;
  1314. ICopyArrayOf<IPropertyTree> matched;
  1315. bool getLeaves = iptiter_remotegetbranch == (flags & iptiter_remotegetbranch);
  1316. CDisableFetchChangeBlock block(connection);
  1317. matchServerTree(&parent, *serverMatchTree, matchedLocals, matched, getLeaves, remoteGetMb);
  1318. if (0 == matched.ordinality())
  1319. return;
  1320. remoteGetMb.append((__int64)0);
  1321. if (!sendRequest(remoteGetMb))
  1322. throw MakeSDSException(SDSExcpt_FailedToCommunicateWithServer, "ensureLocal");
  1323. SdsReply replyMsg;
  1324. remoteGetMb.read((int &)replyMsg);
  1325. switch (replyMsg)
  1326. {
  1327. case DAMP_SDSREPLY_OK:
  1328. break;
  1329. case DAMP_SDSREPLY_EMPTY:
  1330. return;
  1331. case DAMP_SDSREPLY_ERROR:
  1332. throwMbException("SDS Reply Error ", remoteGetMb);
  1333. default:
  1334. assertex(false);
  1335. }
  1336. ForEachItemIn(m, matched)
  1337. walkAndFill(matched.item(m), matchedLocals.item(m), remoteGetMb, childrenCanBeMissing);
  1338. }
  1339. void CClientSDSManager::getChildrenFor(CRTArray &childLessList, CRemoteConnection &connection, unsigned levels)
  1340. {
  1341. CMessageBuffer mb;
  1342. if (childrenCanBeMissing)
  1343. mb.append((int)DAMP_SDSCMD_GETCHILDREN2 | lazyExtFlag);
  1344. else
  1345. mb.append((int)DAMP_SDSCMD_GETCHILDREN | lazyExtFlag);
  1346. mb.append(connection.queryConnectionId());
  1347. ForEachItemIn(f, childLessList)
  1348. {
  1349. CRemoteTreeBase &parent = childLessList.item(f);
  1350. if (parent.queryServerId())
  1351. {
  1352. mb.append(parent.queryServerId());
  1353. mb.append(levels);
  1354. }
  1355. }
  1356. mb.append((__int64)0); // terminator
  1357. if (!sendRequest(mb))
  1358. throw MakeSDSException(SDSExcpt_FailedToCommunicateWithServer, "getChildrenFor");
  1359. SdsReply replyMsg;
  1360. mb.read((int &)replyMsg);
  1361. switch (replyMsg)
  1362. {
  1363. case DAMP_SDSREPLY_OK:
  1364. break;
  1365. case DAMP_SDSREPLY_EMPTY:
  1366. return;
  1367. case DAMP_SDSREPLY_ERROR:
  1368. throwMbException("SDS Reply Error ", mb);
  1369. default:
  1370. assertex(false);
  1371. }
  1372. CDisableFetchChangeBlock block(connection);
  1373. ForEachItemIn(f2, childLessList)
  1374. {
  1375. CRemoteTreeBase &parent = childLessList.item(f2);
  1376. parent.createChildMap();
  1377. if (parent.queryServerId())
  1378. {
  1379. bool r;
  1380. if (childrenCanBeMissing)
  1381. mb.read(r);
  1382. else
  1383. r = true;
  1384. if (r)
  1385. parent.deserializeChildrenRT(mb);
  1386. }
  1387. }
  1388. }
  1389. IPropertyTreeIterator *CClientSDSManager::getElements(CRemoteConnection &connection, const char *xpath)
  1390. {
  1391. CMessageBuffer mb;
  1392. mb.append((int)DAMP_SDSCMD_GETELEMENTS | lazyExtFlag);
  1393. mb.append(connection.queryConnectionId());
  1394. mb.append(xpath);
  1395. if (!sendRequest(mb))
  1396. throw MakeSDSException(SDSExcpt_FailedToCommunicateWithServer);
  1397. SdsReply replyMsg;
  1398. mb.read((int &)replyMsg);
  1399. CClientRemoteTree *tree = NULL;
  1400. switch (replyMsg)
  1401. {
  1402. case DAMP_SDSREPLY_OK:
  1403. {
  1404. unsigned count;
  1405. mb.read(count);
  1406. CDisableFetchChangeBlock block(connection);
  1407. Owned<DaliPTArrayIterator> iter = new DaliPTArrayIterator();
  1408. while (count--)
  1409. {
  1410. CClientRemoteTree *tree = new CClientRemoteTree(connection);
  1411. iter->array.append(*tree);
  1412. tree->deserializeSelfRT(mb);
  1413. }
  1414. return LINK(iter);
  1415. }
  1416. default:
  1417. throwMbException("SDS Reply Error ", mb);
  1418. }
  1419. return NULL;
  1420. }
  1421. void CClientSDSManager::noteDisconnected(CRemoteConnection &connection)
  1422. {
  1423. connection.setConnected(false);
  1424. connections.removeExact(&connection);
  1425. }
  1426. void CClientSDSManager::commit(CRemoteConnection &connection, bool *disconnectDeleteRoot)
  1427. {
  1428. CriticalBlock b(crit); // if >1 commit per client concurrently would cause problems with serverId.
  1429. CClientRemoteTree *tree = (CClientRemoteTree *) connection.queryRoot();
  1430. try
  1431. {
  1432. CMessageBuffer mb;
  1433. mb.append((int)DAMP_SDSCMD_DATA);
  1434. mb.append(connection.queryConnectionId());
  1435. if (disconnectDeleteRoot)
  1436. {
  1437. mb.append((byte)(0x80 + 1)); // kludge, high bit to indicate new client format. (for backward compat.)
  1438. mb.append(*disconnectDeleteRoot);
  1439. }
  1440. else
  1441. mb.append((byte)0x80); // kludge, high bit to indicate new client format. (for backward compat.)
  1442. bool lazyFetch = connection.setLazyFetch(false);
  1443. Owned<IPropertyTree> changes = tree->collateData();
  1444. connection.setLazyFetch(lazyFetch);
  1445. if (NULL == disconnectDeleteRoot && !changes) return;
  1446. if (changes) changes->serialize(mb);
  1447. try
  1448. {
  1449. if (!sendRequest(mb))
  1450. throw MakeSDSException(SDSExcpt_FailedToCommunicateWithServer, "committing");
  1451. }
  1452. catch (IDaliClient_Exception *e)
  1453. {
  1454. if (DCERR_server_closed == e->errorCode())
  1455. {
  1456. if (changes)
  1457. WARNLOG("Dali server disconnect, failed to commit data");
  1458. e->Release();
  1459. if (disconnectDeleteRoot)
  1460. noteDisconnected(connection);
  1461. return; // JCSMORE does this really help, shouldn't it just throw?
  1462. }
  1463. else
  1464. throw;
  1465. }
  1466. SdsReply replyMsg;
  1467. mb.read((int &)replyMsg);
  1468. switch (replyMsg)
  1469. {
  1470. case DAMP_SDSREPLY_OK:
  1471. {
  1472. bool lazyFetch = connection.setLazyFetch(false);
  1473. // NOTE: this means that send collated data order and the following order have to match!
  1474. // JCSMORE - true but.. hmm.. (could possibly have alternative lookup scheme)
  1475. tree->clearCommitChanges(&mb);
  1476. assertex(mb.getPos() == mb.length()); // must have read it all
  1477. connection.setLazyFetch(lazyFetch);
  1478. break;
  1479. }
  1480. case DAMP_SDSREPLY_EMPTY:
  1481. break;
  1482. case DAMP_SDSREPLY_ERROR:
  1483. throwMbException("SDS Reply Error ", mb);
  1484. default:
  1485. assertex(false);
  1486. }
  1487. }
  1488. catch (IException *)
  1489. {
  1490. if (disconnectDeleteRoot)
  1491. noteDisconnected(connection);
  1492. throw;
  1493. }
  1494. if (disconnectDeleteRoot)
  1495. noteDisconnected(connection);
  1496. }
  1497. void CClientSDSManager::changeMode(CRemoteConnection &connection, unsigned mode, unsigned timeout, bool suppressReloads)
  1498. {
  1499. CConnectionLock b(connection);
  1500. if (mode & RTM_CREATE_MASK)
  1501. throw MakeSDSException(SDSExcpt_BadMode, "calling changeMode");
  1502. unsigned prevMode = connection.queryMode();
  1503. if (RTM_MODE(prevMode, RTM_LOCK_WRITE) && !RTM_MODE(mode, RTM_LOCK_WRITE))
  1504. commit(connection, NULL);
  1505. CMessageBuffer mb;
  1506. mb.append((int)DAMP_SDSCMD_CHANGEMODE);
  1507. mb.append(connection.queryConnectionId());
  1508. mb.append(mode).append(timeout);
  1509. if (!sendRequest(mb))
  1510. throw MakeSDSException(SDSExcpt_FailedToCommunicateWithServer, "changing mode");
  1511. SdsReply replyMsg;
  1512. mb.read((int &)replyMsg);
  1513. switch (replyMsg)
  1514. {
  1515. case DAMP_SDSREPLY_OK:
  1516. {
  1517. connection.setMode(mode);
  1518. if (!suppressReloads)
  1519. {
  1520. if (RTM_MODE(mode, RTM_LOCK_WRITE) && !RTM_MODE(prevMode, RTM_LOCK_WRITE) && !RTM_MODE(prevMode, RTM_LOCK_READ))
  1521. connection.reload();
  1522. }
  1523. break;
  1524. }
  1525. case DAMP_SDSREPLY_ERROR:
  1526. throwMbException("SDS Reply Error ", mb);
  1527. default:
  1528. assertex(false);
  1529. }
  1530. }
  1531. // ISDSManager impl.
  1532. #define MIN_MCONNECT_SVER "1.5"
  1533. IRemoteConnections *CClientSDSManager::connect(IMultipleConnector *mConnect, SessionId id, unsigned timeout)
  1534. {
  1535. CDaliVersion serverVersionNeeded(MIN_MCONNECT_SVER);
  1536. if (queryDaliServerVersion().compare(serverVersionNeeded) < 0)
  1537. throw MakeSDSException(SDSExcpt_VersionMismatch, "Multiple connect not supported by server versions prior to " MIN_MCONNECT_SVER);
  1538. if (0 == id || id != myProcessSession())
  1539. throw MakeSDSException(SDSExcpt_InvalidSessionId, ", in multi connect, sessionid=%" I64F "x", id);
  1540. CMessageBuffer mb;
  1541. mb.append((unsigned)DAMP_SDSCMD_MCONNECT | lazyExtFlag);
  1542. mb.append(id).append(timeout);
  1543. mConnect->serialize(mb);
  1544. if (!sendRequest(mb, true))
  1545. {
  1546. StringBuffer s(", making multiple connect to ");
  1547. getMConnectString(mConnect, s);
  1548. throw MakeSDSException(SDSExcpt_FailedToCommunicateWithServer, "%s", s.str());
  1549. }
  1550. SdsReply replyMsg;
  1551. Owned<CRemoteConnections> remoteConnections = new CRemoteConnections();
  1552. unsigned c;
  1553. for (c=0; c<mConnect->queryConnections(); c++)
  1554. {
  1555. mb.read((int &)replyMsg);
  1556. switch (replyMsg)
  1557. {
  1558. case DAMP_SDSREPLY_OK:
  1559. {
  1560. ConnectionId connId;
  1561. mb.read(connId);
  1562. StringAttr xpath;
  1563. unsigned mode;
  1564. mConnect->getConnectionDetails(c, xpath, mode);
  1565. Owned<CRemoteConnection> conn = new CRemoteConnection(*this, connId, xpath, id, mode & ~RTM_CREATE_MASK, timeout);
  1566. assertex(conn.get());
  1567. if (queryProperties().getPropBool("Client/@LogConnection"))
  1568. DBGLOG("SDSManager::connect() - IMultipleConnector: RemoteConnection ID<%" I64F "x>, timeout<%d>", connId, timeout);
  1569. CClientRemoteTree *tree;
  1570. { CDisableFetchChangeBlock block(*conn);
  1571. tree = new CClientRemoteTree(*conn);
  1572. tree->deserializeRT(mb);
  1573. }
  1574. conn->setRoot(tree);
  1575. connections.replace(*conn);
  1576. remoteConnections->add(LINK(conn));
  1577. break;
  1578. }
  1579. case DAMP_SDSREPLY_ERROR:
  1580. throwMbException("SDS Reply Error ", mb);
  1581. default:
  1582. assertex(false);
  1583. }
  1584. }
  1585. return LINK(remoteConnections);
  1586. }
  1587. IRemoteConnection *CClientSDSManager::connect(const char *xpath, SessionId id, unsigned mode, unsigned timeout)
  1588. {
  1589. if (0 == id || id != myProcessSession())
  1590. throw MakeSDSException(SDSExcpt_InvalidSessionId, ", connecting to %s, sessionid=%" I64F "x", xpath, id);
  1591. CMessageBuffer mb;
  1592. mb.append((int)DAMP_SDSCMD_CONNECT | lazyExtFlag);
  1593. mb.append(id).append(mode).append(timeout);
  1594. mb.append(xpath);
  1595. if (!sendRequest(mb, true))
  1596. throw MakeSDSException(SDSExcpt_FailedToCommunicateWithServer, ", connecting to %s", xpath);
  1597. SdsReply replyMsg;
  1598. mb.read((int &)replyMsg);
  1599. CRemoteConnection *conn = NULL;
  1600. switch (replyMsg)
  1601. {
  1602. case DAMP_SDSREPLY_OK:
  1603. {
  1604. ConnectionId connId;
  1605. mb.read(connId);
  1606. conn = new CRemoteConnection(*this, connId, xpath, id, mode & ~RTM_CREATE_MASK, timeout);
  1607. assertex(conn);
  1608. if (queryProperties().getPropBool("Client/@LogConnection"))
  1609. DBGLOG("SDSManager::connect(): xpath<%s>, RemoteConnection ID<%" I64F "x>, mode<%x>, timeout<%d>", xpath, connId, mode, timeout);
  1610. CClientRemoteTree *tree;
  1611. { CDisableFetchChangeBlock block(*conn);
  1612. tree = new CClientRemoteTree(*conn);
  1613. tree->deserializeRT(mb);
  1614. }
  1615. conn->setRoot(tree);
  1616. connections.replace(*conn);
  1617. break;
  1618. }
  1619. case DAMP_SDSREPLY_EMPTY:
  1620. break;
  1621. case DAMP_SDSREPLY_ERROR:
  1622. throwMbException("SDS Reply Error ", mb);
  1623. default:
  1624. assertex(false);
  1625. }
  1626. return conn;
  1627. }
  1628. SubscriptionId CClientSDSManager::subscribe(const char *xpath, ISDSSubscription &notify, bool sub, bool sendValue)
  1629. {
  1630. assertex(xpath);
  1631. if (sub && sendValue)
  1632. throw MakeSDSException(SDSExcpt_Unsupported, "Subscription to sub elements, with sendValue option unsupported");
  1633. StringBuffer s;
  1634. if ('/' != *xpath)
  1635. {
  1636. s.append('/').append(xpath);
  1637. xpath = s.str();
  1638. }
  1639. CSDSSubscriberProxy *subscriber = new CSDSSubscriberProxy(xpath, sub, sendValue, notify);
  1640. querySubscriptionManager(SDS_PUBLISHER)->add(subscriber, subscriber->getId());
  1641. return subscriber->getId();
  1642. }
  1643. SubscriptionId CClientSDSManager::subscribeExact(const char *xpath, ISDSNodeSubscription &notify, bool sendValue)
  1644. {
  1645. if (queryDaliServerVersion().compare(SDS_SVER_MIN_NODESUBSCRIBE) < 0)
  1646. throw MakeSDSException(SDSExcpt_VersionMismatch, "Requires dali server version >= " SDS_SVER_MIN_NODESUBSCRIBE " for subscribeExact");
  1647. assertex(xpath);
  1648. StringBuffer s;
  1649. if ('/' != *xpath)
  1650. {
  1651. s.append('/').append(xpath);
  1652. xpath = s.str();
  1653. }
  1654. CSDSNodeSubscriberProxy *subscriber = new CSDSNodeSubscriberProxy(xpath, sendValue, notify);
  1655. querySubscriptionManager(SDSNODE_PUBLISHER)->add(subscriber, subscriber->getId());
  1656. return subscriber->getId();
  1657. }
  1658. void CClientSDSManager::unsubscribe(SubscriptionId id)
  1659. {
  1660. querySubscriptionManager(SDS_PUBLISHER)->remove(id);
  1661. }
  1662. void CClientSDSManager::unsubscribeExact(SubscriptionId id)
  1663. {
  1664. if (queryDaliServerVersion().compare(SDS_SVER_MIN_NODESUBSCRIBE) < 0)
  1665. throw MakeSDSException(SDSExcpt_VersionMismatch, "Requires dali server version >= " SDS_SVER_MIN_NODESUBSCRIBE " for unsubscribeExact");
  1666. querySubscriptionManager(SDSNODE_PUBLISHER)->remove(id);
  1667. }
  1668. StringBuffer &CClientSDSManager::getInfo(SdsDiagCommand cmd, StringBuffer &out)
  1669. {
  1670. CMessageBuffer mb;
  1671. mb.append((int)DAMP_SDSCMD_DIAGNOSTIC);
  1672. mb.append((int)cmd);
  1673. if (!queryCoven().sendRecv(mb, RANK_RANDOM, MPTAG_DALI_SDS_REQUEST))
  1674. throw MakeSDSException(SDSExcpt_FailedToCommunicateWithServer, "querying sds diagnositc info");
  1675. SdsReply replyMsg;
  1676. mb.read((int &)replyMsg);
  1677. switch (replyMsg)
  1678. {
  1679. case DAMP_SDSREPLY_OK:
  1680. {
  1681. switch (cmd)
  1682. {
  1683. case DIAG_CMD_STATS:
  1684. formatUsageStats(mb, out);
  1685. break;
  1686. case DIAG_CMD_CONNECTIONS:
  1687. formatConnections(mb, out);
  1688. break;
  1689. case DIAG_CMD_SUBSCRIBERS:
  1690. formatSubscribers(mb, out);
  1691. break;
  1692. }
  1693. break;
  1694. }
  1695. case DAMP_SDSREPLY_ERROR:
  1696. throwMbException("SDS Reply Error ", mb);
  1697. default:
  1698. assertex(false);
  1699. }
  1700. return out;
  1701. }
  1702. ILockInfoCollection *CClientSDSManager::getLocks(const char *ipPattern, const char *xpathPattern)
  1703. {
  1704. CMessageBuffer msg;
  1705. msg.append((int)DAMP_SDSCMD_DIAGNOSTIC);
  1706. msg.append((int)DIAG_CMD_LOCKINFO);
  1707. msg.append(ipPattern?ipPattern:"");
  1708. msg.append(xpathPattern?xpathPattern:"");
  1709. if (!queryCoven().sendRecv(msg, RANK_RANDOM, MPTAG_DALI_SDS_REQUEST))
  1710. throw MakeSDSException(SDSExcpt_FailedToCommunicateWithServer, "getLocks");
  1711. SdsReply replyMsg;
  1712. msg.read((int &)replyMsg);
  1713. switch (replyMsg)
  1714. {
  1715. case DAMP_SDSREPLY_OK:
  1716. break;
  1717. case DAMP_SDSREPLY_ERROR:
  1718. throwMbException("SDS Reply Error ", msg);
  1719. default:
  1720. assertex(false);
  1721. }
  1722. return deserializeLockInfoCollection(msg);
  1723. }
  1724. StringBuffer &CClientSDSManager::getUsageStats(StringBuffer &out)
  1725. {
  1726. return getInfo(DIAG_CMD_STATS, out);
  1727. }
  1728. StringBuffer &CClientSDSManager::getConnections(StringBuffer &out)
  1729. {
  1730. return getInfo(DIAG_CMD_CONNECTIONS, out);
  1731. }
  1732. StringBuffer &CClientSDSManager::getSubscribers(StringBuffer &out)
  1733. {
  1734. return getInfo(DIAG_CMD_SUBSCRIBERS, out);
  1735. }
  1736. // TODO
  1737. StringBuffer &CClientSDSManager::getExternalReport(StringBuffer &out)
  1738. {
  1739. return out;
  1740. }
  1741. IPropertyTree &CClientSDSManager::queryProperties() const
  1742. {
  1743. if (properties) return *properties;
  1744. CDaliVersion serverVersionNeeded("3.1");
  1745. if (queryDaliServerVersion().compare(serverVersionNeeded) < 0)
  1746. throw MakeSDSException(SDSExcpt_VersionMismatch, "Requires dali server version >= 3.1 for getProperties usage");
  1747. CMessageBuffer mb;
  1748. mb.append((int)DAMP_SDSCMD_GETPROPS);
  1749. if (!queryCoven().sendRecv(mb, RANK_RANDOM, MPTAG_DALI_SDS_REQUEST))
  1750. throw MakeSDSException(SDSExcpt_FailedToCommunicateWithServer, "querying sds diagnostic info");
  1751. SdsReply replyMsg;
  1752. mb.read((int &)replyMsg);
  1753. switch (replyMsg)
  1754. {
  1755. case DAMP_SDSREPLY_OK:
  1756. break;
  1757. case DAMP_SDSREPLY_ERROR:
  1758. throwMbException("SDS Reply Error ", mb);
  1759. default:
  1760. assertex(false);
  1761. }
  1762. properties = createPTree(mb);
  1763. if (!properties->hasProp("Client"))
  1764. properties->setPropTree("Client", createPTree());
  1765. return *properties;
  1766. }
  1767. IPropertyTree *CClientSDSManager::getXPaths(__int64 serverId, const char *xpath, bool getServerIds)
  1768. {
  1769. CMessageBuffer mb;
  1770. mb.append((int)(getServerIds?DAMP_SDSCMD_GETXPATHSPLUSIDS:DAMP_SDSCMD_GETXPATHS));
  1771. mb.append(serverId);
  1772. mb.append(xpath);
  1773. if (!sendRequest(mb))
  1774. throw MakeSDSException(SDSExcpt_FailedToCommunicateWithServer);
  1775. SdsReply replyMsg;
  1776. mb.read((int &)replyMsg);
  1777. switch (replyMsg)
  1778. {
  1779. case DAMP_SDSREPLY_OK:
  1780. return createPTree(mb);
  1781. case DAMP_SDSREPLY_EMPTY:
  1782. return NULL;
  1783. case DAMP_SDSREPLY_ERROR:
  1784. throwMbException("SDS Reply Error ", mb);
  1785. }
  1786. throwUnexpected();
  1787. }
  1788. IPropertyTreeIterator *CClientSDSManager::getXPathsSortLimit(const char *baseXPath, const char *matchXPath, const char *sortBy, bool caseinsensitive, bool ascending, unsigned from, unsigned limit)
  1789. {
  1790. CMessageBuffer mb;
  1791. mb.append((int)DAMP_SDSCMD_GETXPATHSCRITERIA);
  1792. mb.append(baseXPath);
  1793. mb.append(matchXPath);
  1794. mb.append(sortBy);
  1795. mb.append(caseinsensitive);
  1796. mb.append(ascending);
  1797. mb.append(from);
  1798. mb.append(limit);
  1799. if (!sendRequest(mb))
  1800. throw MakeSDSException(SDSExcpt_FailedToCommunicateWithServer);
  1801. SdsReply replyMsg;
  1802. mb.read((int &)replyMsg);
  1803. switch (replyMsg)
  1804. {
  1805. case DAMP_SDSREPLY_OK:
  1806. break;
  1807. case DAMP_SDSREPLY_EMPTY:
  1808. return createNullPTreeIterator();
  1809. case DAMP_SDSREPLY_ERROR:
  1810. throwMbException("SDS Reply Error ", mb);
  1811. default:
  1812. throwUnexpected();
  1813. }
  1814. Owned<IPropertyTree> matchTree = createPTree(mb);
  1815. Owned<CRemoteConnection> conn = (CRemoteConnection *)connect(baseXPath, myProcessSession(), RTM_LOCK_READ, INFINITE);
  1816. if (!conn)
  1817. return createNullPTreeIterator();
  1818. ensureLocal(*conn, (CRemoteTreeBase &)*conn->queryRoot(), matchTree);
  1819. return new CXPathIterator(conn->queryRoot(), matchTree, iptiter_null);
  1820. }
  1821. void CClientSDSManager::getExternalValueFromServerId(__int64 serverId, MemoryBuffer &resMb)
  1822. {
  1823. CMessageBuffer mb;
  1824. mb.append((int)DAMP_SDSCMD_GETEXTVALUE);
  1825. mb.append(serverId);
  1826. if (!sendRequest(mb))
  1827. throw MakeSDSException(SDSExcpt_FailedToCommunicateWithServer);
  1828. SdsReply replyMsg;
  1829. mb.read((int &)replyMsg);
  1830. switch (replyMsg)
  1831. {
  1832. case DAMP_SDSREPLY_OK:
  1833. resMb.append(mb.length()-mb.getPos(), mb.toByteArray()+mb.getPos());
  1834. return;
  1835. case DAMP_SDSREPLY_EMPTY:
  1836. return;
  1837. case DAMP_SDSREPLY_ERROR:
  1838. throwMbException("SDS Reply Error ", mb);
  1839. }
  1840. throwUnexpected();
  1841. }
  1842. IPropertyTreeIterator *CClientSDSManager::getElementsRaw(const char *xpath, INode *remotedali, unsigned timeout)
  1843. {
  1844. CMessageBuffer mb;
  1845. mb.append((int)DAMP_SDSCMD_GETELEMENTSRAW);
  1846. mb.append(xpath);
  1847. bool ok;
  1848. if (remotedali) {
  1849. Owned<IGroup> grp = createIGroup(1,&remotedali);
  1850. Owned<ICommunicator> comm = createCommunicator(grp,false);
  1851. try {
  1852. ok = comm->sendRecv(mb,RANK_RANDOM, MPTAG_DALI_SDS_REQUEST, timeout);
  1853. }
  1854. catch (IMP_Exception *e)
  1855. {
  1856. if (e->errorCode()!=MPERR_link_closed)
  1857. throw;
  1858. e->Release();
  1859. throw createClientException(DCERR_server_closed);
  1860. }
  1861. }
  1862. else
  1863. ok = sendRequest(mb);
  1864. if (!ok)
  1865. throw MakeSDSException(SDSExcpt_FailedToCommunicateWithServer);
  1866. SdsReply replyMsg;
  1867. mb.read((int &)replyMsg);
  1868. switch (replyMsg)
  1869. {
  1870. case DAMP_SDSREPLY_OK:
  1871. {
  1872. Owned<DaliPTArrayIterator> resultIterator = new DaliPTArrayIterator;
  1873. unsigned count, c;
  1874. mb.read(count);
  1875. for (c=0; c<count; c++)
  1876. {
  1877. Owned<IPropertyTree> item = createPTree(mb);
  1878. resultIterator->array.append(*LINK(item));
  1879. }
  1880. return LINK(resultIterator);
  1881. }
  1882. case DAMP_SDSREPLY_ERROR:
  1883. throwMbException("SDS Reply Error ", mb);
  1884. }
  1885. return NULL;
  1886. }
  1887. void CClientSDSManager::setConfigOpt(const char *opt, const char *value)
  1888. {
  1889. IPropertyTree &props = queryProperties();
  1890. if (props.hasProp(opt) && (0 == strcmp(value, props.queryProp(opt))))
  1891. return;
  1892. ensurePTree(&queryProperties(), opt);
  1893. queryProperties().setProp(opt, value);
  1894. if (0 == strcmp("Client/Throttle/@limit", opt))
  1895. {
  1896. unsigned newV = props.getPropInt(opt);
  1897. int diff = clientThrottleLimit-newV;
  1898. if (diff)
  1899. {
  1900. if (diff>0) // new limit is lower than old
  1901. {
  1902. PROGLOG("Reducing concurrentThrottleLimit from %d to %d", clientThrottleLimit, newV);
  1903. unsigned c=0;
  1904. for (;;)
  1905. {
  1906. // generally won't be waiting, as would expect this option to typically be called just after component startup time.
  1907. if (!concurrentRequests.wait(clientThrottleDelay))
  1908. WARNLOG("Waiting on active requests to lower clientThrottleLimit");
  1909. else
  1910. {
  1911. ++c;
  1912. if (c == diff)
  1913. break;
  1914. }
  1915. }
  1916. }
  1917. else
  1918. {
  1919. PROGLOG("Increasing clientThrottleLimit from %d to %d", clientThrottleLimit, newV);
  1920. concurrentRequests.signal(-diff); // new limit is higher than old
  1921. }
  1922. clientThrottleLimit = newV;
  1923. }
  1924. }
  1925. }
  1926. #define MIN_QUERYCOUNT_SVER "3.8"
  1927. unsigned CClientSDSManager::queryCount(const char *xpath)
  1928. {
  1929. CDaliVersion serverVersionNeeded(MIN_QUERYCOUNT_SVER);
  1930. if (queryDaliServerVersion().compare(serverVersionNeeded) < 0)
  1931. throw MakeSDSException(SDSExcpt_VersionMismatch, "Requires dali server version >= " MIN_QUERYCOUNT_SVER " for queryCount(<xpath>)");
  1932. CMessageBuffer mb;
  1933. mb.append((int)DAMP_SDSCMD_GETCOUNT);
  1934. mb.append(xpath);
  1935. if (!sendRequest(mb, true))
  1936. throw MakeSDSException(SDSExcpt_FailedToCommunicateWithServer, ", queryCount(%s)", xpath);
  1937. unsigned count=0;
  1938. SdsReply replyMsg;
  1939. mb.read((int &)replyMsg);
  1940. if (DAMP_SDSREPLY_OK == replyMsg)
  1941. mb.read(count);
  1942. else
  1943. {
  1944. assertex(replyMsg == DAMP_SDSREPLY_ERROR);
  1945. throwMbException("SDS Reply Error ", mb);
  1946. }
  1947. return count;
  1948. }
  1949. #define MIN_UPDTENV_SVER "3.9"
  1950. bool CClientSDSManager::updateEnvironment(IPropertyTree *newEnv, bool forceGroupUpdate, StringBuffer &response)
  1951. {
  1952. CDaliVersion serverVersionNeeded(MIN_QUERYCOUNT_SVER);
  1953. if (queryDaliServerVersion().compare(serverVersionNeeded) < 0)
  1954. {
  1955. // have to do the old fashioned way, from client
  1956. Owned<IRemoteConnection> conn = querySDS().connect("/",myProcessSession(),0, INFINITE);
  1957. if (conn)
  1958. {
  1959. Owned<IPropertyTree> root = conn->getRoot();
  1960. Owned<IPropertyTree> oldEnvironment = root->getPropTree("Environment");
  1961. if (oldEnvironment.get())
  1962. {
  1963. StringBuffer bakname;
  1964. Owned<IFileIO> io = createUniqueFile(NULL, "environment", "bak", bakname);
  1965. Owned<IFileIOStream> fstream = createBufferedIOStream(io);
  1966. toXML(oldEnvironment, *fstream); // formatted (default)
  1967. root->removeTree(oldEnvironment);
  1968. }
  1969. root->addPropTree("Environment", LINK(newEnv));
  1970. root.clear();
  1971. conn->commit();
  1972. conn->close();
  1973. StringBuffer messages;
  1974. initClusterGroups(forceGroupUpdate, messages, oldEnvironment);
  1975. if (messages.length())
  1976. PROGLOG("CClientSDSManager::updateEnvironment: %s", messages.str());
  1977. PROGLOG("Environment and node groups updated");
  1978. }
  1979. return true;
  1980. }
  1981. CMessageBuffer mb;
  1982. mb.append((int)DAMP_SDSCMD_UPDTENV);
  1983. newEnv->serialize(mb);
  1984. mb.append(forceGroupUpdate);
  1985. if (!queryCoven().sendRecv(mb, RANK_RANDOM, MPTAG_DALI_SDS_REQUEST))
  1986. throw MakeSDSException(SDSExcpt_FailedToCommunicateWithServer, "querying sds diagnositc info");
  1987. bool result = false;
  1988. StringAttr resultStr;
  1989. SdsReply replyMsg;
  1990. mb.read((int &)replyMsg);
  1991. switch (replyMsg)
  1992. {
  1993. case DAMP_SDSREPLY_OK:
  1994. {
  1995. mb.read(result);
  1996. mb.read(resultStr);
  1997. response.append(resultStr);
  1998. break;
  1999. }
  2000. case DAMP_SDSREPLY_ERROR:
  2001. throwMbException("SDS Reply Error ", mb);
  2002. default:
  2003. assertex(false);
  2004. }
  2005. return result;
  2006. }
  2007. //////////////
  2008. ISDSManager &querySDS()
  2009. {
  2010. CriticalBlock block(SDScrit);
  2011. if (SDSManager)
  2012. return *SDSManager;
  2013. else if (!queryCoven().inCoven())
  2014. {
  2015. if (!SDSManager)
  2016. SDSManager = new CClientSDSManager();
  2017. return *SDSManager;
  2018. }
  2019. else
  2020. {
  2021. SDSManager = &querySDSServer();
  2022. return *SDSManager;
  2023. }
  2024. }
  2025. void closeSDS()
  2026. {
  2027. CriticalBlock block(SDScrit);
  2028. if (SDSManager) {
  2029. assertex(!queryCoven().inCoven()); // only called by client
  2030. try {
  2031. delete SDSManager;
  2032. }
  2033. catch (IMP_Exception *e)
  2034. {
  2035. if (e->errorCode()!=MPERR_link_closed)
  2036. throw;
  2037. EXCLOG(e, "closeSDS");
  2038. e->Release();
  2039. }
  2040. catch (IDaliClient_Exception *e) {
  2041. if (e->errorCode()!=DCERR_server_closed)
  2042. throw;
  2043. e->Release();
  2044. }
  2045. SDSManager = NULL;
  2046. }
  2047. }