wujobq.cpp 56 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include <algorithm>
  15. #include "limits.h"
  16. #include "jlib.hpp"
  17. #include "jbuff.hpp"
  18. #include "dasess.hpp"
  19. #include "dautils.hpp"
  20. #include "portlist.h"
  21. #include "dacoven.hpp"
  22. #include "daclient.hpp"
  23. #include "dasds.hpp"
  24. #include "dasess.hpp"
  25. #include "wujobq.hpp"
  26. #ifdef _MSC_VER
  27. #pragma warning (disable : 4355)
  28. #endif
  29. #if 0
  30. JobQueues
  31. JobQueue @name= @count= @state=active|paused|stopped
  32. Edition <num>
  33. Client @session= @connected= @waiting= -- connections and waiting can be > 1 (multiple threads)
  34. Item* @wuid @owner @node @port @priority @session
  35. #endif
  36. class CJobQueueItem: public CInterface, implements IJobQueueItem
  37. {
  38. StringAttr wu;
  39. StringAttr owner;
  40. int priority;
  41. SessionId sessid;
  42. SocketEndpoint ep;
  43. unsigned port;
  44. CDateTime enqueuedt;
  45. public:
  46. IMPLEMENT_IINTERFACE;
  47. CJobQueueItem(MemoryBuffer &src)
  48. {
  49. deserialize(src);
  50. }
  51. CJobQueueItem(const char *_wu)
  52. : wu(_wu)
  53. {
  54. priority = 0;
  55. ep = queryMyNode()->endpoint();
  56. port = 0;
  57. sessid = myProcessSession();
  58. }
  59. CJobQueueItem(IPropertyTree *item)
  60. {
  61. const char * wuid = item->queryProp("@wuid");
  62. if (*wuid=='~')
  63. wuid++;
  64. wu.set(wuid);
  65. owner.set(item->queryProp("@owner"));
  66. sessid = (SessionId)item->getPropInt64("@session");
  67. priority = item->getPropInt("@priority");
  68. ep.set(item->queryProp("@node"));
  69. port = (unsigned)item->getPropInt("@port");
  70. StringBuffer dts;
  71. if (item->getProp("@enqueuedt",dts))
  72. enqueuedt.setString(dts.str());
  73. }
  74. static void assignBranch(IPropertyTree *item,IJobQueueItem *qi)
  75. {
  76. item->setPropInt64("@session",qi->getSessionId());
  77. item->setPropInt("@priority",qi->getPriority());
  78. item->setPropInt("@port",qi->getPort());
  79. item->setProp("@wuid",qi->queryWUID());
  80. item->setProp("@owner",qi->queryOwner());
  81. StringBuffer eps;
  82. qi->queryEndpoint().getUrlStr(eps);
  83. item->setProp("@node",eps.str());
  84. StringBuffer dts;
  85. qi->queryEnqueuedTime().getString(dts);
  86. if (dts.length()==0) {
  87. CDateTime dt;
  88. dt.setNow();
  89. dt.getString(dts);
  90. qi->setEnqueuedTime(dt);
  91. }
  92. item->setProp("@enqueuedt",dts.str());
  93. }
  94. IPropertyTree *createBranch(CJobQueueItem)
  95. {
  96. IPropertyTree *item = createPTree("Item");
  97. assignBranch(item,this);
  98. return item;
  99. }
  100. const char *queryWUID()
  101. {
  102. return wu.get();
  103. }
  104. int getPriority()
  105. {
  106. return priority;
  107. }
  108. unsigned getPort()
  109. {
  110. return port;
  111. }
  112. SessionId getSessionId()
  113. {
  114. return sessid;
  115. }
  116. SocketEndpoint &queryEndpoint()
  117. {
  118. return ep;
  119. }
  120. const char *queryOwner()
  121. {
  122. return owner.get();
  123. }
  124. bool equals(IJobQueueItem *other)
  125. {
  126. // work unit is primary key
  127. return strcmp(wu.get(),other->queryWUID())==0;
  128. }
  129. CDateTime &queryEnqueuedTime()
  130. {
  131. return enqueuedt;
  132. }
  133. void setEnqueuedTime(const CDateTime &dt)
  134. {
  135. enqueuedt.set(dt);
  136. }
  137. void serialize(MemoryBuffer &tgt)
  138. {
  139. tgt.append(priority).append(port).append(wu).append(sessid);
  140. ep.serialize(tgt);
  141. StringBuffer dts;
  142. enqueuedt.getString(dts);
  143. tgt.append(owner).append(dts);
  144. }
  145. void deserialize(MemoryBuffer &src)
  146. {
  147. src.read(priority).read(port).read(wu).read(sessid);
  148. ep.deserialize(src);
  149. StringBuffer dts;
  150. src.read(owner).read(dts);
  151. enqueuedt.setString(dts.str());
  152. }
  153. IJobQueueItem* clone()
  154. {
  155. IJobQueueItem* ret = new CJobQueueItem(wu);
  156. ret->setPriority(priority);
  157. ret->setPriority(port);
  158. ret->setEndpoint(ep);
  159. ret->setSessionId(sessid);
  160. return ret;
  161. }
  162. void setPriority(int _priority)
  163. {
  164. priority = _priority;
  165. }
  166. void setPort(unsigned _port)
  167. {
  168. port = _port;
  169. }
  170. void setEndpoint(const SocketEndpoint &_ep)
  171. {
  172. ep = _ep;
  173. }
  174. void setSessionId(SessionId _id)
  175. {
  176. if (_id)
  177. sessid = _id;
  178. else
  179. sessid = myProcessSession();
  180. }
  181. void setOwner(const char *_owner)
  182. {
  183. owner.set(_owner);
  184. }
  185. bool isValidSession()
  186. {
  187. Owned<INode> node = createINode(ep);
  188. return (querySessionManager().lookupProcessSession(node)==sessid);
  189. }
  190. };
  191. class CJobQueueIterator: public CInterface, implements IJobQueueIterator
  192. {
  193. public:
  194. CJobQueueContents &items;
  195. unsigned idx;
  196. IMPLEMENT_IINTERFACE;
  197. CJobQueueIterator(CJobQueueContents &_items)
  198. : items(_items)
  199. {
  200. idx = 0;
  201. }
  202. bool isValid()
  203. {
  204. return idx<items.ordinality();
  205. }
  206. bool first()
  207. {
  208. idx = 0;
  209. return isValid();
  210. }
  211. bool next()
  212. {
  213. idx++;
  214. return isValid();
  215. }
  216. IJobQueueItem & query()
  217. {
  218. return items.item(idx);
  219. }
  220. };
  221. IJobQueueIterator *CJobQueueContents::getIterator()
  222. {
  223. return new CJobQueueIterator(*this);
  224. }
  225. IJobQueueItem *createJobQueueItem(const char *wuid)
  226. {
  227. if (!wuid||!*wuid)
  228. throw MakeStringException(-1,"createJobQueueItem empty WUID");
  229. return new CJobQueueItem(wuid);;
  230. }
  231. IJobQueueItem *deserializeJobQueueItem(MemoryBuffer &mb)
  232. {
  233. return new CJobQueueItem(mb);
  234. }
  235. #define ForEachQueue(qd) for (sQueueData *qd = qdata; qd!=NULL; qd=qd->next)
  236. #define ForEachQueueIn(parent,qd) for (sQueueData *qd = parent.qdata; qd!=NULL; qd=qd->next)
  237. struct sQueueData
  238. {
  239. sQueueData *next;
  240. IRemoteConnection *conn;
  241. StringAttr qname;
  242. IPropertyTree *root;
  243. SubscriptionId subscriberid;
  244. unsigned lastWaitEdition;
  245. };
  246. class CJobQueue: public CInterface, implements IJobQueue
  247. {
  248. class cOrderedIterator
  249. {
  250. CJobQueue &parent;
  251. unsigned numqueues;
  252. unsigned *queueidx;
  253. sQueueData **queues;
  254. IPropertyTree **queuet;
  255. MemoryAttr ma;
  256. unsigned current;
  257. public:
  258. cOrderedIterator(CJobQueue &_parent)
  259. : parent(_parent)
  260. {
  261. numqueues=0;
  262. ForEachQueueIn(parent,qd1)
  263. if (qd1->root)
  264. numqueues++;
  265. queueidx = (unsigned *)ma.allocate(numqueues*(sizeof(unsigned)+sizeof(sQueueData *)+sizeof(IPropertyTree *)));
  266. queues = (sQueueData **)(queueidx+numqueues);
  267. queuet = (IPropertyTree **)(queues+numqueues);
  268. unsigned i = 0;
  269. ForEachQueueIn(parent,qd2) {
  270. if (qd2->root)
  271. queues[i++] = qd2;
  272. }
  273. current = (unsigned)-1;
  274. }
  275. bool first()
  276. {
  277. StringBuffer path;
  278. parent.getItemPath(path,0U);
  279. current = (unsigned)-1;
  280. for (unsigned i = 0; i<numqueues;i++) {
  281. queueidx[i] = 0;
  282. queuet[i] = queues[i]->root->queryPropTree(path.str());
  283. if (queuet[i])
  284. if ((current==(unsigned)-1)||parent.itemOlder(queuet[i],queuet[current]))
  285. current = i;
  286. }
  287. return current!=(unsigned)-1;
  288. }
  289. bool next()
  290. {
  291. if (current==(unsigned)-1)
  292. return false;
  293. queueidx[current]++;
  294. StringBuffer path;
  295. parent.getItemPath(path,queueidx[current]);
  296. queuet[current] = queues[current]->root->queryPropTree(path.str());
  297. current = (unsigned)-1;
  298. for (unsigned i = 0; i<numqueues;i++) {
  299. if (queuet[i])
  300. if ((current==(unsigned)-1)||parent.itemOlder(queuet[i],queuet[current]))
  301. current = i;
  302. }
  303. return current!=(unsigned)-1;
  304. }
  305. bool isValid()
  306. {
  307. return current!=(unsigned)-1;
  308. }
  309. void item(sQueueData *&qd, IPropertyTree *&t,unsigned &idx)
  310. {
  311. assertex(current!=(unsigned)-1);
  312. qd = queues[current];
  313. t = queuet[current];
  314. idx = queueidx[current];
  315. }
  316. sQueueData &queryQueue()
  317. {
  318. assertex(current!=(unsigned)-1);
  319. return *queues[current];
  320. }
  321. IPropertyTree &queryTree()
  322. {
  323. assertex(current!=(unsigned)-1);
  324. return *queuet[current];
  325. }
  326. };
  327. public:
  328. sQueueData *qdata;
  329. sQueueData *activeq;
  330. SessionId sessionid;
  331. unsigned locknest;
  332. bool writemode;
  333. Semaphore notifysem;
  334. CriticalSection crit;
  335. bool connected;
  336. Owned<IConversation> initiateconv;
  337. StringAttr initiatewu;
  338. bool dequeuestop;
  339. bool cancelwaiting;
  340. bool validateitemsessions;
  341. class csubs: public CInterface, implements ISDSSubscription
  342. {
  343. CJobQueue *parent;
  344. public:
  345. IMPLEMENT_IINTERFACE;
  346. csubs(CJobQueue *_parent)
  347. {
  348. parent = _parent;
  349. }
  350. void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
  351. {
  352. CriticalBlock block(parent->crit);
  353. parent->notifysem.signal();
  354. }
  355. } subs;
  356. IMPLEMENT_IINTERFACE;
  357. CJobQueue(const char *_qname)
  358. : subs(this)
  359. {
  360. StringArray qlist;
  361. qlist.appendListUniq(_qname, ",");
  362. sQueueData *last = NULL;
  363. ForEachItemIn(i,qlist) {
  364. sQueueData *qd = new sQueueData;
  365. qd->next = NULL;
  366. qd->qname.set(qlist.item(i));
  367. qd->conn = NULL;
  368. qd->root = NULL;
  369. qd->lastWaitEdition = 0;
  370. qd->subscriberid = 0;
  371. if (last)
  372. last->next = qd;
  373. else
  374. qdata = qd;
  375. last = qd;
  376. }
  377. validateitemsessions = false;
  378. activeq = qdata;
  379. writemode = false;
  380. locknest = 0;
  381. sessionid = myProcessSession();
  382. connected = false;
  383. dequeuestop = false;
  384. cancelwaiting = false;
  385. Cconnlockblock block(this,false); // this just checks queue exists
  386. }
  387. virtual ~CJobQueue()
  388. {
  389. try {
  390. while (locknest)
  391. connunlock(true); // auto rollback
  392. if (connected)
  393. disconnect();
  394. }
  395. catch (IException *e) {
  396. // server error
  397. EXCLOG(e, "~CJobQueue");
  398. e->Release();
  399. }
  400. try { // must attempt to remove subscription before object destroyed.
  401. dounsubscribe();
  402. }
  403. catch (IException *e) {
  404. EXCLOG(e, "~CJobQueue calling dounsubscribe");
  405. e->Release();
  406. }
  407. while (qdata)
  408. {
  409. sQueueData * next = qdata->next;
  410. delete qdata;
  411. qdata = next;
  412. }
  413. }
  414. void connlock(bool exclusive)
  415. { // must be in sect
  416. if (locknest++==0) {
  417. unsigned wait = qdata&&qdata->next?5000:INFINITE;
  418. ForEachQueue(qd) {
  419. loop {
  420. StringBuffer path;
  421. path.appendf("/JobQueues/Queue[@name=\"%s\"]",qd->qname.get());
  422. bool timeout;
  423. loop {
  424. timeout=false;
  425. try {
  426. qd->conn = querySDS().connect(path.str(),myProcessSession(),exclusive?RTM_LOCK_WRITE:RTM_LOCK_READ,wait);
  427. if (qd->conn)
  428. break;
  429. }
  430. catch (ISDSException *e) {
  431. if (SDSExcpt_LockTimeout != e->errorCode())
  432. throw;
  433. e->Release();
  434. timeout = true;
  435. }
  436. // create queue
  437. Owned<IRemoteConnection> pconn;
  438. try {
  439. pconn.setown(querySDS().connect("/JobQueues",myProcessSession(),RTM_LOCK_WRITE|RTM_CREATE_QUERY,wait));
  440. if (!pconn)
  441. throw MakeStringException(-1,"CJobQueue could not create JobQueues");
  442. IPropertyTree *proot = pconn->queryRoot();
  443. StringBuffer cpath;
  444. cpath.appendf("Queue[@name=\"%s\"]",qd->qname.get());
  445. if (!proot->hasProp(cpath.str())) {
  446. IPropertyTree *pt = proot->addPropTree("Queue",createPTree("Queue"));
  447. pt->setProp("@name",qd->qname.get());
  448. pt->setProp("@state","active");
  449. pt->setPropInt("@count", 0);
  450. pt->setPropInt("Edition", 1);
  451. }
  452. }
  453. catch (ISDSException *e) {
  454. if (SDSExcpt_LockTimeout != e->errorCode())
  455. throw;
  456. e->Release();
  457. timeout = true;
  458. }
  459. }
  460. if (!timeout)
  461. break;
  462. sQueueData *qd2 = qdata;
  463. do {
  464. ::Release(qd2->conn);
  465. qd2->conn = NULL;
  466. qd2->root = NULL;
  467. }
  468. while (qd2!=qd);
  469. PROGLOG("Job Queue contention - delaying before retrying");
  470. Sleep(getRandom()%5000); // dining philosopher delay
  471. wait = getRandom()%4000+3000; // try and prevent sync
  472. qd = qdata;
  473. }
  474. qd->root = qd->conn->queryRoot();
  475. }
  476. writemode = exclusive;
  477. }
  478. else {
  479. if (exclusive&&!writemode) {
  480. ForEachQueue(qd) {
  481. assertex(qd->conn);
  482. writemode = exclusive;
  483. bool lockreleased;
  484. safeChangeModeWrite(qd->conn,qd->qname.get(),lockreleased);
  485. qd->root = qd->conn->queryRoot();
  486. }
  487. }
  488. }
  489. }
  490. void connunlock(bool rollback=false)
  491. { // should be in sect
  492. if (--locknest==0) {
  493. ForEachQueue(qd) {
  494. if (qd->conn) { // can occur if connection to dali threw exception
  495. if (writemode) {
  496. if (rollback)
  497. qd->conn->rollback();
  498. else {
  499. qd->root->setPropInt("Edition",qd->root->getPropInt("Edition")+1);
  500. qd->conn->commit();
  501. }
  502. }
  503. qd->conn->Release();
  504. qd->conn = NULL;
  505. }
  506. qd->root = NULL;
  507. }
  508. writemode = false;
  509. }
  510. }
  511. void conncommit() // doesn't set edition
  512. { // called within sect
  513. if (writemode) {
  514. ForEachQueue(qd) {
  515. if (qd->conn)
  516. qd->conn->commit();
  517. }
  518. }
  519. }
  520. class Cconnlockblock: public CriticalBlock
  521. {
  522. CJobQueue *parent;
  523. bool rollback;
  524. public:
  525. Cconnlockblock(CJobQueue *_parent,bool exclusive)
  526. : CriticalBlock(_parent->crit)
  527. {
  528. parent = _parent;
  529. parent->connlock(exclusive);
  530. rollback = false;
  531. }
  532. ~Cconnlockblock()
  533. {
  534. parent->connunlock(rollback);
  535. }
  536. void setRollback(bool set=true)
  537. {
  538. rollback = set;
  539. }
  540. void commit()
  541. {
  542. parent->conncommit();
  543. }
  544. };
  545. StringBuffer &getItemPath(StringBuffer &path,const char *wuid)
  546. {
  547. if (!wuid||!*wuid)
  548. return getItemPath(path,0U);
  549. return path.appendf("Item[@wuid=\"%s\"]",wuid);
  550. }
  551. StringBuffer &getItemPath(StringBuffer &path,unsigned idx)
  552. {
  553. path.appendf("Item[@num=\"%d\"]",idx+1);
  554. return path;
  555. }
  556. void removeItem(sQueueData &qd,IPropertyTree *item, bool cache)
  557. { // does not adjust or use @count
  558. unsigned n = item->getPropInt("@num");
  559. if (!n)
  560. return;
  561. if (cache) {
  562. StringBuffer s;
  563. item->getProp("@wuid",s.clear());
  564. qd.root->setProp("@prevwuid",s.str());
  565. item->getProp("@enqueuedt",s.clear());
  566. qd.root->setProp("@prevenqueuedt",s.str());
  567. qd.root->setPropInt("@prevpriority",item->getPropInt("@priority"));
  568. }
  569. item->setPropInt("@num",-1);
  570. StringBuffer path;
  571. loop {
  572. IPropertyTree *item2 = qd.root->queryPropTree(getItemPath(path.clear(),n).str());
  573. if (!item2)
  574. break;
  575. item2->setPropInt("@num",n);
  576. n++;
  577. }
  578. qd.root->removeTree(item);
  579. }
  580. IPropertyTree *addItem(sQueueData &qd,IPropertyTree *item,unsigned idx,unsigned count)
  581. {
  582. // does not set any values other than num
  583. StringBuffer path;
  584. // first move following up
  585. unsigned n=count;
  586. while (n>idx) {
  587. n--;
  588. qd.root->queryPropTree(getItemPath(path.clear(),n).str())->setPropInt("@num",n+2);
  589. }
  590. item->setPropInt("@num",idx+1);
  591. return qd.root->addPropTree("Item",item);
  592. }
  593. IPropertyTree *queryClientRoot(sQueueData &qd,unsigned idx=(unsigned)-1)
  594. {
  595. StringBuffer path;
  596. if (idx==(unsigned)-1)
  597. path.appendf("Client[@session=\"%"I64F"d\"]",sessionid);
  598. else
  599. path.appendf("Client[%d]",idx+1);
  600. IPropertyTree *ret = qd.root->queryPropTree(path.str());
  601. if (!ret&&(idx==(unsigned)-1)) {
  602. Cconnlockblock block(this,true);
  603. ret = createPTree("Client");
  604. ret = qd.root->addPropTree("Client",ret);
  605. ret->setPropInt64("@session",sessionid);
  606. StringBuffer eps;
  607. ret->setProp("@node",queryMyNode()->endpoint().getUrlStr(eps).str());
  608. }
  609. return ret;
  610. }
  611. void dosubscribe()
  612. { // called in crit section
  613. ForEachQueue(qd) {
  614. if (qd->subscriberid) {
  615. querySDS().unsubscribe(qd->subscriberid);
  616. qd->subscriberid = 0;
  617. }
  618. StringBuffer path;
  619. path.appendf("/JobQueues/Queue[@name=\"%s\"]/Edition",qd->qname.get());
  620. qd->subscriberid = querySDS().subscribe(path.str(), subs, false);
  621. }
  622. }
  623. bool haschanged() // returns if any changed
  624. {
  625. bool changed = false;
  626. ForEachQueue(qd) {
  627. if (!qd->subscriberid) {
  628. StringBuffer path;
  629. path.appendf("/JobQueues/Queue[@name=\"%s\"]/Edition",qd->qname.get());
  630. qd->subscriberid = querySDS().subscribe(path.str(), subs, false);
  631. }
  632. unsigned e = (unsigned)qd->root->getPropInt("Edition", 1);
  633. if (e!=qd->lastWaitEdition) {
  634. qd->lastWaitEdition = e;
  635. changed = true;
  636. break;
  637. }
  638. }
  639. return changed;
  640. }
  641. void dounsubscribe()
  642. {
  643. // called in crit section
  644. ForEachQueue(qd) {
  645. if (qd->subscriberid) {
  646. querySDS().unsubscribe(qd->subscriberid);
  647. qd->subscriberid = 0;
  648. }
  649. }
  650. }
  651. void connect(bool _validateitemsessions)
  652. {
  653. Cconnlockblock block(this,true);
  654. validateitemsessions = _validateitemsessions;
  655. if (connected)
  656. disconnect();
  657. dosubscribe();
  658. ForEachQueue(qd) {
  659. unsigned connected;
  660. unsigned waiting;
  661. unsigned count;
  662. getStats(*qd,connected,waiting,count); // clear any duff clients
  663. IPropertyTree *croot = queryClientRoot(*qd);
  664. croot->setPropInt64("@connected",croot->getPropInt64("@connected",0)+1);
  665. }
  666. connected = true;
  667. }
  668. void disconnect() // signal no longer wil be dequeing (optional - done automatically on release)
  669. {
  670. Cconnlockblock block(this,true);
  671. if (connected) {
  672. dounsubscribe();
  673. ForEachQueue(qd) {
  674. IPropertyTree *croot = queryClientRoot(*qd);
  675. croot->setPropInt64("@connected",croot->getPropInt64("@connected",0)-1);
  676. }
  677. connected = false;
  678. }
  679. }
  680. bool itemOlder(IPropertyTree *qt1, IPropertyTree *qt2)
  681. {
  682. // if this ever becomes time critical thne could cache enqueued values
  683. StringBuffer d1s;
  684. if (qt1)
  685. qt1->getProp("@enqueuedt",d1s);
  686. StringBuffer d2s;
  687. if (qt2)
  688. qt2->getProp("@enqueuedt",d2s);
  689. return (strcmp(d1s.str(),d2s.str())<0);
  690. }
  691. sQueueData *findbestqueue(bool useprev,int minprio,unsigned numqueues,sQueueData **queues)
  692. {
  693. if (numqueues==0)
  694. return NULL;
  695. if (numqueues==1)
  696. return *queues;
  697. sQueueData *best = NULL;
  698. IPropertyTree *bestt = NULL;
  699. for (unsigned i=0;i<numqueues;i++) {
  700. sQueueData *qd = queues[i];
  701. unsigned count = qd->root->getPropInt("@count");
  702. if (count) {
  703. int mpr = useprev?std::max(qd->root->getPropInt("@prevpriority"),minprio):minprio;
  704. if (count&&((minprio==INT_MIN)||checkprio(*qd,mpr))) {
  705. StringBuffer path;
  706. IPropertyTree *item = qd->root->queryPropTree(getItemPath(path,0U).str());
  707. if (!item)
  708. continue;
  709. if (item->getPropInt("@num",0)<=0)
  710. continue;
  711. CDateTime dt;
  712. StringBuffer enqueued;
  713. if (!best||itemOlder(item,bestt)) {
  714. best = qd;
  715. bestt = item;
  716. }
  717. }
  718. }
  719. }
  720. return best;
  721. }
  722. void setWaiting(unsigned numqueues,sQueueData **queues, bool set)
  723. {
  724. for (unsigned i=0; i<numqueues; i++) {
  725. IPropertyTree *croot = queryClientRoot(*queues[i]);
  726. croot->setPropInt64("@waiting",croot->getPropInt64("@waiting",0)+(set?1:-1));
  727. }
  728. }
  729. // 'simple' queuing
  730. IJobQueueItem *dodequeue(int minprio,unsigned timeout=INFINITE, bool useprev=false, bool *timedout=NULL)
  731. {
  732. bool hasminprio=(minprio!=INT_MIN);
  733. if (timedout)
  734. *timedout = false;
  735. IJobQueueItem *ret=NULL;
  736. bool waitingset = false;
  737. while (!dequeuestop) {
  738. unsigned t;
  739. if (timeout!=(unsigned)INFINITE)
  740. t = msTick();
  741. {
  742. Cconnlockblock block(this,true);
  743. block.setRollback(true); // assume not going to update
  744. // now cycle through queues looking at state
  745. unsigned total = 0;
  746. unsigned stopped = 0;
  747. PointerArray active;
  748. ForEachQueue(qd) {
  749. total++;
  750. const char *state = qd->root->queryProp("@state");
  751. if (state) {
  752. if (strcmp(state,"stopped")==0)
  753. stopped++;
  754. else if (strcmp(state,"paused")!=0)
  755. active.append(qd);
  756. }
  757. else
  758. active.append(qd);
  759. }
  760. if (stopped==total)
  761. return NULL; // all stopped
  762. sQueueData **activeqds = (sQueueData **)active.getArray();
  763. unsigned activenum = active.ordinality();
  764. if (activenum) {
  765. sQueueData *bestqd = findbestqueue(useprev,minprio,activenum,activeqds);
  766. unsigned count = bestqd?bestqd->root->getPropInt("@count"):0;
  767. // load minp from cache
  768. if (count) {
  769. int mpr = useprev?std::max(bestqd->root->getPropInt("@prevpriority"),minprio):minprio;
  770. if (!hasminprio||checkprio(*bestqd,mpr)) {
  771. block.setRollback(false);
  772. ret = dotake(*bestqd,NULL,true,hasminprio,mpr);
  773. if (ret) // think it must be!
  774. timeout = 0; // so mark that done
  775. else if (!hasminprio) {
  776. WARNLOG("Resetting queue %s",bestqd->qname.get());
  777. clear(*bestqd); // reset queue as seems to have become out of sync
  778. }
  779. }
  780. }
  781. if (timeout!=0) { // more to do
  782. if (!connected) { // if connect already done non-zero
  783. connect(validateitemsessions);
  784. block.setRollback(false);
  785. }
  786. if (!waitingset) {
  787. setWaiting(activenum,activeqds,true);
  788. block.commit();
  789. waitingset = true;
  790. }
  791. }
  792. }
  793. if (timeout==0) {
  794. if (waitingset) {
  795. setWaiting(activenum,activeqds,false);
  796. block.commit();
  797. }
  798. if (timedout)
  799. *timedout = (ret==NULL);
  800. break;
  801. }
  802. }
  803. unsigned to = 5*60*1000;
  804. // check every 5 mins independant of notify (in case subscription lost for some reason)
  805. if (to>timeout)
  806. to = timeout;
  807. notifysem.wait(to);
  808. if (timeout!=(unsigned)INFINITE) {
  809. t = msTick()-t;
  810. if (t<timeout)
  811. timeout -= t;
  812. else
  813. timeout = 0;
  814. }
  815. }
  816. return ret;
  817. }
  818. IJobQueueItem *dequeue(unsigned timeout=INFINITE)
  819. {
  820. return dodequeue(INT_MIN,timeout);
  821. }
  822. IJobQueueItem *prioDequeue(int minprio,unsigned timeout=INFINITE) // minprio == MAX_INT - used cache priority
  823. {
  824. return dodequeue(minprio,timeout);
  825. }
  826. void placeonqueue(sQueueData &qd, IJobQueueItem *qitem,unsigned idx) // takes ownership of qitem
  827. {
  828. Owned<IJobQueueItem> qi = qitem;
  829. remove(qi->queryWUID()); // just in case trying to put on twice!
  830. int priority = qi->getPriority();
  831. unsigned count = qd.root->getPropInt("@count");
  832. StringBuffer path;
  833. if (count&&(idx!=(unsigned)-1)) { // need to check before and after
  834. if (idx) {
  835. IPropertyTree *pt = qd.root->queryPropTree(getItemPath(path.clear(),idx-1).str());
  836. if (pt) {
  837. int pp = pt->getPropInt("@priority");
  838. if (priority>pp) {
  839. qi->setPriority(pp);
  840. priority = pp;
  841. }
  842. }
  843. else // what happened here?
  844. idx = (unsigned)-1;
  845. }
  846. if (idx<count) {
  847. IPropertyTree *pt = qd.root->queryPropTree(getItemPath(path.clear(),idx).str());
  848. if (pt) {
  849. int pp = pt->getPropInt("@priority");
  850. if (priority<pp) {
  851. qi->setPriority(pp);
  852. priority = pp;
  853. }
  854. }
  855. else // what happened here?
  856. idx = (unsigned)-1;
  857. }
  858. }
  859. if (idx==(unsigned)-1) {
  860. idx = count;
  861. while (idx) {
  862. IPropertyTree *previtem = qd.root->queryPropTree(getItemPath(path.clear(),idx-1).str());
  863. if (previtem) {
  864. if (previtem->getPropInt("@priority")>=priority) {
  865. break;
  866. }
  867. }
  868. else
  869. count--; // how did that happen?
  870. idx--;
  871. }
  872. }
  873. CJobQueueItem::assignBranch(addItem(qd,createPTree("Item"),idx,count),qi);
  874. qd.root->setPropInt("@count",count+1);
  875. }
  876. void enqueue(sQueueData &qd,IJobQueueItem *qitem) // takes ownership of qitem
  877. {
  878. Cconnlockblock block(this,true);
  879. placeonqueue(qd,qitem,(unsigned)-1);
  880. }
  881. void enqueueBefore(sQueueData &qd,IJobQueueItem *qitem,const char *wuid)
  882. {
  883. Cconnlockblock block(this,true);
  884. placeonqueue(qd,qitem,findRank(qd,wuid));
  885. }
  886. void enqueueAfter(sQueueData &qd,IJobQueueItem *qitem,const char *wuid)
  887. {
  888. Cconnlockblock block(this,true);
  889. unsigned idx = findRank(qd,wuid);
  890. if (idx!=(unsigned)-1)
  891. idx++;
  892. placeonqueue(qd,qitem,idx);
  893. }
  894. void enqueueTail(sQueueData &qd,IJobQueueItem *qitem)
  895. {
  896. Cconnlockblock block(this,true);
  897. Owned<IJobQueueItem> qi = getTail(qd);
  898. if (qi)
  899. enqueueAfter(qd,qitem,qi->queryWUID());
  900. else
  901. enqueue(qd,qitem);
  902. }
  903. void enqueueHead(sQueueData &qd,IJobQueueItem *qitem)
  904. {
  905. Cconnlockblock block(this,true);
  906. Owned<IJobQueueItem> qi = getHead(qd);
  907. if (qi)
  908. enqueueBefore(qd,qitem,qi->queryWUID());
  909. else
  910. enqueue(qd,qitem);
  911. }
  912. unsigned ordinality(sQueueData &qd)
  913. {
  914. Cconnlockblock block(this,false);
  915. return qd.root->getPropInt("@count");
  916. }
  917. unsigned waiting(sQueueData &qd)
  918. {
  919. Cconnlockblock block(this,false);
  920. unsigned ret = 0;
  921. for (unsigned i=0;;i++) {
  922. IPropertyTree *croot = queryClientRoot(qd,i);
  923. if (!croot)
  924. break;
  925. ret += croot->getPropInt("@waiting");
  926. }
  927. return ret;
  928. }
  929. IJobQueueItem *getItem(sQueueData &qd,unsigned idx)
  930. {
  931. Cconnlockblock block(this,false);
  932. if (idx==(unsigned)-1) {
  933. idx = qd.root->getPropInt("@count");
  934. if (!idx)
  935. return NULL;
  936. idx--;
  937. }
  938. StringBuffer path;
  939. IPropertyTree *item = qd.root->queryPropTree(getItemPath(path,idx).str());
  940. if (!item)
  941. return NULL;
  942. return new CJobQueueItem(item);
  943. }
  944. IJobQueueItem *getHead(sQueueData &qd)
  945. {
  946. return getItem(qd,0);
  947. }
  948. IJobQueueItem *getTail(sQueueData &qd)
  949. {
  950. return getItem(qd,(unsigned)-1);
  951. }
  952. unsigned doFindRank(sQueueData &qd,const char *wuid)
  953. {
  954. StringBuffer path;
  955. IPropertyTree *item = qd.root->queryPropTree(getItemPath(path,wuid).str());
  956. if (!item)
  957. return (unsigned)-1;
  958. return item->getPropInt("@num")-1;
  959. }
  960. unsigned findRank(sQueueData &qd,const char *wuid)
  961. {
  962. Cconnlockblock block(this,false);
  963. return doFindRank(qd,wuid);
  964. }
  965. IJobQueueItem *loadItem(sQueueData &qd,IJobQueueItem *qi)
  966. {
  967. Cconnlockblock block(this,false);
  968. StringBuffer path;
  969. IPropertyTree *item = qd.root->queryPropTree(getItemPath(path,qi->queryWUID()).str());
  970. if (!item)
  971. return NULL;
  972. bool cached = item->getPropInt("@num",0)<=0;
  973. if (cached)
  974. return NULL; // don't want cached value
  975. return new CJobQueueItem(item);
  976. }
  977. IJobQueueItem *find(sQueueData &qd,const char *wuid)
  978. {
  979. Cconnlockblock block(this,false);
  980. StringBuffer path;
  981. IPropertyTree *item = qd.root->queryPropTree(getItemPath(path,wuid).str());
  982. if (!item)
  983. return NULL;
  984. bool cached = item->getPropInt("@num",0)<=0;
  985. if (wuid&&cached)
  986. return NULL; // don't want cached value unless explicit
  987. return new CJobQueueItem(item);
  988. }
  989. bool checkprio(sQueueData &qd,int minprio=0)
  990. {
  991. StringBuffer path;
  992. IPropertyTree *item = qd.root->queryPropTree(getItemPath(path,0U).str());
  993. if (!item)
  994. return false;
  995. return (item->getPropInt("@priority")>=minprio);
  996. }
  997. IJobQueueItem *dotake(sQueueData &qd,const char *wuid,bool saveitem,bool hasminprio=false,int minprio=0)
  998. {
  999. StringBuffer path;
  1000. IPropertyTree *item = qd.root->queryPropTree(getItemPath(path,wuid).str());
  1001. if (!item)
  1002. return NULL;
  1003. if (item->getPropInt("@num",0)<=0)
  1004. return NULL; // don't want (old) cached value
  1005. if (hasminprio&&(item->getPropInt("@priority")<minprio))
  1006. return NULL;
  1007. IJobQueueItem *ret = new CJobQueueItem(item);
  1008. removeItem(qd,item,saveitem);
  1009. unsigned count = qd.root->getPropInt("@count");
  1010. assertex(count);
  1011. qd.root->setPropInt("@count",count-1);
  1012. return ret;
  1013. }
  1014. IJobQueueItem *take(sQueueData &qd,const char *wuid)
  1015. {
  1016. Cconnlockblock block(this,true);
  1017. return dotake(qd,wuid,false);
  1018. }
  1019. unsigned copyItemsImpl(sQueueData &qd,CJobQueueContents &dest)
  1020. {
  1021. unsigned ret=0;
  1022. StringBuffer path;
  1023. for (unsigned i=0;;i++) {
  1024. IPropertyTree *item = qd.root->queryPropTree(getItemPath(path.clear(),i).str());
  1025. if (!item)
  1026. break;
  1027. ret++;
  1028. dest.append(*new CJobQueueItem(item));
  1029. }
  1030. return ret;
  1031. }
  1032. unsigned copyItems(sQueueData &qd,CJobQueueContents &dest)
  1033. {
  1034. Cconnlockblock block(this,false);
  1035. return copyItemsImpl(qd,dest);
  1036. }
  1037. void copyItemsAndState(CJobQueueContents& contents, StringBuffer& state)
  1038. {
  1039. assertex(qdata);
  1040. Cconnlockblock block(this,false);
  1041. assertex(qdata->root);
  1042. copyItemsImpl(*qdata,contents);
  1043. const char *st = qdata->root->queryProp("@state");
  1044. if (st&&*st)
  1045. state.set(st);
  1046. }
  1047. unsigned takeItems(sQueueData &qd,CJobQueueContents &dest)
  1048. {
  1049. Cconnlockblock block(this,true);
  1050. unsigned ret = copyItems(qd,dest);
  1051. clear(qd);
  1052. return ret;
  1053. }
  1054. void enqueueItems(sQueueData &qd,CJobQueueContents &items)
  1055. {
  1056. unsigned n=items.ordinality();
  1057. if (n) {
  1058. Cconnlockblock block(this,true);
  1059. for (unsigned i=0;i<n;i++)
  1060. enqueue(qd,items.item(i).clone());
  1061. }
  1062. }
  1063. sQueueData *findQD(const char *wuid)
  1064. {
  1065. if (wuid&&*wuid) {
  1066. ForEachQueue(qd) {
  1067. unsigned idx = doFindRank(*qd,wuid);
  1068. if (idx!=(unsigned)-1)
  1069. return qd;
  1070. }
  1071. }
  1072. return NULL;
  1073. }
  1074. void enqueueBefore(IJobQueueItem *qitem,const char *wuid)
  1075. {
  1076. Cconnlockblock block(this,true);
  1077. sQueueData *qd = qdata->next?findQD(wuid):qdata;
  1078. enqueueBefore(*qd,qitem,wuid);
  1079. }
  1080. void enqueueAfter(IJobQueueItem *qitem,const char *wuid)
  1081. {
  1082. Cconnlockblock block(this,true);
  1083. sQueueData *qd = qdata->next?findQD(wuid):qdata;
  1084. enqueueAfter(*qd,qitem,wuid);
  1085. }
  1086. bool moveBefore(const char *wuid,const char *nextwuid)
  1087. {
  1088. if (!qdata)
  1089. return false;
  1090. Cconnlockblock block(this,true);
  1091. sQueueData *qd = qdata->next?findQD(wuid):qdata;
  1092. if (!qd)
  1093. return false;
  1094. IJobQueueItem *qi=take(*qd,wuid);
  1095. if (!qi)
  1096. return false;
  1097. sQueueData *qdd = NULL;
  1098. if (qdata->next)
  1099. qdd = findQD(nextwuid);
  1100. if (!qdd)
  1101. qdd = qd;
  1102. enqueueBefore(*qdd,qi,nextwuid);
  1103. return true;
  1104. }
  1105. bool moveAfter(const char *wuid,const char *prevwuid)
  1106. {
  1107. if (!qdata)
  1108. return false;
  1109. Cconnlockblock block(this,true);
  1110. sQueueData *qd = qdata->next?findQD(wuid):qdata;
  1111. if (!qd)
  1112. return false;
  1113. IJobQueueItem *qi=take(*qd,wuid);
  1114. if (!qi)
  1115. return false;
  1116. sQueueData *qdd = NULL;
  1117. if (qdata->next)
  1118. qdd = findQD(prevwuid);
  1119. if (!qdd)
  1120. qdd = qd;
  1121. enqueueAfter(*qdd,qi,prevwuid);
  1122. return true;
  1123. }
  1124. bool moveToHead(const char *wuid)
  1125. {
  1126. if (!qdata)
  1127. return false;
  1128. Cconnlockblock block(this,true);
  1129. sQueueData *qd = qdata->next?findQD(wuid):qdata;
  1130. if (!qd)
  1131. return false;
  1132. IJobQueueItem *qi=take(*qd,wuid);
  1133. if (!qi)
  1134. return false;
  1135. enqueueHead(*qd,qi);
  1136. return true;
  1137. }
  1138. bool moveToTail(const char *wuid)
  1139. {
  1140. if (!qdata)
  1141. return false;
  1142. Cconnlockblock block(this,true);
  1143. sQueueData *qd = qdata->next?findQD(wuid):qdata;
  1144. if (!qd)
  1145. return false;
  1146. IJobQueueItem *qi=take(*qd,wuid);
  1147. if (!qi)
  1148. return false;
  1149. enqueueTail(*qd,qi);
  1150. return true;
  1151. }
  1152. bool remove(const char *wuid)
  1153. {
  1154. if (!qdata)
  1155. return false;
  1156. Cconnlockblock block(this,true);
  1157. sQueueData *qd = qdata->next?findQD(wuid):qdata;
  1158. if (!qd)
  1159. return false;
  1160. StringBuffer path;
  1161. IPropertyTree *item = qd->root->queryPropTree(getItemPath(path,wuid).str());
  1162. if (!item)
  1163. return false;
  1164. bool cached = item->getPropInt("@num",0)<=0; // old cached (bwd compat)
  1165. removeItem(*qd,item,false);
  1166. if (!cached) {
  1167. unsigned count = qd->root->getPropInt("@count");
  1168. assertex(count);
  1169. qd->root->setPropInt("@count",count-1);
  1170. }
  1171. return true;
  1172. }
  1173. bool changePriority(const char *wuid,int value)
  1174. {
  1175. if (!qdata)
  1176. return false;
  1177. Cconnlockblock block(this,true);
  1178. sQueueData *qd = qdata->next?findQD(wuid):qdata;
  1179. if (!qd)
  1180. return false;
  1181. IJobQueueItem *qi=take(*qd,wuid);
  1182. if (!qi) {
  1183. StringBuffer ws("~"); // change cached item
  1184. ws.append(wuid);
  1185. StringBuffer path;
  1186. IPropertyTree *item = qd->root->queryPropTree(getItemPath(path,ws.str()).str());
  1187. if (item) {
  1188. item->setPropInt("@priority",value);
  1189. return true;
  1190. }
  1191. return false;
  1192. }
  1193. qi->setPriority(value);
  1194. enqueue(*qd,qi);
  1195. return true;
  1196. }
  1197. void clear(sQueueData &qd)
  1198. {
  1199. Cconnlockblock block(this,true);
  1200. qd.root->setPropInt("@count",0);
  1201. loop {
  1202. IPropertyTree *item = qd.root->queryPropTree("Item[1]");
  1203. if (!item)
  1204. break;
  1205. qd.root->removeTree(item);
  1206. }
  1207. }
  1208. void lock()
  1209. {
  1210. connlock(false); // sub functions will change to exclusive if needed
  1211. }
  1212. void unlock(bool rollback=false)
  1213. {
  1214. connunlock(rollback);
  1215. }
  1216. void pause(sQueueData &qd)
  1217. {
  1218. Cconnlockblock block(this,true);
  1219. qd.root->setProp("@state","paused");
  1220. }
  1221. void resume(sQueueData &qd)
  1222. {
  1223. Cconnlockblock block(this,true);
  1224. qd.root->setProp("@state","active");
  1225. }
  1226. bool paused(sQueueData &qd)
  1227. {
  1228. Cconnlockblock block(this,false);
  1229. const char *state = qd.root->queryProp("@state");
  1230. return (state&&(strcmp(state,"paused")==0));
  1231. }
  1232. void stop(sQueueData &qd)
  1233. {
  1234. Cconnlockblock block(this,true);
  1235. qd.root->setProp("@state","stopped");
  1236. }
  1237. bool stopped(sQueueData &qd)
  1238. {
  1239. Cconnlockblock block(this,false);
  1240. const char *state = qd.root->queryProp("@state");
  1241. return (state&&(strcmp(state,"stopped")==0));
  1242. }
  1243. void doGetStats(sQueueData &qd,unsigned &connected,unsigned &waiting,unsigned &enqueued)
  1244. {
  1245. Cconnlockblock block(this,false);
  1246. connected = 0;
  1247. waiting = 0;
  1248. unsigned i=0;
  1249. loop {
  1250. IPropertyTree *croot = queryClientRoot(qd,i);
  1251. if (!croot)
  1252. break;
  1253. if (!validSession(croot)) {
  1254. Cconnlockblock block(this,true);
  1255. qd.root->removeTree(croot);
  1256. }
  1257. else {
  1258. waiting += croot->getPropInt("@waiting");
  1259. connected += croot->getPropInt("@connected");
  1260. i++;
  1261. }
  1262. }
  1263. // now remove any duff queue items
  1264. unsigned count = qd.root->getPropInt("@count");
  1265. if (!validateitemsessions) {
  1266. enqueued = count;
  1267. return;
  1268. }
  1269. i=0;
  1270. StringBuffer path;
  1271. loop {
  1272. IPropertyTree *item = qd.root->queryPropTree(getItemPath(path.clear(),i).str());
  1273. if (!item)
  1274. break;
  1275. if (!validSession(item)) {
  1276. Cconnlockblock block(this,true);
  1277. item = qd.root->queryPropTree(path.str());
  1278. if (!item)
  1279. break;
  1280. // PROGLOG("WUJOBQ: Removing %s as session %"I64F"x not active",item->queryProp("@wuid"),item->getPropInt64("@session"));
  1281. removeItem(qd,item,false);
  1282. }
  1283. else
  1284. i++;
  1285. }
  1286. if (count!=i) {
  1287. Cconnlockblock block(this,true);
  1288. qd.root->setPropInt("@count",i);
  1289. }
  1290. enqueued = i;
  1291. }
  1292. void getStats(sQueueData &qd,unsigned &connected,unsigned &waiting,unsigned &enqueued)
  1293. {
  1294. Cconnlockblock block(this,false);
  1295. doGetStats(qd,connected,waiting,enqueued);
  1296. }
  1297. void getStats(unsigned &connected,unsigned &waiting,unsigned &enqueued)
  1298. {
  1299. // multi queue
  1300. Cconnlockblock block(this,false);
  1301. connected=0;
  1302. waiting=0;
  1303. enqueued=0;
  1304. ForEachQueue(qd) {
  1305. unsigned c;
  1306. unsigned w;
  1307. unsigned e;
  1308. doGetStats(*qd,c,w,e);
  1309. connected+=c;
  1310. waiting+=w;
  1311. enqueued+=e;
  1312. }
  1313. }
  1314. unsigned waiting()
  1315. {
  1316. Cconnlockblock block(this,false);
  1317. unsigned ret = 0;
  1318. ForEachQueue(qd) {
  1319. for (unsigned i=0;;i++) {
  1320. IPropertyTree *croot = queryClientRoot(*qd,i);
  1321. if (!croot)
  1322. break;
  1323. ret += croot->getPropInt("@waiting");
  1324. }
  1325. }
  1326. return ret;
  1327. }
  1328. unsigned findRank(const char *wuid)
  1329. {
  1330. assertex(qdata);
  1331. if (!qdata->next)
  1332. return findRank(*qdata,wuid);
  1333. Cconnlockblock block(this,false);
  1334. cOrderedIterator it(*this);
  1335. unsigned i = 0;
  1336. ForEach(it) {
  1337. const char *twuid = it.queryTree().queryProp("@wuid");
  1338. if (twuid&&(strcmp(twuid,wuid)==0))
  1339. return i;
  1340. i++;
  1341. }
  1342. return (unsigned)-1;
  1343. }
  1344. unsigned copyItems(CJobQueueContents &dest)
  1345. {
  1346. assertex(qdata);
  1347. if (!qdata->next)
  1348. return copyItems(*qdata,dest);
  1349. Cconnlockblock block(this,false);
  1350. cOrderedIterator it(*this);
  1351. unsigned ret = 0;
  1352. ForEach(it) {
  1353. dest.append(*new CJobQueueItem(&it.queryTree()));
  1354. ret++;
  1355. }
  1356. return ret;
  1357. }
  1358. IJobQueueItem *take(const char *wuid)
  1359. {
  1360. assertex(qdata);
  1361. if (!qdata->next)
  1362. return take(*qdata,wuid);
  1363. Cconnlockblock block(this,true);
  1364. ForEachQueue(qd) {
  1365. IJobQueueItem *ret = dotake(*qd,wuid,false);
  1366. if (ret)
  1367. return ret;
  1368. }
  1369. return NULL;
  1370. }
  1371. unsigned takeItems(CJobQueueContents &dest)
  1372. {
  1373. assertex(qdata);
  1374. if (!qdata->next)
  1375. return takeItems(*qdata,dest);
  1376. Cconnlockblock block(this,true);
  1377. unsigned ret = 0;
  1378. ForEachQueue(qd) {
  1379. ret += copyItems(*qd,dest);
  1380. clear(*qd);
  1381. }
  1382. return ret;
  1383. }
  1384. void enqueueItems(CJobQueueContents &items)
  1385. { // enqueues to firs sub-queue (not sure that useful)
  1386. assertex(qdata);
  1387. return enqueueItems(*qdata,items);
  1388. }
  1389. void clear()
  1390. {
  1391. ForEachQueue(qd) {
  1392. clear(*qd);
  1393. }
  1394. }
  1395. bool validSession(IPropertyTree *item)
  1396. {
  1397. Owned<INode> node = createINode(item->queryProp("@node"),DALI_SERVER_PORT); // port should always be present
  1398. return (querySessionManager().lookupProcessSession(node)==(SessionId)item->getPropInt64("@session"));
  1399. }
  1400. IConversation *initiateConversation(sQueueData &qd,IJobQueueItem *item)
  1401. {
  1402. CriticalBlock block(crit);
  1403. assertex(!initiateconv.get());
  1404. SocketEndpoint ep = item->queryEndpoint();
  1405. unsigned short port = (unsigned short)item->getPort();
  1406. initiateconv.setown(createSingletonSocketConnection(port));
  1407. if (!port)
  1408. item->setPort(initiateconv->setRandomPort(WUJOBQ_BASE_PORT,WUJOBQ_PORT_NUM));
  1409. initiatewu.set(item->queryWUID());
  1410. enqueue(qd,item);
  1411. bool ok;
  1412. {
  1413. CriticalUnblock unblock(crit);
  1414. ok = initiateconv->accept(INFINITE);
  1415. }
  1416. if (!ok)
  1417. initiateconv.clear();
  1418. return initiateconv.getClear();
  1419. }
  1420. IConversation *acceptConversation(IJobQueueItem *&retitem, unsigned prioritytransitiondelay,IDynamicPriority *maxp)
  1421. {
  1422. CriticalBlock block(crit);
  1423. retitem = NULL;
  1424. assertex(connected); // must be connected
  1425. int curmp = maxp?maxp->get():0;
  1426. int nextmp = curmp;
  1427. loop {
  1428. bool timedout = false;
  1429. Owned<IJobQueueItem> item;
  1430. {
  1431. CriticalUnblock unblock(crit);
  1432. // this is a bit complicated with multi-thor
  1433. if (prioritytransitiondelay||maxp) {
  1434. item.setown(dodequeue((std::max(curmp,nextmp)/10)*10, // round down to multiple of 10
  1435. prioritytransitiondelay?prioritytransitiondelay:60000,prioritytransitiondelay>0,&timedout));
  1436. // if dynamic priority check every minute
  1437. if (!prioritytransitiondelay) {
  1438. curmp = nextmp; // using max above is a bit devious to allow transition
  1439. nextmp = maxp->get();
  1440. }
  1441. }
  1442. else
  1443. item.setown(dequeue(INFINITE));
  1444. }
  1445. if (item.get()) {
  1446. if (item->isValidSession()) {
  1447. SocketEndpoint ep = item->queryEndpoint();
  1448. ep.port = item->getPort();
  1449. Owned<IConversation> acceptconv = createSingletonSocketConnection(ep.port,&ep);
  1450. if (acceptconv->connect(3*60*1000)) { // shouldn't need that long
  1451. retitem = item.getClear();
  1452. return acceptconv.getClear();
  1453. }
  1454. }
  1455. }
  1456. else if (prioritytransitiondelay)
  1457. prioritytransitiondelay = 0;
  1458. else if (!timedout)
  1459. break;
  1460. }
  1461. return NULL;
  1462. }
  1463. void cancelInitiateConversation(sQueueData &qd)
  1464. {
  1465. CriticalBlock block(crit);
  1466. if (initiatewu.get())
  1467. remove(initiatewu);
  1468. if (initiateconv.get())
  1469. initiateconv->cancel();
  1470. }
  1471. void cancelAcceptConversation()
  1472. {
  1473. CriticalBlock block(crit);
  1474. dequeuestop = true;
  1475. notifysem.signal();
  1476. }
  1477. bool cancelInitiateConversation(sQueueData &qd,const char *wuid)
  1478. {
  1479. Cconnlockblock block(this,true);
  1480. loop {
  1481. Owned<IJobQueueItem> item = dotake(qd,wuid,false);
  1482. if (!item.get())
  1483. break;
  1484. if (item->isValidSession()) {
  1485. SocketEndpoint ep = item->queryEndpoint();
  1486. ep.port = item->getPort();
  1487. Owned<IConversation> acceptconv = createSingletonSocketConnection(ep.port,&ep);
  1488. acceptconv->connect(3*60*1000); // connect then close should close other end
  1489. return true;
  1490. }
  1491. }
  1492. return false;
  1493. }
  1494. bool waitStatsChange(unsigned timeout)
  1495. {
  1496. assertex(!connected); // not allowed to call this while connected
  1497. cancelwaiting = false;
  1498. while(!cancelwaiting) {
  1499. {
  1500. Cconnlockblock block(this,false);
  1501. if (haschanged())
  1502. return true;
  1503. }
  1504. if (!notifysem.wait(timeout))
  1505. break;
  1506. }
  1507. return false;
  1508. }
  1509. void cancelWaitStatsChange()
  1510. {
  1511. CriticalBlock block(crit);
  1512. cancelwaiting = true;
  1513. notifysem.signal();
  1514. }
  1515. virtual void enqueue(IJobQueueItem *qitem)
  1516. {
  1517. enqueue(*activeq,qitem);
  1518. }
  1519. void enqueueHead(IJobQueueItem *qitem)
  1520. {
  1521. enqueueHead(*activeq,qitem);
  1522. }
  1523. void enqueueTail(IJobQueueItem *qitem)
  1524. {
  1525. enqueueTail(*activeq,qitem);
  1526. }
  1527. IJobQueueItem *getItem(unsigned idx)
  1528. {
  1529. if (!qdata)
  1530. return NULL;
  1531. if (!qdata->next)
  1532. return getItem(*qdata,idx);
  1533. Cconnlockblock block(this,false);
  1534. cOrderedIterator it(*this);
  1535. unsigned i = 0;
  1536. IPropertyTree *ret = NULL;
  1537. ForEach(it) {
  1538. if (i==idx) {
  1539. ret = &it.queryTree();
  1540. break;
  1541. }
  1542. else if (idx==(unsigned)-1) // -1 means return last
  1543. ret = &it.queryTree();
  1544. i++;
  1545. }
  1546. if (ret)
  1547. return new CJobQueueItem(ret);
  1548. return NULL;
  1549. }
  1550. IJobQueueItem *getHead()
  1551. {
  1552. if (!qdata)
  1553. return NULL;
  1554. if (!qdata->next)
  1555. return getHead(*qdata);
  1556. return getItem(0);
  1557. }
  1558. IJobQueueItem *getTail()
  1559. {
  1560. if (!qdata)
  1561. return NULL;
  1562. if (!qdata->next)
  1563. return getHead(*qdata);
  1564. return getItem((unsigned)-1);
  1565. }
  1566. IJobQueueItem *find(const char *wuid)
  1567. {
  1568. if (!qdata)
  1569. return NULL;
  1570. sQueueData *qd = qdata->next?findQD(wuid):qdata;
  1571. if (!qd)
  1572. return NULL;
  1573. return find(*qd,wuid);
  1574. }
  1575. unsigned ordinality()
  1576. {
  1577. Cconnlockblock block(this,false);
  1578. unsigned ret = 0;
  1579. ForEachQueue(qd) {
  1580. if (qd->root)
  1581. ret += qd->root->getPropInt("@count");
  1582. }
  1583. return ret;
  1584. }
  1585. void pause()
  1586. {
  1587. Cconnlockblock block(this,true);
  1588. ForEachQueue(qd) {
  1589. if (qd->root)
  1590. qd->root->setProp("@state","paused");
  1591. }
  1592. }
  1593. bool paused()
  1594. {
  1595. // true if all paused
  1596. Cconnlockblock block(this,false);
  1597. ForEachQueue(qd) {
  1598. if (qd->root) {
  1599. const char *state = qd->root->queryProp("@state");
  1600. if (state&&(strcmp(state,"paused")!=0))
  1601. return false;
  1602. }
  1603. }
  1604. return true;
  1605. }
  1606. void stop()
  1607. {
  1608. Cconnlockblock block(this,true);
  1609. ForEachQueue(qd) {
  1610. if (qd->root)
  1611. qd->root->setProp("@state","stopped");
  1612. }
  1613. }
  1614. bool stopped()
  1615. {
  1616. // true if all stopped
  1617. Cconnlockblock block(this,false);
  1618. ForEachQueue(qd) {
  1619. if (qd->root) {
  1620. const char *state = qd->root->queryProp("@state");
  1621. if (state&&(strcmp(state,"stopped")!=0))
  1622. return false;
  1623. }
  1624. }
  1625. return true;
  1626. }
  1627. void resume()
  1628. {
  1629. Cconnlockblock block(this,true);
  1630. ForEachQueue(qd) {
  1631. if (qd->root)
  1632. qd->root->setProp("@state","active");
  1633. }
  1634. }
  1635. IConversation *initiateConversation(IJobQueueItem *item)
  1636. {
  1637. return initiateConversation(*activeq,item);
  1638. }
  1639. void cancelInitiateConversation()
  1640. {
  1641. return cancelInitiateConversation(*activeq);
  1642. }
  1643. bool cancelInitiateConversation(const char *wuid)
  1644. {
  1645. return cancelInitiateConversation(*activeq,wuid);
  1646. }
  1647. const char * queryActiveQueueName()
  1648. {
  1649. return activeq->qname;
  1650. }
  1651. void setActiveQueue(const char *name)
  1652. {
  1653. ForEachQueue(qd) {
  1654. if (!name||(strcmp(qd->qname.get(),name)==0)) {
  1655. activeq = qd;
  1656. return;
  1657. }
  1658. }
  1659. if (name)
  1660. throw MakeStringException (-1,"queue %s not found",name);
  1661. }
  1662. const char *nextQueueName(const char *last)
  1663. {
  1664. ForEachQueue(qd) {
  1665. if (!last||(strcmp(qd->qname.get(),last)==0)) {
  1666. if (qd->next)
  1667. return qd->next->qname.get();
  1668. break;
  1669. }
  1670. }
  1671. return NULL;
  1672. }
  1673. bool getLastDequeuedInfo(StringAttr &wuid, CDateTime &enqueuedt, int &priority)
  1674. {
  1675. priority = 0;
  1676. if (!activeq)
  1677. return false;
  1678. const char *w = activeq->root->queryProp("@prevwuid");
  1679. if (!w||!*w)
  1680. return false;
  1681. wuid.set(w);
  1682. StringBuffer dts;
  1683. if (activeq->root->getProp("@prevenqueuedt",dts))
  1684. enqueuedt.setString(dts.str());
  1685. priority = activeq->root->getPropInt("@prevpriority");
  1686. return true;
  1687. }
  1688. };
  1689. IJobQueue *createJobQueue(const char *name)
  1690. {
  1691. if (!name||!*name)
  1692. throw MakeStringException(-1,"createJobQueue empty name");
  1693. return new CJobQueue(name);
  1694. }
  1695. extern bool WORKUNIT_API runWorkUnit(const char *wuid, const char *cluster)
  1696. {
  1697. Owned<IConstWUClusterInfo> clusterInfo = getTargetClusterInfo(cluster);
  1698. if (!clusterInfo.get())
  1699. return false;
  1700. SCMStringBuffer agentQueue;
  1701. clusterInfo->getAgentQueue(agentQueue);
  1702. if (!agentQueue.length())
  1703. return false;
  1704. Owned<IJobQueue> queue = createJobQueue(agentQueue.str());
  1705. if (!queue.get())
  1706. throw MakeStringException(-1, "Could not create workunit queue");
  1707. IJobQueueItem *item = createJobQueueItem(wuid);
  1708. queue->enqueue(item);
  1709. PROGLOG("Agent request '%s' enqueued on '%s'", wuid, agentQueue.str());
  1710. return true;
  1711. }
  1712. extern bool WORKUNIT_API runWorkUnit(const char *wuid)
  1713. {
  1714. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  1715. Owned<IConstWorkUnit> w = factory->openWorkUnit(wuid, false);
  1716. if (w)
  1717. {
  1718. SCMStringBuffer clusterName;
  1719. w->getClusterName(clusterName);
  1720. w.clear();
  1721. return runWorkUnit(wuid,clusterName.str());
  1722. }
  1723. else
  1724. return false;
  1725. }
  1726. extern WORKUNIT_API StringBuffer &getQueuesContainingWorkUnit(const char *wuid, StringBuffer &queueList)
  1727. {
  1728. Owned<IRemoteConnection> conn = querySDS().connect("/JobQueues", myProcessSession(), RTM_LOCK_READ, 5000);
  1729. if (!conn)
  1730. return queueList;
  1731. VStringBuffer xpath("Queue[Item/@wuid='%s']", wuid);
  1732. Owned<IPropertyTreeIterator> it = conn->getElements(xpath.str());
  1733. ForEach(*it)
  1734. {
  1735. if (queueList.length())
  1736. queueList.append(',');
  1737. queueList.append(it->query().queryProp("@name"));
  1738. }
  1739. return queueList;
  1740. }
  1741. extern void WORKUNIT_API removeWorkUnitFromAllQueues(const char *wuid)
  1742. {
  1743. StringBuffer queueList;
  1744. if (!getQueuesContainingWorkUnit(wuid, queueList).length())
  1745. return;
  1746. Owned<IJobQueue> q = createJobQueue(queueList.str());
  1747. if (q)
  1748. while(q->remove(wuid));
  1749. }
  1750. extern bool WORKUNIT_API switchWorkUnitQueue(IWorkUnit* wu, const char *cluster)
  1751. {
  1752. if (!wu)
  1753. return false;
  1754. class cQswitcher: public CInterface, implements IQueueSwitcher
  1755. {
  1756. public:
  1757. IMPLEMENT_IINTERFACE;
  1758. void * getQ(const char * qname, const char * wuid)
  1759. {
  1760. Owned<IJobQueue> q = createJobQueue(qname);
  1761. return q->take(wuid);
  1762. }
  1763. void putQ(const char * qname, const char * wuid, void * qitem)
  1764. {
  1765. Owned<IJobQueue> q = createJobQueue(qname);
  1766. q->enqueue((IJobQueueItem *)qitem);
  1767. }
  1768. bool isAuto()
  1769. {
  1770. return false;
  1771. }
  1772. } switcher;
  1773. return wu->switchThorQueue(cluster, &switcher);
  1774. }