wujobq.cpp 63 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include <algorithm>
  15. #include "limits.h"
  16. #include "jlib.hpp"
  17. #include "jbuff.hpp"
  18. #include "dasess.hpp"
  19. #include "dautils.hpp"
  20. #include "portlist.h"
  21. #include "dacoven.hpp"
  22. #include "daclient.hpp"
  23. #include "dasds.hpp"
  24. #include "dasess.hpp"
  25. #include "daqueue.hpp"
  26. #include "workunit.hpp"
  27. #include "wujobq.hpp"
  28. #ifndef _CONTAINERIZED
  29. #include "environment.hpp"
  30. #endif
  31. #ifdef _MSC_VER
  32. #pragma warning (disable : 4355)
  33. #endif
  34. #if 0
  35. JobQueues
  36. JobQueue @name= @count= @state=active|paused|stopped
  37. Edition <num>
  38. Client @session= @connected= @waiting= -- connections and waiting can be > 1 (multiple threads)
  39. Item* @wuid @owner @node @port @priority @session
  40. #endif
  41. class CJobQueueItem: implements IJobQueueItem, public CInterface
  42. {
  43. int priority;
  44. StringAttr wu;
  45. StringAttr owner;
  46. SessionId sessid;
  47. SocketEndpoint ep;
  48. unsigned port;
  49. CDateTime enqueuedt;
  50. public:
  51. IMPLEMENT_IINTERFACE;
  52. CJobQueueItem(MemoryBuffer &src)
  53. {
  54. deserialize(src);
  55. }
  56. CJobQueueItem(const char *_wu)
  57. : wu(_wu)
  58. {
  59. priority = 0;
  60. ep = queryMyNode()->endpoint();
  61. port = 0;
  62. sessid = myProcessSession();
  63. }
  64. CJobQueueItem(IPropertyTree *item)
  65. {
  66. const char * wuid = item->queryProp("@wuid");
  67. if (*wuid=='~')
  68. wuid++;
  69. wu.set(wuid);
  70. owner.set(item->queryProp("@owner"));
  71. sessid = (SessionId)item->getPropInt64("@session");
  72. priority = item->getPropInt("@priority");
  73. ep.set(item->queryProp("@node"));
  74. port = (unsigned)item->getPropInt("@port");
  75. StringBuffer dts;
  76. if (item->getProp("@enqueuedt",dts))
  77. enqueuedt.setString(dts.str());
  78. }
  79. static void assignBranch(IPropertyTree *item,IJobQueueItem *qi)
  80. {
  81. item->setPropInt64("@session",qi->getSessionId());
  82. item->setPropInt("@priority",qi->getPriority());
  83. item->setPropInt("@port",qi->getPort());
  84. item->setProp("@wuid",qi->queryWUID());
  85. item->setProp("@owner",qi->queryOwner());
  86. StringBuffer eps;
  87. qi->queryEndpoint().getUrlStr(eps);
  88. item->setProp("@node",eps.str());
  89. StringBuffer dts;
  90. qi->queryEnqueuedTime().getString(dts);
  91. if (dts.length()==0) {
  92. CDateTime dt;
  93. dt.setNow();
  94. dt.getString(dts);
  95. qi->setEnqueuedTime(dt);
  96. }
  97. item->setProp("@enqueuedt",dts.str());
  98. }
  99. const char *queryWUID()
  100. {
  101. return wu.get();
  102. }
  103. int getPriority()
  104. {
  105. return priority;
  106. }
  107. unsigned getPort()
  108. {
  109. return port;
  110. }
  111. SessionId getSessionId()
  112. {
  113. return sessid;
  114. }
  115. SocketEndpoint &queryEndpoint()
  116. {
  117. return ep;
  118. }
  119. const char *queryOwner()
  120. {
  121. return owner.get();
  122. }
  123. bool equals(IJobQueueItem *other)
  124. {
  125. // work unit is primary key
  126. return strcmp(wu.get(),other->queryWUID())==0;
  127. }
  128. CDateTime &queryEnqueuedTime()
  129. {
  130. return enqueuedt;
  131. }
  132. void setEnqueuedTime(const CDateTime &dt)
  133. {
  134. enqueuedt.set(dt);
  135. }
  136. void serialize(MemoryBuffer &tgt)
  137. {
  138. tgt.append(priority).append(port).append(wu).append(sessid);
  139. ep.serialize(tgt);
  140. StringBuffer dts;
  141. enqueuedt.getString(dts);
  142. tgt.append(owner).append(dts);
  143. }
  144. void deserialize(MemoryBuffer &src)
  145. {
  146. src.read(priority).read(port).read(wu).read(sessid);
  147. ep.deserialize(src);
  148. StringBuffer dts;
  149. src.read(owner).read(dts);
  150. enqueuedt.setString(dts.str());
  151. }
  152. IJobQueueItem* clone()
  153. {
  154. IJobQueueItem* ret = new CJobQueueItem(wu);
  155. ret->setPriority(priority);
  156. ret->setPriority(port);
  157. ret->setEndpoint(ep);
  158. ret->setSessionId(sessid);
  159. return ret;
  160. }
  161. void setPriority(int _priority)
  162. {
  163. priority = _priority;
  164. }
  165. void setPort(unsigned _port)
  166. {
  167. port = _port;
  168. }
  169. void setEndpoint(const SocketEndpoint &_ep)
  170. {
  171. ep = _ep;
  172. }
  173. void setSessionId(SessionId _id)
  174. {
  175. if (_id)
  176. sessid = _id;
  177. else
  178. sessid = myProcessSession();
  179. }
  180. void setOwner(const char *_owner)
  181. {
  182. owner.set(_owner);
  183. }
  184. bool isValidSession()
  185. {
  186. Owned<INode> node = createINode(ep);
  187. return (querySessionManager().lookupProcessSession(node)==sessid);
  188. }
  189. };
  190. class CJobQueueIterator: implements IJobQueueIterator, public CInterface
  191. {
  192. public:
  193. CJobQueueContents &items;
  194. unsigned idx;
  195. IMPLEMENT_IINTERFACE;
  196. CJobQueueIterator(CJobQueueContents &_items)
  197. : items(_items)
  198. {
  199. idx = 0;
  200. }
  201. bool isValid()
  202. {
  203. return idx<items.ordinality();
  204. }
  205. bool first()
  206. {
  207. idx = 0;
  208. return isValid();
  209. }
  210. bool next()
  211. {
  212. idx++;
  213. return isValid();
  214. }
  215. IJobQueueItem & query()
  216. {
  217. return items.item(idx);
  218. }
  219. };
  220. IJobQueueIterator *CJobQueueContents::getIterator()
  221. {
  222. return new CJobQueueIterator(*this);
  223. }
  224. IJobQueueItem *createJobQueueItem(const char *wuid)
  225. {
  226. if (!wuid||!*wuid)
  227. throw MakeStringException(-1,"createJobQueueItem empty WUID");
  228. return new CJobQueueItem(wuid);;
  229. }
  230. IJobQueueItem *deserializeJobQueueItem(MemoryBuffer &mb)
  231. {
  232. return new CJobQueueItem(mb);
  233. }
  234. #define ForEachQueue(qd) for (sQueueData *qd = qdata; qd!=NULL; qd=qd->next)
  235. #define ForEachQueueIn(parent,qd) for (sQueueData *qd = parent.qdata; qd!=NULL; qd=qd->next)
  236. struct sQueueData
  237. {
  238. sQueueData *next;
  239. IRemoteConnection *conn;
  240. StringAttr qname;
  241. IPropertyTree *root;
  242. SubscriptionId subscriberid;
  243. unsigned lastWaitEdition;
  244. };
  245. class CJobQueueBase: implements IJobQueueConst, public CInterface
  246. {
  247. class cOrderedIterator
  248. {
  249. CJobQueueBase &parent;
  250. unsigned numqueues;
  251. unsigned *queueidx;
  252. sQueueData **queues;
  253. IPropertyTree **queuet;
  254. MemoryAttr ma;
  255. unsigned current;
  256. public:
  257. cOrderedIterator(CJobQueueBase&_parent) : parent(_parent)
  258. {
  259. numqueues=0;
  260. ForEachQueueIn(parent,qd1)
  261. if (qd1->root)
  262. numqueues++;
  263. queueidx = (unsigned *)ma.allocate(numqueues*(sizeof(unsigned)+sizeof(sQueueData *)+sizeof(IPropertyTree *)));
  264. queues = (sQueueData **)(queueidx+numqueues);
  265. queuet = (IPropertyTree **)(queues+numqueues);
  266. unsigned i = 0;
  267. ForEachQueueIn(parent,qd2)
  268. {
  269. if (qd2->root)
  270. queues[i++] = qd2;
  271. }
  272. current = (unsigned)-1;
  273. }
  274. bool first()
  275. {
  276. StringBuffer path;
  277. parent.getItemPath(path,0U);
  278. current = (unsigned)-1;
  279. for (unsigned i = 0; i<numqueues;i++)
  280. {
  281. queueidx[i] = 0;
  282. queuet[i] = queues[i]->root->queryPropTree(path.str());
  283. if (queuet[i])
  284. if ((current==(unsigned)-1)||parent.itemOlder(queuet[i],queuet[current]))
  285. current = i;
  286. }
  287. return current!=(unsigned)-1;
  288. }
  289. bool next()
  290. {
  291. if (current==(unsigned)-1)
  292. return false;
  293. queueidx[current]++;
  294. StringBuffer path;
  295. parent.getItemPath(path,queueidx[current]);
  296. queuet[current] = queues[current]->root->queryPropTree(path.str());
  297. current = (unsigned)-1;
  298. for (unsigned i = 0; i<numqueues;i++)
  299. {
  300. if (queuet[i])
  301. if ((current==(unsigned)-1)||parent.itemOlder(queuet[i],queuet[current]))
  302. current = i;
  303. }
  304. return current!=(unsigned)-1;
  305. }
  306. bool isValid()
  307. {
  308. return current!=(unsigned)-1;
  309. }
  310. void item(sQueueData *&qd, IPropertyTree *&t,unsigned &idx)
  311. {
  312. assertex(current!=(unsigned)-1);
  313. qd = queues[current];
  314. t = queuet[current];
  315. idx = queueidx[current];
  316. }
  317. sQueueData &queryQueue()
  318. {
  319. assertex(current!=(unsigned)-1);
  320. return *queues[current];
  321. }
  322. IPropertyTree &queryTree()
  323. {
  324. assertex(current!=(unsigned)-1);
  325. return *queuet[current];
  326. }
  327. };
  328. protected:
  329. bool doGetLastDequeuedInfo(sQueueData *qd, StringAttr &wuid, CDateTime &enqueuedt, int &priority)
  330. {
  331. priority = 0;
  332. if (!qd)
  333. return false;
  334. const char *w = qd->root->queryProp("@prevwuid");
  335. if (!w||!*w)
  336. return false;
  337. wuid.set(w);
  338. StringBuffer dts;
  339. if (qd->root->getProp("@prevenqueuedt",dts))
  340. enqueuedt.setString(dts.str());
  341. priority = qd->root->getPropInt("@prevpriority");
  342. return true;
  343. }
  344. public:
  345. sQueueData *qdata;
  346. Semaphore notifysem;
  347. CriticalSection crit;
  348. IMPLEMENT_IINTERFACE;
  349. CJobQueueBase(const char *_qname)
  350. {
  351. StringArray qlist;
  352. qlist.appendListUniq(_qname, ",");
  353. sQueueData *last = NULL;
  354. ForEachItemIn(i,qlist)
  355. {
  356. sQueueData *qd = new sQueueData;
  357. qd->next = NULL;
  358. qd->qname.set(qlist.item(i));
  359. qd->conn = NULL;
  360. qd->root = NULL;
  361. qd->lastWaitEdition = 0;
  362. qd->subscriberid = 0;
  363. if (last)
  364. last->next = qd;
  365. else
  366. qdata = qd;
  367. last = qd;
  368. }
  369. };
  370. virtual ~CJobQueueBase()
  371. {
  372. while (qdata)
  373. {
  374. sQueueData * next = qdata->next;
  375. delete qdata;
  376. qdata = next;
  377. }
  378. }
  379. StringBuffer &getItemPath(StringBuffer &path,const char *wuid)
  380. {
  381. if (!wuid||!*wuid)
  382. return getItemPath(path,0U);
  383. return path.appendf("Item[@wuid=\"%s\"]",wuid);
  384. }
  385. StringBuffer &getItemPath(StringBuffer &path,unsigned idx)
  386. {
  387. path.appendf("Item[@num=\"%d\"]",idx+1);
  388. return path;
  389. }
  390. IPropertyTree *queryClientRootIndex(sQueueData &qd, unsigned idx)
  391. {
  392. VStringBuffer path("Client[%d]", idx+1);
  393. return qd.root->queryPropTree(path);
  394. }
  395. bool itemOlder(IPropertyTree *qt1, IPropertyTree *qt2)
  396. {
  397. // if this ever becomes time critical thne could cache enqueued values
  398. StringBuffer d1s;
  399. if (qt1)
  400. qt1->getProp("@enqueuedt",d1s);
  401. StringBuffer d2s;
  402. if (qt2)
  403. qt2->getProp("@enqueuedt",d2s);
  404. return (strcmp(d1s.str(),d2s.str())<0);
  405. }
  406. IJobQueueItem *doGetItem(sQueueData &qd,unsigned idx)
  407. {
  408. if (idx==(unsigned)-1)
  409. {
  410. idx = qd.root->getPropInt("@count");
  411. if (!idx)
  412. return NULL;
  413. idx--;
  414. }
  415. StringBuffer path;
  416. IPropertyTree *item = qd.root->queryPropTree(getItemPath(path,idx).str());
  417. if (!item)
  418. return NULL;
  419. return new CJobQueueItem(item);
  420. }
  421. IJobQueueItem *getItem(sQueueData &qd,unsigned idx)
  422. {
  423. return doGetItem(qd, idx);
  424. }
  425. IJobQueueItem *getHead(sQueueData &qd)
  426. {
  427. return getItem(qd,0);
  428. }
  429. unsigned doFindRank(sQueueData &qd,const char *wuid)
  430. {
  431. StringBuffer path;
  432. IPropertyTree *item = qd.root->queryPropTree(getItemPath(path,wuid).str());
  433. if (!item)
  434. return (unsigned)-1;
  435. return item->getPropInt("@num")-1;
  436. }
  437. unsigned findRank(sQueueData &qd,const char *wuid)
  438. {
  439. return doFindRank(qd,wuid);
  440. }
  441. IJobQueueItem *find(sQueueData &qd,const char *wuid)
  442. {
  443. StringBuffer path;
  444. IPropertyTree *item = qd.root->queryPropTree(getItemPath(path,wuid).str());
  445. if (!item)
  446. return NULL;
  447. bool cached = item->getPropInt("@num",0)<=0;
  448. if (wuid&&cached)
  449. return NULL; // don't want cached value unless explicit
  450. return new CJobQueueItem(item);
  451. }
  452. unsigned copyItemsImpl(sQueueData &qd,CJobQueueContents &dest)
  453. {
  454. unsigned ret=0;
  455. StringBuffer path;
  456. for (unsigned i=0;;i++)
  457. {
  458. IPropertyTree *item = qd.root->queryPropTree(getItemPath(path.clear(),i).str());
  459. if (!item)
  460. break;
  461. ret++;
  462. dest.append(*new CJobQueueItem(item));
  463. }
  464. return ret;
  465. }
  466. virtual void copyItemsAndState(CJobQueueContents& contents, StringBuffer& state, StringBuffer& stateDetails)
  467. {
  468. assertex(qdata);
  469. assertex(qdata->root);
  470. copyItemsImpl(*qdata,contents);
  471. const char *st = qdata->root->queryProp("@state");
  472. if (st&&*st)
  473. state.set(st);
  474. if (st && (strieq(st, "paused") || strieq(st, "stopped")))
  475. {
  476. const char *stDetails = qdata->root->queryProp("@stateDetails");
  477. if (stDetails&&*stDetails)
  478. stateDetails.set(stDetails);
  479. }
  480. }
  481. sQueueData *findQD(const char *wuid)
  482. {
  483. if (wuid&&*wuid)
  484. {
  485. ForEachQueue(qd)
  486. {
  487. unsigned idx = doFindRank(*qd,wuid);
  488. if (idx!=(unsigned)-1)
  489. return qd;
  490. }
  491. }
  492. return NULL;
  493. }
  494. virtual unsigned waiting()
  495. {
  496. unsigned ret = 0;
  497. ForEachQueue(qd)
  498. {
  499. for (unsigned i=0;;i++)
  500. {
  501. IPropertyTree *croot = queryClientRootIndex(*qd,i);
  502. if (!croot)
  503. break;
  504. ret += croot->getPropInt("@waiting");
  505. }
  506. }
  507. return ret;
  508. }
  509. virtual unsigned findRank(const char *wuid)
  510. {
  511. assertex(qdata);
  512. if (!qdata->next)
  513. return findRank(*qdata,wuid);
  514. cOrderedIterator it(*this);
  515. unsigned i = 0;
  516. ForEach(it)
  517. {
  518. const char *twuid = it.queryTree().queryProp("@wuid");
  519. if (twuid&&(strcmp(twuid,wuid)==0))
  520. return i;
  521. i++;
  522. }
  523. return (unsigned)-1;
  524. }
  525. virtual unsigned copyItems(CJobQueueContents &dest)
  526. {
  527. assertex(qdata);
  528. if (!qdata->next)
  529. return copyItemsImpl(*qdata,dest);
  530. cOrderedIterator it(*this);
  531. unsigned ret = 0;
  532. ForEach(it)
  533. {
  534. dest.append(*new CJobQueueItem(&it.queryTree()));
  535. ret++;
  536. }
  537. return ret;
  538. }
  539. virtual IJobQueueItem *getItem(unsigned idx)
  540. {
  541. if (!qdata)
  542. return NULL;
  543. if (!qdata->next)
  544. return getItem(*qdata,idx);
  545. cOrderedIterator it(*this);
  546. unsigned i = 0;
  547. IPropertyTree *ret = NULL;
  548. ForEach(it)
  549. {
  550. if (i==idx)
  551. {
  552. ret = &it.queryTree();
  553. break;
  554. }
  555. else if (idx==(unsigned)-1) // -1 means return last
  556. ret = &it.queryTree();
  557. i++;
  558. }
  559. if (ret)
  560. return new CJobQueueItem(ret);
  561. return NULL;
  562. }
  563. virtual IJobQueueItem *getHead()
  564. {
  565. if (!qdata)
  566. return NULL;
  567. if (!qdata->next)
  568. return getHead(*qdata);
  569. return getItem(0);
  570. }
  571. virtual IJobQueueItem *getTail()
  572. {
  573. if (!qdata)
  574. return NULL;
  575. if (!qdata->next)
  576. return getHead(*qdata);
  577. return getItem((unsigned)-1);
  578. }
  579. virtual IJobQueueItem *find(const char *wuid)
  580. {
  581. if (!qdata)
  582. return NULL;
  583. sQueueData *qd = qdata->next?findQD(wuid):qdata;
  584. if (!qd)
  585. return NULL;
  586. return find(*qd,wuid);
  587. }
  588. virtual bool paused()
  589. {
  590. // true if all paused
  591. ForEachQueue(qd)
  592. {
  593. if (qd->root)
  594. {
  595. const char *state = qd->root->queryProp("@state");
  596. if (state&&(strcmp(state,"paused")!=0))
  597. return false;
  598. }
  599. }
  600. return true;
  601. }
  602. virtual bool paused(StringBuffer& info)
  603. {
  604. // true if all paused
  605. ForEachQueue(qd)
  606. {
  607. if (qd->root)
  608. {
  609. const char *state = qd->root->queryProp("@state");
  610. if (state&&(strcmp(state,"paused")!=0))
  611. return false;
  612. if (state&&!info.length())
  613. {
  614. const char *stateDetails = qd->root->queryProp("@stateDetails");
  615. if (stateDetails && *stateDetails)
  616. info.set(stateDetails);
  617. }
  618. }
  619. }
  620. return true;
  621. }
  622. virtual bool stopped()
  623. {
  624. // true if all stopped
  625. ForEachQueue(qd)
  626. {
  627. if (qd->root)
  628. {
  629. const char *state = qd->root->queryProp("@state");
  630. if (state&&(strcmp(state,"stopped")!=0))
  631. return false;
  632. }
  633. }
  634. return true;
  635. }
  636. virtual bool stopped(StringBuffer& info)
  637. {
  638. // true if all stopped
  639. ForEachQueue(qd)
  640. {
  641. if (qd->root)
  642. {
  643. const char *state = qd->root->queryProp("@state");
  644. if (state&&(strcmp(state,"stopped")!=0))
  645. return false;
  646. if (state&&!info.length())
  647. {
  648. const char *stateDetails = qd->root->queryProp("@stateDetails");
  649. if (stateDetails && *stateDetails)
  650. info.set(stateDetails);
  651. }
  652. }
  653. }
  654. return true;
  655. }
  656. virtual unsigned ordinality()
  657. {
  658. unsigned ret = 0;
  659. ForEachQueue(qd)
  660. {
  661. if (qd->root)
  662. ret += qd->root->getPropInt("@count");
  663. }
  664. return ret;
  665. }
  666. virtual bool getLastDequeuedInfo(StringAttr &wuid, CDateTime &enqueuedt, int &priority)
  667. {
  668. return doGetLastDequeuedInfo(qdata, wuid, enqueuedt, priority);
  669. }
  670. //Similar to copyItemsAndState(), this method returns the state information for one queue.
  671. virtual void getState(StringBuffer& state, StringBuffer& stateDetails)
  672. {
  673. if (!qdata->root)
  674. return;
  675. const char *st = qdata->root->queryProp("@state");
  676. if (!st || !*st)
  677. return;
  678. state.set(st);
  679. if ((strieq(st, "paused") || strieq(st, "stopped")))
  680. stateDetails.set(qdata->root->queryProp("@stateDetails"));
  681. }
  682. };
  683. class CJobQueueConst: public CJobQueueBase
  684. {
  685. Owned<IPropertyTree> jobQueueSnapshot;
  686. public:
  687. CJobQueueConst(const char *_qname, IPropertyTree* _jobQueueSnapshot) : CJobQueueBase(_qname)
  688. {
  689. if (!_jobQueueSnapshot)
  690. throw MakeStringException(-1, "No job queue snapshot");
  691. jobQueueSnapshot.setown(_jobQueueSnapshot);
  692. ForEachQueue(qd)
  693. {
  694. VStringBuffer path("Queue[@name=\"%s\"]", qd->qname.get());
  695. qd->root = jobQueueSnapshot->queryPropTree(path.str());
  696. if (!qd->root)
  697. throw MakeStringException(-1, "No job queue found for %s", qd->qname.get());
  698. }
  699. };
  700. };
  701. class CJobQueue: public CJobQueueBase, implements IJobQueue
  702. {
  703. public:
  704. sQueueData *activeq;
  705. SessionId sessionid;
  706. unsigned locknest;
  707. bool writemode;
  708. bool connected;
  709. Owned<IConversation> initiateconv;
  710. StringAttr initiatewu;
  711. bool dequeuestop;
  712. bool cancelwaiting;
  713. bool validateitemsessions;
  714. class csubs: implements ISDSSubscription, public CInterface
  715. {
  716. CJobQueue *parent;
  717. public:
  718. IMPLEMENT_IINTERFACE;
  719. csubs(CJobQueue *_parent)
  720. {
  721. parent = _parent;
  722. }
  723. void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
  724. {
  725. CriticalBlock block(parent->crit);
  726. parent->notifysem.signal();
  727. }
  728. } subs;
  729. IMPLEMENT_IINTERFACE;
  730. CJobQueue(const char *_qname) : CJobQueueBase(_qname), subs(this)
  731. {
  732. activeq = qdata;
  733. sessionid = myProcessSession();
  734. validateitemsessions = false;
  735. writemode = false;
  736. locknest = 0;
  737. connected = false;
  738. dequeuestop = false;
  739. cancelwaiting = false;
  740. Cconnlockblock block(this,false); // this just checks queue exists
  741. }
  742. virtual ~CJobQueue()
  743. {
  744. try {
  745. while (locknest)
  746. connunlock(true); // auto rollback
  747. if (connected)
  748. disconnect();
  749. }
  750. catch (IException *e) {
  751. // server error
  752. EXCLOG(e, "~CJobQueue");
  753. e->Release();
  754. }
  755. try { // must attempt to remove subscription before object destroyed.
  756. dounsubscribe();
  757. }
  758. catch (IException *e) {
  759. EXCLOG(e, "~CJobQueue calling dounsubscribe");
  760. e->Release();
  761. }
  762. }
  763. void connlock(bool exclusive)
  764. { // must be in sect
  765. if (locknest++==0) {
  766. unsigned wait = qdata&&qdata->next?5000:INFINITE;
  767. ForEachQueue(qd) {
  768. for (;;) {
  769. StringBuffer path;
  770. path.appendf("/JobQueues/Queue[@name=\"%s\"]",qd->qname.get());
  771. bool timeout;
  772. for (;;) {
  773. timeout=false;
  774. try {
  775. qd->conn = querySDS().connect(path.str(),myProcessSession(),exclusive?RTM_LOCK_WRITE:RTM_LOCK_READ,wait);
  776. if (qd->conn)
  777. break;
  778. }
  779. catch (ISDSException *e) {
  780. if (SDSExcpt_LockTimeout != e->errorCode())
  781. throw;
  782. e->Release();
  783. timeout = true;
  784. }
  785. // create queue
  786. Owned<IRemoteConnection> pconn;
  787. try {
  788. pconn.setown(querySDS().connect("/JobQueues",myProcessSession(),RTM_LOCK_WRITE|RTM_CREATE_QUERY,wait));
  789. if (!pconn)
  790. throw MakeStringException(-1,"CJobQueue could not create JobQueues");
  791. IPropertyTree *proot = pconn->queryRoot();
  792. StringBuffer cpath;
  793. cpath.appendf("Queue[@name=\"%s\"]",qd->qname.get());
  794. if (!proot->hasProp(cpath.str())) {
  795. IPropertyTree *pt = proot->addPropTree("Queue");
  796. pt->setProp("@name",qd->qname.get());
  797. pt->setProp("@state","active");
  798. pt->setPropInt("@count", 0);
  799. pt->setPropInt("Edition", 1);
  800. }
  801. }
  802. catch (ISDSException *e) {
  803. if (SDSExcpt_LockTimeout != e->errorCode())
  804. throw;
  805. e->Release();
  806. timeout = true;
  807. }
  808. }
  809. if (!timeout)
  810. break;
  811. sQueueData *qd2 = qdata;
  812. do {
  813. ::Release(qd2->conn);
  814. qd2->conn = NULL;
  815. qd2->root = NULL;
  816. }
  817. while (qd2!=qd);
  818. PROGLOG("Job Queue contention - delaying before retrying");
  819. Sleep(getRandom()%5000); // dining philosopher delay
  820. wait = getRandom()%4000+3000; // try and prevent sync
  821. qd = qdata;
  822. }
  823. qd->root = qd->conn->queryRoot();
  824. }
  825. writemode = exclusive;
  826. }
  827. else {
  828. if (exclusive&&!writemode) {
  829. ForEachQueue(qd) {
  830. assertex(qd->conn);
  831. writemode = exclusive;
  832. bool lockreleased;
  833. safeChangeModeWrite(qd->conn,qd->qname.get(),lockreleased);
  834. qd->root = qd->conn->queryRoot();
  835. }
  836. }
  837. }
  838. }
  839. void connunlock(bool rollback=false)
  840. { // should be in sect
  841. if (--locknest==0) {
  842. ForEachQueue(qd) {
  843. if (qd->conn) { // can occur if connection to dali threw exception
  844. if (writemode) {
  845. if (rollback)
  846. qd->conn->rollback();
  847. else {
  848. qd->root->setPropInt("Edition",qd->root->getPropInt("Edition")+1);
  849. qd->conn->commit();
  850. }
  851. }
  852. qd->conn->Release();
  853. qd->conn = NULL;
  854. }
  855. qd->root = NULL;
  856. }
  857. writemode = false;
  858. }
  859. }
  860. void conncommit() // doesn't set edition
  861. { // called within sect
  862. if (writemode) {
  863. ForEachQueue(qd) {
  864. if (qd->conn)
  865. qd->conn->commit();
  866. }
  867. }
  868. }
  869. class Cconnlockblock: public CriticalBlock
  870. {
  871. CJobQueue *parent;
  872. bool rollback;
  873. public:
  874. Cconnlockblock(CJobQueue *_parent,bool exclusive)
  875. : CriticalBlock(_parent->crit)
  876. {
  877. parent = _parent;
  878. parent->connlock(exclusive);
  879. rollback = false;
  880. }
  881. ~Cconnlockblock()
  882. {
  883. parent->connunlock(rollback);
  884. }
  885. void setRollback(bool set=true)
  886. {
  887. rollback = set;
  888. }
  889. void commit()
  890. {
  891. parent->conncommit();
  892. }
  893. };
  894. void removeItem(sQueueData &qd,IPropertyTree *item, bool cache)
  895. { // does not adjust or use @count
  896. unsigned n = item->getPropInt("@num");
  897. if (!n)
  898. return;
  899. if (cache) {
  900. StringBuffer s;
  901. item->getProp("@wuid",s.clear());
  902. qd.root->setProp("@prevwuid",s.str());
  903. item->getProp("@enqueuedt",s.clear());
  904. qd.root->setProp("@prevenqueuedt",s.str());
  905. qd.root->setPropInt("@prevpriority",item->getPropInt("@priority"));
  906. }
  907. item->setPropInt("@num",-1);
  908. StringBuffer path;
  909. for (;;) {
  910. IPropertyTree *item2 = qd.root->queryPropTree(getItemPath(path.clear(),n).str());
  911. if (!item2)
  912. break;
  913. item2->setPropInt("@num",n);
  914. n++;
  915. }
  916. qd.root->removeTree(item);
  917. }
  918. IPropertyTree *addItem(sQueueData &qd,IPropertyTree *item,unsigned idx,unsigned count)
  919. {
  920. // does not set any values other than num
  921. StringBuffer path;
  922. // first move following up
  923. unsigned n=count;
  924. while (n>idx) {
  925. n--;
  926. qd.root->queryPropTree(getItemPath(path.clear(),n).str())->setPropInt("@num",n+2);
  927. }
  928. item->setPropInt("@num",idx+1);
  929. return qd.root->addPropTree("Item",item);
  930. }
  931. void dosubscribe()
  932. { // called in crit section
  933. ForEachQueue(qd) {
  934. if (qd->subscriberid) {
  935. querySDS().unsubscribe(qd->subscriberid);
  936. qd->subscriberid = 0;
  937. }
  938. StringBuffer path;
  939. path.appendf("/JobQueues/Queue[@name=\"%s\"]/Edition",qd->qname.get());
  940. qd->subscriberid = querySDS().subscribe(path.str(), subs, false);
  941. }
  942. }
  943. bool haschanged() // returns if any changed
  944. {
  945. bool changed = false;
  946. ForEachQueue(qd) {
  947. if (!qd->subscriberid) {
  948. StringBuffer path;
  949. path.appendf("/JobQueues/Queue[@name=\"%s\"]/Edition",qd->qname.get());
  950. qd->subscriberid = querySDS().subscribe(path.str(), subs, false);
  951. }
  952. unsigned e = (unsigned)qd->root->getPropInt("Edition", 1);
  953. if (e!=qd->lastWaitEdition) {
  954. qd->lastWaitEdition = e;
  955. changed = true;
  956. break;
  957. }
  958. }
  959. return changed;
  960. }
  961. void dounsubscribe()
  962. {
  963. // called in crit section
  964. ForEachQueue(qd) {
  965. if (qd->subscriberid) {
  966. querySDS().unsubscribe(qd->subscriberid);
  967. qd->subscriberid = 0;
  968. }
  969. }
  970. }
  971. IPropertyTree *queryClientRootSession(sQueueData &qd)
  972. {
  973. VStringBuffer path("Client[@session=\"%" I64F "d\"]", sessionid);
  974. IPropertyTree *ret = qd.root->queryPropTree(path.str());
  975. if (!ret)
  976. {
  977. ret = qd.root->addPropTree("Client");
  978. ret->setPropInt64("@session",sessionid);
  979. StringBuffer eps;
  980. ret->setProp("@node",queryMyNode()->endpoint().getUrlStr(eps).str());
  981. }
  982. return ret;
  983. }
  984. void connect(bool _validateitemsessions)
  985. {
  986. Cconnlockblock block(this,true);
  987. validateitemsessions = _validateitemsessions;
  988. if (connected)
  989. disconnect();
  990. dosubscribe();
  991. ForEachQueue(qd)
  992. {
  993. if (validateitemsessions)
  994. {
  995. unsigned connected;
  996. unsigned waiting;
  997. unsigned count;
  998. getStats(*qd,connected,waiting,count); // clear any duff clients
  999. }
  1000. IPropertyTree *croot = queryClientRootSession(*qd);
  1001. croot->setPropInt64("@connected",croot->getPropInt64("@connected",0)+1);
  1002. }
  1003. connected = true;
  1004. }
  1005. void disconnect() // signal no longer wil be dequeing (optional - done automatically on release)
  1006. {
  1007. Cconnlockblock block(this,true);
  1008. if (connected) {
  1009. dounsubscribe();
  1010. ForEachQueue(qd) {
  1011. IPropertyTree *croot = queryClientRootSession(*qd);
  1012. croot->setPropInt64("@connected",croot->getPropInt64("@connected",0)-1);
  1013. }
  1014. connected = false;
  1015. }
  1016. }
  1017. sQueueData *findbestqueue(bool useprev,int minprio,unsigned numqueues,sQueueData **queues)
  1018. {
  1019. if (numqueues==0)
  1020. return NULL;
  1021. if (numqueues==1)
  1022. return *queues;
  1023. sQueueData *best = NULL;
  1024. IPropertyTree *bestt = NULL;
  1025. for (unsigned i=0;i<numqueues;i++) {
  1026. sQueueData *qd = queues[i];
  1027. unsigned count = qd->root->getPropInt("@count");
  1028. if (count) {
  1029. int mpr = useprev?std::max(qd->root->getPropInt("@prevpriority"),minprio):minprio;
  1030. if (count&&((minprio==INT_MIN)||checkprio(*qd,mpr))) {
  1031. StringBuffer path;
  1032. IPropertyTree *item = qd->root->queryPropTree(getItemPath(path,0U).str());
  1033. if (!item)
  1034. continue;
  1035. if (item->getPropInt("@num",0)<=0)
  1036. continue;
  1037. CDateTime dt;
  1038. StringBuffer enqueued;
  1039. if (!best||itemOlder(item,bestt)) {
  1040. best = qd;
  1041. bestt = item;
  1042. }
  1043. }
  1044. }
  1045. }
  1046. return best;
  1047. }
  1048. void setWaiting(unsigned numqueues,sQueueData **queues, bool set)
  1049. {
  1050. for (unsigned i=0; i<numqueues; i++) {
  1051. IPropertyTree *croot = queryClientRootSession(*queues[i]);
  1052. croot->setPropInt64("@waiting",croot->getPropInt64("@waiting",0)+(set?1:-1));
  1053. }
  1054. }
  1055. // 'simple' queuing
  1056. IJobQueueItem *dodequeue(int minprio,unsigned timeout=INFINITE, bool useprev=false, bool *timedout=NULL)
  1057. {
  1058. bool hasminprio=(minprio!=INT_MIN);
  1059. if (timedout)
  1060. *timedout = false;
  1061. IJobQueueItem *ret=NULL;
  1062. bool waitingset = false;
  1063. while (!dequeuestop) {
  1064. unsigned t = 0;
  1065. if (timeout!=(unsigned)INFINITE)
  1066. t = msTick();
  1067. {
  1068. Cconnlockblock block(this,true);
  1069. block.setRollback(true); // assume not going to update
  1070. // now cycle through queues looking at state
  1071. unsigned total = 0;
  1072. unsigned stopped = 0;
  1073. PointerArray active;
  1074. ForEachQueue(qd) {
  1075. total++;
  1076. const char *state = qd->root->queryProp("@state");
  1077. if (state) {
  1078. if (strcmp(state,"stopped")==0)
  1079. stopped++;
  1080. else if (strcmp(state,"paused")!=0)
  1081. active.append(qd);
  1082. }
  1083. else
  1084. active.append(qd);
  1085. }
  1086. if (stopped==total)
  1087. return NULL; // all stopped
  1088. sQueueData **activeqds = (sQueueData **)active.getArray();
  1089. unsigned activenum = active.ordinality();
  1090. if (activenum) {
  1091. sQueueData *bestqd = findbestqueue(useprev,minprio,activenum,activeqds);
  1092. unsigned count = bestqd?bestqd->root->getPropInt("@count"):0;
  1093. // load minp from cache
  1094. if (count) {
  1095. int mpr = useprev?std::max(bestqd->root->getPropInt("@prevpriority"),minprio):minprio;
  1096. if (!hasminprio||checkprio(*bestqd,mpr)) {
  1097. block.setRollback(false);
  1098. ret = dotake(*bestqd,NULL,true,hasminprio,mpr);
  1099. if (ret) // think it must be!
  1100. timeout = 0; // so mark that done
  1101. else if (!hasminprio) {
  1102. WARNLOG("Resetting queue %s",bestqd->qname.get());
  1103. clear(*bestqd); // reset queue as seems to have become out of sync
  1104. }
  1105. }
  1106. }
  1107. if (timeout!=0) { // more to do
  1108. if (!connected) { // if connect already done non-zero
  1109. connect(validateitemsessions);
  1110. block.setRollback(false);
  1111. }
  1112. if (!waitingset) {
  1113. setWaiting(activenum,activeqds,true);
  1114. block.commit();
  1115. waitingset = true;
  1116. }
  1117. }
  1118. }
  1119. if (timeout==0) {
  1120. if (waitingset) {
  1121. setWaiting(activenum,activeqds,false);
  1122. block.commit();
  1123. }
  1124. if (timedout)
  1125. *timedout = (ret==NULL);
  1126. break;
  1127. }
  1128. }
  1129. unsigned to = 5*60*1000;
  1130. // check every 5 mins independant of notify (in case subscription lost for some reason)
  1131. if (to>timeout)
  1132. to = timeout;
  1133. notifysem.wait(to);
  1134. if (timeout!=(unsigned)INFINITE) {
  1135. t = msTick()-t;
  1136. if (t<timeout)
  1137. timeout -= t;
  1138. else
  1139. timeout = 0;
  1140. }
  1141. }
  1142. return ret;
  1143. }
  1144. IJobQueueItem *dequeue(unsigned timeout=INFINITE)
  1145. {
  1146. return dodequeue(INT_MIN,timeout);
  1147. }
  1148. IJobQueueItem *prioDequeue(int minprio,unsigned timeout=INFINITE) // minprio == MAX_INT - used cache priority
  1149. {
  1150. return dodequeue(minprio,timeout);
  1151. }
  1152. void placeonqueue(sQueueData &qd, IJobQueueItem *qitem,unsigned idx) // takes ownership of qitem
  1153. {
  1154. Owned<IJobQueueItem> qi = qitem;
  1155. remove(qi->queryWUID()); // just in case trying to put on twice!
  1156. int priority = qi->getPriority();
  1157. unsigned count = qd.root->getPropInt("@count");
  1158. StringBuffer path;
  1159. if (count&&(idx!=(unsigned)-1)) { // need to check before and after
  1160. if (idx) {
  1161. IPropertyTree *pt = qd.root->queryPropTree(getItemPath(path.clear(),idx-1).str());
  1162. if (pt) {
  1163. int pp = pt->getPropInt("@priority");
  1164. if (priority>pp) {
  1165. qi->setPriority(pp);
  1166. priority = pp;
  1167. }
  1168. }
  1169. else // what happened here?
  1170. idx = (unsigned)-1;
  1171. }
  1172. if (idx<count) {
  1173. IPropertyTree *pt = qd.root->queryPropTree(getItemPath(path.clear(),idx).str());
  1174. if (pt) {
  1175. int pp = pt->getPropInt("@priority");
  1176. if (priority<pp) {
  1177. qi->setPriority(pp);
  1178. priority = pp;
  1179. }
  1180. }
  1181. else // what happened here?
  1182. idx = (unsigned)-1;
  1183. }
  1184. }
  1185. if (idx==(unsigned)-1) {
  1186. idx = count;
  1187. while (idx) {
  1188. IPropertyTree *previtem = qd.root->queryPropTree(getItemPath(path.clear(),idx-1).str());
  1189. if (previtem) {
  1190. if (previtem->getPropInt("@priority")>=priority) {
  1191. break;
  1192. }
  1193. }
  1194. else
  1195. count--; // how did that happen?
  1196. idx--;
  1197. }
  1198. }
  1199. CJobQueueItem::assignBranch(addItem(qd,createPTree("Item"),idx,count),qi);
  1200. qd.root->setPropInt("@count",count+1);
  1201. }
  1202. void enqueue(sQueueData &qd,IJobQueueItem *qitem) // takes ownership of qitem
  1203. {
  1204. Cconnlockblock block(this,true);
  1205. placeonqueue(qd,qitem,(unsigned)-1);
  1206. }
  1207. void enqueueBefore(sQueueData &qd,IJobQueueItem *qitem,const char *wuid)
  1208. {
  1209. Cconnlockblock block(this,true);
  1210. placeonqueue(qd,qitem,doFindRank(qd,wuid));
  1211. }
  1212. void enqueueAfter(sQueueData &qd,IJobQueueItem *qitem,const char *wuid)
  1213. {
  1214. Cconnlockblock block(this,true);
  1215. unsigned idx = doFindRank(qd,wuid);
  1216. if (idx!=(unsigned)-1)
  1217. idx++;
  1218. placeonqueue(qd,qitem,idx);
  1219. }
  1220. void enqueueTail(sQueueData &qd,IJobQueueItem *qitem)
  1221. {
  1222. Cconnlockblock block(this,true);
  1223. Owned<IJobQueueItem> qi = getTail(qd);
  1224. if (qi)
  1225. enqueueAfter(qd,qitem,qi->queryWUID());
  1226. else
  1227. enqueue(qd,qitem);
  1228. }
  1229. void enqueueHead(sQueueData &qd,IJobQueueItem *qitem)
  1230. {
  1231. Cconnlockblock block(this,true);
  1232. Owned<IJobQueueItem> qi = doGetItem(qd, 0);
  1233. if (qi)
  1234. enqueueBefore(qd,qitem,qi->queryWUID());
  1235. else
  1236. enqueue(qd,qitem);
  1237. }
  1238. unsigned ordinality(sQueueData &qd)
  1239. {
  1240. Cconnlockblock block(this,false);
  1241. return qd.root->getPropInt("@count");
  1242. }
  1243. IJobQueueItem *getTail(sQueueData &qd)
  1244. {
  1245. return doGetItem(qd,(unsigned)-1);
  1246. }
  1247. IJobQueueItem *loadItem(sQueueData &qd,IJobQueueItem *qi)
  1248. {
  1249. Cconnlockblock block(this,false);
  1250. StringBuffer path;
  1251. IPropertyTree *item = qd.root->queryPropTree(getItemPath(path,qi->queryWUID()).str());
  1252. if (!item)
  1253. return NULL;
  1254. bool cached = item->getPropInt("@num",0)<=0;
  1255. if (cached)
  1256. return NULL; // don't want cached value
  1257. return new CJobQueueItem(item);
  1258. }
  1259. bool checkprio(sQueueData &qd,int minprio=0)
  1260. {
  1261. StringBuffer path;
  1262. IPropertyTree *item = qd.root->queryPropTree(getItemPath(path,0U).str());
  1263. if (!item)
  1264. return false;
  1265. return (item->getPropInt("@priority")>=minprio);
  1266. }
  1267. IJobQueueItem *dotake(sQueueData &qd,const char *wuid,bool saveitem,bool hasminprio=false,int minprio=0)
  1268. {
  1269. StringBuffer path;
  1270. IPropertyTree *item = qd.root->queryPropTree(getItemPath(path,wuid).str());
  1271. if (!item)
  1272. return NULL;
  1273. if (item->getPropInt("@num",0)<=0)
  1274. return NULL; // don't want (old) cached value
  1275. if (hasminprio&&(item->getPropInt("@priority")<minprio))
  1276. return NULL;
  1277. IJobQueueItem *ret = new CJobQueueItem(item);
  1278. removeItem(qd,item,saveitem);
  1279. unsigned count = qd.root->getPropInt("@count");
  1280. assertex(count);
  1281. qd.root->setPropInt("@count",count-1);
  1282. return ret;
  1283. }
  1284. IJobQueueItem *take(sQueueData &qd,const char *wuid)
  1285. {
  1286. Cconnlockblock block(this,true);
  1287. return dotake(qd,wuid,false);
  1288. }
  1289. unsigned takeItems(sQueueData &qd,CJobQueueContents &dest)
  1290. {
  1291. Cconnlockblock block(this,true);
  1292. unsigned ret = copyItemsImpl(qd,dest);
  1293. clear(qd);
  1294. return ret;
  1295. }
  1296. void enqueueItems(sQueueData &qd,CJobQueueContents &items)
  1297. {
  1298. unsigned n=items.ordinality();
  1299. if (n) {
  1300. Cconnlockblock block(this,true);
  1301. for (unsigned i=0;i<n;i++)
  1302. enqueue(qd,items.item(i).clone());
  1303. }
  1304. }
  1305. void enqueueBefore(IJobQueueItem *qitem,const char *wuid)
  1306. {
  1307. Cconnlockblock block(this,true);
  1308. sQueueData *qd = qdata->next?findQD(wuid):qdata;
  1309. enqueueBefore(*qd,qitem,wuid);
  1310. }
  1311. void enqueueAfter(IJobQueueItem *qitem,const char *wuid)
  1312. {
  1313. Cconnlockblock block(this,true);
  1314. sQueueData *qd = qdata->next?findQD(wuid):qdata;
  1315. enqueueAfter(*qd,qitem,wuid);
  1316. }
  1317. bool moveBefore(const char *wuid,const char *nextwuid)
  1318. {
  1319. if (!qdata)
  1320. return false;
  1321. Cconnlockblock block(this,true);
  1322. sQueueData *qd = qdata->next?findQD(wuid):qdata;
  1323. if (!qd)
  1324. return false;
  1325. IJobQueueItem *qi=take(*qd,wuid);
  1326. if (!qi)
  1327. return false;
  1328. sQueueData *qdd = NULL;
  1329. if (qdata->next)
  1330. qdd = findQD(nextwuid);
  1331. if (!qdd)
  1332. qdd = qd;
  1333. enqueueBefore(*qdd,qi,nextwuid);
  1334. return true;
  1335. }
  1336. bool moveAfter(const char *wuid,const char *prevwuid)
  1337. {
  1338. if (!qdata)
  1339. return false;
  1340. Cconnlockblock block(this,true);
  1341. sQueueData *qd = qdata->next?findQD(wuid):qdata;
  1342. if (!qd)
  1343. return false;
  1344. IJobQueueItem *qi=take(*qd,wuid);
  1345. if (!qi)
  1346. return false;
  1347. sQueueData *qdd = NULL;
  1348. if (qdata->next)
  1349. qdd = findQD(prevwuid);
  1350. if (!qdd)
  1351. qdd = qd;
  1352. enqueueAfter(*qdd,qi,prevwuid);
  1353. return true;
  1354. }
  1355. bool moveToHead(const char *wuid)
  1356. {
  1357. if (!qdata)
  1358. return false;
  1359. Cconnlockblock block(this,true);
  1360. sQueueData *qd = qdata->next?findQD(wuid):qdata;
  1361. if (!qd)
  1362. return false;
  1363. IJobQueueItem *qi=take(*qd,wuid);
  1364. if (!qi)
  1365. return false;
  1366. enqueueHead(*qd,qi);
  1367. return true;
  1368. }
  1369. bool moveToTail(const char *wuid)
  1370. {
  1371. if (!qdata)
  1372. return false;
  1373. Cconnlockblock block(this,true);
  1374. sQueueData *qd = qdata->next?findQD(wuid):qdata;
  1375. if (!qd)
  1376. return false;
  1377. IJobQueueItem *qi=take(*qd,wuid);
  1378. if (!qi)
  1379. return false;
  1380. enqueueTail(*qd,qi);
  1381. return true;
  1382. }
  1383. bool remove(const char *wuid)
  1384. {
  1385. if (!qdata)
  1386. return false;
  1387. Cconnlockblock block(this,true);
  1388. sQueueData *qd = qdata->next?findQD(wuid):qdata;
  1389. if (!qd)
  1390. return false;
  1391. StringBuffer path;
  1392. IPropertyTree *item = qd->root->queryPropTree(getItemPath(path,wuid).str());
  1393. if (!item)
  1394. return false;
  1395. bool cached = item->getPropInt("@num",0)<=0; // old cached (bwd compat)
  1396. removeItem(*qd,item,false);
  1397. if (!cached) {
  1398. unsigned count = qd->root->getPropInt("@count");
  1399. assertex(count);
  1400. qd->root->setPropInt("@count",count-1);
  1401. }
  1402. return true;
  1403. }
  1404. bool changePriority(const char *wuid,int value)
  1405. {
  1406. if (!qdata)
  1407. return false;
  1408. Cconnlockblock block(this,true);
  1409. sQueueData *qd = qdata->next?findQD(wuid):qdata;
  1410. if (!qd)
  1411. return false;
  1412. IJobQueueItem *qi=take(*qd,wuid);
  1413. if (!qi) {
  1414. StringBuffer ws("~"); // change cached item
  1415. ws.append(wuid);
  1416. StringBuffer path;
  1417. IPropertyTree *item = qd->root->queryPropTree(getItemPath(path,ws.str()).str());
  1418. if (item) {
  1419. item->setPropInt("@priority",value);
  1420. return true;
  1421. }
  1422. return false;
  1423. }
  1424. qi->setPriority(value);
  1425. enqueue(*qd,qi);
  1426. return true;
  1427. }
  1428. void clear(sQueueData &qd)
  1429. {
  1430. Cconnlockblock block(this,true);
  1431. qd.root->setPropInt("@count",0);
  1432. for (;;) {
  1433. IPropertyTree *item = qd.root->queryPropTree("Item[1]");
  1434. if (!item)
  1435. break;
  1436. qd.root->removeTree(item);
  1437. }
  1438. }
  1439. void lock()
  1440. {
  1441. connlock(false); // sub functions will change to exclusive if needed
  1442. }
  1443. void unlock(bool rollback=false)
  1444. {
  1445. connunlock(rollback);
  1446. }
  1447. void pause(sQueueData &qd)
  1448. {
  1449. Cconnlockblock block(this,true);
  1450. qd.root->setProp("@state","paused");
  1451. }
  1452. void resume(sQueueData &qd)
  1453. {
  1454. Cconnlockblock block(this,true);
  1455. qd.root->setProp("@state","active");
  1456. }
  1457. bool paused(sQueueData &qd)
  1458. {
  1459. Cconnlockblock block(this,false);
  1460. const char *state = qd.root->queryProp("@state");
  1461. return (state&&(strcmp(state,"paused")==0));
  1462. }
  1463. void stop(sQueueData &qd)
  1464. {
  1465. Cconnlockblock block(this,true);
  1466. qd.root->setProp("@state","stopped");
  1467. }
  1468. bool stopped(sQueueData &qd)
  1469. {
  1470. Cconnlockblock block(this,false);
  1471. const char *state = qd.root->queryProp("@state");
  1472. return (state&&(strcmp(state,"stopped")==0));
  1473. }
  1474. void doGetStats(sQueueData &qd,unsigned &connected,unsigned &waiting,unsigned &enqueued)
  1475. {
  1476. Cconnlockblock block(this,false);
  1477. connected = 0;
  1478. waiting = 0;
  1479. unsigned i=0;
  1480. for (;;) {
  1481. IPropertyTree *croot = queryClientRootIndex(qd,i);
  1482. if (!croot)
  1483. break;
  1484. if (validateitemsessions && !validSession(croot)) {
  1485. Cconnlockblock block(this,true);
  1486. qd.root->removeTree(croot);
  1487. }
  1488. else {
  1489. waiting += croot->getPropInt("@waiting");
  1490. connected += croot->getPropInt("@connected");
  1491. i++;
  1492. }
  1493. }
  1494. // now remove any duff queue items
  1495. unsigned count = qd.root->getPropInt("@count");
  1496. if (!validateitemsessions) {
  1497. enqueued = count;
  1498. return;
  1499. }
  1500. i=0;
  1501. StringBuffer path;
  1502. for (;;) {
  1503. IPropertyTree *item = qd.root->queryPropTree(getItemPath(path.clear(),i).str());
  1504. if (!item)
  1505. break;
  1506. if (!validSession(item)) {
  1507. Cconnlockblock block(this,true);
  1508. item = qd.root->queryPropTree(path.str());
  1509. if (!item)
  1510. break;
  1511. // PROGLOG("WUJOBQ: Removing %s as session %" I64F "x not active",item->queryProp("@wuid"),item->getPropInt64("@session"));
  1512. removeItem(qd,item,false);
  1513. }
  1514. else
  1515. i++;
  1516. }
  1517. if (count!=i) {
  1518. Cconnlockblock block(this,true);
  1519. qd.root->setPropInt("@count",i);
  1520. }
  1521. enqueued = i;
  1522. }
  1523. void getStats(sQueueData &qd,unsigned &connected,unsigned &waiting,unsigned &enqueued)
  1524. {
  1525. Cconnlockblock block(this,false);
  1526. doGetStats(qd,connected,waiting,enqueued);
  1527. }
  1528. void getStats(unsigned &connected,unsigned &waiting,unsigned &enqueued)
  1529. {
  1530. // multi queue
  1531. Cconnlockblock block(this,false);
  1532. connected=0;
  1533. waiting=0;
  1534. enqueued=0;
  1535. ForEachQueue(qd) {
  1536. unsigned c;
  1537. unsigned w;
  1538. unsigned e;
  1539. doGetStats(*qd,c,w,e);
  1540. connected+=c;
  1541. waiting+=w;
  1542. enqueued+=e;
  1543. }
  1544. }
  1545. IJobQueueItem *take(const char *wuid)
  1546. {
  1547. assertex(qdata);
  1548. if (!qdata->next)
  1549. return take(*qdata,wuid);
  1550. Cconnlockblock block(this,true);
  1551. ForEachQueue(qd) {
  1552. IJobQueueItem *ret = dotake(*qd,wuid,false);
  1553. if (ret)
  1554. return ret;
  1555. }
  1556. return NULL;
  1557. }
  1558. unsigned takeItems(CJobQueueContents &dest)
  1559. {
  1560. assertex(qdata);
  1561. if (!qdata->next)
  1562. return takeItems(*qdata,dest);
  1563. Cconnlockblock block(this,true);
  1564. unsigned ret = 0;
  1565. ForEachQueue(qd) {
  1566. ret += copyItemsImpl(*qd,dest);
  1567. clear(*qd);
  1568. }
  1569. return ret;
  1570. }
  1571. void enqueueItems(CJobQueueContents &items)
  1572. { // enqueues to firs sub-queue (not sure that useful)
  1573. assertex(qdata);
  1574. return enqueueItems(*qdata,items);
  1575. }
  1576. void clear()
  1577. {
  1578. ForEachQueue(qd) {
  1579. clear(*qd);
  1580. }
  1581. }
  1582. bool validSession(IPropertyTree *item)
  1583. {
  1584. Owned<INode> node = createINode(item->queryProp("@node"),DALI_SERVER_PORT); // port should always be present
  1585. return (querySessionManager().lookupProcessSession(node)==(SessionId)item->getPropInt64("@session"));
  1586. }
  1587. IConversation *initiateConversation(sQueueData &qd,IJobQueueItem *item)
  1588. {
  1589. CriticalBlock block(crit);
  1590. assertex(!initiateconv.get());
  1591. SocketEndpoint ep = item->queryEndpoint();
  1592. unsigned short port = (unsigned short)item->getPort();
  1593. initiateconv.setown(createSingletonSocketConnection(port));
  1594. if (!port)
  1595. item->setPort(initiateconv->setRandomPort(WUJOBQ_BASE_PORT,WUJOBQ_PORT_NUM));
  1596. initiatewu.set(item->queryWUID());
  1597. enqueue(qd,item);
  1598. bool ok;
  1599. {
  1600. CriticalUnblock unblock(crit);
  1601. ok = initiateconv->accept(INFINITE);
  1602. }
  1603. if (!ok)
  1604. initiateconv.clear();
  1605. return initiateconv.getClear();
  1606. }
  1607. IConversation *acceptConversation(IJobQueueItem *&retitem, unsigned prioritytransitiondelay,IDynamicPriority *maxp)
  1608. {
  1609. CriticalBlock block(crit);
  1610. retitem = NULL;
  1611. assertex(connected); // must be connected
  1612. int curmp = maxp?maxp->get():0;
  1613. int nextmp = curmp;
  1614. for (;;) {
  1615. bool timedout = false;
  1616. Owned<IJobQueueItem> item;
  1617. {
  1618. CriticalUnblock unblock(crit);
  1619. // this is a bit complicated with multi-thor
  1620. if (prioritytransitiondelay||maxp) {
  1621. item.setown(dodequeue((std::max(curmp,nextmp)/10)*10, // round down to multiple of 10
  1622. prioritytransitiondelay?prioritytransitiondelay:60000,prioritytransitiondelay>0,&timedout));
  1623. // if dynamic priority check every minute
  1624. if (!prioritytransitiondelay) {
  1625. curmp = nextmp; // using max above is a bit devious to allow transition
  1626. nextmp = maxp->get();
  1627. }
  1628. }
  1629. else
  1630. item.setown(dequeue(INFINITE));
  1631. }
  1632. if (item.get()) {
  1633. if (item->isValidSession()) {
  1634. SocketEndpoint ep = item->queryEndpoint();
  1635. ep.port = item->getPort();
  1636. Owned<IConversation> acceptconv = createSingletonSocketConnection(ep.port,&ep);
  1637. if (acceptconv->connect(3*60*1000)) { // shouldn't need that long
  1638. retitem = item.getClear();
  1639. return acceptconv.getClear();
  1640. }
  1641. }
  1642. }
  1643. else if (prioritytransitiondelay)
  1644. prioritytransitiondelay = 0;
  1645. else if (!timedout)
  1646. break;
  1647. }
  1648. return NULL;
  1649. }
  1650. void cancelInitiateConversation(sQueueData &qd)
  1651. {
  1652. CriticalBlock block(crit);
  1653. if (initiatewu.get())
  1654. remove(initiatewu);
  1655. if (initiateconv.get())
  1656. initiateconv->cancel();
  1657. }
  1658. void cancelAcceptConversation()
  1659. {
  1660. CriticalBlock block(crit);
  1661. dequeuestop = true;
  1662. notifysem.signal();
  1663. }
  1664. bool cancelInitiateConversation(sQueueData &qd,const char *wuid)
  1665. {
  1666. Cconnlockblock block(this,true);
  1667. for (;;) {
  1668. Owned<IJobQueueItem> item = dotake(qd,wuid,false);
  1669. if (!item.get())
  1670. break;
  1671. if (item->isValidSession()) {
  1672. SocketEndpoint ep = item->queryEndpoint();
  1673. ep.port = item->getPort();
  1674. Owned<IConversation> acceptconv = createSingletonSocketConnection(ep.port,&ep);
  1675. acceptconv->connect(3*60*1000); // connect then close should close other end
  1676. return true;
  1677. }
  1678. }
  1679. return false;
  1680. }
  1681. bool waitStatsChange(unsigned timeout)
  1682. {
  1683. assertex(!connected); // not allowed to call this while connected
  1684. cancelwaiting = false;
  1685. while(!cancelwaiting) {
  1686. {
  1687. Cconnlockblock block(this,false);
  1688. if (haschanged())
  1689. return true;
  1690. }
  1691. if (!notifysem.wait(timeout))
  1692. break;
  1693. }
  1694. return false;
  1695. }
  1696. void cancelWaitStatsChange()
  1697. {
  1698. CriticalBlock block(crit);
  1699. cancelwaiting = true;
  1700. notifysem.signal();
  1701. }
  1702. virtual void enqueue(IJobQueueItem *qitem)
  1703. {
  1704. enqueue(*activeq,qitem);
  1705. }
  1706. void enqueueHead(IJobQueueItem *qitem)
  1707. {
  1708. enqueueHead(*activeq,qitem);
  1709. }
  1710. void enqueueTail(IJobQueueItem *qitem)
  1711. {
  1712. enqueueTail(*activeq,qitem);
  1713. }
  1714. void pause()
  1715. {
  1716. Cconnlockblock block(this,true);
  1717. ForEachQueue(qd) {
  1718. if (qd->root)
  1719. qd->root->setProp("@state","paused");
  1720. }
  1721. }
  1722. void pause(const char* info)
  1723. {
  1724. Cconnlockblock block(this,true);
  1725. ForEachQueue(qd) {
  1726. if (qd->root) {
  1727. qd->root->setProp("@state","paused");
  1728. if (info && *info)
  1729. qd->root->setProp("@stateDetails",info);
  1730. }
  1731. }
  1732. }
  1733. void stop()
  1734. {
  1735. Cconnlockblock block(this,true);
  1736. ForEachQueue(qd) {
  1737. if (qd->root)
  1738. qd->root->setProp("@state","stopped");
  1739. }
  1740. }
  1741. void stop(const char* info)
  1742. {
  1743. Cconnlockblock block(this,true);
  1744. ForEachQueue(qd) {
  1745. if (qd->root) {
  1746. qd->root->setProp("@state","stopped");
  1747. if (info && *info)
  1748. qd->root->setProp("@stateDetails",info);
  1749. }
  1750. }
  1751. }
  1752. void resume()
  1753. {
  1754. Cconnlockblock block(this,true);
  1755. ForEachQueue(qd) {
  1756. if (qd->root)
  1757. qd->root->setProp("@state","active");
  1758. }
  1759. }
  1760. void resume(const char* info)
  1761. {
  1762. Cconnlockblock block(this,true);
  1763. ForEachQueue(qd) {
  1764. if (qd->root) {
  1765. qd->root->setProp("@state","active");
  1766. if (info && *info)
  1767. qd->root->setProp("@stateDetails",info);
  1768. }
  1769. }
  1770. }
  1771. IConversation *initiateConversation(IJobQueueItem *item)
  1772. {
  1773. return initiateConversation(*activeq,item);
  1774. }
  1775. void cancelInitiateConversation()
  1776. {
  1777. return cancelInitiateConversation(*activeq);
  1778. }
  1779. bool cancelInitiateConversation(const char *wuid)
  1780. {
  1781. return cancelInitiateConversation(*activeq,wuid);
  1782. }
  1783. const char * queryActiveQueueName()
  1784. {
  1785. return activeq->qname;
  1786. }
  1787. void setActiveQueue(const char *name)
  1788. {
  1789. ForEachQueue(qd) {
  1790. if (!name||(strcmp(qd->qname.get(),name)==0)) {
  1791. activeq = qd;
  1792. return;
  1793. }
  1794. }
  1795. if (name)
  1796. throw MakeStringException (-1,"queue %s not found",name);
  1797. }
  1798. const char *nextQueueName(const char *last)
  1799. {
  1800. ForEachQueue(qd) {
  1801. if (!last||(strcmp(qd->qname.get(),last)==0)) {
  1802. if (qd->next)
  1803. return qd->next->qname.get();
  1804. break;
  1805. }
  1806. }
  1807. return NULL;
  1808. }
  1809. virtual bool paused()
  1810. {
  1811. Cconnlockblock block(this,false);
  1812. return CJobQueueBase::paused();
  1813. }
  1814. virtual bool paused(StringBuffer& info)
  1815. {
  1816. Cconnlockblock block(this,false);
  1817. return CJobQueueBase::paused(info);
  1818. }
  1819. virtual bool stopped()
  1820. {
  1821. Cconnlockblock block(this,false);
  1822. return CJobQueueBase::stopped();
  1823. }
  1824. virtual bool stopped(StringBuffer& info)
  1825. {
  1826. Cconnlockblock block(this,false);
  1827. return CJobQueueBase::stopped(info);
  1828. }
  1829. virtual unsigned ordinality()
  1830. {
  1831. Cconnlockblock block(this,false);
  1832. return CJobQueueBase::ordinality();
  1833. }
  1834. virtual unsigned waiting()
  1835. {
  1836. Cconnlockblock block(this,false);
  1837. return CJobQueueBase::waiting();
  1838. }
  1839. virtual IJobQueueItem *getItem(unsigned idx)
  1840. {
  1841. Cconnlockblock block(this,false);
  1842. return CJobQueueBase::getItem(idx);
  1843. }
  1844. virtual IJobQueueItem *getHead()
  1845. {
  1846. Cconnlockblock block(this,false);
  1847. return CJobQueueBase::getHead();
  1848. }
  1849. virtual IJobQueueItem *getTail()
  1850. {
  1851. Cconnlockblock block(this,false);
  1852. return CJobQueueBase::getTail();
  1853. }
  1854. virtual IJobQueueItem *find(const char *wuid)
  1855. {
  1856. Cconnlockblock block(this,false);
  1857. return CJobQueueBase::find(wuid);
  1858. }
  1859. virtual unsigned findRank(const char *wuid)
  1860. {
  1861. Cconnlockblock block(this,false);
  1862. return CJobQueueBase::findRank(wuid);
  1863. }
  1864. virtual unsigned copyItems(CJobQueueContents &dest)
  1865. {
  1866. Cconnlockblock block(this,false);
  1867. return CJobQueueBase::copyItems(dest);
  1868. }
  1869. virtual bool getLastDequeuedInfo(StringAttr &wuid, CDateTime &enqueuedt, int &priority)
  1870. {
  1871. Cconnlockblock block(this,false);
  1872. return CJobQueueBase::doGetLastDequeuedInfo(activeq, wuid, enqueuedt, priority);
  1873. }
  1874. virtual void copyItemsAndState(CJobQueueContents& contents, StringBuffer& state, StringBuffer& stateDetails)
  1875. {
  1876. Cconnlockblock block(this,false);
  1877. CJobQueueBase::copyItemsAndState(contents, state, stateDetails);
  1878. }
  1879. virtual void getState(StringBuffer& state, StringBuffer& stateDetails)
  1880. {
  1881. Cconnlockblock block(this,false);
  1882. CJobQueueBase::getState(state, stateDetails);
  1883. }
  1884. };
  1885. class CJQSnapshot : public CInterface, implements IJQSnapshot
  1886. {
  1887. Owned<IPropertyTree> jobQueueInfo;
  1888. public:
  1889. IMPLEMENT_IINTERFACE;
  1890. CJQSnapshot()
  1891. {
  1892. Owned<IRemoteConnection> connJobQueues = querySDS().connect("/JobQueues", myProcessSession(), RTM_LOCK_READ, 30000);
  1893. if (!connJobQueues)
  1894. throw MakeStringException(-1, "CJQSnapshot::CJQSnapshot: /JobQueues not found");
  1895. jobQueueInfo.setown(createPTreeFromIPT(connJobQueues->queryRoot()));
  1896. }
  1897. IJobQueueConst* getJobQueue(const char *name)
  1898. {
  1899. if (!jobQueueInfo)
  1900. return NULL;
  1901. return new CJobQueueConst(name, jobQueueInfo.getLink());
  1902. }
  1903. };
  1904. IJQSnapshot *createJQSnapshot()
  1905. {
  1906. return new CJQSnapshot();
  1907. }
  1908. IJobQueue *createJobQueue(const char *name)
  1909. {
  1910. if (!name||!*name)
  1911. throw MakeStringException(-1,"createJobQueue empty name");
  1912. return new CJobQueue(name);
  1913. }
  1914. extern bool WORKUNIT_API runWorkUnit(const char *wuid, const char *queueName)
  1915. {
  1916. #ifdef _CONTAINERIZED
  1917. StringBuffer agentQueue;
  1918. getClusterEclAgentQueueName(agentQueue, queueName);
  1919. #else
  1920. //NB: In the non-container system the name of the roxie agent queue does not follow the convention for the containerized system
  1921. Owned<IConstWUClusterInfo> clusterInfo = getTargetClusterInfo(queueName);
  1922. if (!clusterInfo.get())
  1923. return false;
  1924. SCMStringBuffer agentQueue;
  1925. clusterInfo->getAgentQueue(agentQueue);
  1926. if (!agentQueue.length())
  1927. return false;
  1928. #endif
  1929. Owned<IJobQueue> queue = createJobQueue(agentQueue.str());
  1930. if (!queue.get())
  1931. throw MakeStringException(-1, "Could not create workunit queue");
  1932. IJobQueueItem *item = createJobQueueItem(wuid);
  1933. queue->enqueue(item);
  1934. PROGLOG("Agent request '%s' enqueued on '%s'", wuid, agentQueue.str());
  1935. return true;
  1936. }
  1937. extern bool WORKUNIT_API runWorkUnit(const char *wuid)
  1938. {
  1939. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  1940. Owned<IConstWorkUnit> w = factory->openWorkUnit(wuid);
  1941. if (w)
  1942. {
  1943. StringAttr clusterName = (w->queryClusterName());
  1944. w.clear();
  1945. return runWorkUnit(wuid, clusterName.str());
  1946. }
  1947. else
  1948. return false;
  1949. }
  1950. extern WORKUNIT_API StringBuffer &getQueuesContainingWorkUnit(const char *wuid, StringBuffer &queueList)
  1951. {
  1952. Owned<IRemoteConnection> conn = querySDS().connect("/JobQueues", myProcessSession(), RTM_LOCK_READ, 5000);
  1953. if (!conn)
  1954. return queueList;
  1955. VStringBuffer xpath("Queue[Item/@wuid='%s']", wuid);
  1956. Owned<IPropertyTreeIterator> it = conn->getElements(xpath.str());
  1957. ForEach(*it)
  1958. {
  1959. if (queueList.length())
  1960. queueList.append(',');
  1961. queueList.append(it->query().queryProp("@name"));
  1962. }
  1963. return queueList;
  1964. }
  1965. extern void WORKUNIT_API removeWorkUnitFromAllQueues(const char *wuid)
  1966. {
  1967. StringBuffer queueList;
  1968. if (!getQueuesContainingWorkUnit(wuid, queueList).length())
  1969. return;
  1970. Owned<IJobQueue> q = createJobQueue(queueList.str());
  1971. if (q)
  1972. while(q->remove(wuid));
  1973. }
  1974. extern bool WORKUNIT_API switchWorkUnitQueue(IWorkUnit* wu, const char *cluster)
  1975. {
  1976. if (!wu)
  1977. return false;
  1978. class cQswitcher: public CInterface, implements IQueueSwitcher
  1979. {
  1980. public:
  1981. IMPLEMENT_IINTERFACE;
  1982. void * getQ(const char * qname, const char * wuid)
  1983. {
  1984. Owned<IJobQueue> q = createJobQueue(qname);
  1985. return q->take(wuid);
  1986. }
  1987. void putQ(const char * qname, const char * wuid, void * qitem)
  1988. {
  1989. Owned<IJobQueue> q = createJobQueue(qname);
  1990. q->enqueue((IJobQueueItem *)qitem);
  1991. }
  1992. bool isAuto()
  1993. {
  1994. return false;
  1995. }
  1996. } switcher;
  1997. return wu->switchThorQueue(cluster, &switcher);
  1998. }