wujobqtest.cpp 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "jlib.hpp"
  14. #include "jfile.hpp"
  15. #include "jprop.hpp"
  16. #include "jsocket.hpp"
  17. #include "wujobq.hpp"
  18. #include "mpbase.hpp"
  19. #include "dllserver.hpp"
  20. #include "daclient.hpp"
  21. #include "dasds.hpp"
  22. #define DEFAULT_PORT 7171
  23. static void usage()
  24. {
  25. printf("Usage: WUJOBQTEST <dali-ip> <action> <jobq> <params>\n");
  26. printf("Actions:\n");
  27. printf(" list <jobq> -- prints queue contents\n");
  28. printf(" stats <jobq> -- prints queue contents\n");
  29. printf(" tofront <jobq> <WUID> -- puts queue item to front\n");
  30. printf(" toback <jobq> <WUID> -- puts queue item to back\n");
  31. printf(" setprio <jobq> <WUID> <priority) -- changes priority\n");
  32. printf(" movebefore <jobq> <WUID> <WUID2> -- moves before\n");
  33. printf(" moveafter <jobq> <WUID> <WUID2> -- moves after\n");
  34. printf(" remove <jobq> <WUID> -- removes item from queue\n");
  35. printf(" pause <jobq> -- paused queue\n");
  36. printf(" stop <jobq> -- stops queue\n");
  37. printf(" resume <jobq> -- resumes paused/stopped queue\n");
  38. printf(" clear <jobq> -- clears queue\n");
  39. printf(" prior <jobq> -- prints last item to be dequeued\n");
  40. printf(" switch <WUID> <cluster> -- switches WU to new cluster\n");
  41. printf(" usage <cluster> -- monitors job queue for cluster\n");
  42. printf("Testing:\n");
  43. printf(" add <jobq> <WUID> <PRIORITY> <EP>\n");
  44. printf(" dequeue <jobq>\n");
  45. printf(" accept <jobq>\n");
  46. printf(" initiate <jobq> <WUID> <PRIORITY>\n");
  47. printf(" xremove <jobq> <WUID> -- removes queue item (doesn't cancel conv)\n");
  48. exit(2);
  49. }
  50. extern bool switchWorkunitQueue(const char *wuid, const char *cluster);
  51. #if 0
  52. void testEnqueue(unsigned nthreads,const char *qname)
  53. {
  54. class casyncfor: public CAsyncFor
  55. {
  56. public:
  57. bool ok;
  58. casyncfor()
  59. {
  60. }
  61. void Do(unsigned i)
  62. {
  63. try {
  64. Owned<IJobQueue> jq = createJobQueue(loadGroup.str());
  65. IJobQueueItem* item = createJobQueueItem(wuid.str());
  66. item->setOwner(owner.str());
  67. item->setPriority(priority);
  68. class cPollThread: public Thread
  69. {
  70. Semaphore sem;
  71. bool stopped;
  72. unsigned starttime;
  73. IJobQueue *jq;
  74. IAgentContext *realagent;
  75. public:
  76. bool timedout;
  77. CTimeMon tm;
  78. cPollThread(IJobQueue *_jq,IAgentContext *_realagent,unsigned timelimit)
  79. : tm(timelimit)
  80. {
  81. stopped = false;
  82. jq = _jq;
  83. realagent = _realagent;
  84. timedout = false;
  85. }
  86. ~cPollThread()
  87. {
  88. stop();
  89. }
  90. int run()
  91. {
  92. while (!stopped) {
  93. sem.wait(ABORT_POLL_PERIOD);
  94. if (stopped)
  95. break;
  96. if (tm.timedout()) {
  97. timedout = true;
  98. stopped = true;
  99. jq->cancelInitiateConversation();
  100. }
  101. else if (realagent->queryWorkUnit()->aborting()) {
  102. stopped = true;
  103. jq->cancelInitiateConversation();
  104. }
  105. }
  106. return 0;
  107. }
  108. void stop()
  109. {
  110. stopped = true;
  111. sem.signal();
  112. }
  113. } pollthread(jq,realAgent,timelimit);
  114. pollthread.start();
  115. for (unsigned copy = 0; copy < 2; copy++)
  116. {
  117. unsigned idx=copy?((i+width/2)%width):i;
  118. Owned<IDistributedFilePart> part = file->getPart(idx);
  119. if (copy&&(copy>=part->numCopies()))
  120. continue;
  121. RemoteFilename rfn;
  122. part->getFilename(rfn,copy,false);
  123. if (grpfilter&&(grpfilter->rank(rfn.queryEndpoint())==RANK_NULL))
  124. continue;
  125. if (port)
  126. rfn.setPort(port); // if daliservix
  127. Owned<IFile> partfile = createIFile(rfn);
  128. StringBuffer eps;
  129. try
  130. {
  131. unsigned start = msTick();
  132. if (!partfile->remove()&&(copy==0)) // only warn about missing primary files
  133. LOG(MCwarning, unknownJob, "Failed to remove file part %s from %s", partfile->queryFilename(),rfn.queryEndpoint().getUrlStr(eps).str());
  134. else {
  135. unsigned t = msTick()-start;
  136. if (t>5*1000)
  137. LOG(MCwarning, unknownJob, "Removing %s from %s took %ds", partfile->queryFilename(), rfn.queryEndpoint().getUrlStr(eps).str(), t/1000);
  138. }
  139. }
  140. catch (IException *e)
  141. {
  142. CriticalBlock block(errcrit);
  143. if (mexcept)
  144. mexcept->append(*e);
  145. else {
  146. StringBuffer s("Failed to remove file part ");
  147. s.append(partfile->queryFilename()).append(" from ");
  148. rfn.queryEndpoint().getUrlStr(s);
  149. EXCLOG(e, s.str());
  150. e->Release();
  151. }
  152. ok = false;
  153. }
  154. }
  155. }
  156. } afor(this,width,port,grpfilter,mexcept,errcrit);
  157. afor.For(width,10,false,true);
  158. }
  159. class cPollThread: public Thread
  160. {
  161. Semaphore sem;
  162. bool stopped;
  163. unsigned starttime;
  164. IJobQueue *jq;
  165. IAgentContext *realagent;
  166. public:
  167. bool timedout;
  168. CTimeMon tm;
  169. cPollThread(IJobQueue *_jq,IAgentContext *_realagent,unsigned timelimit)
  170. : tm(timelimit)
  171. {
  172. stopped = false;
  173. jq = _jq;
  174. realagent = _realagent;
  175. timedout = false;
  176. }
  177. ~cPollThread()
  178. {
  179. stop();
  180. }
  181. int run()
  182. {
  183. while (!stopped) {
  184. sem.wait(ABORT_POLL_PERIOD);
  185. if (stopped)
  186. break;
  187. if (tm.timedout()) {
  188. timedout = true;
  189. stopped = true;
  190. jq->cancelInitiateConversation();
  191. }
  192. else if (realagent->queryWorkUnit()->aborting()) {
  193. stopped = true;
  194. jq->cancelInitiateConversation();
  195. }
  196. }
  197. return 0;
  198. }
  199. void stop()
  200. {
  201. stopped = true;
  202. sem.signal();
  203. }
  204. } pollthread(jq,realAgent,timelimit);
  205. pollthread.start();
  206. PROGLOG("Enqueuing on %s to run wuid=%s, graph=%s, timelimit=%d, priority=%d", loadGroup.str(), wuid.str(), graphName, timelimit, priority);
  207. Owned<IConversation> conversation = jq->initiateConversation(item);
  208. bool got = conversation.get()!=NULL;
  209. pollthread.stop();
  210. pollthread.join();
  211. if (!got) {
  212. if (pollthread.timedout)
  213. throw MakeStringException(0, "Query %s failed to start within specified timelimit (%d)", wuid.str(), timelimit);
  214. throw MakeStringException(0, "Query %s cancelled (1)",wuid.str());
  215. }
  216. // get the thor ep from whoever picked up
  217. SocketEndpoint thorMaster;
  218. MemoryBuffer msg;
  219. if (!conversation->recv(msg,1000*60)) {
  220. throw MakeStringException(0, "Query %s cancelled (2)",wuid.str());
  221. }
  222. thorMaster.deserialize(msg);
  223. msg.clear().append(graphName);
  224. SocketEndpoint myep;
  225. myep.setLocalHost(0);
  226. myep.serialize(msg); // only used for tracing
  227. if (!conversation->send(msg)) {
  228. StringBuffer s("Failed to send query to Thor on ");
  229. thorMaster.getUrlStr(s);
  230. throw MakeStringException(-1, s.str()); // maybe retry?
  231. }
  232. #endif
  233. static void cmd_list(IJobQueue *queue)
  234. {
  235. CJobQueueContents contents;
  236. queue->copyItems(contents);
  237. Owned<IJobQueueIterator> iter = contents.getIterator();
  238. unsigned n=0;
  239. ForEach(*iter) {
  240. n++;
  241. IJobQueueItem &item = iter->query();
  242. StringBuffer eps;
  243. StringBuffer dts;
  244. printf("%3d: %s owner=%s priority=%d session=%" I64F "x ep=%s port=%d enqueuedt=%s\n",n,item.queryWUID(),item.queryOwner(),item.getPriority(),item.getSessionId(),item.queryEndpoint().getUrlStr(eps).str(),item.getPort(),item.queryEnqueuedTime().getString(dts).str());
  245. }
  246. }
  247. static void cmd_prior(IJobQueue *queue)
  248. {
  249. const char *qn = queue->nextQueueName(NULL);
  250. while (qn) {
  251. StringAttr wuid;
  252. CDateTime enqueuedt;
  253. int prio;
  254. queue->setActiveQueue(qn);
  255. if (queue->getLastDequeuedInfo(wuid,enqueuedt,prio)) {
  256. StringBuffer dts;
  257. enqueuedt.getString(dts);
  258. printf("%s: wuid=%s enqueuedt=%s priority=%d \n",qn,wuid.get(),dts.str(),prio);
  259. }
  260. else
  261. printf("%s: No prior item recorded\n");
  262. qn = queue->nextQueueName(qn);
  263. }
  264. }
  265. static void cmd_xremove(IJobQueue *queue,const char *wuid)
  266. {
  267. if (queue->remove(wuid))
  268. printf("%s removed\n",wuid);
  269. else
  270. printf("%s not removed\n",wuid);
  271. }
  272. static void cmd_tofront(IJobQueue *queue,const char *wuid)
  273. {
  274. if (queue->moveToHead(wuid))
  275. printf("%s moved to front\n",wuid);
  276. else
  277. printf("%s not moved\n",wuid);
  278. }
  279. static void cmd_toback(IJobQueue *queue,const char *wuid)
  280. {
  281. if (queue->moveToTail(wuid))
  282. printf("%s moved to back\n",wuid);
  283. else
  284. printf("%s not moved\n",wuid);
  285. }
  286. static void cmd_setprio(IJobQueue *queue,const char *wuid,int prio)
  287. {
  288. if (queue->changePriority(wuid,prio))
  289. printf("%s changed priority to %d\n",wuid,prio);
  290. else
  291. printf("%s could not change priority\n",wuid);
  292. }
  293. static void cmd_movebefore(IJobQueue *queue,const char *wuid,const char *wuid2)
  294. {
  295. if (queue->moveBefore(wuid,wuid2))
  296. printf("%s moved before %s\n",wuid,wuid2);
  297. else
  298. printf("%s not moved\n",wuid);
  299. }
  300. static void cmd_moveafter(IJobQueue *queue,const char *wuid,const char *wuid2)
  301. {
  302. if (queue->moveAfter(wuid,wuid))
  303. printf("%s moved after %s\n",wuid,wuid2);
  304. else
  305. printf("%s not moved\n",wuid);
  306. }
  307. static void cmd_pause(IJobQueue *queue)
  308. {
  309. queue->pause();
  310. }
  311. static void cmd_stop(IJobQueue *queue)
  312. {
  313. queue->stop();
  314. for (;;) {
  315. unsigned enqueued=0;
  316. unsigned connected=0;
  317. unsigned waiting=0;
  318. queue->getStats(connected,waiting,enqueued);
  319. printf("%d connected, waiting...\n",connected);
  320. if (connected==0) {
  321. queue->resume(); // auto resume
  322. break;
  323. }
  324. if (!queue->waitStatsChange(unsigned()-1))
  325. break;
  326. }
  327. }
  328. static void cmd_resume(IJobQueue *queue)
  329. {
  330. queue->resume();
  331. }
  332. static void cmd_clear(IJobQueue *queue)
  333. {
  334. queue->clear();
  335. }
  336. static void cmd_add(IJobQueue *queue,const char *wuid,int prio,const char *eps)
  337. {
  338. IJobQueueItem *item = createJobQueueItem(wuid);
  339. item->setPriority(prio);
  340. if (eps&&*eps) {
  341. SocketEndpoint ep(eps);
  342. item->setEndpoint(ep);
  343. }
  344. item->setOwner("testUser");
  345. queue->enqueue(item);
  346. }
  347. static void cmd_accept(IJobQueue *queue)
  348. {
  349. queue->connect();
  350. IJobQueueItem *item;
  351. Owned<IConversation> conv = queue->acceptConversation(item);
  352. if (!conv.get()) {
  353. printf("Initiate failed\n");
  354. queue->disconnect();
  355. return;
  356. }
  357. MemoryBuffer mb;
  358. mb.append("hello");
  359. conv->send(mb);
  360. printf("sent '%s'\n","hello");
  361. queue->disconnect();
  362. }
  363. static void cmd_dequeue(IJobQueue *queue)
  364. {
  365. queue->connect(false);
  366. Owned<IJobQueueItem> item = queue->dequeue(1000*60);
  367. if (!item) {
  368. printf("Timed out\n");
  369. return;
  370. }
  371. StringBuffer eps;
  372. printf("%s owner=%s priority=%d session=%" I64F "x ep=%s port=%d\n",item->queryWUID(),item->queryOwner(),item->getPriority(),item->getSessionId(),item->queryEndpoint().getUrlStr(eps).str(),item->getPort());
  373. queue->disconnect();
  374. }
  375. static void cmd_initiate(IJobQueue *queue,const char *wuid,int prio)
  376. {
  377. IJobQueueItem *item = createJobQueueItem(wuid);
  378. item->setPriority(prio);
  379. item->setPort(DEFAULT_PORT);
  380. Owned<IConversation> conv= queue->initiateConversation(item);
  381. if (!conv.get()) {
  382. printf("Initiate failed\n");
  383. return;
  384. }
  385. MemoryBuffer mb;
  386. if (!conv->recv(mb, 3*60*1000)) {
  387. printf("initial message not received\n");
  388. return;
  389. }
  390. StringAttr s;
  391. mb.read(s);
  392. printf("acceptor sent '%s'\n",s.get());
  393. }
  394. static void cmd_remove(IJobQueue *queue,const char *wuid)
  395. {
  396. if (queue->cancelInitiateConversation(wuid))
  397. printf("removed %s\n",wuid);
  398. else
  399. printf("failed to remove %s\n",wuid);
  400. }
  401. static void cmd_stats(IJobQueue *queue,bool wait)
  402. {
  403. for (;;) {
  404. unsigned enqueued=0;
  405. unsigned connected=0;
  406. unsigned waiting=0;
  407. queue->getStats(connected,waiting,enqueued);
  408. printf("%d item(s) on queue, %d client(s) connected, %d client(s) waiting\n",enqueued,connected,waiting);
  409. if (!wait)
  410. break;
  411. if (!queue->waitStatsChange(unsigned()-1))
  412. break;
  413. }
  414. }
  415. static void cmd_activity(IJobQueue *queue,const char *qname)
  416. {
  417. StringBuffer xpath;
  418. xpath.appendf("Server[@queue=\"%s\"]/WorkUnit",qname);
  419. Owned<IRemoteConnection> conn = querySDS().connect("Status/Servers", myProcessSession(), 0, 100000);
  420. if (!conn) {
  421. OERRLOG("cannot connect to Status/Servers");
  422. return;
  423. }
  424. for (;;) {
  425. conn->reload();
  426. Owned<IPropertyTreeIterator> iter = conn->queryRoot()->getElements(xpath.str());
  427. StringArray wuids;
  428. ForEach(*iter) {
  429. IPropertyTree &wu = iter->query();
  430. wuids.append(wu.queryProp(NULL));
  431. }
  432. unsigned enqueued=0;
  433. unsigned connected=0;
  434. unsigned waiting=0;
  435. queue->getStats(connected,waiting,enqueued);
  436. StringBuffer times;
  437. CDateTime time;
  438. time.setNow();
  439. time.getString(times);
  440. fprintf(stdout,"%s,%d,%d,%d,%d,%s,%s\n",times.str(),wuids.ordinality(),enqueued,waiting,connected,wuids.ordinality()>0?wuids.item(0):"---",wuids.ordinality()>1?wuids.item(1):"---");
  441. fflush(stdout);
  442. Sleep(60*1000);
  443. }
  444. }
  445. int main(int argc, const char *argv[])
  446. {
  447. InitModuleObjects();
  448. EnableSEHtoExceptionMapping();
  449. if (argc<4)
  450. usage();
  451. Owned<IGroup> serverGroup = createIGroup(argv[1], DALI_SERVER_PORT);
  452. initClientProcess(serverGroup,DCR_Other);
  453. try
  454. {
  455. const char *action = argv[2];
  456. const char *qname = argv[3];
  457. const char *wuid = (argc>4)?argv[4]:"";
  458. int prio = (argc>5)?atoi(argv[5]):0;
  459. const char *wuid2 = (argc>5)?argv[5]:"";
  460. Owned<IJobQueue> queue = (stricmp(action,"switch")!=0)?createJobQueue(qname):NULL;
  461. if (stricmp(action,"list")==0)
  462. cmd_list(queue);
  463. else if (stricmp(action,"xremove")==0)
  464. cmd_xremove(queue,wuid);
  465. else if (stricmp(action,"remove")==0)
  466. cmd_remove(queue,wuid);
  467. else if (stricmp(action,"tofront")==0)
  468. cmd_tofront(queue,wuid);
  469. else if (stricmp(action,"toback")==0)
  470. cmd_toback(queue,wuid);
  471. else if (stricmp(action,"setprio")==0)
  472. cmd_setprio(queue,wuid,prio);
  473. else if (stricmp(action,"movebefore")==0)
  474. cmd_movebefore(queue,wuid,wuid2);
  475. else if (stricmp(action,"moveafter")==0)
  476. cmd_moveafter(queue,wuid,wuid2);
  477. else if (stricmp(action,"pause")==0)
  478. cmd_pause(queue);
  479. else if (stricmp(action,"stop")==0)
  480. cmd_stop(queue);
  481. else if (stricmp(action,"resume")==0)
  482. cmd_resume(queue);
  483. else if (stricmp(action,"clear")==0)
  484. cmd_clear(queue);
  485. else if (stricmp(action,"add")==0)
  486. cmd_add(queue,wuid,prio,(argc>6)?argv[6]:NULL);
  487. else if (stricmp(action,"switch")==0) {
  488. // switchWorkunitQueue(argv[3],argv[4]);
  489. }
  490. else if (stricmp(action,"accept")==0)
  491. cmd_accept(queue);
  492. else if (stricmp(action,"dequeue")==0)
  493. cmd_dequeue(queue);
  494. else if (stricmp(action,"initiate")==0)
  495. cmd_initiate(queue,wuid,prio);
  496. else if (stricmp(action,"stats")==0)
  497. cmd_stats(queue,(argc>4)?(stricmp(argv[4],"loop")==0):false);
  498. else if (stricmp(action,"activity")==0)
  499. cmd_activity(queue,qname);
  500. else if (stricmp(action,"prior")==0)
  501. cmd_prior(queue);
  502. else
  503. usage();
  504. queue.clear();
  505. }
  506. catch (IException *e)
  507. {
  508. StringBuffer m;
  509. printf("Error: %s\n", e->errorMessage(m).str());
  510. e->Release();
  511. }
  512. closedownClientProcess(); // dali client closedown
  513. releaseAtoms();
  514. return 0;
  515. }