wujobq.cpp 62 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include <algorithm>
  15. #include "limits.h"
  16. #include "jlib.hpp"
  17. #include "jbuff.hpp"
  18. #include "dasess.hpp"
  19. #include "dautils.hpp"
  20. #include "portlist.h"
  21. #include "dacoven.hpp"
  22. #include "daclient.hpp"
  23. #include "dasds.hpp"
  24. #include "dasess.hpp"
  25. #include "wujobq.hpp"
  26. #ifdef _MSC_VER
  27. #pragma warning (disable : 4355)
  28. #endif
  29. #if 0
  30. JobQueues
  31. JobQueue @name= @count= @state=active|paused|stopped
  32. Edition <num>
  33. Client @session= @connected= @waiting= -- connections and waiting can be > 1 (multiple threads)
  34. Item* @wuid @owner @node @port @priority @session
  35. #endif
  36. class CJobQueueItem: implements IJobQueueItem, public CInterface
  37. {
  38. int priority;
  39. StringAttr wu;
  40. StringAttr owner;
  41. SessionId sessid;
  42. SocketEndpoint ep;
  43. unsigned port;
  44. CDateTime enqueuedt;
  45. public:
  46. IMPLEMENT_IINTERFACE;
  47. CJobQueueItem(MemoryBuffer &src)
  48. {
  49. deserialize(src);
  50. }
  51. CJobQueueItem(const char *_wu)
  52. : wu(_wu)
  53. {
  54. priority = 0;
  55. ep = queryMyNode()->endpoint();
  56. port = 0;
  57. sessid = myProcessSession();
  58. }
  59. CJobQueueItem(IPropertyTree *item)
  60. {
  61. const char * wuid = item->queryProp("@wuid");
  62. if (*wuid=='~')
  63. wuid++;
  64. wu.set(wuid);
  65. owner.set(item->queryProp("@owner"));
  66. sessid = (SessionId)item->getPropInt64("@session");
  67. priority = item->getPropInt("@priority");
  68. ep.set(item->queryProp("@node"));
  69. port = (unsigned)item->getPropInt("@port");
  70. StringBuffer dts;
  71. if (item->getProp("@enqueuedt",dts))
  72. enqueuedt.setString(dts.str());
  73. }
  74. static void assignBranch(IPropertyTree *item,IJobQueueItem *qi)
  75. {
  76. item->setPropInt64("@session",qi->getSessionId());
  77. item->setPropInt("@priority",qi->getPriority());
  78. item->setPropInt("@port",qi->getPort());
  79. item->setProp("@wuid",qi->queryWUID());
  80. item->setProp("@owner",qi->queryOwner());
  81. StringBuffer eps;
  82. qi->queryEndpoint().getUrlStr(eps);
  83. item->setProp("@node",eps.str());
  84. StringBuffer dts;
  85. qi->queryEnqueuedTime().getString(dts);
  86. if (dts.length()==0) {
  87. CDateTime dt;
  88. dt.setNow();
  89. dt.getString(dts);
  90. qi->setEnqueuedTime(dt);
  91. }
  92. item->setProp("@enqueuedt",dts.str());
  93. }
  94. const char *queryWUID()
  95. {
  96. return wu.get();
  97. }
  98. int getPriority()
  99. {
  100. return priority;
  101. }
  102. unsigned getPort()
  103. {
  104. return port;
  105. }
  106. SessionId getSessionId()
  107. {
  108. return sessid;
  109. }
  110. SocketEndpoint &queryEndpoint()
  111. {
  112. return ep;
  113. }
  114. const char *queryOwner()
  115. {
  116. return owner.get();
  117. }
  118. bool equals(IJobQueueItem *other)
  119. {
  120. // work unit is primary key
  121. return strcmp(wu.get(),other->queryWUID())==0;
  122. }
  123. CDateTime &queryEnqueuedTime()
  124. {
  125. return enqueuedt;
  126. }
  127. void setEnqueuedTime(const CDateTime &dt)
  128. {
  129. enqueuedt.set(dt);
  130. }
  131. void serialize(MemoryBuffer &tgt)
  132. {
  133. tgt.append(priority).append(port).append(wu).append(sessid);
  134. ep.serialize(tgt);
  135. StringBuffer dts;
  136. enqueuedt.getString(dts);
  137. tgt.append(owner).append(dts);
  138. }
  139. void deserialize(MemoryBuffer &src)
  140. {
  141. src.read(priority).read(port).read(wu).read(sessid);
  142. ep.deserialize(src);
  143. StringBuffer dts;
  144. src.read(owner).read(dts);
  145. enqueuedt.setString(dts.str());
  146. }
  147. IJobQueueItem* clone()
  148. {
  149. IJobQueueItem* ret = new CJobQueueItem(wu);
  150. ret->setPriority(priority);
  151. ret->setPriority(port);
  152. ret->setEndpoint(ep);
  153. ret->setSessionId(sessid);
  154. return ret;
  155. }
  156. void setPriority(int _priority)
  157. {
  158. priority = _priority;
  159. }
  160. void setPort(unsigned _port)
  161. {
  162. port = _port;
  163. }
  164. void setEndpoint(const SocketEndpoint &_ep)
  165. {
  166. ep = _ep;
  167. }
  168. void setSessionId(SessionId _id)
  169. {
  170. if (_id)
  171. sessid = _id;
  172. else
  173. sessid = myProcessSession();
  174. }
  175. void setOwner(const char *_owner)
  176. {
  177. owner.set(_owner);
  178. }
  179. bool isValidSession()
  180. {
  181. Owned<INode> node = createINode(ep);
  182. return (querySessionManager().lookupProcessSession(node)==sessid);
  183. }
  184. };
  185. class CJobQueueIterator: implements IJobQueueIterator, public CInterface
  186. {
  187. public:
  188. CJobQueueContents &items;
  189. unsigned idx;
  190. IMPLEMENT_IINTERFACE;
  191. CJobQueueIterator(CJobQueueContents &_items)
  192. : items(_items)
  193. {
  194. idx = 0;
  195. }
  196. bool isValid()
  197. {
  198. return idx<items.ordinality();
  199. }
  200. bool first()
  201. {
  202. idx = 0;
  203. return isValid();
  204. }
  205. bool next()
  206. {
  207. idx++;
  208. return isValid();
  209. }
  210. IJobQueueItem & query()
  211. {
  212. return items.item(idx);
  213. }
  214. };
  215. IJobQueueIterator *CJobQueueContents::getIterator()
  216. {
  217. return new CJobQueueIterator(*this);
  218. }
  219. IJobQueueItem *createJobQueueItem(const char *wuid)
  220. {
  221. if (!wuid||!*wuid)
  222. throw MakeStringException(-1,"createJobQueueItem empty WUID");
  223. return new CJobQueueItem(wuid);;
  224. }
  225. IJobQueueItem *deserializeJobQueueItem(MemoryBuffer &mb)
  226. {
  227. return new CJobQueueItem(mb);
  228. }
  229. #define ForEachQueue(qd) for (sQueueData *qd = qdata; qd!=NULL; qd=qd->next)
  230. #define ForEachQueueIn(parent,qd) for (sQueueData *qd = parent.qdata; qd!=NULL; qd=qd->next)
  231. struct sQueueData
  232. {
  233. sQueueData *next;
  234. IRemoteConnection *conn;
  235. StringAttr qname;
  236. IPropertyTree *root;
  237. SubscriptionId subscriberid;
  238. unsigned lastWaitEdition;
  239. };
  240. class CJobQueueBase: implements IJobQueueConst, public CInterface
  241. {
  242. class cOrderedIterator
  243. {
  244. CJobQueueBase &parent;
  245. unsigned numqueues;
  246. unsigned *queueidx;
  247. sQueueData **queues;
  248. IPropertyTree **queuet;
  249. MemoryAttr ma;
  250. unsigned current;
  251. public:
  252. cOrderedIterator(CJobQueueBase&_parent) : parent(_parent)
  253. {
  254. numqueues=0;
  255. ForEachQueueIn(parent,qd1)
  256. if (qd1->root)
  257. numqueues++;
  258. queueidx = (unsigned *)ma.allocate(numqueues*(sizeof(unsigned)+sizeof(sQueueData *)+sizeof(IPropertyTree *)));
  259. queues = (sQueueData **)(queueidx+numqueues);
  260. queuet = (IPropertyTree **)(queues+numqueues);
  261. unsigned i = 0;
  262. ForEachQueueIn(parent,qd2)
  263. {
  264. if (qd2->root)
  265. queues[i++] = qd2;
  266. }
  267. current = (unsigned)-1;
  268. }
  269. bool first()
  270. {
  271. StringBuffer path;
  272. parent.getItemPath(path,0U);
  273. current = (unsigned)-1;
  274. for (unsigned i = 0; i<numqueues;i++)
  275. {
  276. queueidx[i] = 0;
  277. queuet[i] = queues[i]->root->queryPropTree(path.str());
  278. if (queuet[i])
  279. if ((current==(unsigned)-1)||parent.itemOlder(queuet[i],queuet[current]))
  280. current = i;
  281. }
  282. return current!=(unsigned)-1;
  283. }
  284. bool next()
  285. {
  286. if (current==(unsigned)-1)
  287. return false;
  288. queueidx[current]++;
  289. StringBuffer path;
  290. parent.getItemPath(path,queueidx[current]);
  291. queuet[current] = queues[current]->root->queryPropTree(path.str());
  292. current = (unsigned)-1;
  293. for (unsigned i = 0; i<numqueues;i++)
  294. {
  295. if (queuet[i])
  296. if ((current==(unsigned)-1)||parent.itemOlder(queuet[i],queuet[current]))
  297. current = i;
  298. }
  299. return current!=(unsigned)-1;
  300. }
  301. bool isValid()
  302. {
  303. return current!=(unsigned)-1;
  304. }
  305. void item(sQueueData *&qd, IPropertyTree *&t,unsigned &idx)
  306. {
  307. assertex(current!=(unsigned)-1);
  308. qd = queues[current];
  309. t = queuet[current];
  310. idx = queueidx[current];
  311. }
  312. sQueueData &queryQueue()
  313. {
  314. assertex(current!=(unsigned)-1);
  315. return *queues[current];
  316. }
  317. IPropertyTree &queryTree()
  318. {
  319. assertex(current!=(unsigned)-1);
  320. return *queuet[current];
  321. }
  322. };
  323. protected:
  324. bool doGetLastDequeuedInfo(sQueueData *qd, StringAttr &wuid, CDateTime &enqueuedt, int &priority)
  325. {
  326. priority = 0;
  327. if (!qd)
  328. return false;
  329. const char *w = qd->root->queryProp("@prevwuid");
  330. if (!w||!*w)
  331. return false;
  332. wuid.set(w);
  333. StringBuffer dts;
  334. if (qd->root->getProp("@prevenqueuedt",dts))
  335. enqueuedt.setString(dts.str());
  336. priority = qd->root->getPropInt("@prevpriority");
  337. return true;
  338. }
  339. public:
  340. sQueueData *qdata;
  341. Semaphore notifysem;
  342. CriticalSection crit;
  343. IMPLEMENT_IINTERFACE;
  344. CJobQueueBase(const char *_qname)
  345. {
  346. StringArray qlist;
  347. qlist.appendListUniq(_qname, ",");
  348. sQueueData *last = NULL;
  349. ForEachItemIn(i,qlist)
  350. {
  351. sQueueData *qd = new sQueueData;
  352. qd->next = NULL;
  353. qd->qname.set(qlist.item(i));
  354. qd->conn = NULL;
  355. qd->root = NULL;
  356. qd->lastWaitEdition = 0;
  357. qd->subscriberid = 0;
  358. if (last)
  359. last->next = qd;
  360. else
  361. qdata = qd;
  362. last = qd;
  363. }
  364. };
  365. virtual ~CJobQueueBase()
  366. {
  367. while (qdata)
  368. {
  369. sQueueData * next = qdata->next;
  370. delete qdata;
  371. qdata = next;
  372. }
  373. }
  374. StringBuffer &getItemPath(StringBuffer &path,const char *wuid)
  375. {
  376. if (!wuid||!*wuid)
  377. return getItemPath(path,0U);
  378. return path.appendf("Item[@wuid=\"%s\"]",wuid);
  379. }
  380. StringBuffer &getItemPath(StringBuffer &path,unsigned idx)
  381. {
  382. path.appendf("Item[@num=\"%d\"]",idx+1);
  383. return path;
  384. }
  385. IPropertyTree *queryClientRootIndex(sQueueData &qd, unsigned idx)
  386. {
  387. VStringBuffer path("Client[%d]", idx+1);
  388. return qd.root->queryPropTree(path);
  389. }
  390. bool itemOlder(IPropertyTree *qt1, IPropertyTree *qt2)
  391. {
  392. // if this ever becomes time critical thne could cache enqueued values
  393. StringBuffer d1s;
  394. if (qt1)
  395. qt1->getProp("@enqueuedt",d1s);
  396. StringBuffer d2s;
  397. if (qt2)
  398. qt2->getProp("@enqueuedt",d2s);
  399. return (strcmp(d1s.str(),d2s.str())<0);
  400. }
  401. IJobQueueItem *doGetItem(sQueueData &qd,unsigned idx)
  402. {
  403. if (idx==(unsigned)-1)
  404. {
  405. idx = qd.root->getPropInt("@count");
  406. if (!idx)
  407. return NULL;
  408. idx--;
  409. }
  410. StringBuffer path;
  411. IPropertyTree *item = qd.root->queryPropTree(getItemPath(path,idx).str());
  412. if (!item)
  413. return NULL;
  414. return new CJobQueueItem(item);
  415. }
  416. IJobQueueItem *getItem(sQueueData &qd,unsigned idx)
  417. {
  418. return doGetItem(qd, idx);
  419. }
  420. IJobQueueItem *getHead(sQueueData &qd)
  421. {
  422. return getItem(qd,0);
  423. }
  424. unsigned doFindRank(sQueueData &qd,const char *wuid)
  425. {
  426. StringBuffer path;
  427. IPropertyTree *item = qd.root->queryPropTree(getItemPath(path,wuid).str());
  428. if (!item)
  429. return (unsigned)-1;
  430. return item->getPropInt("@num")-1;
  431. }
  432. unsigned findRank(sQueueData &qd,const char *wuid)
  433. {
  434. return doFindRank(qd,wuid);
  435. }
  436. IJobQueueItem *find(sQueueData &qd,const char *wuid)
  437. {
  438. StringBuffer path;
  439. IPropertyTree *item = qd.root->queryPropTree(getItemPath(path,wuid).str());
  440. if (!item)
  441. return NULL;
  442. bool cached = item->getPropInt("@num",0)<=0;
  443. if (wuid&&cached)
  444. return NULL; // don't want cached value unless explicit
  445. return new CJobQueueItem(item);
  446. }
  447. unsigned copyItemsImpl(sQueueData &qd,CJobQueueContents &dest)
  448. {
  449. unsigned ret=0;
  450. StringBuffer path;
  451. for (unsigned i=0;;i++)
  452. {
  453. IPropertyTree *item = qd.root->queryPropTree(getItemPath(path.clear(),i).str());
  454. if (!item)
  455. break;
  456. ret++;
  457. dest.append(*new CJobQueueItem(item));
  458. }
  459. return ret;
  460. }
  461. virtual void copyItemsAndState(CJobQueueContents& contents, StringBuffer& state, StringBuffer& stateDetails)
  462. {
  463. assertex(qdata);
  464. assertex(qdata->root);
  465. copyItemsImpl(*qdata,contents);
  466. const char *st = qdata->root->queryProp("@state");
  467. if (st&&*st)
  468. state.set(st);
  469. if (st && (strieq(st, "paused") || strieq(st, "stopped")))
  470. {
  471. const char *stDetails = qdata->root->queryProp("@stateDetails");
  472. if (stDetails&&*stDetails)
  473. stateDetails.set(stDetails);
  474. }
  475. }
  476. sQueueData *findQD(const char *wuid)
  477. {
  478. if (wuid&&*wuid)
  479. {
  480. ForEachQueue(qd)
  481. {
  482. unsigned idx = doFindRank(*qd,wuid);
  483. if (idx!=(unsigned)-1)
  484. return qd;
  485. }
  486. }
  487. return NULL;
  488. }
  489. virtual unsigned waiting()
  490. {
  491. unsigned ret = 0;
  492. ForEachQueue(qd)
  493. {
  494. for (unsigned i=0;;i++)
  495. {
  496. IPropertyTree *croot = queryClientRootIndex(*qd,i);
  497. if (!croot)
  498. break;
  499. ret += croot->getPropInt("@waiting");
  500. }
  501. }
  502. return ret;
  503. }
  504. virtual unsigned findRank(const char *wuid)
  505. {
  506. assertex(qdata);
  507. if (!qdata->next)
  508. return findRank(*qdata,wuid);
  509. cOrderedIterator it(*this);
  510. unsigned i = 0;
  511. ForEach(it)
  512. {
  513. const char *twuid = it.queryTree().queryProp("@wuid");
  514. if (twuid&&(strcmp(twuid,wuid)==0))
  515. return i;
  516. i++;
  517. }
  518. return (unsigned)-1;
  519. }
  520. virtual unsigned copyItems(CJobQueueContents &dest)
  521. {
  522. assertex(qdata);
  523. if (!qdata->next)
  524. return copyItemsImpl(*qdata,dest);
  525. cOrderedIterator it(*this);
  526. unsigned ret = 0;
  527. ForEach(it)
  528. {
  529. dest.append(*new CJobQueueItem(&it.queryTree()));
  530. ret++;
  531. }
  532. return ret;
  533. }
  534. virtual IJobQueueItem *getItem(unsigned idx)
  535. {
  536. if (!qdata)
  537. return NULL;
  538. if (!qdata->next)
  539. return getItem(*qdata,idx);
  540. cOrderedIterator it(*this);
  541. unsigned i = 0;
  542. IPropertyTree *ret = NULL;
  543. ForEach(it)
  544. {
  545. if (i==idx)
  546. {
  547. ret = &it.queryTree();
  548. break;
  549. }
  550. else if (idx==(unsigned)-1) // -1 means return last
  551. ret = &it.queryTree();
  552. i++;
  553. }
  554. if (ret)
  555. return new CJobQueueItem(ret);
  556. return NULL;
  557. }
  558. virtual IJobQueueItem *getHead()
  559. {
  560. if (!qdata)
  561. return NULL;
  562. if (!qdata->next)
  563. return getHead(*qdata);
  564. return getItem(0);
  565. }
  566. virtual IJobQueueItem *getTail()
  567. {
  568. if (!qdata)
  569. return NULL;
  570. if (!qdata->next)
  571. return getHead(*qdata);
  572. return getItem((unsigned)-1);
  573. }
  574. virtual IJobQueueItem *find(const char *wuid)
  575. {
  576. if (!qdata)
  577. return NULL;
  578. sQueueData *qd = qdata->next?findQD(wuid):qdata;
  579. if (!qd)
  580. return NULL;
  581. return find(*qd,wuid);
  582. }
  583. virtual bool paused()
  584. {
  585. // true if all paused
  586. ForEachQueue(qd)
  587. {
  588. if (qd->root)
  589. {
  590. const char *state = qd->root->queryProp("@state");
  591. if (state&&(strcmp(state,"paused")!=0))
  592. return false;
  593. }
  594. }
  595. return true;
  596. }
  597. virtual bool paused(StringBuffer& info)
  598. {
  599. // true if all paused
  600. ForEachQueue(qd)
  601. {
  602. if (qd->root)
  603. {
  604. const char *state = qd->root->queryProp("@state");
  605. if (state&&(strcmp(state,"paused")!=0))
  606. return false;
  607. if (state&&!info.length())
  608. {
  609. const char *stateDetails = qd->root->queryProp("@stateDetails");
  610. if (stateDetails && *stateDetails)
  611. info.set(stateDetails);
  612. }
  613. }
  614. }
  615. return true;
  616. }
  617. virtual bool stopped()
  618. {
  619. // true if all stopped
  620. ForEachQueue(qd)
  621. {
  622. if (qd->root)
  623. {
  624. const char *state = qd->root->queryProp("@state");
  625. if (state&&(strcmp(state,"stopped")!=0))
  626. return false;
  627. }
  628. }
  629. return true;
  630. }
  631. virtual bool stopped(StringBuffer& info)
  632. {
  633. // true if all stopped
  634. ForEachQueue(qd)
  635. {
  636. if (qd->root)
  637. {
  638. const char *state = qd->root->queryProp("@state");
  639. if (state&&(strcmp(state,"stopped")!=0))
  640. return false;
  641. if (state&&!info.length())
  642. {
  643. const char *stateDetails = qd->root->queryProp("@stateDetails");
  644. if (stateDetails && *stateDetails)
  645. info.set(stateDetails);
  646. }
  647. }
  648. }
  649. return true;
  650. }
  651. virtual unsigned ordinality()
  652. {
  653. unsigned ret = 0;
  654. ForEachQueue(qd)
  655. {
  656. if (qd->root)
  657. ret += qd->root->getPropInt("@count");
  658. }
  659. return ret;
  660. }
  661. virtual bool getLastDequeuedInfo(StringAttr &wuid, CDateTime &enqueuedt, int &priority)
  662. {
  663. return doGetLastDequeuedInfo(qdata, wuid, enqueuedt, priority);
  664. }
  665. //Similar to copyItemsAndState(), this method returns the state information for one queue.
  666. virtual void getState(StringBuffer& state, StringBuffer& stateDetails)
  667. {
  668. if (!qdata->root)
  669. return;
  670. const char *st = qdata->root->queryProp("@state");
  671. if (!st || !*st)
  672. return;
  673. state.set(st);
  674. if ((strieq(st, "paused") || strieq(st, "stopped")))
  675. stateDetails.set(qdata->root->queryProp("@stateDetails"));
  676. }
  677. };
  678. class CJobQueueConst: public CJobQueueBase
  679. {
  680. Owned<IPropertyTree> jobQueueSnapshot;
  681. public:
  682. CJobQueueConst(const char *_qname, IPropertyTree* _jobQueueSnapshot) : CJobQueueBase(_qname)
  683. {
  684. if (!_jobQueueSnapshot)
  685. throw MakeStringException(-1, "No job queue snapshot");
  686. jobQueueSnapshot.setown(_jobQueueSnapshot);
  687. ForEachQueue(qd)
  688. {
  689. VStringBuffer path("Queue[@name=\"%s\"]", qd->qname.get());
  690. qd->root = jobQueueSnapshot->queryPropTree(path.str());
  691. if (!qd->root)
  692. throw MakeStringException(-1, "No job queue found for %s", qd->qname.get());
  693. }
  694. };
  695. };
  696. class CJobQueue: public CJobQueueBase, implements IJobQueue
  697. {
  698. public:
  699. sQueueData *activeq;
  700. SessionId sessionid;
  701. unsigned locknest;
  702. bool writemode;
  703. bool connected;
  704. Owned<IConversation> initiateconv;
  705. StringAttr initiatewu;
  706. bool dequeuestop;
  707. bool cancelwaiting;
  708. bool validateitemsessions;
  709. class csubs: implements ISDSSubscription, public CInterface
  710. {
  711. CJobQueue *parent;
  712. public:
  713. IMPLEMENT_IINTERFACE;
  714. csubs(CJobQueue *_parent)
  715. {
  716. parent = _parent;
  717. }
  718. void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
  719. {
  720. CriticalBlock block(parent->crit);
  721. parent->notifysem.signal();
  722. }
  723. } subs;
  724. IMPLEMENT_IINTERFACE;
  725. CJobQueue(const char *_qname) : CJobQueueBase(_qname), subs(this)
  726. {
  727. activeq = qdata;
  728. sessionid = myProcessSession();
  729. validateitemsessions = false;
  730. writemode = false;
  731. locknest = 0;
  732. connected = false;
  733. dequeuestop = false;
  734. cancelwaiting = false;
  735. Cconnlockblock block(this,false); // this just checks queue exists
  736. }
  737. virtual ~CJobQueue()
  738. {
  739. try {
  740. while (locknest)
  741. connunlock(true); // auto rollback
  742. if (connected)
  743. disconnect();
  744. }
  745. catch (IException *e) {
  746. // server error
  747. EXCLOG(e, "~CJobQueue");
  748. e->Release();
  749. }
  750. try { // must attempt to remove subscription before object destroyed.
  751. dounsubscribe();
  752. }
  753. catch (IException *e) {
  754. EXCLOG(e, "~CJobQueue calling dounsubscribe");
  755. e->Release();
  756. }
  757. }
  758. void connlock(bool exclusive)
  759. { // must be in sect
  760. if (locknest++==0) {
  761. unsigned wait = qdata&&qdata->next?5000:INFINITE;
  762. ForEachQueue(qd) {
  763. for (;;) {
  764. StringBuffer path;
  765. path.appendf("/JobQueues/Queue[@name=\"%s\"]",qd->qname.get());
  766. bool timeout;
  767. for (;;) {
  768. timeout=false;
  769. try {
  770. qd->conn = querySDS().connect(path.str(),myProcessSession(),exclusive?RTM_LOCK_WRITE:RTM_LOCK_READ,wait);
  771. if (qd->conn)
  772. break;
  773. }
  774. catch (ISDSException *e) {
  775. if (SDSExcpt_LockTimeout != e->errorCode())
  776. throw;
  777. e->Release();
  778. timeout = true;
  779. }
  780. // create queue
  781. Owned<IRemoteConnection> pconn;
  782. try {
  783. pconn.setown(querySDS().connect("/JobQueues",myProcessSession(),RTM_LOCK_WRITE|RTM_CREATE_QUERY,wait));
  784. if (!pconn)
  785. throw MakeStringException(-1,"CJobQueue could not create JobQueues");
  786. IPropertyTree *proot = pconn->queryRoot();
  787. StringBuffer cpath;
  788. cpath.appendf("Queue[@name=\"%s\"]",qd->qname.get());
  789. if (!proot->hasProp(cpath.str())) {
  790. IPropertyTree *pt = proot->addPropTree("Queue");
  791. pt->setProp("@name",qd->qname.get());
  792. pt->setProp("@state","active");
  793. pt->setPropInt("@count", 0);
  794. pt->setPropInt("Edition", 1);
  795. }
  796. }
  797. catch (ISDSException *e) {
  798. if (SDSExcpt_LockTimeout != e->errorCode())
  799. throw;
  800. e->Release();
  801. timeout = true;
  802. }
  803. }
  804. if (!timeout)
  805. break;
  806. sQueueData *qd2 = qdata;
  807. do {
  808. ::Release(qd2->conn);
  809. qd2->conn = NULL;
  810. qd2->root = NULL;
  811. }
  812. while (qd2!=qd);
  813. PROGLOG("Job Queue contention - delaying before retrying");
  814. Sleep(getRandom()%5000); // dining philosopher delay
  815. wait = getRandom()%4000+3000; // try and prevent sync
  816. qd = qdata;
  817. }
  818. qd->root = qd->conn->queryRoot();
  819. }
  820. writemode = exclusive;
  821. }
  822. else {
  823. if (exclusive&&!writemode) {
  824. ForEachQueue(qd) {
  825. assertex(qd->conn);
  826. writemode = exclusive;
  827. bool lockreleased;
  828. safeChangeModeWrite(qd->conn,qd->qname.get(),lockreleased);
  829. qd->root = qd->conn->queryRoot();
  830. }
  831. }
  832. }
  833. }
  834. void connunlock(bool rollback=false)
  835. { // should be in sect
  836. if (--locknest==0) {
  837. ForEachQueue(qd) {
  838. if (qd->conn) { // can occur if connection to dali threw exception
  839. if (writemode) {
  840. if (rollback)
  841. qd->conn->rollback();
  842. else {
  843. qd->root->setPropInt("Edition",qd->root->getPropInt("Edition")+1);
  844. qd->conn->commit();
  845. }
  846. }
  847. qd->conn->Release();
  848. qd->conn = NULL;
  849. }
  850. qd->root = NULL;
  851. }
  852. writemode = false;
  853. }
  854. }
  855. void conncommit() // doesn't set edition
  856. { // called within sect
  857. if (writemode) {
  858. ForEachQueue(qd) {
  859. if (qd->conn)
  860. qd->conn->commit();
  861. }
  862. }
  863. }
  864. class Cconnlockblock: public CriticalBlock
  865. {
  866. CJobQueue *parent;
  867. bool rollback;
  868. public:
  869. Cconnlockblock(CJobQueue *_parent,bool exclusive)
  870. : CriticalBlock(_parent->crit)
  871. {
  872. parent = _parent;
  873. parent->connlock(exclusive);
  874. rollback = false;
  875. }
  876. ~Cconnlockblock()
  877. {
  878. parent->connunlock(rollback);
  879. }
  880. void setRollback(bool set=true)
  881. {
  882. rollback = set;
  883. }
  884. void commit()
  885. {
  886. parent->conncommit();
  887. }
  888. };
  889. void removeItem(sQueueData &qd,IPropertyTree *item, bool cache)
  890. { // does not adjust or use @count
  891. unsigned n = item->getPropInt("@num");
  892. if (!n)
  893. return;
  894. if (cache) {
  895. StringBuffer s;
  896. item->getProp("@wuid",s.clear());
  897. qd.root->setProp("@prevwuid",s.str());
  898. item->getProp("@enqueuedt",s.clear());
  899. qd.root->setProp("@prevenqueuedt",s.str());
  900. qd.root->setPropInt("@prevpriority",item->getPropInt("@priority"));
  901. }
  902. item->setPropInt("@num",-1);
  903. StringBuffer path;
  904. for (;;) {
  905. IPropertyTree *item2 = qd.root->queryPropTree(getItemPath(path.clear(),n).str());
  906. if (!item2)
  907. break;
  908. item2->setPropInt("@num",n);
  909. n++;
  910. }
  911. qd.root->removeTree(item);
  912. }
  913. IPropertyTree *addItem(sQueueData &qd,IPropertyTree *item,unsigned idx,unsigned count)
  914. {
  915. // does not set any values other than num
  916. StringBuffer path;
  917. // first move following up
  918. unsigned n=count;
  919. while (n>idx) {
  920. n--;
  921. qd.root->queryPropTree(getItemPath(path.clear(),n).str())->setPropInt("@num",n+2);
  922. }
  923. item->setPropInt("@num",idx+1);
  924. return qd.root->addPropTree("Item",item);
  925. }
  926. void dosubscribe()
  927. { // called in crit section
  928. ForEachQueue(qd) {
  929. if (qd->subscriberid) {
  930. querySDS().unsubscribe(qd->subscriberid);
  931. qd->subscriberid = 0;
  932. }
  933. StringBuffer path;
  934. path.appendf("/JobQueues/Queue[@name=\"%s\"]/Edition",qd->qname.get());
  935. qd->subscriberid = querySDS().subscribe(path.str(), subs, false);
  936. }
  937. }
  938. bool haschanged() // returns if any changed
  939. {
  940. bool changed = false;
  941. ForEachQueue(qd) {
  942. if (!qd->subscriberid) {
  943. StringBuffer path;
  944. path.appendf("/JobQueues/Queue[@name=\"%s\"]/Edition",qd->qname.get());
  945. qd->subscriberid = querySDS().subscribe(path.str(), subs, false);
  946. }
  947. unsigned e = (unsigned)qd->root->getPropInt("Edition", 1);
  948. if (e!=qd->lastWaitEdition) {
  949. qd->lastWaitEdition = e;
  950. changed = true;
  951. break;
  952. }
  953. }
  954. return changed;
  955. }
  956. void dounsubscribe()
  957. {
  958. // called in crit section
  959. ForEachQueue(qd) {
  960. if (qd->subscriberid) {
  961. querySDS().unsubscribe(qd->subscriberid);
  962. qd->subscriberid = 0;
  963. }
  964. }
  965. }
  966. IPropertyTree *queryClientRootSession(sQueueData &qd)
  967. {
  968. VStringBuffer path("Client[@session=\"%" I64F "d\"]", sessionid);
  969. IPropertyTree *ret = qd.root->queryPropTree(path.str());
  970. if (!ret)
  971. {
  972. ret = qd.root->addPropTree("Client");
  973. ret->setPropInt64("@session",sessionid);
  974. StringBuffer eps;
  975. ret->setProp("@node",queryMyNode()->endpoint().getUrlStr(eps).str());
  976. }
  977. return ret;
  978. }
  979. void connect(bool _validateitemsessions)
  980. {
  981. Cconnlockblock block(this,true);
  982. validateitemsessions = _validateitemsessions;
  983. if (connected)
  984. disconnect();
  985. dosubscribe();
  986. ForEachQueue(qd)
  987. {
  988. if (validateitemsessions)
  989. {
  990. unsigned connected;
  991. unsigned waiting;
  992. unsigned count;
  993. getStats(*qd,connected,waiting,count); // clear any duff clients
  994. }
  995. IPropertyTree *croot = queryClientRootSession(*qd);
  996. croot->setPropInt64("@connected",croot->getPropInt64("@connected",0)+1);
  997. }
  998. connected = true;
  999. }
  1000. void disconnect() // signal no longer wil be dequeing (optional - done automatically on release)
  1001. {
  1002. Cconnlockblock block(this,true);
  1003. if (connected) {
  1004. dounsubscribe();
  1005. ForEachQueue(qd) {
  1006. IPropertyTree *croot = queryClientRootSession(*qd);
  1007. croot->setPropInt64("@connected",croot->getPropInt64("@connected",0)-1);
  1008. }
  1009. connected = false;
  1010. }
  1011. }
  1012. sQueueData *findbestqueue(bool useprev,int minprio,unsigned numqueues,sQueueData **queues)
  1013. {
  1014. if (numqueues==0)
  1015. return NULL;
  1016. if (numqueues==1)
  1017. return *queues;
  1018. sQueueData *best = NULL;
  1019. IPropertyTree *bestt = NULL;
  1020. for (unsigned i=0;i<numqueues;i++) {
  1021. sQueueData *qd = queues[i];
  1022. unsigned count = qd->root->getPropInt("@count");
  1023. if (count) {
  1024. int mpr = useprev?std::max(qd->root->getPropInt("@prevpriority"),minprio):minprio;
  1025. if (count&&((minprio==INT_MIN)||checkprio(*qd,mpr))) {
  1026. StringBuffer path;
  1027. IPropertyTree *item = qd->root->queryPropTree(getItemPath(path,0U).str());
  1028. if (!item)
  1029. continue;
  1030. if (item->getPropInt("@num",0)<=0)
  1031. continue;
  1032. CDateTime dt;
  1033. StringBuffer enqueued;
  1034. if (!best||itemOlder(item,bestt)) {
  1035. best = qd;
  1036. bestt = item;
  1037. }
  1038. }
  1039. }
  1040. }
  1041. return best;
  1042. }
  1043. void setWaiting(unsigned numqueues,sQueueData **queues, bool set)
  1044. {
  1045. for (unsigned i=0; i<numqueues; i++) {
  1046. IPropertyTree *croot = queryClientRootSession(*queues[i]);
  1047. croot->setPropInt64("@waiting",croot->getPropInt64("@waiting",0)+(set?1:-1));
  1048. }
  1049. }
  1050. // 'simple' queuing
  1051. IJobQueueItem *dodequeue(int minprio,unsigned timeout=INFINITE, bool useprev=false, bool *timedout=NULL)
  1052. {
  1053. bool hasminprio=(minprio!=INT_MIN);
  1054. if (timedout)
  1055. *timedout = false;
  1056. IJobQueueItem *ret=NULL;
  1057. bool waitingset = false;
  1058. while (!dequeuestop) {
  1059. unsigned t = 0;
  1060. if (timeout!=(unsigned)INFINITE)
  1061. t = msTick();
  1062. {
  1063. Cconnlockblock block(this,true);
  1064. block.setRollback(true); // assume not going to update
  1065. // now cycle through queues looking at state
  1066. unsigned total = 0;
  1067. unsigned stopped = 0;
  1068. PointerArray active;
  1069. ForEachQueue(qd) {
  1070. total++;
  1071. const char *state = qd->root->queryProp("@state");
  1072. if (state) {
  1073. if (strcmp(state,"stopped")==0)
  1074. stopped++;
  1075. else if (strcmp(state,"paused")!=0)
  1076. active.append(qd);
  1077. }
  1078. else
  1079. active.append(qd);
  1080. }
  1081. if (stopped==total)
  1082. return NULL; // all stopped
  1083. sQueueData **activeqds = (sQueueData **)active.getArray();
  1084. unsigned activenum = active.ordinality();
  1085. if (activenum) {
  1086. sQueueData *bestqd = findbestqueue(useprev,minprio,activenum,activeqds);
  1087. unsigned count = bestqd?bestqd->root->getPropInt("@count"):0;
  1088. // load minp from cache
  1089. if (count) {
  1090. int mpr = useprev?std::max(bestqd->root->getPropInt("@prevpriority"),minprio):minprio;
  1091. if (!hasminprio||checkprio(*bestqd,mpr)) {
  1092. block.setRollback(false);
  1093. ret = dotake(*bestqd,NULL,true,hasminprio,mpr);
  1094. if (ret) // think it must be!
  1095. timeout = 0; // so mark that done
  1096. else if (!hasminprio) {
  1097. WARNLOG("Resetting queue %s",bestqd->qname.get());
  1098. clear(*bestqd); // reset queue as seems to have become out of sync
  1099. }
  1100. }
  1101. }
  1102. if (timeout!=0) { // more to do
  1103. if (!connected) { // if connect already done non-zero
  1104. connect(validateitemsessions);
  1105. block.setRollback(false);
  1106. }
  1107. if (!waitingset) {
  1108. setWaiting(activenum,activeqds,true);
  1109. block.commit();
  1110. waitingset = true;
  1111. }
  1112. }
  1113. }
  1114. if (timeout==0) {
  1115. if (waitingset) {
  1116. setWaiting(activenum,activeqds,false);
  1117. block.commit();
  1118. }
  1119. if (timedout)
  1120. *timedout = (ret==NULL);
  1121. break;
  1122. }
  1123. }
  1124. unsigned to = 5*60*1000;
  1125. // check every 5 mins independant of notify (in case subscription lost for some reason)
  1126. if (to>timeout)
  1127. to = timeout;
  1128. notifysem.wait(to);
  1129. if (timeout!=(unsigned)INFINITE) {
  1130. t = msTick()-t;
  1131. if (t<timeout)
  1132. timeout -= t;
  1133. else
  1134. timeout = 0;
  1135. }
  1136. }
  1137. return ret;
  1138. }
  1139. IJobQueueItem *dequeue(unsigned timeout=INFINITE)
  1140. {
  1141. return dodequeue(INT_MIN,timeout);
  1142. }
  1143. IJobQueueItem *prioDequeue(int minprio,unsigned timeout=INFINITE) // minprio == MAX_INT - used cache priority
  1144. {
  1145. return dodequeue(minprio,timeout);
  1146. }
  1147. void placeonqueue(sQueueData &qd, IJobQueueItem *qitem,unsigned idx) // takes ownership of qitem
  1148. {
  1149. Owned<IJobQueueItem> qi = qitem;
  1150. remove(qi->queryWUID()); // just in case trying to put on twice!
  1151. int priority = qi->getPriority();
  1152. unsigned count = qd.root->getPropInt("@count");
  1153. StringBuffer path;
  1154. if (count&&(idx!=(unsigned)-1)) { // need to check before and after
  1155. if (idx) {
  1156. IPropertyTree *pt = qd.root->queryPropTree(getItemPath(path.clear(),idx-1).str());
  1157. if (pt) {
  1158. int pp = pt->getPropInt("@priority");
  1159. if (priority>pp) {
  1160. qi->setPriority(pp);
  1161. priority = pp;
  1162. }
  1163. }
  1164. else // what happened here?
  1165. idx = (unsigned)-1;
  1166. }
  1167. if (idx<count) {
  1168. IPropertyTree *pt = qd.root->queryPropTree(getItemPath(path.clear(),idx).str());
  1169. if (pt) {
  1170. int pp = pt->getPropInt("@priority");
  1171. if (priority<pp) {
  1172. qi->setPriority(pp);
  1173. priority = pp;
  1174. }
  1175. }
  1176. else // what happened here?
  1177. idx = (unsigned)-1;
  1178. }
  1179. }
  1180. if (idx==(unsigned)-1) {
  1181. idx = count;
  1182. while (idx) {
  1183. IPropertyTree *previtem = qd.root->queryPropTree(getItemPath(path.clear(),idx-1).str());
  1184. if (previtem) {
  1185. if (previtem->getPropInt("@priority")>=priority) {
  1186. break;
  1187. }
  1188. }
  1189. else
  1190. count--; // how did that happen?
  1191. idx--;
  1192. }
  1193. }
  1194. CJobQueueItem::assignBranch(addItem(qd,createPTree("Item"),idx,count),qi);
  1195. qd.root->setPropInt("@count",count+1);
  1196. }
  1197. void enqueue(sQueueData &qd,IJobQueueItem *qitem) // takes ownership of qitem
  1198. {
  1199. Cconnlockblock block(this,true);
  1200. placeonqueue(qd,qitem,(unsigned)-1);
  1201. }
  1202. void enqueueBefore(sQueueData &qd,IJobQueueItem *qitem,const char *wuid)
  1203. {
  1204. Cconnlockblock block(this,true);
  1205. placeonqueue(qd,qitem,doFindRank(qd,wuid));
  1206. }
  1207. void enqueueAfter(sQueueData &qd,IJobQueueItem *qitem,const char *wuid)
  1208. {
  1209. Cconnlockblock block(this,true);
  1210. unsigned idx = doFindRank(qd,wuid);
  1211. if (idx!=(unsigned)-1)
  1212. idx++;
  1213. placeonqueue(qd,qitem,idx);
  1214. }
  1215. void enqueueTail(sQueueData &qd,IJobQueueItem *qitem)
  1216. {
  1217. Cconnlockblock block(this,true);
  1218. Owned<IJobQueueItem> qi = getTail(qd);
  1219. if (qi)
  1220. enqueueAfter(qd,qitem,qi->queryWUID());
  1221. else
  1222. enqueue(qd,qitem);
  1223. }
  1224. void enqueueHead(sQueueData &qd,IJobQueueItem *qitem)
  1225. {
  1226. Cconnlockblock block(this,true);
  1227. Owned<IJobQueueItem> qi = doGetItem(qd, 0);
  1228. if (qi)
  1229. enqueueBefore(qd,qitem,qi->queryWUID());
  1230. else
  1231. enqueue(qd,qitem);
  1232. }
  1233. unsigned ordinality(sQueueData &qd)
  1234. {
  1235. Cconnlockblock block(this,false);
  1236. return qd.root->getPropInt("@count");
  1237. }
  1238. IJobQueueItem *getTail(sQueueData &qd)
  1239. {
  1240. return doGetItem(qd,(unsigned)-1);
  1241. }
  1242. IJobQueueItem *loadItem(sQueueData &qd,IJobQueueItem *qi)
  1243. {
  1244. Cconnlockblock block(this,false);
  1245. StringBuffer path;
  1246. IPropertyTree *item = qd.root->queryPropTree(getItemPath(path,qi->queryWUID()).str());
  1247. if (!item)
  1248. return NULL;
  1249. bool cached = item->getPropInt("@num",0)<=0;
  1250. if (cached)
  1251. return NULL; // don't want cached value
  1252. return new CJobQueueItem(item);
  1253. }
  1254. bool checkprio(sQueueData &qd,int minprio=0)
  1255. {
  1256. StringBuffer path;
  1257. IPropertyTree *item = qd.root->queryPropTree(getItemPath(path,0U).str());
  1258. if (!item)
  1259. return false;
  1260. return (item->getPropInt("@priority")>=minprio);
  1261. }
  1262. IJobQueueItem *dotake(sQueueData &qd,const char *wuid,bool saveitem,bool hasminprio=false,int minprio=0)
  1263. {
  1264. StringBuffer path;
  1265. IPropertyTree *item = qd.root->queryPropTree(getItemPath(path,wuid).str());
  1266. if (!item)
  1267. return NULL;
  1268. if (item->getPropInt("@num",0)<=0)
  1269. return NULL; // don't want (old) cached value
  1270. if (hasminprio&&(item->getPropInt("@priority")<minprio))
  1271. return NULL;
  1272. IJobQueueItem *ret = new CJobQueueItem(item);
  1273. removeItem(qd,item,saveitem);
  1274. unsigned count = qd.root->getPropInt("@count");
  1275. assertex(count);
  1276. qd.root->setPropInt("@count",count-1);
  1277. return ret;
  1278. }
  1279. IJobQueueItem *take(sQueueData &qd,const char *wuid)
  1280. {
  1281. Cconnlockblock block(this,true);
  1282. return dotake(qd,wuid,false);
  1283. }
  1284. unsigned takeItems(sQueueData &qd,CJobQueueContents &dest)
  1285. {
  1286. Cconnlockblock block(this,true);
  1287. unsigned ret = copyItemsImpl(qd,dest);
  1288. clear(qd);
  1289. return ret;
  1290. }
  1291. void enqueueItems(sQueueData &qd,CJobQueueContents &items)
  1292. {
  1293. unsigned n=items.ordinality();
  1294. if (n) {
  1295. Cconnlockblock block(this,true);
  1296. for (unsigned i=0;i<n;i++)
  1297. enqueue(qd,items.item(i).clone());
  1298. }
  1299. }
  1300. void enqueueBefore(IJobQueueItem *qitem,const char *wuid)
  1301. {
  1302. Cconnlockblock block(this,true);
  1303. sQueueData *qd = qdata->next?findQD(wuid):qdata;
  1304. enqueueBefore(*qd,qitem,wuid);
  1305. }
  1306. void enqueueAfter(IJobQueueItem *qitem,const char *wuid)
  1307. {
  1308. Cconnlockblock block(this,true);
  1309. sQueueData *qd = qdata->next?findQD(wuid):qdata;
  1310. enqueueAfter(*qd,qitem,wuid);
  1311. }
  1312. bool moveBefore(const char *wuid,const char *nextwuid)
  1313. {
  1314. if (!qdata)
  1315. return false;
  1316. Cconnlockblock block(this,true);
  1317. sQueueData *qd = qdata->next?findQD(wuid):qdata;
  1318. if (!qd)
  1319. return false;
  1320. IJobQueueItem *qi=take(*qd,wuid);
  1321. if (!qi)
  1322. return false;
  1323. sQueueData *qdd = NULL;
  1324. if (qdata->next)
  1325. qdd = findQD(nextwuid);
  1326. if (!qdd)
  1327. qdd = qd;
  1328. enqueueBefore(*qdd,qi,nextwuid);
  1329. return true;
  1330. }
  1331. bool moveAfter(const char *wuid,const char *prevwuid)
  1332. {
  1333. if (!qdata)
  1334. return false;
  1335. Cconnlockblock block(this,true);
  1336. sQueueData *qd = qdata->next?findQD(wuid):qdata;
  1337. if (!qd)
  1338. return false;
  1339. IJobQueueItem *qi=take(*qd,wuid);
  1340. if (!qi)
  1341. return false;
  1342. sQueueData *qdd = NULL;
  1343. if (qdata->next)
  1344. qdd = findQD(prevwuid);
  1345. if (!qdd)
  1346. qdd = qd;
  1347. enqueueAfter(*qdd,qi,prevwuid);
  1348. return true;
  1349. }
  1350. bool moveToHead(const char *wuid)
  1351. {
  1352. if (!qdata)
  1353. return false;
  1354. Cconnlockblock block(this,true);
  1355. sQueueData *qd = qdata->next?findQD(wuid):qdata;
  1356. if (!qd)
  1357. return false;
  1358. IJobQueueItem *qi=take(*qd,wuid);
  1359. if (!qi)
  1360. return false;
  1361. enqueueHead(*qd,qi);
  1362. return true;
  1363. }
  1364. bool moveToTail(const char *wuid)
  1365. {
  1366. if (!qdata)
  1367. return false;
  1368. Cconnlockblock block(this,true);
  1369. sQueueData *qd = qdata->next?findQD(wuid):qdata;
  1370. if (!qd)
  1371. return false;
  1372. IJobQueueItem *qi=take(*qd,wuid);
  1373. if (!qi)
  1374. return false;
  1375. enqueueTail(*qd,qi);
  1376. return true;
  1377. }
  1378. bool remove(const char *wuid)
  1379. {
  1380. if (!qdata)
  1381. return false;
  1382. Cconnlockblock block(this,true);
  1383. sQueueData *qd = qdata->next?findQD(wuid):qdata;
  1384. if (!qd)
  1385. return false;
  1386. StringBuffer path;
  1387. IPropertyTree *item = qd->root->queryPropTree(getItemPath(path,wuid).str());
  1388. if (!item)
  1389. return false;
  1390. bool cached = item->getPropInt("@num",0)<=0; // old cached (bwd compat)
  1391. removeItem(*qd,item,false);
  1392. if (!cached) {
  1393. unsigned count = qd->root->getPropInt("@count");
  1394. assertex(count);
  1395. qd->root->setPropInt("@count",count-1);
  1396. }
  1397. return true;
  1398. }
  1399. bool changePriority(const char *wuid,int value)
  1400. {
  1401. if (!qdata)
  1402. return false;
  1403. Cconnlockblock block(this,true);
  1404. sQueueData *qd = qdata->next?findQD(wuid):qdata;
  1405. if (!qd)
  1406. return false;
  1407. IJobQueueItem *qi=take(*qd,wuid);
  1408. if (!qi) {
  1409. StringBuffer ws("~"); // change cached item
  1410. ws.append(wuid);
  1411. StringBuffer path;
  1412. IPropertyTree *item = qd->root->queryPropTree(getItemPath(path,ws.str()).str());
  1413. if (item) {
  1414. item->setPropInt("@priority",value);
  1415. return true;
  1416. }
  1417. return false;
  1418. }
  1419. qi->setPriority(value);
  1420. enqueue(*qd,qi);
  1421. return true;
  1422. }
  1423. void clear(sQueueData &qd)
  1424. {
  1425. Cconnlockblock block(this,true);
  1426. qd.root->setPropInt("@count",0);
  1427. for (;;) {
  1428. IPropertyTree *item = qd.root->queryPropTree("Item[1]");
  1429. if (!item)
  1430. break;
  1431. qd.root->removeTree(item);
  1432. }
  1433. }
  1434. void lock()
  1435. {
  1436. connlock(false); // sub functions will change to exclusive if needed
  1437. }
  1438. void unlock(bool rollback=false)
  1439. {
  1440. connunlock(rollback);
  1441. }
  1442. void pause(sQueueData &qd)
  1443. {
  1444. Cconnlockblock block(this,true);
  1445. qd.root->setProp("@state","paused");
  1446. }
  1447. void resume(sQueueData &qd)
  1448. {
  1449. Cconnlockblock block(this,true);
  1450. qd.root->setProp("@state","active");
  1451. }
  1452. bool paused(sQueueData &qd)
  1453. {
  1454. Cconnlockblock block(this,false);
  1455. const char *state = qd.root->queryProp("@state");
  1456. return (state&&(strcmp(state,"paused")==0));
  1457. }
  1458. void stop(sQueueData &qd)
  1459. {
  1460. Cconnlockblock block(this,true);
  1461. qd.root->setProp("@state","stopped");
  1462. }
  1463. bool stopped(sQueueData &qd)
  1464. {
  1465. Cconnlockblock block(this,false);
  1466. const char *state = qd.root->queryProp("@state");
  1467. return (state&&(strcmp(state,"stopped")==0));
  1468. }
  1469. void doGetStats(sQueueData &qd,unsigned &connected,unsigned &waiting,unsigned &enqueued)
  1470. {
  1471. Cconnlockblock block(this,false);
  1472. connected = 0;
  1473. waiting = 0;
  1474. unsigned i=0;
  1475. for (;;) {
  1476. IPropertyTree *croot = queryClientRootIndex(qd,i);
  1477. if (!croot)
  1478. break;
  1479. if (validateitemsessions && !validSession(croot)) {
  1480. Cconnlockblock block(this,true);
  1481. qd.root->removeTree(croot);
  1482. }
  1483. else {
  1484. waiting += croot->getPropInt("@waiting");
  1485. connected += croot->getPropInt("@connected");
  1486. i++;
  1487. }
  1488. }
  1489. // now remove any duff queue items
  1490. unsigned count = qd.root->getPropInt("@count");
  1491. if (!validateitemsessions) {
  1492. enqueued = count;
  1493. return;
  1494. }
  1495. i=0;
  1496. StringBuffer path;
  1497. for (;;) {
  1498. IPropertyTree *item = qd.root->queryPropTree(getItemPath(path.clear(),i).str());
  1499. if (!item)
  1500. break;
  1501. if (!validSession(item)) {
  1502. Cconnlockblock block(this,true);
  1503. item = qd.root->queryPropTree(path.str());
  1504. if (!item)
  1505. break;
  1506. // PROGLOG("WUJOBQ: Removing %s as session %" I64F "x not active",item->queryProp("@wuid"),item->getPropInt64("@session"));
  1507. removeItem(qd,item,false);
  1508. }
  1509. else
  1510. i++;
  1511. }
  1512. if (count!=i) {
  1513. Cconnlockblock block(this,true);
  1514. qd.root->setPropInt("@count",i);
  1515. }
  1516. enqueued = i;
  1517. }
  1518. void getStats(sQueueData &qd,unsigned &connected,unsigned &waiting,unsigned &enqueued)
  1519. {
  1520. Cconnlockblock block(this,false);
  1521. doGetStats(qd,connected,waiting,enqueued);
  1522. }
  1523. void getStats(unsigned &connected,unsigned &waiting,unsigned &enqueued)
  1524. {
  1525. // multi queue
  1526. Cconnlockblock block(this,false);
  1527. connected=0;
  1528. waiting=0;
  1529. enqueued=0;
  1530. ForEachQueue(qd) {
  1531. unsigned c;
  1532. unsigned w;
  1533. unsigned e;
  1534. doGetStats(*qd,c,w,e);
  1535. connected+=c;
  1536. waiting+=w;
  1537. enqueued+=e;
  1538. }
  1539. }
  1540. IJobQueueItem *take(const char *wuid)
  1541. {
  1542. assertex(qdata);
  1543. if (!qdata->next)
  1544. return take(*qdata,wuid);
  1545. Cconnlockblock block(this,true);
  1546. ForEachQueue(qd) {
  1547. IJobQueueItem *ret = dotake(*qd,wuid,false);
  1548. if (ret)
  1549. return ret;
  1550. }
  1551. return NULL;
  1552. }
  1553. unsigned takeItems(CJobQueueContents &dest)
  1554. {
  1555. assertex(qdata);
  1556. if (!qdata->next)
  1557. return takeItems(*qdata,dest);
  1558. Cconnlockblock block(this,true);
  1559. unsigned ret = 0;
  1560. ForEachQueue(qd) {
  1561. ret += copyItemsImpl(*qd,dest);
  1562. clear(*qd);
  1563. }
  1564. return ret;
  1565. }
  1566. void enqueueItems(CJobQueueContents &items)
  1567. { // enqueues to firs sub-queue (not sure that useful)
  1568. assertex(qdata);
  1569. return enqueueItems(*qdata,items);
  1570. }
  1571. void clear()
  1572. {
  1573. ForEachQueue(qd) {
  1574. clear(*qd);
  1575. }
  1576. }
  1577. bool validSession(IPropertyTree *item)
  1578. {
  1579. Owned<INode> node = createINode(item->queryProp("@node"),DALI_SERVER_PORT); // port should always be present
  1580. return (querySessionManager().lookupProcessSession(node)==(SessionId)item->getPropInt64("@session"));
  1581. }
  1582. IConversation *initiateConversation(sQueueData &qd,IJobQueueItem *item)
  1583. {
  1584. CriticalBlock block(crit);
  1585. assertex(!initiateconv.get());
  1586. SocketEndpoint ep = item->queryEndpoint();
  1587. unsigned short port = (unsigned short)item->getPort();
  1588. initiateconv.setown(createSingletonSocketConnection(port));
  1589. if (!port)
  1590. item->setPort(initiateconv->setRandomPort(WUJOBQ_BASE_PORT,WUJOBQ_PORT_NUM));
  1591. initiatewu.set(item->queryWUID());
  1592. enqueue(qd,item);
  1593. bool ok;
  1594. {
  1595. CriticalUnblock unblock(crit);
  1596. ok = initiateconv->accept(INFINITE);
  1597. }
  1598. if (!ok)
  1599. initiateconv.clear();
  1600. return initiateconv.getClear();
  1601. }
  1602. IConversation *acceptConversation(IJobQueueItem *&retitem, unsigned prioritytransitiondelay,IDynamicPriority *maxp)
  1603. {
  1604. CriticalBlock block(crit);
  1605. retitem = NULL;
  1606. assertex(connected); // must be connected
  1607. int curmp = maxp?maxp->get():0;
  1608. int nextmp = curmp;
  1609. for (;;) {
  1610. bool timedout = false;
  1611. Owned<IJobQueueItem> item;
  1612. {
  1613. CriticalUnblock unblock(crit);
  1614. // this is a bit complicated with multi-thor
  1615. if (prioritytransitiondelay||maxp) {
  1616. item.setown(dodequeue((std::max(curmp,nextmp)/10)*10, // round down to multiple of 10
  1617. prioritytransitiondelay?prioritytransitiondelay:60000,prioritytransitiondelay>0,&timedout));
  1618. // if dynamic priority check every minute
  1619. if (!prioritytransitiondelay) {
  1620. curmp = nextmp; // using max above is a bit devious to allow transition
  1621. nextmp = maxp->get();
  1622. }
  1623. }
  1624. else
  1625. item.setown(dequeue(INFINITE));
  1626. }
  1627. if (item.get()) {
  1628. if (item->isValidSession()) {
  1629. SocketEndpoint ep = item->queryEndpoint();
  1630. ep.port = item->getPort();
  1631. Owned<IConversation> acceptconv = createSingletonSocketConnection(ep.port,&ep);
  1632. if (acceptconv->connect(3*60*1000)) { // shouldn't need that long
  1633. retitem = item.getClear();
  1634. return acceptconv.getClear();
  1635. }
  1636. }
  1637. }
  1638. else if (prioritytransitiondelay)
  1639. prioritytransitiondelay = 0;
  1640. else if (!timedout)
  1641. break;
  1642. }
  1643. return NULL;
  1644. }
  1645. void cancelInitiateConversation(sQueueData &qd)
  1646. {
  1647. CriticalBlock block(crit);
  1648. if (initiatewu.get())
  1649. remove(initiatewu);
  1650. if (initiateconv.get())
  1651. initiateconv->cancel();
  1652. }
  1653. void cancelAcceptConversation()
  1654. {
  1655. CriticalBlock block(crit);
  1656. dequeuestop = true;
  1657. notifysem.signal();
  1658. }
  1659. bool cancelInitiateConversation(sQueueData &qd,const char *wuid)
  1660. {
  1661. Cconnlockblock block(this,true);
  1662. for (;;) {
  1663. Owned<IJobQueueItem> item = dotake(qd,wuid,false);
  1664. if (!item.get())
  1665. break;
  1666. if (item->isValidSession()) {
  1667. SocketEndpoint ep = item->queryEndpoint();
  1668. ep.port = item->getPort();
  1669. Owned<IConversation> acceptconv = createSingletonSocketConnection(ep.port,&ep);
  1670. acceptconv->connect(3*60*1000); // connect then close should close other end
  1671. return true;
  1672. }
  1673. }
  1674. return false;
  1675. }
  1676. bool waitStatsChange(unsigned timeout)
  1677. {
  1678. assertex(!connected); // not allowed to call this while connected
  1679. cancelwaiting = false;
  1680. while(!cancelwaiting) {
  1681. {
  1682. Cconnlockblock block(this,false);
  1683. if (haschanged())
  1684. return true;
  1685. }
  1686. if (!notifysem.wait(timeout))
  1687. break;
  1688. }
  1689. return false;
  1690. }
  1691. void cancelWaitStatsChange()
  1692. {
  1693. CriticalBlock block(crit);
  1694. cancelwaiting = true;
  1695. notifysem.signal();
  1696. }
  1697. virtual void enqueue(IJobQueueItem *qitem)
  1698. {
  1699. enqueue(*activeq,qitem);
  1700. }
  1701. void enqueueHead(IJobQueueItem *qitem)
  1702. {
  1703. enqueueHead(*activeq,qitem);
  1704. }
  1705. void enqueueTail(IJobQueueItem *qitem)
  1706. {
  1707. enqueueTail(*activeq,qitem);
  1708. }
  1709. void pause()
  1710. {
  1711. Cconnlockblock block(this,true);
  1712. ForEachQueue(qd) {
  1713. if (qd->root)
  1714. qd->root->setProp("@state","paused");
  1715. }
  1716. }
  1717. void pause(const char* info)
  1718. {
  1719. Cconnlockblock block(this,true);
  1720. ForEachQueue(qd) {
  1721. if (qd->root) {
  1722. qd->root->setProp("@state","paused");
  1723. if (info && *info)
  1724. qd->root->setProp("@stateDetails",info);
  1725. }
  1726. }
  1727. }
  1728. void stop()
  1729. {
  1730. Cconnlockblock block(this,true);
  1731. ForEachQueue(qd) {
  1732. if (qd->root)
  1733. qd->root->setProp("@state","stopped");
  1734. }
  1735. }
  1736. void stop(const char* info)
  1737. {
  1738. Cconnlockblock block(this,true);
  1739. ForEachQueue(qd) {
  1740. if (qd->root) {
  1741. qd->root->setProp("@state","stopped");
  1742. if (info && *info)
  1743. qd->root->setProp("@stateDetails",info);
  1744. }
  1745. }
  1746. }
  1747. void resume()
  1748. {
  1749. Cconnlockblock block(this,true);
  1750. ForEachQueue(qd) {
  1751. if (qd->root)
  1752. qd->root->setProp("@state","active");
  1753. }
  1754. }
  1755. void resume(const char* info)
  1756. {
  1757. Cconnlockblock block(this,true);
  1758. ForEachQueue(qd) {
  1759. if (qd->root) {
  1760. qd->root->setProp("@state","active");
  1761. if (info && *info)
  1762. qd->root->setProp("@stateDetails",info);
  1763. }
  1764. }
  1765. }
  1766. IConversation *initiateConversation(IJobQueueItem *item)
  1767. {
  1768. return initiateConversation(*activeq,item);
  1769. }
  1770. void cancelInitiateConversation()
  1771. {
  1772. return cancelInitiateConversation(*activeq);
  1773. }
  1774. bool cancelInitiateConversation(const char *wuid)
  1775. {
  1776. return cancelInitiateConversation(*activeq,wuid);
  1777. }
  1778. const char * queryActiveQueueName()
  1779. {
  1780. return activeq->qname;
  1781. }
  1782. void setActiveQueue(const char *name)
  1783. {
  1784. ForEachQueue(qd) {
  1785. if (!name||(strcmp(qd->qname.get(),name)==0)) {
  1786. activeq = qd;
  1787. return;
  1788. }
  1789. }
  1790. if (name)
  1791. throw MakeStringException (-1,"queue %s not found",name);
  1792. }
  1793. const char *nextQueueName(const char *last)
  1794. {
  1795. ForEachQueue(qd) {
  1796. if (!last||(strcmp(qd->qname.get(),last)==0)) {
  1797. if (qd->next)
  1798. return qd->next->qname.get();
  1799. break;
  1800. }
  1801. }
  1802. return NULL;
  1803. }
  1804. virtual bool paused()
  1805. {
  1806. Cconnlockblock block(this,false);
  1807. return CJobQueueBase::paused();
  1808. }
  1809. virtual bool paused(StringBuffer& info)
  1810. {
  1811. Cconnlockblock block(this,false);
  1812. return CJobQueueBase::paused(info);
  1813. }
  1814. virtual bool stopped()
  1815. {
  1816. Cconnlockblock block(this,false);
  1817. return CJobQueueBase::stopped();
  1818. }
  1819. virtual bool stopped(StringBuffer& info)
  1820. {
  1821. Cconnlockblock block(this,false);
  1822. return CJobQueueBase::stopped(info);
  1823. }
  1824. virtual unsigned ordinality()
  1825. {
  1826. Cconnlockblock block(this,false);
  1827. return CJobQueueBase::ordinality();
  1828. }
  1829. virtual unsigned waiting()
  1830. {
  1831. Cconnlockblock block(this,false);
  1832. return CJobQueueBase::waiting();
  1833. }
  1834. virtual IJobQueueItem *getItem(unsigned idx)
  1835. {
  1836. Cconnlockblock block(this,false);
  1837. return CJobQueueBase::getItem(idx);
  1838. }
  1839. virtual IJobQueueItem *getHead()
  1840. {
  1841. Cconnlockblock block(this,false);
  1842. return CJobQueueBase::getHead();
  1843. }
  1844. virtual IJobQueueItem *getTail()
  1845. {
  1846. Cconnlockblock block(this,false);
  1847. return CJobQueueBase::getTail();
  1848. }
  1849. virtual IJobQueueItem *find(const char *wuid)
  1850. {
  1851. Cconnlockblock block(this,false);
  1852. return CJobQueueBase::find(wuid);
  1853. }
  1854. virtual unsigned findRank(const char *wuid)
  1855. {
  1856. Cconnlockblock block(this,false);
  1857. return CJobQueueBase::findRank(wuid);
  1858. }
  1859. virtual unsigned copyItems(CJobQueueContents &dest)
  1860. {
  1861. Cconnlockblock block(this,false);
  1862. return CJobQueueBase::copyItems(dest);
  1863. }
  1864. virtual bool getLastDequeuedInfo(StringAttr &wuid, CDateTime &enqueuedt, int &priority)
  1865. {
  1866. Cconnlockblock block(this,false);
  1867. return CJobQueueBase::doGetLastDequeuedInfo(activeq, wuid, enqueuedt, priority);
  1868. }
  1869. virtual void copyItemsAndState(CJobQueueContents& contents, StringBuffer& state, StringBuffer& stateDetails)
  1870. {
  1871. Cconnlockblock block(this,false);
  1872. CJobQueueBase::copyItemsAndState(contents, state, stateDetails);
  1873. }
  1874. virtual void getState(StringBuffer& state, StringBuffer& stateDetails)
  1875. {
  1876. Cconnlockblock block(this,false);
  1877. CJobQueueBase::getState(state, stateDetails);
  1878. }
  1879. };
  1880. class CJQSnapshot : public CInterface, implements IJQSnapshot
  1881. {
  1882. Owned<IPropertyTree> jobQueueInfo;
  1883. public:
  1884. IMPLEMENT_IINTERFACE;
  1885. CJQSnapshot()
  1886. {
  1887. Owned<IRemoteConnection> connJobQueues = querySDS().connect("/JobQueues", myProcessSession(), RTM_LOCK_READ, 30000);
  1888. if (!connJobQueues)
  1889. throw MakeStringException(-1, "CJQSnapshot::CJQSnapshot: /JobQueues not found");
  1890. jobQueueInfo.setown(createPTreeFromIPT(connJobQueues->queryRoot()));
  1891. }
  1892. IJobQueueConst* getJobQueue(const char *name)
  1893. {
  1894. if (!jobQueueInfo)
  1895. return NULL;
  1896. return new CJobQueueConst(name, jobQueueInfo.getLink());
  1897. }
  1898. };
  1899. IJQSnapshot *createJQSnapshot()
  1900. {
  1901. return new CJQSnapshot();
  1902. }
  1903. IJobQueue *createJobQueue(const char *name)
  1904. {
  1905. if (!name||!*name)
  1906. throw MakeStringException(-1,"createJobQueue empty name");
  1907. return new CJobQueue(name);
  1908. }
  1909. extern bool WORKUNIT_API runWorkUnit(const char *wuid, const char *cluster)
  1910. {
  1911. Owned<IConstWUClusterInfo> clusterInfo = getTargetClusterInfo(cluster);
  1912. if (!clusterInfo.get())
  1913. return false;
  1914. SCMStringBuffer agentQueue;
  1915. clusterInfo->getAgentQueue(agentQueue);
  1916. if (!agentQueue.length())
  1917. return false;
  1918. Owned<IJobQueue> queue = createJobQueue(agentQueue.str());
  1919. if (!queue.get())
  1920. throw MakeStringException(-1, "Could not create workunit queue");
  1921. IJobQueueItem *item = createJobQueueItem(wuid);
  1922. queue->enqueue(item);
  1923. PROGLOG("Agent request '%s' enqueued on '%s'", wuid, agentQueue.str());
  1924. return true;
  1925. }
  1926. extern bool WORKUNIT_API runWorkUnit(const char *wuid)
  1927. {
  1928. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  1929. Owned<IConstWorkUnit> w = factory->openWorkUnit(wuid);
  1930. if (w)
  1931. {
  1932. StringAttr clusterName = (w->queryClusterName());
  1933. w.clear();
  1934. return runWorkUnit(wuid, clusterName.str());
  1935. }
  1936. else
  1937. return false;
  1938. }
  1939. extern WORKUNIT_API StringBuffer &getQueuesContainingWorkUnit(const char *wuid, StringBuffer &queueList)
  1940. {
  1941. Owned<IRemoteConnection> conn = querySDS().connect("/JobQueues", myProcessSession(), RTM_LOCK_READ, 5000);
  1942. if (!conn)
  1943. return queueList;
  1944. VStringBuffer xpath("Queue[Item/@wuid='%s']", wuid);
  1945. Owned<IPropertyTreeIterator> it = conn->getElements(xpath.str());
  1946. ForEach(*it)
  1947. {
  1948. if (queueList.length())
  1949. queueList.append(',');
  1950. queueList.append(it->query().queryProp("@name"));
  1951. }
  1952. return queueList;
  1953. }
  1954. extern void WORKUNIT_API removeWorkUnitFromAllQueues(const char *wuid)
  1955. {
  1956. StringBuffer queueList;
  1957. if (!getQueuesContainingWorkUnit(wuid, queueList).length())
  1958. return;
  1959. Owned<IJobQueue> q = createJobQueue(queueList.str());
  1960. if (q)
  1961. while(q->remove(wuid));
  1962. }
  1963. extern bool WORKUNIT_API switchWorkUnitQueue(IWorkUnit* wu, const char *cluster)
  1964. {
  1965. if (!wu)
  1966. return false;
  1967. class cQswitcher: public CInterface, implements IQueueSwitcher
  1968. {
  1969. public:
  1970. IMPLEMENT_IINTERFACE;
  1971. void * getQ(const char * qname, const char * wuid)
  1972. {
  1973. Owned<IJobQueue> q = createJobQueue(qname);
  1974. return q->take(wuid);
  1975. }
  1976. void putQ(const char * qname, const char * wuid, void * qitem)
  1977. {
  1978. Owned<IJobQueue> q = createJobQueue(qname);
  1979. q->enqueue((IJobQueueItem *)qitem);
  1980. }
  1981. bool isAuto()
  1982. {
  1983. return false;
  1984. }
  1985. } switcher;
  1986. return wu->switchThorQueue(cluster, &switcher);
  1987. }