scheduleread.cpp 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "scheduleread.hpp"
  14. #include "schedulectrl.ipp"
  15. #include "jptree.hpp"
  16. #include "jexcept.hpp"
  17. #include "dasds.hpp"
  18. class CScheduleWuidIterator : public CInterface
  19. {
  20. public:
  21. CScheduleWuidIterator(IPropertyTree * textBranch) : branch(textBranch)
  22. {
  23. iter.setown(branch->getElements("*"));
  24. iter->first();
  25. }
  26. bool isValidWuid() const { return iter->isValid(); }
  27. bool nextWuid() { return iter->next(); }
  28. StringBuffer & getWuid(StringBuffer & out) const { return ncnameUnescape(iter->query().queryName(), out); }
  29. bool queryUpdateLatest(char const * baseXpath, CDateTime const & dtNow) const
  30. {
  31. StringBuffer xpath;
  32. xpath.append(baseXpath).append('/').append(iter->query().queryName());
  33. Owned<IRemoteConnection> connection = querySDS().connect(xpath.str(), myProcessSession(), RTM_LOCK_WRITE | RTM_CREATE_QUERY, connectionTimeout);
  34. IPropertyTree * root = connection->queryRoot();
  35. if(root->hasProp("@latest"))
  36. {
  37. CDateTime dtThen;
  38. dtThen.setString(root->queryProp("@latest"));
  39. if(dtNow <= dtThen)
  40. return false;
  41. }
  42. StringBuffer timestr;
  43. dtNow.getString(timestr);
  44. root->setProp("@latest", timestr.str());
  45. return true;
  46. }
  47. private:
  48. Owned<IPropertyTree> branch;
  49. Owned<IPropertyTreeIterator> iter;
  50. };
  51. class CScheduleEventTextIterator : public CInterface
  52. {
  53. public:
  54. CScheduleEventTextIterator(IPropertyTree * nameBranch) : branch(nameBranch)
  55. {
  56. iter.setown(branch->getElements("*"));
  57. iter->first();
  58. if(iter->isValid())
  59. child.setown(new CScheduleWuidIterator(&iter->get()));
  60. }
  61. bool isValidWuid() const { return (child && child->isValidWuid()); }
  62. bool nextWuid() { return (child && child->nextWuid()); }
  63. bool isValidEventText() const { return iter->isValid(); }
  64. bool nextEventText()
  65. {
  66. if(iter->next())
  67. {
  68. child.setown(new CScheduleWuidIterator(&iter->get()));
  69. return true;
  70. }
  71. else
  72. {
  73. child.clear();
  74. return false;
  75. }
  76. }
  77. StringBuffer & getWuid(StringBuffer & out) const { return child->getWuid(out); }
  78. StringBuffer & getEventText(StringBuffer & out) const { return ncnameUnescape(iter->query().queryName(), out); }
  79. bool queryUpdateLatest(char const * baseXpath, CDateTime const & dt) const
  80. {
  81. StringBuffer xpath;
  82. xpath.append(baseXpath).append('/').append(iter->query().queryName());
  83. return child->queryUpdateLatest(xpath.str(), dt);
  84. }
  85. private:
  86. Owned<IPropertyTree> branch;
  87. Owned<IPropertyTreeIterator> iter;
  88. Owned<CScheduleWuidIterator> child;
  89. };
  90. class ScheduleReaderBase : implements IInterface
  91. {
  92. public:
  93. class SubscriptionProxy : public CInterface, implements ISDSSubscription
  94. {
  95. public:
  96. SubscriptionProxy(ScheduleReaderBase * _owner) : owner(_owner) {}
  97. IMPLEMENT_IINTERFACE;
  98. virtual void notify(SubscriptionId id, char const * xpath, SDSNotifyFlags flags, unsigned valueLen = 0, const void * valueData = NULL)
  99. {
  100. owner->notify();
  101. }
  102. private:
  103. ScheduleReaderBase * owner;
  104. };
  105. ScheduleReaderBase(IScheduleSubscriber * _subscriber) : uptodate(true), linkCount(0), subscriber(_subscriber), subsId(0) {}
  106. virtual ~ScheduleReaderBase() { if(subsProxy) querySDS().unsubscribe(subsId); }
  107. void link()
  108. {
  109. CriticalBlock block(crit);
  110. linkCount++;
  111. }
  112. void unlink()
  113. {
  114. CriticalBlock block(crit);
  115. linkCount--;
  116. if(!uptodate)
  117. {
  118. read(false);
  119. uptodate = true;
  120. }
  121. }
  122. void notify()
  123. {
  124. CriticalBlock block(crit);
  125. if(linkCount==0)
  126. {
  127. read(false);
  128. uptodate = true;
  129. if(subscriber)
  130. subscriber->notify();
  131. }
  132. else
  133. {
  134. uptodate = false;
  135. }
  136. }
  137. protected:
  138. virtual void read(bool subscribeAfter) = 0;
  139. void subscribe(char const * xpath)
  140. {
  141. assertex(subsId == 0);
  142. subsProxy.setown(new SubscriptionProxy(this));
  143. subsId = querySDS().subscribe(xpath, *subsProxy.get(), true);
  144. }
  145. private:
  146. bool uptodate;
  147. CriticalSection crit;
  148. unsigned linkCount;
  149. Owned<IScheduleSubscriber> subscriber;
  150. Owned<SubscriptionProxy> subsProxy;
  151. SubscriptionId subsId;
  152. };
  153. class CScheduleEventNameIterator : public CInterface, implements IScheduleReaderIterator
  154. {
  155. public:
  156. CScheduleEventNameIterator(IPropertyTree * scheduleBranch, char const * _baseXpath, ScheduleReaderBase * _owner) : branch(scheduleBranch), baseXpath(_baseXpath), owner(_owner)
  157. {
  158. owner->link();
  159. iter.setown(branch->getElements("*"));
  160. iter->first();
  161. if(iter->isValid())
  162. child.setown(new CScheduleEventTextIterator(&iter->get()));
  163. }
  164. virtual ~CScheduleEventNameIterator() { owner->unlink(); }
  165. IMPLEMENT_IINTERFACE;
  166. virtual bool isValidWuid() const { return (child && child->isValidWuid()); }
  167. virtual bool nextWuid() { return (child && child->nextWuid()); }
  168. virtual bool isValidEventText() const { return (child && child->isValidEventText()); }
  169. virtual bool nextEventText() { return (child && child->nextEventText()); }
  170. virtual bool isValidEventName() const { return iter->isValid(); }
  171. virtual bool nextEventName()
  172. {
  173. if(iter->next())
  174. {
  175. child.setown(new CScheduleEventTextIterator(&iter->get()));
  176. return true;
  177. }
  178. else
  179. {
  180. child.clear();
  181. return false;
  182. }
  183. }
  184. virtual StringBuffer & getWuid(StringBuffer & out) const { return child->getWuid(out); }
  185. virtual StringBuffer & getEventText(StringBuffer & out) const { return child->getEventText(out); }
  186. virtual StringBuffer & getEventName(StringBuffer & out) const { return ncnameUnescape(iter->query().queryName(), out); }
  187. virtual bool queryUpdateLatest(CDateTime const & dt) const
  188. {
  189. StringBuffer xpath;
  190. xpath.append(baseXpath).append('/').append(iter->query().queryName());
  191. return child->queryUpdateLatest(xpath.str(), dt);
  192. }
  193. private:
  194. Owned<IPropertyTree> branch;
  195. StringAttr baseXpath; // /Schedule/servername
  196. Owned<IPropertyTreeIterator> iter;
  197. Owned<CScheduleEventTextIterator> child;
  198. Owned<ScheduleReaderBase> owner;
  199. };
  200. class CScheduleSingleEventNameIterator : public CInterface, implements IScheduleReaderIterator
  201. {
  202. public:
  203. CScheduleSingleEventNameIterator(char const * eventName, IPropertyTree * nameBranch, char const * _baseXpath, ScheduleReaderBase * _owner) : name(eventName), owner(_owner)
  204. {
  205. owner->link();
  206. child.setown(new CScheduleEventTextIterator(nameBranch));
  207. }
  208. ~CScheduleSingleEventNameIterator() { owner->unlink(); }
  209. IMPLEMENT_IINTERFACE;
  210. virtual bool isValidWuid() const { return (child && child->isValidWuid()); }
  211. virtual bool nextWuid() { return (child && child->nextWuid()); }
  212. virtual bool isValidEventText() const { return (child && child->isValidEventText()); }
  213. virtual bool nextEventText() { return (child && child->nextEventText()); }
  214. virtual bool isValidEventName() const { return child.get() != NULL; }
  215. virtual bool nextEventName() { child.clear(); return false; }
  216. virtual StringBuffer & getWuid(StringBuffer & out) const { return child->getWuid(out); }
  217. virtual StringBuffer & getEventText(StringBuffer & out) const { return child->getEventText(out); }
  218. virtual StringBuffer & getEventName(StringBuffer & out) const { if(child) out.append(name.get()); return out; }
  219. virtual bool queryUpdateLatest(CDateTime const & dt) const { return child->queryUpdateLatest(baseXpath.get(), dt); }
  220. private:
  221. StringAttr name;
  222. Owned<ScheduleReaderBase> owner;
  223. StringAttr baseXpath; // /Schedule/server/name
  224. Owned<CScheduleEventTextIterator> child;
  225. };
  226. class CScheduleSingleEventTextIterator : public CInterface, implements IScheduleReaderIterator
  227. {
  228. public:
  229. CScheduleSingleEventTextIterator(char const * eventName, char const * eventText, IPropertyTree * textBranch, char const * _baseXpath, ScheduleReaderBase * _owner) : name(eventName), text(eventText), baseXpath(_baseXpath), owner(_owner)
  230. {
  231. owner->link();
  232. child.setown(new CScheduleWuidIterator(textBranch));
  233. }
  234. ~CScheduleSingleEventTextIterator() { owner->unlink(); }
  235. IMPLEMENT_IINTERFACE;
  236. virtual bool isValidWuid() const { return (child && child->isValidWuid()); }
  237. virtual bool nextWuid() { return (child && child->nextWuid()); }
  238. virtual bool isValidEventText() const { return child.get() != NULL; }
  239. virtual bool nextEventText() { child.clear(); return false; }
  240. virtual bool isValidEventName() const { return child.get() != NULL; }
  241. virtual bool nextEventName() { child.clear(); return false; }
  242. virtual StringBuffer & getWuid(StringBuffer & out) const { return child->getWuid(out); }
  243. virtual StringBuffer & getEventText(StringBuffer & out) const { if(child) out.append(text.get()); return out; }
  244. virtual StringBuffer & getEventName(StringBuffer & out) const { if(child) out.append(name.get()); return out; }
  245. virtual bool queryUpdateLatest(CDateTime const & dt) const { return child->queryUpdateLatest(baseXpath.get(), dt); }
  246. private:
  247. StringAttr name;
  248. StringAttr text;
  249. StringAttr baseXpath; // /Scheduler/server/name/text
  250. Owned<ScheduleReaderBase> owner;
  251. Owned<CScheduleWuidIterator> child;
  252. };
  253. class CScheduleNullEventNameIterator : public CInterface, implements IScheduleReaderIterator
  254. {
  255. public:
  256. CScheduleNullEventNameIterator() {}
  257. IMPLEMENT_IINTERFACE;
  258. virtual bool isValidEventName() const { return false; }
  259. virtual bool isValidEventText() const { return false; }
  260. virtual bool isValidWuid() const { return false; }
  261. virtual bool nextEventName() { return false; }
  262. virtual bool nextEventText() { return false; }
  263. virtual bool nextWuid() { return false; }
  264. virtual StringBuffer & getEventName(StringBuffer & out) const { throwUnexpected(); return out; }
  265. virtual StringBuffer & getEventText(StringBuffer & out) const { throwUnexpected(); return out; }
  266. virtual StringBuffer & getWuid(StringBuffer & out) const { throwUnexpected(); return out; }
  267. virtual bool queryUpdateLatest(CDateTime const & dt) const { throwUnexpected(); return false; }
  268. };
  269. class CRootScheduleReader : public CInterface, public ScheduleReaderBase, implements IScheduleReader
  270. {
  271. public:
  272. CRootScheduleReader(char const * _serverName, bool subscribe, IScheduleSubscriber * _subscriber) : ScheduleReaderBase(_subscriber), serverName(_serverName)
  273. {
  274. rootPath.append("/Schedule/").append(serverName.get());
  275. read(subscribe);
  276. }
  277. IMPLEMENT_IINTERFACE;
  278. virtual IScheduleReaderIterator * getIterator(char const * eventName, char const * eventText)
  279. {
  280. Owned<IPropertyTree> safeScheduleBranch;
  281. {
  282. CriticalBlock block(treeCrit);
  283. safeScheduleBranch.set(scheduleBranch);
  284. }
  285. if(eventName)
  286. {
  287. StringBuffer xpath;
  288. ncnameEscape(eventName, xpath);
  289. StringBuffer childPath;
  290. childPath.append(rootPath).append('/').append(xpath.str());
  291. Owned<IPropertyTree> nameBranch(safeScheduleBranch->getPropTree(xpath.str()));
  292. if(!nameBranch)
  293. return new CScheduleNullEventNameIterator();
  294. if(eventText)
  295. {
  296. ncnameEscape(eventText, xpath.clear());
  297. Owned<IPropertyTree> textBranch(nameBranch->getPropTree(xpath.str()));
  298. if(!textBranch)
  299. return new CScheduleNullEventNameIterator();
  300. childPath.append('/').append(xpath);
  301. return new CScheduleSingleEventTextIterator(eventName, eventText, textBranch.getLink(), childPath.str(), LINK(this));
  302. }
  303. else
  304. {
  305. return new CScheduleSingleEventNameIterator(eventName, nameBranch.getLink(), childPath.str(), LINK(this));
  306. }
  307. }
  308. else
  309. {
  310. return new CScheduleEventNameIterator(safeScheduleBranch.getClear(), rootPath.str(), LINK(this));
  311. }
  312. }
  313. protected:
  314. void read(bool subscribeAfter)
  315. {
  316. Owned<IRemoteConnection> connection = querySDS().connect(rootPath.str(), myProcessSession(), RTM_LOCK_READ | RTM_CREATE_QUERY, connectionTimeout);
  317. Owned<IPropertyTree> root(connection->queryRoot()->getBranch("."));
  318. if(root)
  319. {
  320. Owned<IPropertyTree> cloned = createPTreeFromIPT(root);
  321. CriticalBlock block(treeCrit);
  322. scheduleBranch.setown(cloned.getClear());
  323. }
  324. if(subscribeAfter)
  325. subscribe(rootPath.str());
  326. }
  327. private:
  328. StringAttr serverName;
  329. StringBuffer rootPath;
  330. CriticalSection treeCrit;
  331. Owned<IPropertyTree> scheduleBranch;
  332. };
  333. class CBranchScheduleReader : public CInterface, public ScheduleReaderBase, implements IScheduleReader
  334. {
  335. public:
  336. CBranchScheduleReader(char const * _serverName, char const * _eventName, bool subscribe, IScheduleSubscriber * _subscriber) : ScheduleReaderBase(_subscriber), serverName(_serverName), eventName(_eventName)
  337. {
  338. rootPath.append("/Schedule/").append(serverName.get());
  339. ncnameEscape(_eventName, xpath);
  340. fullPath.append(rootPath).append('/').append(xpath.str());
  341. read(subscribe);
  342. }
  343. IMPLEMENT_IINTERFACE;
  344. virtual IScheduleReaderIterator * getIterator(char const * _eventName, char const * eventText)
  345. {
  346. Owned<IPropertyTree> safeNameBranch;
  347. {
  348. CriticalBlock block(treeCrit);
  349. safeNameBranch.set(nameBranch);
  350. }
  351. if((!safeNameBranch) || (_eventName && (strcmp(_eventName, eventName.get())!=0)))
  352. return new CScheduleNullEventNameIterator();
  353. if(eventText)
  354. {
  355. StringBuffer xpath;
  356. ncnameEscape(eventText, xpath);
  357. Owned<IPropertyTree> textBranch(safeNameBranch->getPropTree(xpath.str()));
  358. if(!textBranch)
  359. return new CScheduleNullEventNameIterator();
  360. StringBuffer childPath;
  361. childPath.append(fullPath).append('/').append(xpath);
  362. return new CScheduleSingleEventTextIterator(_eventName, eventText, textBranch.getLink(), childPath.str(), LINK(this));
  363. }
  364. else
  365. {
  366. return new CScheduleSingleEventNameIterator(eventName.get(), safeNameBranch.getLink(), fullPath.str(), LINK(this));
  367. }
  368. }
  369. protected:
  370. void read(bool subscribeAfter)
  371. {
  372. Owned<IRemoteConnection> connection = querySDS().connect(rootPath.str(), myProcessSession(), RTM_LOCK_READ | RTM_CREATE_QUERY, connectionTimeout);
  373. Owned<IPropertyTree> root(connection->queryRoot()->getBranch(xpath.str()));
  374. if(root)
  375. {
  376. Owned<IPropertyTree> cloned = createPTreeFromIPT(root);
  377. CriticalBlock block(treeCrit);
  378. nameBranch.setown(cloned.getClear());
  379. }
  380. if(subscribeAfter)
  381. subscribe(fullPath.str());
  382. }
  383. private:
  384. StringAttr serverName;
  385. StringAttr eventName;
  386. StringBuffer rootPath; // /Schedule/server
  387. StringBuffer xpath; // name
  388. StringBuffer fullPath; // /Schedule/server/name
  389. Owned<IPropertyTree> nameBranch;
  390. CriticalSection treeCrit;
  391. };
  392. IScheduleReader * getScheduleReader(char const * serverName, char const * eventName)
  393. {
  394. if(eventName && *eventName)
  395. return new CBranchScheduleReader(serverName, eventName, false, NULL);
  396. else
  397. return new CRootScheduleReader(serverName, false, NULL);
  398. }
  399. IScheduleReader * getSubscribingScheduleReader(char const * serverName, IScheduleSubscriber * subscriber, char const * eventName)
  400. {
  401. if(eventName && *eventName)
  402. return new CBranchScheduleReader(serverName, eventName, true, subscriber);
  403. else
  404. return new CRootScheduleReader(serverName, true, subscriber);
  405. }
  406. class CSchedulerListIterator : public CInterface, implements ISchedulerListIterator
  407. {
  408. public:
  409. CSchedulerListIterator()
  410. {
  411. conn.setown(querySDS().connect("/Schedulers", myProcessSession(), RTM_LOCK_READ, connectionTimeout));
  412. if(conn)
  413. {
  414. root.setown(conn->queryRoot()->getBranch("."));
  415. iter.setown(root->getElements("*"));
  416. }
  417. }
  418. IMPLEMENT_IINTERFACE;
  419. virtual void first() { if(iter) iter->first(); }
  420. virtual bool isValid() const { return (iter && iter->isValid()); }
  421. virtual bool next() { return (iter && iter->next()); }
  422. virtual char const * query() const { return (iter ? iter->query().queryName() : NULL); }
  423. private:
  424. Owned<IRemoteConnection> conn;
  425. Owned<IPropertyTree> root;
  426. Owned<IPropertyTreeIterator> iter;
  427. };
  428. ISchedulerListIterator * getSchedulerList()
  429. {
  430. return new CSchedulerListIterator();
  431. }