wujobq.cpp 63 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include <algorithm>
  15. #include "limits.h"
  16. #include "jlib.hpp"
  17. #include "jbuff.hpp"
  18. #include "dasess.hpp"
  19. #include "dautils.hpp"
  20. #include "portlist.h"
  21. #include "dacoven.hpp"
  22. #include "daclient.hpp"
  23. #include "dasds.hpp"
  24. #include "dasess.hpp"
  25. #include "wujobq.hpp"
  26. #ifdef _MSC_VER
  27. #pragma warning (disable : 4355)
  28. #endif
  29. #if 0
  30. JobQueues
  31. JobQueue @name= @count= @state=active|paused|stopped
  32. Edition <num>
  33. Client @session= @connected= @waiting= -- connections and waiting can be > 1 (multiple threads)
  34. Item* @wuid @owner @node @port @priority @session
  35. #endif
  36. class CJobQueueItem: public CInterface, implements IJobQueueItem
  37. {
  38. StringAttr wu;
  39. StringAttr owner;
  40. int priority;
  41. SessionId sessid;
  42. SocketEndpoint ep;
  43. unsigned port;
  44. CDateTime enqueuedt;
  45. public:
  46. IMPLEMENT_IINTERFACE;
  47. CJobQueueItem(MemoryBuffer &src)
  48. {
  49. deserialize(src);
  50. }
  51. CJobQueueItem(const char *_wu)
  52. : wu(_wu)
  53. {
  54. priority = 0;
  55. ep = queryMyNode()->endpoint();
  56. port = 0;
  57. sessid = myProcessSession();
  58. }
  59. CJobQueueItem(IPropertyTree *item)
  60. {
  61. const char * wuid = item->queryProp("@wuid");
  62. if (*wuid=='~')
  63. wuid++;
  64. wu.set(wuid);
  65. owner.set(item->queryProp("@owner"));
  66. sessid = (SessionId)item->getPropInt64("@session");
  67. priority = item->getPropInt("@priority");
  68. ep.set(item->queryProp("@node"));
  69. port = (unsigned)item->getPropInt("@port");
  70. StringBuffer dts;
  71. if (item->getProp("@enqueuedt",dts))
  72. enqueuedt.setString(dts.str());
  73. }
  74. static void assignBranch(IPropertyTree *item,IJobQueueItem *qi)
  75. {
  76. item->setPropInt64("@session",qi->getSessionId());
  77. item->setPropInt("@priority",qi->getPriority());
  78. item->setPropInt("@port",qi->getPort());
  79. item->setProp("@wuid",qi->queryWUID());
  80. item->setProp("@owner",qi->queryOwner());
  81. StringBuffer eps;
  82. qi->queryEndpoint().getUrlStr(eps);
  83. item->setProp("@node",eps.str());
  84. StringBuffer dts;
  85. qi->queryEnqueuedTime().getString(dts);
  86. if (dts.length()==0) {
  87. CDateTime dt;
  88. dt.setNow();
  89. dt.getString(dts);
  90. qi->setEnqueuedTime(dt);
  91. }
  92. item->setProp("@enqueuedt",dts.str());
  93. }
  94. IPropertyTree *createBranch(CJobQueueItem)
  95. {
  96. IPropertyTree *item = createPTree("Item");
  97. assignBranch(item,this);
  98. return item;
  99. }
  100. const char *queryWUID()
  101. {
  102. return wu.get();
  103. }
  104. int getPriority()
  105. {
  106. return priority;
  107. }
  108. unsigned getPort()
  109. {
  110. return port;
  111. }
  112. SessionId getSessionId()
  113. {
  114. return sessid;
  115. }
  116. SocketEndpoint &queryEndpoint()
  117. {
  118. return ep;
  119. }
  120. const char *queryOwner()
  121. {
  122. return owner.get();
  123. }
  124. bool equals(IJobQueueItem *other)
  125. {
  126. // work unit is primary key
  127. return strcmp(wu.get(),other->queryWUID())==0;
  128. }
  129. CDateTime &queryEnqueuedTime()
  130. {
  131. return enqueuedt;
  132. }
  133. void setEnqueuedTime(const CDateTime &dt)
  134. {
  135. enqueuedt.set(dt);
  136. }
  137. void serialize(MemoryBuffer &tgt)
  138. {
  139. tgt.append(priority).append(port).append(wu).append(sessid);
  140. ep.serialize(tgt);
  141. StringBuffer dts;
  142. enqueuedt.getString(dts);
  143. tgt.append(owner).append(dts);
  144. }
  145. void deserialize(MemoryBuffer &src)
  146. {
  147. src.read(priority).read(port).read(wu).read(sessid);
  148. ep.deserialize(src);
  149. StringBuffer dts;
  150. src.read(owner).read(dts);
  151. enqueuedt.setString(dts.str());
  152. }
  153. IJobQueueItem* clone()
  154. {
  155. IJobQueueItem* ret = new CJobQueueItem(wu);
  156. ret->setPriority(priority);
  157. ret->setPriority(port);
  158. ret->setEndpoint(ep);
  159. ret->setSessionId(sessid);
  160. return ret;
  161. }
  162. void setPriority(int _priority)
  163. {
  164. priority = _priority;
  165. }
  166. void setPort(unsigned _port)
  167. {
  168. port = _port;
  169. }
  170. void setEndpoint(const SocketEndpoint &_ep)
  171. {
  172. ep = _ep;
  173. }
  174. void setSessionId(SessionId _id)
  175. {
  176. if (_id)
  177. sessid = _id;
  178. else
  179. sessid = myProcessSession();
  180. }
  181. void setOwner(const char *_owner)
  182. {
  183. owner.set(_owner);
  184. }
  185. bool isValidSession()
  186. {
  187. Owned<INode> node = createINode(ep);
  188. return (querySessionManager().lookupProcessSession(node)==sessid);
  189. }
  190. };
  191. class CJobQueueIterator: public CInterface, implements IJobQueueIterator
  192. {
  193. public:
  194. CJobQueueContents &items;
  195. unsigned idx;
  196. IMPLEMENT_IINTERFACE;
  197. CJobQueueIterator(CJobQueueContents &_items)
  198. : items(_items)
  199. {
  200. idx = 0;
  201. }
  202. bool isValid()
  203. {
  204. return idx<items.ordinality();
  205. }
  206. bool first()
  207. {
  208. idx = 0;
  209. return isValid();
  210. }
  211. bool next()
  212. {
  213. idx++;
  214. return isValid();
  215. }
  216. IJobQueueItem & query()
  217. {
  218. return items.item(idx);
  219. }
  220. };
  221. IJobQueueIterator *CJobQueueContents::getIterator()
  222. {
  223. return new CJobQueueIterator(*this);
  224. }
  225. IJobQueueItem *createJobQueueItem(const char *wuid)
  226. {
  227. if (!wuid||!*wuid)
  228. throw MakeStringException(-1,"createJobQueueItem empty WUID");
  229. return new CJobQueueItem(wuid);;
  230. }
  231. IJobQueueItem *deserializeJobQueueItem(MemoryBuffer &mb)
  232. {
  233. return new CJobQueueItem(mb);
  234. }
  235. #define ForEachQueue(qd) for (sQueueData *qd = qdata; qd!=NULL; qd=qd->next)
  236. #define ForEachQueueIn(parent,qd) for (sQueueData *qd = parent.qdata; qd!=NULL; qd=qd->next)
  237. struct sQueueData
  238. {
  239. sQueueData *next;
  240. IRemoteConnection *conn;
  241. StringAttr qname;
  242. IPropertyTree *root;
  243. SubscriptionId subscriberid;
  244. unsigned lastWaitEdition;
  245. };
  246. class CJobQueueBase: public CInterface, implements IJobQueueConst
  247. {
  248. class cOrderedIterator
  249. {
  250. CJobQueueBase &parent;
  251. unsigned numqueues;
  252. unsigned *queueidx;
  253. sQueueData **queues;
  254. IPropertyTree **queuet;
  255. MemoryAttr ma;
  256. unsigned current;
  257. public:
  258. cOrderedIterator(CJobQueueBase&_parent) : parent(_parent)
  259. {
  260. numqueues=0;
  261. ForEachQueueIn(parent,qd1)
  262. if (qd1->root)
  263. numqueues++;
  264. queueidx = (unsigned *)ma.allocate(numqueues*(sizeof(unsigned)+sizeof(sQueueData *)+sizeof(IPropertyTree *)));
  265. queues = (sQueueData **)(queueidx+numqueues);
  266. queuet = (IPropertyTree **)(queues+numqueues);
  267. unsigned i = 0;
  268. ForEachQueueIn(parent,qd2)
  269. {
  270. if (qd2->root)
  271. queues[i++] = qd2;
  272. }
  273. current = (unsigned)-1;
  274. }
  275. bool first()
  276. {
  277. StringBuffer path;
  278. parent.getItemPath(path,0U);
  279. current = (unsigned)-1;
  280. for (unsigned i = 0; i<numqueues;i++)
  281. {
  282. queueidx[i] = 0;
  283. queuet[i] = queues[i]->root->queryPropTree(path.str());
  284. if (queuet[i])
  285. if ((current==(unsigned)-1)||parent.itemOlder(queuet[i],queuet[current]))
  286. current = i;
  287. }
  288. return current!=(unsigned)-1;
  289. }
  290. bool next()
  291. {
  292. if (current==(unsigned)-1)
  293. return false;
  294. queueidx[current]++;
  295. StringBuffer path;
  296. parent.getItemPath(path,queueidx[current]);
  297. queuet[current] = queues[current]->root->queryPropTree(path.str());
  298. current = (unsigned)-1;
  299. for (unsigned i = 0; i<numqueues;i++)
  300. {
  301. if (queuet[i])
  302. if ((current==(unsigned)-1)||parent.itemOlder(queuet[i],queuet[current]))
  303. current = i;
  304. }
  305. return current!=(unsigned)-1;
  306. }
  307. bool isValid()
  308. {
  309. return current!=(unsigned)-1;
  310. }
  311. void item(sQueueData *&qd, IPropertyTree *&t,unsigned &idx)
  312. {
  313. assertex(current!=(unsigned)-1);
  314. qd = queues[current];
  315. t = queuet[current];
  316. idx = queueidx[current];
  317. }
  318. sQueueData &queryQueue()
  319. {
  320. assertex(current!=(unsigned)-1);
  321. return *queues[current];
  322. }
  323. IPropertyTree &queryTree()
  324. {
  325. assertex(current!=(unsigned)-1);
  326. return *queuet[current];
  327. }
  328. };
  329. protected:
  330. bool doGetLastDequeuedInfo(sQueueData *qd, StringAttr &wuid, CDateTime &enqueuedt, int &priority)
  331. {
  332. priority = 0;
  333. if (!qd)
  334. return false;
  335. const char *w = qd->root->queryProp("@prevwuid");
  336. if (!w||!*w)
  337. return false;
  338. wuid.set(w);
  339. StringBuffer dts;
  340. if (qd->root->getProp("@prevenqueuedt",dts))
  341. enqueuedt.setString(dts.str());
  342. priority = qd->root->getPropInt("@prevpriority");
  343. return true;
  344. }
  345. public:
  346. sQueueData *qdata;
  347. Semaphore notifysem;
  348. CriticalSection crit;
  349. IMPLEMENT_IINTERFACE;
  350. CJobQueueBase(const char *_qname)
  351. {
  352. StringArray qlist;
  353. qlist.appendListUniq(_qname, ",");
  354. sQueueData *last = NULL;
  355. ForEachItemIn(i,qlist)
  356. {
  357. sQueueData *qd = new sQueueData;
  358. qd->next = NULL;
  359. qd->qname.set(qlist.item(i));
  360. qd->conn = NULL;
  361. qd->root = NULL;
  362. qd->lastWaitEdition = 0;
  363. qd->subscriberid = 0;
  364. if (last)
  365. last->next = qd;
  366. else
  367. qdata = qd;
  368. last = qd;
  369. }
  370. };
  371. virtual ~CJobQueueBase()
  372. {
  373. while (qdata)
  374. {
  375. sQueueData * next = qdata->next;
  376. delete qdata;
  377. qdata = next;
  378. }
  379. }
  380. StringBuffer &getItemPath(StringBuffer &path,const char *wuid)
  381. {
  382. if (!wuid||!*wuid)
  383. return getItemPath(path,0U);
  384. return path.appendf("Item[@wuid=\"%s\"]",wuid);
  385. }
  386. StringBuffer &getItemPath(StringBuffer &path,unsigned idx)
  387. {
  388. path.appendf("Item[@num=\"%d\"]",idx+1);
  389. return path;
  390. }
  391. IPropertyTree *queryClientRootIndex(sQueueData &qd, unsigned idx)
  392. {
  393. VStringBuffer path("Client[%d]", idx+1);
  394. return qd.root->queryPropTree(path);
  395. }
  396. bool itemOlder(IPropertyTree *qt1, IPropertyTree *qt2)
  397. {
  398. // if this ever becomes time critical thne could cache enqueued values
  399. StringBuffer d1s;
  400. if (qt1)
  401. qt1->getProp("@enqueuedt",d1s);
  402. StringBuffer d2s;
  403. if (qt2)
  404. qt2->getProp("@enqueuedt",d2s);
  405. return (strcmp(d1s.str(),d2s.str())<0);
  406. }
  407. IJobQueueItem *doGetItem(sQueueData &qd,unsigned idx)
  408. {
  409. if (idx==(unsigned)-1)
  410. {
  411. idx = qd.root->getPropInt("@count");
  412. if (!idx)
  413. return NULL;
  414. idx--;
  415. }
  416. StringBuffer path;
  417. IPropertyTree *item = qd.root->queryPropTree(getItemPath(path,idx).str());
  418. if (!item)
  419. return NULL;
  420. return new CJobQueueItem(item);
  421. }
  422. IJobQueueItem *getItem(sQueueData &qd,unsigned idx)
  423. {
  424. return doGetItem(qd, idx);
  425. }
  426. IJobQueueItem *getHead(sQueueData &qd)
  427. {
  428. return getItem(qd,0);
  429. }
  430. unsigned doFindRank(sQueueData &qd,const char *wuid)
  431. {
  432. StringBuffer path;
  433. IPropertyTree *item = qd.root->queryPropTree(getItemPath(path,wuid).str());
  434. if (!item)
  435. return (unsigned)-1;
  436. return item->getPropInt("@num")-1;
  437. }
  438. unsigned findRank(sQueueData &qd,const char *wuid)
  439. {
  440. return doFindRank(qd,wuid);
  441. }
  442. IJobQueueItem *find(sQueueData &qd,const char *wuid)
  443. {
  444. StringBuffer path;
  445. IPropertyTree *item = qd.root->queryPropTree(getItemPath(path,wuid).str());
  446. if (!item)
  447. return NULL;
  448. bool cached = item->getPropInt("@num",0)<=0;
  449. if (wuid&&cached)
  450. return NULL; // don't want cached value unless explicit
  451. return new CJobQueueItem(item);
  452. }
  453. unsigned copyItemsImpl(sQueueData &qd,CJobQueueContents &dest)
  454. {
  455. unsigned ret=0;
  456. StringBuffer path;
  457. for (unsigned i=0;;i++)
  458. {
  459. IPropertyTree *item = qd.root->queryPropTree(getItemPath(path.clear(),i).str());
  460. if (!item)
  461. break;
  462. ret++;
  463. dest.append(*new CJobQueueItem(item));
  464. }
  465. return ret;
  466. }
  467. virtual void copyItemsAndState(CJobQueueContents& contents, StringBuffer& state, StringBuffer& stateDetails)
  468. {
  469. assertex(qdata);
  470. assertex(qdata->root);
  471. copyItemsImpl(*qdata,contents);
  472. const char *st = qdata->root->queryProp("@state");
  473. if (st&&*st)
  474. state.set(st);
  475. if (st && (strieq(st, "paused") || strieq(st, "stopped")))
  476. {
  477. const char *stDetails = qdata->root->queryProp("@stateDetails");
  478. if (stDetails&&*stDetails)
  479. stateDetails.set(stDetails);
  480. }
  481. }
  482. sQueueData *findQD(const char *wuid)
  483. {
  484. if (wuid&&*wuid)
  485. {
  486. ForEachQueue(qd)
  487. {
  488. unsigned idx = doFindRank(*qd,wuid);
  489. if (idx!=(unsigned)-1)
  490. return qd;
  491. }
  492. }
  493. return NULL;
  494. }
  495. virtual unsigned waiting()
  496. {
  497. unsigned ret = 0;
  498. ForEachQueue(qd)
  499. {
  500. for (unsigned i=0;;i++)
  501. {
  502. IPropertyTree *croot = queryClientRootIndex(*qd,i);
  503. if (!croot)
  504. break;
  505. ret += croot->getPropInt("@waiting");
  506. }
  507. }
  508. return ret;
  509. }
  510. virtual unsigned findRank(const char *wuid)
  511. {
  512. assertex(qdata);
  513. if (!qdata->next)
  514. return findRank(*qdata,wuid);
  515. cOrderedIterator it(*this);
  516. unsigned i = 0;
  517. ForEach(it)
  518. {
  519. const char *twuid = it.queryTree().queryProp("@wuid");
  520. if (twuid&&(strcmp(twuid,wuid)==0))
  521. return i;
  522. i++;
  523. }
  524. return (unsigned)-1;
  525. }
  526. virtual unsigned copyItems(CJobQueueContents &dest)
  527. {
  528. assertex(qdata);
  529. if (!qdata->next)
  530. return copyItemsImpl(*qdata,dest);
  531. cOrderedIterator it(*this);
  532. unsigned ret = 0;
  533. ForEach(it)
  534. {
  535. dest.append(*new CJobQueueItem(&it.queryTree()));
  536. ret++;
  537. }
  538. return ret;
  539. }
  540. virtual IJobQueueItem *getItem(unsigned idx)
  541. {
  542. if (!qdata)
  543. return NULL;
  544. if (!qdata->next)
  545. return getItem(*qdata,idx);
  546. cOrderedIterator it(*this);
  547. unsigned i = 0;
  548. IPropertyTree *ret = NULL;
  549. ForEach(it)
  550. {
  551. if (i==idx)
  552. {
  553. ret = &it.queryTree();
  554. break;
  555. }
  556. else if (idx==(unsigned)-1) // -1 means return last
  557. ret = &it.queryTree();
  558. i++;
  559. }
  560. if (ret)
  561. return new CJobQueueItem(ret);
  562. return NULL;
  563. }
  564. virtual IJobQueueItem *getHead()
  565. {
  566. if (!qdata)
  567. return NULL;
  568. if (!qdata->next)
  569. return getHead(*qdata);
  570. return getItem(0);
  571. }
  572. virtual IJobQueueItem *getTail()
  573. {
  574. if (!qdata)
  575. return NULL;
  576. if (!qdata->next)
  577. return getHead(*qdata);
  578. return getItem((unsigned)-1);
  579. }
  580. virtual IJobQueueItem *find(const char *wuid)
  581. {
  582. if (!qdata)
  583. return NULL;
  584. sQueueData *qd = qdata->next?findQD(wuid):qdata;
  585. if (!qd)
  586. return NULL;
  587. return find(*qd,wuid);
  588. }
  589. virtual bool paused()
  590. {
  591. // true if all paused
  592. ForEachQueue(qd)
  593. {
  594. if (qd->root)
  595. {
  596. const char *state = qd->root->queryProp("@state");
  597. if (state&&(strcmp(state,"paused")!=0))
  598. return false;
  599. }
  600. }
  601. return true;
  602. }
  603. virtual bool paused(StringBuffer& info)
  604. {
  605. // true if all paused
  606. ForEachQueue(qd)
  607. {
  608. if (qd->root)
  609. {
  610. const char *state = qd->root->queryProp("@state");
  611. if (state&&(strcmp(state,"paused")!=0))
  612. return false;
  613. if (state&&!info.length())
  614. {
  615. const char *stateDetails = qd->root->queryProp("@stateDetails");
  616. if (stateDetails && *stateDetails)
  617. info.set(stateDetails);
  618. }
  619. }
  620. }
  621. return true;
  622. }
  623. virtual bool stopped()
  624. {
  625. // true if all stopped
  626. ForEachQueue(qd)
  627. {
  628. if (qd->root)
  629. {
  630. const char *state = qd->root->queryProp("@state");
  631. if (state&&(strcmp(state,"stopped")!=0))
  632. return false;
  633. }
  634. }
  635. return true;
  636. }
  637. virtual bool stopped(StringBuffer& info)
  638. {
  639. // true if all stopped
  640. ForEachQueue(qd)
  641. {
  642. if (qd->root)
  643. {
  644. const char *state = qd->root->queryProp("@state");
  645. if (state&&(strcmp(state,"stopped")!=0))
  646. return false;
  647. if (state&&!info.length())
  648. {
  649. const char *stateDetails = qd->root->queryProp("@stateDetails");
  650. if (stateDetails && *stateDetails)
  651. info.set(stateDetails);
  652. }
  653. }
  654. }
  655. return true;
  656. }
  657. virtual unsigned ordinality()
  658. {
  659. unsigned ret = 0;
  660. ForEachQueue(qd)
  661. {
  662. if (qd->root)
  663. ret += qd->root->getPropInt("@count");
  664. }
  665. return ret;
  666. }
  667. virtual bool getLastDequeuedInfo(StringAttr &wuid, CDateTime &enqueuedt, int &priority)
  668. {
  669. return doGetLastDequeuedInfo(qdata, wuid, enqueuedt, priority);
  670. }
  671. //Similar to copyItemsAndState(), this method returns the state information for one queue.
  672. virtual void getState(StringBuffer& state, StringBuffer& stateDetails)
  673. {
  674. if (!qdata->root)
  675. return;
  676. const char *st = qdata->root->queryProp("@state");
  677. if (!st || !*st)
  678. return;
  679. state.set(st);
  680. if ((strieq(st, "paused") || strieq(st, "stopped")))
  681. stateDetails.set(qdata->root->queryProp("@stateDetails"));
  682. }
  683. };
  684. class CJobQueueConst: public CJobQueueBase
  685. {
  686. Owned<IPropertyTree> jobQueueSnapshot;
  687. public:
  688. IMPLEMENT_IINTERFACE;
  689. CJobQueueConst(const char *_qname, IPropertyTree* _jobQueueSnapshot) : CJobQueueBase(_qname)
  690. {
  691. if (!_jobQueueSnapshot)
  692. throw MakeStringException(-1, "No job queue snapshot");
  693. jobQueueSnapshot.setown(_jobQueueSnapshot);
  694. ForEachQueue(qd)
  695. {
  696. VStringBuffer path("Queue[@name=\"%s\"]", qd->qname.get());
  697. qd->root = jobQueueSnapshot->queryPropTree(path.str());
  698. if (!qd->root)
  699. throw MakeStringException(-1, "No job queue found for %s", qd->qname.get());
  700. }
  701. };
  702. };
  703. class CJobQueue: public CJobQueueBase, implements IJobQueue
  704. {
  705. public:
  706. sQueueData *activeq;
  707. SessionId sessionid;
  708. unsigned locknest;
  709. bool writemode;
  710. bool connected;
  711. Owned<IConversation> initiateconv;
  712. StringAttr initiatewu;
  713. bool dequeuestop;
  714. bool cancelwaiting;
  715. bool validateitemsessions;
  716. class csubs: public CInterface, implements ISDSSubscription
  717. {
  718. CJobQueue *parent;
  719. public:
  720. IMPLEMENT_IINTERFACE;
  721. csubs(CJobQueue *_parent)
  722. {
  723. parent = _parent;
  724. }
  725. void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
  726. {
  727. CriticalBlock block(parent->crit);
  728. parent->notifysem.signal();
  729. }
  730. } subs;
  731. IMPLEMENT_IINTERFACE;
  732. CJobQueue(const char *_qname) : CJobQueueBase(_qname), subs(this)
  733. {
  734. activeq = qdata;
  735. sessionid = myProcessSession();
  736. validateitemsessions = false;
  737. writemode = false;
  738. locknest = 0;
  739. connected = false;
  740. dequeuestop = false;
  741. cancelwaiting = false;
  742. Cconnlockblock block(this,false); // this just checks queue exists
  743. }
  744. virtual ~CJobQueue()
  745. {
  746. try {
  747. while (locknest)
  748. connunlock(true); // auto rollback
  749. if (connected)
  750. disconnect();
  751. }
  752. catch (IException *e) {
  753. // server error
  754. EXCLOG(e, "~CJobQueue");
  755. e->Release();
  756. }
  757. try { // must attempt to remove subscription before object destroyed.
  758. dounsubscribe();
  759. }
  760. catch (IException *e) {
  761. EXCLOG(e, "~CJobQueue calling dounsubscribe");
  762. e->Release();
  763. }
  764. }
  765. void connlock(bool exclusive)
  766. { // must be in sect
  767. if (locknest++==0) {
  768. unsigned wait = qdata&&qdata->next?5000:INFINITE;
  769. ForEachQueue(qd) {
  770. loop {
  771. StringBuffer path;
  772. path.appendf("/JobQueues/Queue[@name=\"%s\"]",qd->qname.get());
  773. bool timeout;
  774. loop {
  775. timeout=false;
  776. try {
  777. qd->conn = querySDS().connect(path.str(),myProcessSession(),exclusive?RTM_LOCK_WRITE:RTM_LOCK_READ,wait);
  778. if (qd->conn)
  779. break;
  780. }
  781. catch (ISDSException *e) {
  782. if (SDSExcpt_LockTimeout != e->errorCode())
  783. throw;
  784. e->Release();
  785. timeout = true;
  786. }
  787. // create queue
  788. Owned<IRemoteConnection> pconn;
  789. try {
  790. pconn.setown(querySDS().connect("/JobQueues",myProcessSession(),RTM_LOCK_WRITE|RTM_CREATE_QUERY,wait));
  791. if (!pconn)
  792. throw MakeStringException(-1,"CJobQueue could not create JobQueues");
  793. IPropertyTree *proot = pconn->queryRoot();
  794. StringBuffer cpath;
  795. cpath.appendf("Queue[@name=\"%s\"]",qd->qname.get());
  796. if (!proot->hasProp(cpath.str())) {
  797. IPropertyTree *pt = proot->addPropTree("Queue",createPTree("Queue"));
  798. pt->setProp("@name",qd->qname.get());
  799. pt->setProp("@state","active");
  800. pt->setPropInt("@count", 0);
  801. pt->setPropInt("Edition", 1);
  802. }
  803. }
  804. catch (ISDSException *e) {
  805. if (SDSExcpt_LockTimeout != e->errorCode())
  806. throw;
  807. e->Release();
  808. timeout = true;
  809. }
  810. }
  811. if (!timeout)
  812. break;
  813. sQueueData *qd2 = qdata;
  814. do {
  815. ::Release(qd2->conn);
  816. qd2->conn = NULL;
  817. qd2->root = NULL;
  818. }
  819. while (qd2!=qd);
  820. PROGLOG("Job Queue contention - delaying before retrying");
  821. Sleep(getRandom()%5000); // dining philosopher delay
  822. wait = getRandom()%4000+3000; // try and prevent sync
  823. qd = qdata;
  824. }
  825. qd->root = qd->conn->queryRoot();
  826. }
  827. writemode = exclusive;
  828. }
  829. else {
  830. if (exclusive&&!writemode) {
  831. ForEachQueue(qd) {
  832. assertex(qd->conn);
  833. writemode = exclusive;
  834. bool lockreleased;
  835. safeChangeModeWrite(qd->conn,qd->qname.get(),lockreleased);
  836. qd->root = qd->conn->queryRoot();
  837. }
  838. }
  839. }
  840. }
  841. void connunlock(bool rollback=false)
  842. { // should be in sect
  843. if (--locknest==0) {
  844. ForEachQueue(qd) {
  845. if (qd->conn) { // can occur if connection to dali threw exception
  846. if (writemode) {
  847. if (rollback)
  848. qd->conn->rollback();
  849. else {
  850. qd->root->setPropInt("Edition",qd->root->getPropInt("Edition")+1);
  851. qd->conn->commit();
  852. }
  853. }
  854. qd->conn->Release();
  855. qd->conn = NULL;
  856. }
  857. qd->root = NULL;
  858. }
  859. writemode = false;
  860. }
  861. }
  862. void conncommit() // doesn't set edition
  863. { // called within sect
  864. if (writemode) {
  865. ForEachQueue(qd) {
  866. if (qd->conn)
  867. qd->conn->commit();
  868. }
  869. }
  870. }
  871. class Cconnlockblock: public CriticalBlock
  872. {
  873. CJobQueue *parent;
  874. bool rollback;
  875. public:
  876. Cconnlockblock(CJobQueue *_parent,bool exclusive)
  877. : CriticalBlock(_parent->crit)
  878. {
  879. parent = _parent;
  880. parent->connlock(exclusive);
  881. rollback = false;
  882. }
  883. ~Cconnlockblock()
  884. {
  885. parent->connunlock(rollback);
  886. }
  887. void setRollback(bool set=true)
  888. {
  889. rollback = set;
  890. }
  891. void commit()
  892. {
  893. parent->conncommit();
  894. }
  895. };
  896. void removeItem(sQueueData &qd,IPropertyTree *item, bool cache)
  897. { // does not adjust or use @count
  898. unsigned n = item->getPropInt("@num");
  899. if (!n)
  900. return;
  901. if (cache) {
  902. StringBuffer s;
  903. item->getProp("@wuid",s.clear());
  904. qd.root->setProp("@prevwuid",s.str());
  905. item->getProp("@enqueuedt",s.clear());
  906. qd.root->setProp("@prevenqueuedt",s.str());
  907. qd.root->setPropInt("@prevpriority",item->getPropInt("@priority"));
  908. }
  909. item->setPropInt("@num",-1);
  910. StringBuffer path;
  911. loop {
  912. IPropertyTree *item2 = qd.root->queryPropTree(getItemPath(path.clear(),n).str());
  913. if (!item2)
  914. break;
  915. item2->setPropInt("@num",n);
  916. n++;
  917. }
  918. qd.root->removeTree(item);
  919. }
  920. IPropertyTree *addItem(sQueueData &qd,IPropertyTree *item,unsigned idx,unsigned count)
  921. {
  922. // does not set any values other than num
  923. StringBuffer path;
  924. // first move following up
  925. unsigned n=count;
  926. while (n>idx) {
  927. n--;
  928. qd.root->queryPropTree(getItemPath(path.clear(),n).str())->setPropInt("@num",n+2);
  929. }
  930. item->setPropInt("@num",idx+1);
  931. return qd.root->addPropTree("Item",item);
  932. }
  933. void dosubscribe()
  934. { // called in crit section
  935. ForEachQueue(qd) {
  936. if (qd->subscriberid) {
  937. querySDS().unsubscribe(qd->subscriberid);
  938. qd->subscriberid = 0;
  939. }
  940. StringBuffer path;
  941. path.appendf("/JobQueues/Queue[@name=\"%s\"]/Edition",qd->qname.get());
  942. qd->subscriberid = querySDS().subscribe(path.str(), subs, false);
  943. }
  944. }
  945. bool haschanged() // returns if any changed
  946. {
  947. bool changed = false;
  948. ForEachQueue(qd) {
  949. if (!qd->subscriberid) {
  950. StringBuffer path;
  951. path.appendf("/JobQueues/Queue[@name=\"%s\"]/Edition",qd->qname.get());
  952. qd->subscriberid = querySDS().subscribe(path.str(), subs, false);
  953. }
  954. unsigned e = (unsigned)qd->root->getPropInt("Edition", 1);
  955. if (e!=qd->lastWaitEdition) {
  956. qd->lastWaitEdition = e;
  957. changed = true;
  958. break;
  959. }
  960. }
  961. return changed;
  962. }
  963. void dounsubscribe()
  964. {
  965. // called in crit section
  966. ForEachQueue(qd) {
  967. if (qd->subscriberid) {
  968. querySDS().unsubscribe(qd->subscriberid);
  969. qd->subscriberid = 0;
  970. }
  971. }
  972. }
  973. IPropertyTree *queryClientRootSession(sQueueData &qd)
  974. {
  975. VStringBuffer path("Client[@session=\"%"I64F"d\"]", sessionid);
  976. IPropertyTree *ret = qd.root->queryPropTree(path.str());
  977. if (!ret)
  978. {
  979. ret = createPTree("Client");
  980. ret = qd.root->addPropTree("Client",ret);
  981. ret->setPropInt64("@session",sessionid);
  982. StringBuffer eps;
  983. ret->setProp("@node",queryMyNode()->endpoint().getUrlStr(eps).str());
  984. }
  985. return ret;
  986. }
  987. void connect(bool _validateitemsessions)
  988. {
  989. Cconnlockblock block(this,true);
  990. validateitemsessions = _validateitemsessions;
  991. if (connected)
  992. disconnect();
  993. dosubscribe();
  994. ForEachQueue(qd) {
  995. unsigned connected;
  996. unsigned waiting;
  997. unsigned count;
  998. getStats(*qd,connected,waiting,count); // clear any duff clients
  999. IPropertyTree *croot = queryClientRootSession(*qd);
  1000. croot->setPropInt64("@connected",croot->getPropInt64("@connected",0)+1);
  1001. }
  1002. connected = true;
  1003. }
  1004. void disconnect() // signal no longer wil be dequeing (optional - done automatically on release)
  1005. {
  1006. Cconnlockblock block(this,true);
  1007. if (connected) {
  1008. dounsubscribe();
  1009. ForEachQueue(qd) {
  1010. IPropertyTree *croot = queryClientRootSession(*qd);
  1011. croot->setPropInt64("@connected",croot->getPropInt64("@connected",0)-1);
  1012. }
  1013. connected = false;
  1014. }
  1015. }
  1016. sQueueData *findbestqueue(bool useprev,int minprio,unsigned numqueues,sQueueData **queues)
  1017. {
  1018. if (numqueues==0)
  1019. return NULL;
  1020. if (numqueues==1)
  1021. return *queues;
  1022. sQueueData *best = NULL;
  1023. IPropertyTree *bestt = NULL;
  1024. for (unsigned i=0;i<numqueues;i++) {
  1025. sQueueData *qd = queues[i];
  1026. unsigned count = qd->root->getPropInt("@count");
  1027. if (count) {
  1028. int mpr = useprev?std::max(qd->root->getPropInt("@prevpriority"),minprio):minprio;
  1029. if (count&&((minprio==INT_MIN)||checkprio(*qd,mpr))) {
  1030. StringBuffer path;
  1031. IPropertyTree *item = qd->root->queryPropTree(getItemPath(path,0U).str());
  1032. if (!item)
  1033. continue;
  1034. if (item->getPropInt("@num",0)<=0)
  1035. continue;
  1036. CDateTime dt;
  1037. StringBuffer enqueued;
  1038. if (!best||itemOlder(item,bestt)) {
  1039. best = qd;
  1040. bestt = item;
  1041. }
  1042. }
  1043. }
  1044. }
  1045. return best;
  1046. }
  1047. void setWaiting(unsigned numqueues,sQueueData **queues, bool set)
  1048. {
  1049. for (unsigned i=0; i<numqueues; i++) {
  1050. IPropertyTree *croot = queryClientRootSession(*queues[i]);
  1051. croot->setPropInt64("@waiting",croot->getPropInt64("@waiting",0)+(set?1:-1));
  1052. }
  1053. }
  1054. // 'simple' queuing
  1055. IJobQueueItem *dodequeue(int minprio,unsigned timeout=INFINITE, bool useprev=false, bool *timedout=NULL)
  1056. {
  1057. bool hasminprio=(minprio!=INT_MIN);
  1058. if (timedout)
  1059. *timedout = false;
  1060. IJobQueueItem *ret=NULL;
  1061. bool waitingset = false;
  1062. while (!dequeuestop) {
  1063. unsigned t = 0;
  1064. if (timeout!=(unsigned)INFINITE)
  1065. t = msTick();
  1066. {
  1067. Cconnlockblock block(this,true);
  1068. block.setRollback(true); // assume not going to update
  1069. // now cycle through queues looking at state
  1070. unsigned total = 0;
  1071. unsigned stopped = 0;
  1072. PointerArray active;
  1073. ForEachQueue(qd) {
  1074. total++;
  1075. const char *state = qd->root->queryProp("@state");
  1076. if (state) {
  1077. if (strcmp(state,"stopped")==0)
  1078. stopped++;
  1079. else if (strcmp(state,"paused")!=0)
  1080. active.append(qd);
  1081. }
  1082. else
  1083. active.append(qd);
  1084. }
  1085. if (stopped==total)
  1086. return NULL; // all stopped
  1087. sQueueData **activeqds = (sQueueData **)active.getArray();
  1088. unsigned activenum = active.ordinality();
  1089. if (activenum) {
  1090. sQueueData *bestqd = findbestqueue(useprev,minprio,activenum,activeqds);
  1091. unsigned count = bestqd?bestqd->root->getPropInt("@count"):0;
  1092. // load minp from cache
  1093. if (count) {
  1094. int mpr = useprev?std::max(bestqd->root->getPropInt("@prevpriority"),minprio):minprio;
  1095. if (!hasminprio||checkprio(*bestqd,mpr)) {
  1096. block.setRollback(false);
  1097. ret = dotake(*bestqd,NULL,true,hasminprio,mpr);
  1098. if (ret) // think it must be!
  1099. timeout = 0; // so mark that done
  1100. else if (!hasminprio) {
  1101. WARNLOG("Resetting queue %s",bestqd->qname.get());
  1102. clear(*bestqd); // reset queue as seems to have become out of sync
  1103. }
  1104. }
  1105. }
  1106. if (timeout!=0) { // more to do
  1107. if (!connected) { // if connect already done non-zero
  1108. connect(validateitemsessions);
  1109. block.setRollback(false);
  1110. }
  1111. if (!waitingset) {
  1112. setWaiting(activenum,activeqds,true);
  1113. block.commit();
  1114. waitingset = true;
  1115. }
  1116. }
  1117. }
  1118. if (timeout==0) {
  1119. if (waitingset) {
  1120. setWaiting(activenum,activeqds,false);
  1121. block.commit();
  1122. }
  1123. if (timedout)
  1124. *timedout = (ret==NULL);
  1125. break;
  1126. }
  1127. }
  1128. unsigned to = 5*60*1000;
  1129. // check every 5 mins independant of notify (in case subscription lost for some reason)
  1130. if (to>timeout)
  1131. to = timeout;
  1132. notifysem.wait(to);
  1133. if (timeout!=(unsigned)INFINITE) {
  1134. t = msTick()-t;
  1135. if (t<timeout)
  1136. timeout -= t;
  1137. else
  1138. timeout = 0;
  1139. }
  1140. }
  1141. return ret;
  1142. }
  1143. IJobQueueItem *dequeue(unsigned timeout=INFINITE)
  1144. {
  1145. return dodequeue(INT_MIN,timeout);
  1146. }
  1147. IJobQueueItem *prioDequeue(int minprio,unsigned timeout=INFINITE) // minprio == MAX_INT - used cache priority
  1148. {
  1149. return dodequeue(minprio,timeout);
  1150. }
  1151. void placeonqueue(sQueueData &qd, IJobQueueItem *qitem,unsigned idx) // takes ownership of qitem
  1152. {
  1153. Owned<IJobQueueItem> qi = qitem;
  1154. remove(qi->queryWUID()); // just in case trying to put on twice!
  1155. int priority = qi->getPriority();
  1156. unsigned count = qd.root->getPropInt("@count");
  1157. StringBuffer path;
  1158. if (count&&(idx!=(unsigned)-1)) { // need to check before and after
  1159. if (idx) {
  1160. IPropertyTree *pt = qd.root->queryPropTree(getItemPath(path.clear(),idx-1).str());
  1161. if (pt) {
  1162. int pp = pt->getPropInt("@priority");
  1163. if (priority>pp) {
  1164. qi->setPriority(pp);
  1165. priority = pp;
  1166. }
  1167. }
  1168. else // what happened here?
  1169. idx = (unsigned)-1;
  1170. }
  1171. if (idx<count) {
  1172. IPropertyTree *pt = qd.root->queryPropTree(getItemPath(path.clear(),idx).str());
  1173. if (pt) {
  1174. int pp = pt->getPropInt("@priority");
  1175. if (priority<pp) {
  1176. qi->setPriority(pp);
  1177. priority = pp;
  1178. }
  1179. }
  1180. else // what happened here?
  1181. idx = (unsigned)-1;
  1182. }
  1183. }
  1184. if (idx==(unsigned)-1) {
  1185. idx = count;
  1186. while (idx) {
  1187. IPropertyTree *previtem = qd.root->queryPropTree(getItemPath(path.clear(),idx-1).str());
  1188. if (previtem) {
  1189. if (previtem->getPropInt("@priority")>=priority) {
  1190. break;
  1191. }
  1192. }
  1193. else
  1194. count--; // how did that happen?
  1195. idx--;
  1196. }
  1197. }
  1198. CJobQueueItem::assignBranch(addItem(qd,createPTree("Item"),idx,count),qi);
  1199. qd.root->setPropInt("@count",count+1);
  1200. }
  1201. void enqueue(sQueueData &qd,IJobQueueItem *qitem) // takes ownership of qitem
  1202. {
  1203. Cconnlockblock block(this,true);
  1204. placeonqueue(qd,qitem,(unsigned)-1);
  1205. }
  1206. void enqueueBefore(sQueueData &qd,IJobQueueItem *qitem,const char *wuid)
  1207. {
  1208. Cconnlockblock block(this,true);
  1209. placeonqueue(qd,qitem,doFindRank(qd,wuid));
  1210. }
  1211. void enqueueAfter(sQueueData &qd,IJobQueueItem *qitem,const char *wuid)
  1212. {
  1213. Cconnlockblock block(this,true);
  1214. unsigned idx = doFindRank(qd,wuid);
  1215. if (idx!=(unsigned)-1)
  1216. idx++;
  1217. placeonqueue(qd,qitem,idx);
  1218. }
  1219. void enqueueTail(sQueueData &qd,IJobQueueItem *qitem)
  1220. {
  1221. Cconnlockblock block(this,true);
  1222. Owned<IJobQueueItem> qi = getTail(qd);
  1223. if (qi)
  1224. enqueueAfter(qd,qitem,qi->queryWUID());
  1225. else
  1226. enqueue(qd,qitem);
  1227. }
  1228. void enqueueHead(sQueueData &qd,IJobQueueItem *qitem)
  1229. {
  1230. Cconnlockblock block(this,true);
  1231. Owned<IJobQueueItem> qi = doGetItem(qd, 0);
  1232. if (qi)
  1233. enqueueBefore(qd,qitem,qi->queryWUID());
  1234. else
  1235. enqueue(qd,qitem);
  1236. }
  1237. unsigned ordinality(sQueueData &qd)
  1238. {
  1239. Cconnlockblock block(this,false);
  1240. return qd.root->getPropInt("@count");
  1241. }
  1242. IJobQueueItem *getTail(sQueueData &qd)
  1243. {
  1244. return doGetItem(qd,(unsigned)-1);
  1245. }
  1246. IJobQueueItem *loadItem(sQueueData &qd,IJobQueueItem *qi)
  1247. {
  1248. Cconnlockblock block(this,false);
  1249. StringBuffer path;
  1250. IPropertyTree *item = qd.root->queryPropTree(getItemPath(path,qi->queryWUID()).str());
  1251. if (!item)
  1252. return NULL;
  1253. bool cached = item->getPropInt("@num",0)<=0;
  1254. if (cached)
  1255. return NULL; // don't want cached value
  1256. return new CJobQueueItem(item);
  1257. }
  1258. bool checkprio(sQueueData &qd,int minprio=0)
  1259. {
  1260. StringBuffer path;
  1261. IPropertyTree *item = qd.root->queryPropTree(getItemPath(path,0U).str());
  1262. if (!item)
  1263. return false;
  1264. return (item->getPropInt("@priority")>=minprio);
  1265. }
  1266. IJobQueueItem *dotake(sQueueData &qd,const char *wuid,bool saveitem,bool hasminprio=false,int minprio=0)
  1267. {
  1268. StringBuffer path;
  1269. IPropertyTree *item = qd.root->queryPropTree(getItemPath(path,wuid).str());
  1270. if (!item)
  1271. return NULL;
  1272. if (item->getPropInt("@num",0)<=0)
  1273. return NULL; // don't want (old) cached value
  1274. if (hasminprio&&(item->getPropInt("@priority")<minprio))
  1275. return NULL;
  1276. IJobQueueItem *ret = new CJobQueueItem(item);
  1277. removeItem(qd,item,saveitem);
  1278. unsigned count = qd.root->getPropInt("@count");
  1279. assertex(count);
  1280. qd.root->setPropInt("@count",count-1);
  1281. return ret;
  1282. }
  1283. IJobQueueItem *take(sQueueData &qd,const char *wuid)
  1284. {
  1285. Cconnlockblock block(this,true);
  1286. return dotake(qd,wuid,false);
  1287. }
  1288. unsigned takeItems(sQueueData &qd,CJobQueueContents &dest)
  1289. {
  1290. Cconnlockblock block(this,true);
  1291. unsigned ret = copyItemsImpl(qd,dest);
  1292. clear(qd);
  1293. return ret;
  1294. }
  1295. void enqueueItems(sQueueData &qd,CJobQueueContents &items)
  1296. {
  1297. unsigned n=items.ordinality();
  1298. if (n) {
  1299. Cconnlockblock block(this,true);
  1300. for (unsigned i=0;i<n;i++)
  1301. enqueue(qd,items.item(i).clone());
  1302. }
  1303. }
  1304. void enqueueBefore(IJobQueueItem *qitem,const char *wuid)
  1305. {
  1306. Cconnlockblock block(this,true);
  1307. sQueueData *qd = qdata->next?findQD(wuid):qdata;
  1308. enqueueBefore(*qd,qitem,wuid);
  1309. }
  1310. void enqueueAfter(IJobQueueItem *qitem,const char *wuid)
  1311. {
  1312. Cconnlockblock block(this,true);
  1313. sQueueData *qd = qdata->next?findQD(wuid):qdata;
  1314. enqueueAfter(*qd,qitem,wuid);
  1315. }
  1316. bool moveBefore(const char *wuid,const char *nextwuid)
  1317. {
  1318. if (!qdata)
  1319. return false;
  1320. Cconnlockblock block(this,true);
  1321. sQueueData *qd = qdata->next?findQD(wuid):qdata;
  1322. if (!qd)
  1323. return false;
  1324. IJobQueueItem *qi=take(*qd,wuid);
  1325. if (!qi)
  1326. return false;
  1327. sQueueData *qdd = NULL;
  1328. if (qdata->next)
  1329. qdd = findQD(nextwuid);
  1330. if (!qdd)
  1331. qdd = qd;
  1332. enqueueBefore(*qdd,qi,nextwuid);
  1333. return true;
  1334. }
  1335. bool moveAfter(const char *wuid,const char *prevwuid)
  1336. {
  1337. if (!qdata)
  1338. return false;
  1339. Cconnlockblock block(this,true);
  1340. sQueueData *qd = qdata->next?findQD(wuid):qdata;
  1341. if (!qd)
  1342. return false;
  1343. IJobQueueItem *qi=take(*qd,wuid);
  1344. if (!qi)
  1345. return false;
  1346. sQueueData *qdd = NULL;
  1347. if (qdata->next)
  1348. qdd = findQD(prevwuid);
  1349. if (!qdd)
  1350. qdd = qd;
  1351. enqueueAfter(*qdd,qi,prevwuid);
  1352. return true;
  1353. }
  1354. bool moveToHead(const char *wuid)
  1355. {
  1356. if (!qdata)
  1357. return false;
  1358. Cconnlockblock block(this,true);
  1359. sQueueData *qd = qdata->next?findQD(wuid):qdata;
  1360. if (!qd)
  1361. return false;
  1362. IJobQueueItem *qi=take(*qd,wuid);
  1363. if (!qi)
  1364. return false;
  1365. enqueueHead(*qd,qi);
  1366. return true;
  1367. }
  1368. bool moveToTail(const char *wuid)
  1369. {
  1370. if (!qdata)
  1371. return false;
  1372. Cconnlockblock block(this,true);
  1373. sQueueData *qd = qdata->next?findQD(wuid):qdata;
  1374. if (!qd)
  1375. return false;
  1376. IJobQueueItem *qi=take(*qd,wuid);
  1377. if (!qi)
  1378. return false;
  1379. enqueueTail(*qd,qi);
  1380. return true;
  1381. }
  1382. bool remove(const char *wuid)
  1383. {
  1384. if (!qdata)
  1385. return false;
  1386. Cconnlockblock block(this,true);
  1387. sQueueData *qd = qdata->next?findQD(wuid):qdata;
  1388. if (!qd)
  1389. return false;
  1390. StringBuffer path;
  1391. IPropertyTree *item = qd->root->queryPropTree(getItemPath(path,wuid).str());
  1392. if (!item)
  1393. return false;
  1394. bool cached = item->getPropInt("@num",0)<=0; // old cached (bwd compat)
  1395. removeItem(*qd,item,false);
  1396. if (!cached) {
  1397. unsigned count = qd->root->getPropInt("@count");
  1398. assertex(count);
  1399. qd->root->setPropInt("@count",count-1);
  1400. }
  1401. return true;
  1402. }
  1403. bool changePriority(const char *wuid,int value)
  1404. {
  1405. if (!qdata)
  1406. return false;
  1407. Cconnlockblock block(this,true);
  1408. sQueueData *qd = qdata->next?findQD(wuid):qdata;
  1409. if (!qd)
  1410. return false;
  1411. IJobQueueItem *qi=take(*qd,wuid);
  1412. if (!qi) {
  1413. StringBuffer ws("~"); // change cached item
  1414. ws.append(wuid);
  1415. StringBuffer path;
  1416. IPropertyTree *item = qd->root->queryPropTree(getItemPath(path,ws.str()).str());
  1417. if (item) {
  1418. item->setPropInt("@priority",value);
  1419. return true;
  1420. }
  1421. return false;
  1422. }
  1423. qi->setPriority(value);
  1424. enqueue(*qd,qi);
  1425. return true;
  1426. }
  1427. void clear(sQueueData &qd)
  1428. {
  1429. Cconnlockblock block(this,true);
  1430. qd.root->setPropInt("@count",0);
  1431. loop {
  1432. IPropertyTree *item = qd.root->queryPropTree("Item[1]");
  1433. if (!item)
  1434. break;
  1435. qd.root->removeTree(item);
  1436. }
  1437. }
  1438. void lock()
  1439. {
  1440. connlock(false); // sub functions will change to exclusive if needed
  1441. }
  1442. void unlock(bool rollback=false)
  1443. {
  1444. connunlock(rollback);
  1445. }
  1446. void pause(sQueueData &qd)
  1447. {
  1448. Cconnlockblock block(this,true);
  1449. qd.root->setProp("@state","paused");
  1450. }
  1451. void resume(sQueueData &qd)
  1452. {
  1453. Cconnlockblock block(this,true);
  1454. qd.root->setProp("@state","active");
  1455. }
  1456. bool paused(sQueueData &qd)
  1457. {
  1458. Cconnlockblock block(this,false);
  1459. const char *state = qd.root->queryProp("@state");
  1460. return (state&&(strcmp(state,"paused")==0));
  1461. }
  1462. void stop(sQueueData &qd)
  1463. {
  1464. Cconnlockblock block(this,true);
  1465. qd.root->setProp("@state","stopped");
  1466. }
  1467. bool stopped(sQueueData &qd)
  1468. {
  1469. Cconnlockblock block(this,false);
  1470. const char *state = qd.root->queryProp("@state");
  1471. return (state&&(strcmp(state,"stopped")==0));
  1472. }
  1473. void doGetStats(sQueueData &qd,unsigned &connected,unsigned &waiting,unsigned &enqueued)
  1474. {
  1475. Cconnlockblock block(this,false);
  1476. connected = 0;
  1477. waiting = 0;
  1478. unsigned i=0;
  1479. loop {
  1480. IPropertyTree *croot = queryClientRootIndex(qd,i);
  1481. if (!croot)
  1482. break;
  1483. if (!validSession(croot)) {
  1484. Cconnlockblock block(this,true);
  1485. qd.root->removeTree(croot);
  1486. }
  1487. else {
  1488. waiting += croot->getPropInt("@waiting");
  1489. connected += croot->getPropInt("@connected");
  1490. i++;
  1491. }
  1492. }
  1493. // now remove any duff queue items
  1494. unsigned count = qd.root->getPropInt("@count");
  1495. if (!validateitemsessions) {
  1496. enqueued = count;
  1497. return;
  1498. }
  1499. i=0;
  1500. StringBuffer path;
  1501. loop {
  1502. IPropertyTree *item = qd.root->queryPropTree(getItemPath(path.clear(),i).str());
  1503. if (!item)
  1504. break;
  1505. if (!validSession(item)) {
  1506. Cconnlockblock block(this,true);
  1507. item = qd.root->queryPropTree(path.str());
  1508. if (!item)
  1509. break;
  1510. // PROGLOG("WUJOBQ: Removing %s as session %" I64F "x not active",item->queryProp("@wuid"),item->getPropInt64("@session"));
  1511. removeItem(qd,item,false);
  1512. }
  1513. else
  1514. i++;
  1515. }
  1516. if (count!=i) {
  1517. Cconnlockblock block(this,true);
  1518. qd.root->setPropInt("@count",i);
  1519. }
  1520. enqueued = i;
  1521. }
  1522. void getStats(sQueueData &qd,unsigned &connected,unsigned &waiting,unsigned &enqueued)
  1523. {
  1524. Cconnlockblock block(this,false);
  1525. doGetStats(qd,connected,waiting,enqueued);
  1526. }
  1527. void getStats(unsigned &connected,unsigned &waiting,unsigned &enqueued)
  1528. {
  1529. // multi queue
  1530. Cconnlockblock block(this,false);
  1531. connected=0;
  1532. waiting=0;
  1533. enqueued=0;
  1534. ForEachQueue(qd) {
  1535. unsigned c;
  1536. unsigned w;
  1537. unsigned e;
  1538. doGetStats(*qd,c,w,e);
  1539. connected+=c;
  1540. waiting+=w;
  1541. enqueued+=e;
  1542. }
  1543. }
  1544. IJobQueueItem *take(const char *wuid)
  1545. {
  1546. assertex(qdata);
  1547. if (!qdata->next)
  1548. return take(*qdata,wuid);
  1549. Cconnlockblock block(this,true);
  1550. ForEachQueue(qd) {
  1551. IJobQueueItem *ret = dotake(*qd,wuid,false);
  1552. if (ret)
  1553. return ret;
  1554. }
  1555. return NULL;
  1556. }
  1557. unsigned takeItems(CJobQueueContents &dest)
  1558. {
  1559. assertex(qdata);
  1560. if (!qdata->next)
  1561. return takeItems(*qdata,dest);
  1562. Cconnlockblock block(this,true);
  1563. unsigned ret = 0;
  1564. ForEachQueue(qd) {
  1565. ret += copyItemsImpl(*qd,dest);
  1566. clear(*qd);
  1567. }
  1568. return ret;
  1569. }
  1570. void enqueueItems(CJobQueueContents &items)
  1571. { // enqueues to firs sub-queue (not sure that useful)
  1572. assertex(qdata);
  1573. return enqueueItems(*qdata,items);
  1574. }
  1575. void clear()
  1576. {
  1577. ForEachQueue(qd) {
  1578. clear(*qd);
  1579. }
  1580. }
  1581. bool validSession(IPropertyTree *item)
  1582. {
  1583. Owned<INode> node = createINode(item->queryProp("@node"),DALI_SERVER_PORT); // port should always be present
  1584. return (querySessionManager().lookupProcessSession(node)==(SessionId)item->getPropInt64("@session"));
  1585. }
  1586. IConversation *initiateConversation(sQueueData &qd,IJobQueueItem *item)
  1587. {
  1588. CriticalBlock block(crit);
  1589. assertex(!initiateconv.get());
  1590. SocketEndpoint ep = item->queryEndpoint();
  1591. unsigned short port = (unsigned short)item->getPort();
  1592. initiateconv.setown(createSingletonSocketConnection(port));
  1593. if (!port)
  1594. item->setPort(initiateconv->setRandomPort(WUJOBQ_BASE_PORT,WUJOBQ_PORT_NUM));
  1595. initiatewu.set(item->queryWUID());
  1596. enqueue(qd,item);
  1597. bool ok;
  1598. {
  1599. CriticalUnblock unblock(crit);
  1600. ok = initiateconv->accept(INFINITE);
  1601. }
  1602. if (!ok)
  1603. initiateconv.clear();
  1604. return initiateconv.getClear();
  1605. }
  1606. IConversation *acceptConversation(IJobQueueItem *&retitem, unsigned prioritytransitiondelay,IDynamicPriority *maxp)
  1607. {
  1608. CriticalBlock block(crit);
  1609. retitem = NULL;
  1610. assertex(connected); // must be connected
  1611. int curmp = maxp?maxp->get():0;
  1612. int nextmp = curmp;
  1613. loop {
  1614. bool timedout = false;
  1615. Owned<IJobQueueItem> item;
  1616. {
  1617. CriticalUnblock unblock(crit);
  1618. // this is a bit complicated with multi-thor
  1619. if (prioritytransitiondelay||maxp) {
  1620. item.setown(dodequeue((std::max(curmp,nextmp)/10)*10, // round down to multiple of 10
  1621. prioritytransitiondelay?prioritytransitiondelay:60000,prioritytransitiondelay>0,&timedout));
  1622. // if dynamic priority check every minute
  1623. if (!prioritytransitiondelay) {
  1624. curmp = nextmp; // using max above is a bit devious to allow transition
  1625. nextmp = maxp->get();
  1626. }
  1627. }
  1628. else
  1629. item.setown(dequeue(INFINITE));
  1630. }
  1631. if (item.get()) {
  1632. if (item->isValidSession()) {
  1633. SocketEndpoint ep = item->queryEndpoint();
  1634. ep.port = item->getPort();
  1635. Owned<IConversation> acceptconv = createSingletonSocketConnection(ep.port,&ep);
  1636. if (acceptconv->connect(3*60*1000)) { // shouldn't need that long
  1637. retitem = item.getClear();
  1638. return acceptconv.getClear();
  1639. }
  1640. }
  1641. }
  1642. else if (prioritytransitiondelay)
  1643. prioritytransitiondelay = 0;
  1644. else if (!timedout)
  1645. break;
  1646. }
  1647. return NULL;
  1648. }
  1649. void cancelInitiateConversation(sQueueData &qd)
  1650. {
  1651. CriticalBlock block(crit);
  1652. if (initiatewu.get())
  1653. remove(initiatewu);
  1654. if (initiateconv.get())
  1655. initiateconv->cancel();
  1656. }
  1657. void cancelAcceptConversation()
  1658. {
  1659. CriticalBlock block(crit);
  1660. dequeuestop = true;
  1661. notifysem.signal();
  1662. }
  1663. bool cancelInitiateConversation(sQueueData &qd,const char *wuid)
  1664. {
  1665. Cconnlockblock block(this,true);
  1666. loop {
  1667. Owned<IJobQueueItem> item = dotake(qd,wuid,false);
  1668. if (!item.get())
  1669. break;
  1670. if (item->isValidSession()) {
  1671. SocketEndpoint ep = item->queryEndpoint();
  1672. ep.port = item->getPort();
  1673. Owned<IConversation> acceptconv = createSingletonSocketConnection(ep.port,&ep);
  1674. acceptconv->connect(3*60*1000); // connect then close should close other end
  1675. return true;
  1676. }
  1677. }
  1678. return false;
  1679. }
  1680. bool waitStatsChange(unsigned timeout)
  1681. {
  1682. assertex(!connected); // not allowed to call this while connected
  1683. cancelwaiting = false;
  1684. while(!cancelwaiting) {
  1685. {
  1686. Cconnlockblock block(this,false);
  1687. if (haschanged())
  1688. return true;
  1689. }
  1690. if (!notifysem.wait(timeout))
  1691. break;
  1692. }
  1693. return false;
  1694. }
  1695. void cancelWaitStatsChange()
  1696. {
  1697. CriticalBlock block(crit);
  1698. cancelwaiting = true;
  1699. notifysem.signal();
  1700. }
  1701. virtual void enqueue(IJobQueueItem *qitem)
  1702. {
  1703. enqueue(*activeq,qitem);
  1704. }
  1705. void enqueueHead(IJobQueueItem *qitem)
  1706. {
  1707. enqueueHead(*activeq,qitem);
  1708. }
  1709. void enqueueTail(IJobQueueItem *qitem)
  1710. {
  1711. enqueueTail(*activeq,qitem);
  1712. }
  1713. void pause()
  1714. {
  1715. Cconnlockblock block(this,true);
  1716. ForEachQueue(qd) {
  1717. if (qd->root)
  1718. qd->root->setProp("@state","paused");
  1719. }
  1720. }
  1721. void pause(const char* info)
  1722. {
  1723. Cconnlockblock block(this,true);
  1724. ForEachQueue(qd) {
  1725. if (qd->root) {
  1726. qd->root->setProp("@state","paused");
  1727. if (info && *info)
  1728. qd->root->setProp("@stateDetails",info);
  1729. }
  1730. }
  1731. }
  1732. void stop()
  1733. {
  1734. Cconnlockblock block(this,true);
  1735. ForEachQueue(qd) {
  1736. if (qd->root)
  1737. qd->root->setProp("@state","stopped");
  1738. }
  1739. }
  1740. void stop(const char* info)
  1741. {
  1742. Cconnlockblock block(this,true);
  1743. ForEachQueue(qd) {
  1744. if (qd->root) {
  1745. qd->root->setProp("@state","stopped");
  1746. if (info && *info)
  1747. qd->root->setProp("@stateDetails",info);
  1748. }
  1749. }
  1750. }
  1751. void resume()
  1752. {
  1753. Cconnlockblock block(this,true);
  1754. ForEachQueue(qd) {
  1755. if (qd->root)
  1756. qd->root->setProp("@state","active");
  1757. }
  1758. }
  1759. void resume(const char* info)
  1760. {
  1761. Cconnlockblock block(this,true);
  1762. ForEachQueue(qd) {
  1763. if (qd->root) {
  1764. qd->root->setProp("@state","active");
  1765. if (info && *info)
  1766. qd->root->setProp("@stateDetails",info);
  1767. }
  1768. }
  1769. }
  1770. IConversation *initiateConversation(IJobQueueItem *item)
  1771. {
  1772. return initiateConversation(*activeq,item);
  1773. }
  1774. void cancelInitiateConversation()
  1775. {
  1776. return cancelInitiateConversation(*activeq);
  1777. }
  1778. bool cancelInitiateConversation(const char *wuid)
  1779. {
  1780. return cancelInitiateConversation(*activeq,wuid);
  1781. }
  1782. const char * queryActiveQueueName()
  1783. {
  1784. return activeq->qname;
  1785. }
  1786. void setActiveQueue(const char *name)
  1787. {
  1788. ForEachQueue(qd) {
  1789. if (!name||(strcmp(qd->qname.get(),name)==0)) {
  1790. activeq = qd;
  1791. return;
  1792. }
  1793. }
  1794. if (name)
  1795. throw MakeStringException (-1,"queue %s not found",name);
  1796. }
  1797. const char *nextQueueName(const char *last)
  1798. {
  1799. ForEachQueue(qd) {
  1800. if (!last||(strcmp(qd->qname.get(),last)==0)) {
  1801. if (qd->next)
  1802. return qd->next->qname.get();
  1803. break;
  1804. }
  1805. }
  1806. return NULL;
  1807. }
  1808. virtual bool paused()
  1809. {
  1810. Cconnlockblock block(this,false);
  1811. return CJobQueueBase::paused();
  1812. }
  1813. virtual bool paused(StringBuffer& info)
  1814. {
  1815. Cconnlockblock block(this,false);
  1816. return CJobQueueBase::paused(info);
  1817. }
  1818. virtual bool stopped()
  1819. {
  1820. Cconnlockblock block(this,false);
  1821. return CJobQueueBase::stopped();
  1822. }
  1823. virtual bool stopped(StringBuffer& info)
  1824. {
  1825. Cconnlockblock block(this,false);
  1826. return CJobQueueBase::stopped(info);
  1827. }
  1828. virtual unsigned ordinality()
  1829. {
  1830. Cconnlockblock block(this,false);
  1831. return CJobQueueBase::ordinality();
  1832. }
  1833. virtual unsigned waiting()
  1834. {
  1835. Cconnlockblock block(this,false);
  1836. return CJobQueueBase::waiting();
  1837. }
  1838. virtual IJobQueueItem *getItem(unsigned idx)
  1839. {
  1840. Cconnlockblock block(this,false);
  1841. return CJobQueueBase::getItem(idx);
  1842. }
  1843. virtual IJobQueueItem *getHead()
  1844. {
  1845. Cconnlockblock block(this,false);
  1846. return CJobQueueBase::getHead();
  1847. }
  1848. virtual IJobQueueItem *getTail()
  1849. {
  1850. Cconnlockblock block(this,false);
  1851. return CJobQueueBase::getTail();
  1852. }
  1853. virtual IJobQueueItem *find(const char *wuid)
  1854. {
  1855. Cconnlockblock block(this,false);
  1856. return CJobQueueBase::find(wuid);
  1857. }
  1858. virtual unsigned findRank(const char *wuid)
  1859. {
  1860. Cconnlockblock block(this,false);
  1861. return CJobQueueBase::findRank(wuid);
  1862. }
  1863. virtual unsigned copyItems(CJobQueueContents &dest)
  1864. {
  1865. Cconnlockblock block(this,false);
  1866. return CJobQueueBase::copyItems(dest);
  1867. }
  1868. virtual bool getLastDequeuedInfo(StringAttr &wuid, CDateTime &enqueuedt, int &priority)
  1869. {
  1870. Cconnlockblock block(this,false);
  1871. return CJobQueueBase::doGetLastDequeuedInfo(activeq, wuid, enqueuedt, priority);
  1872. }
  1873. virtual void copyItemsAndState(CJobQueueContents& contents, StringBuffer& state, StringBuffer& stateDetails)
  1874. {
  1875. Cconnlockblock block(this,false);
  1876. CJobQueueBase::copyItemsAndState(contents, state, stateDetails);
  1877. }
  1878. virtual void getState(StringBuffer& state, StringBuffer& stateDetails)
  1879. {
  1880. Cconnlockblock block(this,false);
  1881. CJobQueueBase::getState(state, stateDetails);
  1882. }
  1883. };
  1884. class CJQSnapshot : public CInterface, implements IJQSnapshot
  1885. {
  1886. Owned<IPropertyTree> jobQueueInfo;
  1887. public:
  1888. IMPLEMENT_IINTERFACE;
  1889. CJQSnapshot()
  1890. {
  1891. Owned<IRemoteConnection> connJobQueues = querySDS().connect("/JobQueues", myProcessSession(), RTM_LOCK_READ, 30000);
  1892. if (!connJobQueues)
  1893. throw MakeStringException(-1, "CJQSnapshot::CJQSnapshot: /JobQueues not found");
  1894. jobQueueInfo.setown(createPTreeFromIPT(connJobQueues->queryRoot()));
  1895. }
  1896. IJobQueueConst* getJobQueue(const char *name)
  1897. {
  1898. if (!jobQueueInfo)
  1899. return NULL;
  1900. return new CJobQueueConst(name, jobQueueInfo.getLink());
  1901. }
  1902. };
  1903. IJQSnapshot *createJQSnapshot()
  1904. {
  1905. return new CJQSnapshot();
  1906. }
  1907. IJobQueue *createJobQueue(const char *name)
  1908. {
  1909. if (!name||!*name)
  1910. throw MakeStringException(-1,"createJobQueue empty name");
  1911. return new CJobQueue(name);
  1912. }
  1913. extern bool WORKUNIT_API runWorkUnit(const char *wuid, const char *cluster)
  1914. {
  1915. Owned<IConstWUClusterInfo> clusterInfo = getTargetClusterInfo(cluster);
  1916. if (!clusterInfo.get())
  1917. return false;
  1918. SCMStringBuffer agentQueue;
  1919. clusterInfo->getAgentQueue(agentQueue);
  1920. if (!agentQueue.length())
  1921. return false;
  1922. Owned<IJobQueue> queue = createJobQueue(agentQueue.str());
  1923. if (!queue.get())
  1924. throw MakeStringException(-1, "Could not create workunit queue");
  1925. IJobQueueItem *item = createJobQueueItem(wuid);
  1926. queue->enqueue(item);
  1927. PROGLOG("Agent request '%s' enqueued on '%s'", wuid, agentQueue.str());
  1928. return true;
  1929. }
  1930. extern bool WORKUNIT_API runWorkUnit(const char *wuid)
  1931. {
  1932. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  1933. Owned<IConstWorkUnit> w = factory->openWorkUnit(wuid, false);
  1934. if (w)
  1935. {
  1936. StringAttr clusterName = (w->queryClusterName());
  1937. w.clear();
  1938. return runWorkUnit(wuid, clusterName.str());
  1939. }
  1940. else
  1941. return false;
  1942. }
  1943. extern WORKUNIT_API StringBuffer &getQueuesContainingWorkUnit(const char *wuid, StringBuffer &queueList)
  1944. {
  1945. Owned<IRemoteConnection> conn = querySDS().connect("/JobQueues", myProcessSession(), RTM_LOCK_READ, 5000);
  1946. if (!conn)
  1947. return queueList;
  1948. VStringBuffer xpath("Queue[Item/@wuid='%s']", wuid);
  1949. Owned<IPropertyTreeIterator> it = conn->getElements(xpath.str());
  1950. ForEach(*it)
  1951. {
  1952. if (queueList.length())
  1953. queueList.append(',');
  1954. queueList.append(it->query().queryProp("@name"));
  1955. }
  1956. return queueList;
  1957. }
  1958. extern void WORKUNIT_API removeWorkUnitFromAllQueues(const char *wuid)
  1959. {
  1960. StringBuffer queueList;
  1961. if (!getQueuesContainingWorkUnit(wuid, queueList).length())
  1962. return;
  1963. Owned<IJobQueue> q = createJobQueue(queueList.str());
  1964. if (q)
  1965. while(q->remove(wuid));
  1966. }
  1967. extern bool WORKUNIT_API switchWorkUnitQueue(IWorkUnit* wu, const char *cluster)
  1968. {
  1969. if (!wu)
  1970. return false;
  1971. class cQswitcher: public CInterface, implements IQueueSwitcher
  1972. {
  1973. public:
  1974. IMPLEMENT_IINTERFACE;
  1975. void * getQ(const char * qname, const char * wuid)
  1976. {
  1977. Owned<IJobQueue> q = createJobQueue(qname);
  1978. return q->take(wuid);
  1979. }
  1980. void putQ(const char * qname, const char * wuid, void * qitem)
  1981. {
  1982. Owned<IJobQueue> q = createJobQueue(qname);
  1983. q->enqueue((IJobQueueItem *)qitem);
  1984. }
  1985. bool isAuto()
  1986. {
  1987. return false;
  1988. }
  1989. } switcher;
  1990. return wu->switchThorQueue(cluster, &switcher);
  1991. }