scheduleadmin.cpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include <stdio.h>
  14. #include <stdlib.h>
  15. #include <string.h>
  16. #include "schedulectrl.hpp"
  17. #include "scheduleread.hpp"
  18. #include "eventqueue.hpp"
  19. #include "daclient.hpp"
  20. void usage(int exitval = 2)
  21. {
  22. printf("Usage:\n"
  23. "scheduleadmin help\n"
  24. "scheduleadmin <DALI> add <WUID>\n"
  25. "scheduleadmin <DALI> remove <WUID>\n"
  26. "scheduleadmin <DALI> removeall\n"
  27. "scheduleadmin <DALI> servers\n"
  28. "scheduleadmin <DALI> list <eclserver> (<event name>)\n"
  29. "scheduleadmin <DALI> monitor <eclserver> (<event name>)\n"
  30. "scheduleadmin <DALI> cleanup\n"
  31. "scheduleadmin <DALI> push eventname eventtext (<wuid>)\n"
  32. #if _DEBUG
  33. "scheduleadmin <DALI> testpull <eclserver queue>\n"
  34. #endif
  35. );
  36. releaseAtoms();
  37. ExitModuleObjects();
  38. exit(exitval);
  39. }
  40. class AdminScheduleMonitor : public CInterface
  41. {
  42. private:
  43. class SubscriptionProxy : public CInterface, implements IScheduleSubscriber
  44. {
  45. public:
  46. SubscriptionProxy(AdminScheduleMonitor const * _owner) : owner(_owner) {}
  47. IMPLEMENT_IINTERFACE;
  48. virtual void notify() { owner->notify(); }
  49. private:
  50. AdminScheduleMonitor const * owner;
  51. };
  52. public:
  53. AdminScheduleMonitor(char const * serverName, char const * eventName, bool subscribe)
  54. {
  55. if(subscribe)
  56. {
  57. subsProxy.setown(new SubscriptionProxy(this));
  58. reader.setown(getSubscribingScheduleReader(serverName, LINK(subsProxy), eventName));
  59. }
  60. else
  61. {
  62. reader.setown(getScheduleReader(serverName, eventName));
  63. }
  64. }
  65. ~AdminScheduleMonitor() { if(subsProxy) printf("...done\n"); }
  66. void dump() const
  67. {
  68. dumpTime();
  69. Owned<IScheduleReaderIterator> iter(reader->getIterator());
  70. StringBuffer buff;
  71. while(iter->isValidEventName())
  72. {
  73. printf("%s\n", iter->getEventName(buff.clear()).str());
  74. while(iter->isValidEventText())
  75. {
  76. printf(" %s\n", iter->getEventText(buff.clear()).str());
  77. while(iter->isValidWuid())
  78. {
  79. printf(" %s\n", iter->getWuid(buff.clear()).str());
  80. iter->nextWuid();
  81. }
  82. iter->nextEventText();
  83. }
  84. iter->nextEventName();
  85. }
  86. if(subsProxy) printf("monitoring...\n");
  87. }
  88. void notify() const
  89. {
  90. printf("\n----------------------------------------\n\n");
  91. dump();
  92. }
  93. private:
  94. void dumpTime() const
  95. {
  96. CDateTime time;
  97. StringBuffer tstr;
  98. time.setNow();
  99. time.getString(tstr, false);
  100. printf("%s\n\n", tstr.str());
  101. }
  102. private:
  103. Owned<SubscriptionProxy> subsProxy;
  104. Owned<IScheduleReader> reader;
  105. };
  106. class AdminScheduleEventTester : public CInterface, implements IExceptionHandler
  107. {
  108. public:
  109. class EventExecutor : public CInterface, implements IScheduleEventExecutor
  110. {
  111. public:
  112. EventExecutor() {}
  113. IMPLEMENT_IINTERFACE;
  114. virtual void execute(char const * wuid, char const * name, char const * text)
  115. {
  116. CDateTime nowdt;
  117. StringBuffer nowstr;
  118. nowdt.setNow();
  119. nowdt.getString(nowstr);
  120. printf("Pass event to workunit %s: name=%s text=%s (at %s)\n", wuid, name, text, nowstr.str());
  121. }
  122. };
  123. AdminScheduleEventTester(char const * _serverName) : serverName(_serverName) {}
  124. void run()
  125. {
  126. printf("watching event queue...\n");
  127. Owned<EventExecutor> executor(new EventExecutor);
  128. Owned<IScheduleEventProcessor> processor(getScheduleEventProcessor(serverName.get(), LINK(executor), this));
  129. processor->start();
  130. getchar();
  131. processor->stop();
  132. printf("...done\n");
  133. }
  134. virtual bool fireException(IException *e) { StringBuffer msg; IERRLOG("Scheduler error (skipping event): %d: %s", e->errorCode(), e->errorMessage(msg).str()); e->Release(); return true; }
  135. private:
  136. StringAttr serverName;
  137. };
  138. interface IScheduleTask : extends IInterface
  139. {
  140. public:
  141. virtual void doit() = 0;
  142. };
  143. class AddScheduleTask : public CInterface, implements IScheduleTask
  144. {
  145. public:
  146. AddScheduleTask(char const * _wuid) : wuid(_wuid) {}
  147. IMPLEMENT_IINTERFACE;
  148. virtual void doit () { scheduleWorkUnit(wuid.get()); }
  149. private:
  150. StringAttr wuid;
  151. };
  152. class RemoveScheduleTask : public CInterface, implements IScheduleTask
  153. {
  154. public:
  155. RemoveScheduleTask(char const * _wuid) : wuid(_wuid) {}
  156. IMPLEMENT_IINTERFACE;
  157. virtual void doit () { descheduleWorkUnit(wuid.get()); }
  158. private:
  159. StringAttr wuid;
  160. };
  161. class RemoveAllScheduleTask : public CInterface, implements IScheduleTask
  162. {
  163. public:
  164. RemoveAllScheduleTask() {}
  165. IMPLEMENT_IINTERFACE;
  166. virtual void doit () { descheduleAllWorkUnits(); }
  167. };
  168. class ListServersScheduleTask : public CInterface, implements IScheduleTask
  169. {
  170. public:
  171. ListServersScheduleTask() {}
  172. IMPLEMENT_IINTERFACE;
  173. virtual void doit()
  174. {
  175. Owned<ISchedulerListIterator> iter = getSchedulerList();
  176. for(iter->first(); iter->isValid(); iter->next())
  177. printf("%s\n", iter->query());
  178. }
  179. };
  180. class ReadNamedScheduleTask : public CInterface, implements IScheduleTask
  181. {
  182. public:
  183. ReadNamedScheduleTask(char const * _serverName, char const * _eventName, bool _subscribe) : serverName(_serverName), eventName(_eventName), subscribe(_subscribe) {}
  184. IMPLEMENT_IINTERFACE;
  185. virtual void doit() { Owned<AdminScheduleMonitor> monitor(new AdminScheduleMonitor(serverName.get(), eventName.get(), subscribe)); monitor->dump(); if(subscribe) getchar(); }
  186. private:
  187. StringAttr serverName;
  188. StringAttr eventName;
  189. bool subscribe;
  190. };
  191. class ReadFullScheduleTask : public CInterface, implements IScheduleTask
  192. {
  193. public:
  194. ReadFullScheduleTask(char const * _serverName, bool _subscribe) : serverName(_serverName), subscribe(_subscribe) {}
  195. IMPLEMENT_IINTERFACE;
  196. virtual void doit() { Owned<AdminScheduleMonitor> monitor(new AdminScheduleMonitor(serverName.get(), NULL, subscribe)); monitor->dump(); if(subscribe) getchar(); }
  197. private:
  198. StringAttr serverName;
  199. bool subscribe;
  200. };
  201. class CleanupScheduleTask : public CInterface, implements IScheduleTask
  202. {
  203. public:
  204. CleanupScheduleTask() {}
  205. IMPLEMENT_IINTERFACE;
  206. virtual void doit() { cleanupWorkUnitSchedule(); }
  207. };
  208. class PushScheduleTask : public CInterface, implements IScheduleTask
  209. {
  210. public:
  211. PushScheduleTask(char const * _name, char const * _text, const char * _target)
  212. : name(_name), text(_text), target(_target) {}
  213. IMPLEMENT_IINTERFACE;
  214. virtual void doit()
  215. {
  216. Owned<IScheduleEventPusher> pusher(getScheduleEventPusher());
  217. unsigned count = pusher->push(name.get(), text.get(), target.get());
  218. PROGLOG("Pushed event to %u active schedulers", count);
  219. }
  220. private:
  221. StringAttr name;
  222. StringAttr text;
  223. StringAttr target;
  224. };
  225. #ifdef _DEBUG
  226. class TestPullScheduleTask : public CInterface, implements IScheduleTask
  227. {
  228. public:
  229. TestPullScheduleTask(char const * _serverName) : serverName(_serverName) {}
  230. IMPLEMENT_IINTERFACE;
  231. virtual void doit() { Owned<AdminScheduleEventTester> tester(new AdminScheduleEventTester(serverName.get())); tester->run(); }
  232. private:
  233. StringAttr serverName;
  234. };
  235. #endif
  236. int main(int argc, char * const * argv)
  237. {
  238. InitModuleObjects();
  239. if((argc==2) && (stricmp(argv[1], "help")==0))
  240. usage(0);
  241. if(argc<3) usage();
  242. Owned<IScheduleTask> task;
  243. try
  244. {
  245. char const * cmd = argv[2];
  246. if(stricmp(cmd, "add")==0)
  247. if(argc==4)
  248. task.setown(new AddScheduleTask(argv[3]));
  249. else
  250. usage();
  251. else if(stricmp(cmd, "remove")==0)
  252. if(argc==4)
  253. task.setown(new RemoveScheduleTask(argv[3]));
  254. else
  255. usage();
  256. else if(stricmp(cmd, "removeall")==0)
  257. if(argc==3)
  258. task.setown(new RemoveAllScheduleTask());
  259. else
  260. usage();
  261. else if(stricmp(cmd, "servers")==0)
  262. if(argc==3)
  263. task.setown(new ListServersScheduleTask());
  264. else
  265. usage();
  266. else if(stricmp(cmd, "list")==0)
  267. {
  268. if(argc==5)
  269. task.setown(new ReadNamedScheduleTask(argv[3], argv[4], false));
  270. else if(argc==4)
  271. task.setown(new ReadFullScheduleTask(argv[3], false));
  272. else
  273. usage();
  274. }
  275. else if(stricmp(cmd, "monitor")==0)
  276. {
  277. if(argc==5)
  278. task.setown(new ReadNamedScheduleTask(argv[3], argv[4], true));
  279. else if(argc==4)
  280. task.setown(new ReadFullScheduleTask(argv[3], true));
  281. else
  282. usage();
  283. }
  284. else if(stricmp(cmd, "cleanup")==0)
  285. if(argc==3)
  286. task.setown(new CleanupScheduleTask());
  287. else
  288. usage();
  289. else if(stricmp(cmd, "push")==0)
  290. if(argc==5)
  291. task.setown(new PushScheduleTask(argv[3], argv[4], NULL));
  292. else if(argc==6)
  293. task.setown(new PushScheduleTask(argv[3], argv[4], argv[5]));
  294. else
  295. usage();
  296. #ifdef _DEBUG
  297. else if(stricmp(cmd, "testpull")==0)
  298. if(argc==4)
  299. task.setown(new TestPullScheduleTask(argv[3]));
  300. else
  301. usage();
  302. #endif
  303. else
  304. usage();
  305. Owned<IGroup> serverGroup = createIGroup(argv[1], DALI_SERVER_PORT);
  306. initClientProcess(serverGroup, DCR_ScheduleAdmin);
  307. task->doit();
  308. closedownClientProcess();
  309. }
  310. catch(IException * e)
  311. {
  312. EXCLOG(e);
  313. e->Release();
  314. }
  315. releaseAtoms();
  316. ExitModuleObjects();
  317. return 0;
  318. }