12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976 |
- /*##############################################################################
- HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ############################################################################## */
- #include "platform.h"
- #include <algorithm>
- #include "limits.h"
- #include "jlib.hpp"
- #include "jbuff.hpp"
- #include "dasess.hpp"
- #include "dautils.hpp"
- #include "portlist.h"
- #include "dacoven.hpp"
- #include "daclient.hpp"
- #include "dasds.hpp"
- #include "dasess.hpp"
- #include "wujobq.hpp"
- #ifdef _MSC_VER
- #pragma warning (disable : 4355)
- #endif
- #if 0
- JobQueues
- JobQueue @name= @count= @state=active|paused|stopped
- Edition <num>
- Client @session= @connected= @waiting= -- connections and waiting can be > 1 (multiple threads)
- Item* @wuid @owner @node @port @priority @session
- #endif
- class CJobQueueItem: public CInterface, implements IJobQueueItem
- {
- StringAttr wu;
- StringAttr owner;
- int priority;
- SessionId sessid;
- SocketEndpoint ep;
- unsigned port;
- CDateTime enqueuedt;
- public:
- IMPLEMENT_IINTERFACE;
- CJobQueueItem(MemoryBuffer &src)
- {
- deserialize(src);
- }
- CJobQueueItem(const char *_wu)
- : wu(_wu)
- {
- priority = 0;
- ep = queryMyNode()->endpoint();
- port = 0;
- sessid = myProcessSession();
- }
- CJobQueueItem(IPropertyTree *item)
- {
- const char * wuid = item->queryProp("@wuid");
- if (*wuid=='~')
- wuid++;
- wu.set(wuid);
- owner.set(item->queryProp("@owner"));
- sessid = (SessionId)item->getPropInt64("@session");
- priority = item->getPropInt("@priority");
- ep.set(item->queryProp("@node"));
- port = (unsigned)item->getPropInt("@port");
- StringBuffer dts;
- if (item->getProp("@enqueuedt",dts))
- enqueuedt.setString(dts.str());
- }
- static void assignBranch(IPropertyTree *item,IJobQueueItem *qi)
- {
- item->setPropInt64("@session",qi->getSessionId());
- item->setPropInt("@priority",qi->getPriority());
- item->setPropInt("@port",qi->getPort());
- item->setProp("@wuid",qi->queryWUID());
- item->setProp("@owner",qi->queryOwner());
- StringBuffer eps;
- qi->queryEndpoint().getUrlStr(eps);
- item->setProp("@node",eps.str());
- StringBuffer dts;
- qi->queryEnqueuedTime().getString(dts);
- if (dts.length()==0) {
- CDateTime dt;
- dt.setNow();
- dt.getString(dts);
- qi->setEnqueuedTime(dt);
- }
- item->setProp("@enqueuedt",dts.str());
- }
- IPropertyTree *createBranch(CJobQueueItem)
- {
- IPropertyTree *item = createPTree("Item");
- assignBranch(item,this);
- return item;
- }
- const char *queryWUID()
- {
- return wu.get();
- }
- int getPriority()
- {
- return priority;
- }
- unsigned getPort()
- {
- return port;
- }
- SessionId getSessionId()
- {
- return sessid;
- }
- SocketEndpoint &queryEndpoint()
- {
- return ep;
- }
- const char *queryOwner()
- {
- return owner.get();
- }
- bool equals(IJobQueueItem *other)
- {
- // work unit is primary key
- return strcmp(wu.get(),other->queryWUID())==0;
- }
- CDateTime &queryEnqueuedTime()
- {
- return enqueuedt;
- }
- void setEnqueuedTime(const CDateTime &dt)
- {
- enqueuedt.set(dt);
- }
- void serialize(MemoryBuffer &tgt)
- {
- tgt.append(priority).append(port).append(wu).append(sessid);
- ep.serialize(tgt);
- StringBuffer dts;
- enqueuedt.getString(dts);
- tgt.append(owner).append(dts);
- }
- void deserialize(MemoryBuffer &src)
- {
- src.read(priority).read(port).read(wu).read(sessid);
- ep.deserialize(src);
- StringBuffer dts;
- src.read(owner).read(dts);
- enqueuedt.setString(dts.str());
- }
- IJobQueueItem* clone()
- {
- IJobQueueItem* ret = new CJobQueueItem(wu);
- ret->setPriority(priority);
- ret->setPriority(port);
- ret->setEndpoint(ep);
- ret->setSessionId(sessid);
- return ret;
- }
- void setPriority(int _priority)
- {
- priority = _priority;
- }
- void setPort(unsigned _port)
- {
- port = _port;
- }
- void setEndpoint(const SocketEndpoint &_ep)
- {
- ep = _ep;
- }
- void setSessionId(SessionId _id)
- {
- if (_id)
- sessid = _id;
- else
- sessid = myProcessSession();
- }
- void setOwner(const char *_owner)
- {
- owner.set(_owner);
- }
- bool isValidSession()
- {
- Owned<INode> node = createINode(ep);
- return (querySessionManager().lookupProcessSession(node)==sessid);
- }
- };
- class CJobQueueIterator: public CInterface, implements IJobQueueIterator
- {
- public:
- CJobQueueContents &items;
- unsigned idx;
- IMPLEMENT_IINTERFACE;
- CJobQueueIterator(CJobQueueContents &_items)
- : items(_items)
- {
- idx = 0;
- }
- bool isValid()
- {
- return idx<items.ordinality();
- }
- bool first()
- {
- idx = 0;
- return isValid();
- }
- bool next()
- {
- idx++;
- return isValid();
- }
- IJobQueueItem & query()
- {
- return items.item(idx);
- }
- };
- IJobQueueIterator *CJobQueueContents::getIterator()
- {
- return new CJobQueueIterator(*this);
- }
- IJobQueueItem *createJobQueueItem(const char *wuid)
- {
- if (!wuid||!*wuid)
- throw MakeStringException(-1,"createJobQueueItem empty WUID");
- return new CJobQueueItem(wuid);;
- }
- IJobQueueItem *deserializeJobQueueItem(MemoryBuffer &mb)
- {
- return new CJobQueueItem(mb);
- }
- #define ForEachQueue(qd) for (sQueueData *qd = qdata; qd!=NULL; qd=qd->next)
- #define ForEachQueueIn(parent,qd) for (sQueueData *qd = parent.qdata; qd!=NULL; qd=qd->next)
- struct sQueueData
- {
- sQueueData *next;
- IRemoteConnection *conn;
- StringAttr qname;
- IPropertyTree *root;
- SubscriptionId subscriberid;
- unsigned lastWaitEdition;
- };
- class CJobQueue: public CInterface, implements IJobQueue
- {
- class cOrderedIterator
- {
- CJobQueue &parent;
- unsigned numqueues;
- unsigned *queueidx;
- sQueueData **queues;
- IPropertyTree **queuet;
- MemoryAttr ma;
- unsigned current;
- public:
- cOrderedIterator(CJobQueue &_parent)
- : parent(_parent)
- {
- numqueues=0;
- ForEachQueueIn(parent,qd1)
- if (qd1->root)
- numqueues++;
- queueidx = (unsigned *)ma.allocate(numqueues*(sizeof(unsigned)+sizeof(sQueueData *)+sizeof(IPropertyTree *)));
- queues = (sQueueData **)(queueidx+numqueues);
- queuet = (IPropertyTree **)(queues+numqueues);
- unsigned i = 0;
- ForEachQueueIn(parent,qd2) {
- if (qd2->root)
- queues[i++] = qd2;
- }
- current = (unsigned)-1;
- }
- bool first()
- {
- StringBuffer path;
- parent.getItemPath(path,0U);
- current = (unsigned)-1;
- for (unsigned i = 0; i<numqueues;i++) {
- queueidx[i] = 0;
- queuet[i] = queues[i]->root->queryPropTree(path.str());
- if (queuet[i])
- if ((current==(unsigned)-1)||parent.itemOlder(queuet[i],queuet[current]))
- current = i;
- }
- return current!=(unsigned)-1;
- }
- bool next()
- {
- if (current==(unsigned)-1)
- return false;
- queueidx[current]++;
- StringBuffer path;
- parent.getItemPath(path,queueidx[current]);
- queuet[current] = queues[current]->root->queryPropTree(path.str());
- current = (unsigned)-1;
- for (unsigned i = 0; i<numqueues;i++) {
- if (queuet[i])
- if ((current==(unsigned)-1)||parent.itemOlder(queuet[i],queuet[current]))
- current = i;
- }
- return current!=(unsigned)-1;
- }
- bool isValid()
- {
- return current!=(unsigned)-1;
- }
- void item(sQueueData *&qd, IPropertyTree *&t,unsigned &idx)
- {
- assertex(current!=(unsigned)-1);
- qd = queues[current];
- t = queuet[current];
- idx = queueidx[current];
- }
- sQueueData &queryQueue()
- {
- assertex(current!=(unsigned)-1);
- return *queues[current];
- }
- IPropertyTree &queryTree()
- {
- assertex(current!=(unsigned)-1);
- return *queuet[current];
- }
- };
- public:
- sQueueData *qdata;
- sQueueData *activeq;
- SessionId sessionid;
- unsigned locknest;
- bool writemode;
- Semaphore notifysem;
- CriticalSection crit;
- bool connected;
- Owned<IConversation> initiateconv;
- StringAttr initiatewu;
- bool dequeuestop;
- bool cancelwaiting;
- bool validateitemsessions;
- class csubs: public CInterface, implements ISDSSubscription
- {
- CJobQueue *parent;
- public:
- IMPLEMENT_IINTERFACE;
- csubs(CJobQueue *_parent)
- {
- parent = _parent;
- }
- void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
- {
- CriticalBlock block(parent->crit);
- parent->notifysem.signal();
- }
- } subs;
- IMPLEMENT_IINTERFACE;
- CJobQueue(const char *_qname)
- : subs(this)
- {
- StringArray qlist;
- qlist.appendListUniq(_qname, ",");
- sQueueData *last = NULL;
- ForEachItemIn(i,qlist) {
- sQueueData *qd = new sQueueData;
- qd->next = NULL;
- qd->qname.set(qlist.item(i));
- qd->conn = NULL;
- qd->root = NULL;
- qd->lastWaitEdition = 0;
- qd->subscriberid = 0;
- if (last)
- last->next = qd;
- else
- qdata = qd;
- last = qd;
- }
- validateitemsessions = false;
- activeq = qdata;
- writemode = false;
- locknest = 0;
- sessionid = myProcessSession();
- connected = false;
- dequeuestop = false;
- cancelwaiting = false;
- Cconnlockblock block(this,false); // this just checks queue exists
- }
- virtual ~CJobQueue()
- {
- try {
- while (locknest)
- connunlock(true); // auto rollback
- if (connected)
- disconnect();
- }
- catch (IException *e) {
- // server error
- EXCLOG(e, "~CJobQueue");
- e->Release();
- }
- try { // must attempt to remove subscription before object destroyed.
- dounsubscribe();
- }
- catch (IException *e) {
- EXCLOG(e, "~CJobQueue calling dounsubscribe");
- e->Release();
- }
- while (qdata)
- {
- sQueueData * next = qdata->next;
- delete qdata;
- qdata = next;
- }
- }
- void connlock(bool exclusive)
- { // must be in sect
- if (locknest++==0) {
- unsigned wait = qdata&&qdata->next?5000:INFINITE;
- ForEachQueue(qd) {
- loop {
- StringBuffer path;
- path.appendf("/JobQueues/Queue[@name=\"%s\"]",qd->qname.get());
- bool timeout;
- loop {
- timeout=false;
- try {
- qd->conn = querySDS().connect(path.str(),myProcessSession(),exclusive?RTM_LOCK_WRITE:RTM_LOCK_READ,wait);
- if (qd->conn)
- break;
- }
- catch (ISDSException *e) {
- if (SDSExcpt_LockTimeout != e->errorCode())
- throw;
- e->Release();
- timeout = true;
- }
- // create queue
- Owned<IRemoteConnection> pconn;
- try {
- pconn.setown(querySDS().connect("/JobQueues",myProcessSession(),RTM_LOCK_WRITE|RTM_CREATE_QUERY,wait));
- if (!pconn)
- throw MakeStringException(-1,"CJobQueue could not create JobQueues");
- IPropertyTree *proot = pconn->queryRoot();
- StringBuffer cpath;
- cpath.appendf("Queue[@name=\"%s\"]",qd->qname.get());
- if (!proot->hasProp(cpath.str())) {
- IPropertyTree *pt = proot->addPropTree("Queue",createPTree("Queue"));
- pt->setProp("@name",qd->qname.get());
- pt->setProp("@state","active");
- pt->setPropInt("@count", 0);
- pt->setPropInt("Edition", 1);
- }
- }
- catch (ISDSException *e) {
- if (SDSExcpt_LockTimeout != e->errorCode())
- throw;
- e->Release();
- timeout = true;
- }
- }
- if (!timeout)
- break;
- sQueueData *qd2 = qdata;
- do {
- ::Release(qd2->conn);
- qd2->conn = NULL;
- qd2->root = NULL;
- }
- while (qd2!=qd);
- PROGLOG("Job Queue contention - delaying before retrying");
- Sleep(getRandom()%5000); // dining philosopher delay
- wait = getRandom()%4000+3000; // try and prevent sync
- qd = qdata;
- }
- qd->root = qd->conn->queryRoot();
- }
- writemode = exclusive;
- }
- else {
- if (exclusive&&!writemode) {
- ForEachQueue(qd) {
- assertex(qd->conn);
- writemode = exclusive;
- bool lockreleased;
- safeChangeModeWrite(qd->conn,qd->qname.get(),lockreleased);
- qd->root = qd->conn->queryRoot();
- }
- }
- }
- }
- void connunlock(bool rollback=false)
- { // should be in sect
- if (--locknest==0) {
- ForEachQueue(qd) {
- if (qd->conn) { // can occur if connection to dali threw exception
- if (writemode) {
- if (rollback)
- qd->conn->rollback();
- else {
- qd->root->setPropInt("Edition",qd->root->getPropInt("Edition")+1);
- qd->conn->commit();
- }
- }
- qd->conn->Release();
- qd->conn = NULL;
- }
- qd->root = NULL;
- }
- writemode = false;
- }
- }
- void conncommit() // doesn't set edition
- { // called within sect
- if (writemode) {
- ForEachQueue(qd) {
- if (qd->conn)
- qd->conn->commit();
- }
- }
- }
- class Cconnlockblock: public CriticalBlock
- {
- CJobQueue *parent;
- bool rollback;
- public:
- Cconnlockblock(CJobQueue *_parent,bool exclusive)
- : CriticalBlock(_parent->crit)
- {
- parent = _parent;
- parent->connlock(exclusive);
- rollback = false;
- }
- ~Cconnlockblock()
- {
- parent->connunlock(rollback);
- }
- void setRollback(bool set=true)
- {
- rollback = set;
- }
- void commit()
- {
- parent->conncommit();
- }
- };
- StringBuffer &getItemPath(StringBuffer &path,const char *wuid)
- {
- if (!wuid||!*wuid)
- return getItemPath(path,0U);
- return path.appendf("Item[@wuid=\"%s\"]",wuid);
- }
- StringBuffer &getItemPath(StringBuffer &path,unsigned idx)
- {
- path.appendf("Item[@num=\"%d\"]",idx+1);
- return path;
- }
- void removeItem(sQueueData &qd,IPropertyTree *item, bool cache)
- { // does not adjust or use @count
- unsigned n = item->getPropInt("@num");
- if (!n)
- return;
- if (cache) {
- StringBuffer s;
- item->getProp("@wuid",s.clear());
- qd.root->setProp("@prevwuid",s.str());
- item->getProp("@enqueuedt",s.clear());
- qd.root->setProp("@prevenqueuedt",s.str());
- qd.root->setPropInt("@prevpriority",item->getPropInt("@priority"));
- }
- item->setPropInt("@num",-1);
- StringBuffer path;
- loop {
- IPropertyTree *item2 = qd.root->queryPropTree(getItemPath(path.clear(),n).str());
- if (!item2)
- break;
- item2->setPropInt("@num",n);
- n++;
- }
- qd.root->removeTree(item);
- }
- IPropertyTree *addItem(sQueueData &qd,IPropertyTree *item,unsigned idx,unsigned count)
- {
- // does not set any values other than num
- StringBuffer path;
- // first move following up
- unsigned n=count;
- while (n>idx) {
- n--;
- qd.root->queryPropTree(getItemPath(path.clear(),n).str())->setPropInt("@num",n+2);
- }
- item->setPropInt("@num",idx+1);
- return qd.root->addPropTree("Item",item);
- }
- IPropertyTree *queryClientRoot(sQueueData &qd,unsigned idx=(unsigned)-1)
- {
- StringBuffer path;
- if (idx==(unsigned)-1)
- path.appendf("Client[@session=\"%"I64F"d\"]",sessionid);
- else
- path.appendf("Client[%d]",idx+1);
- IPropertyTree *ret = qd.root->queryPropTree(path.str());
- if (!ret&&(idx==(unsigned)-1)) {
- Cconnlockblock block(this,true);
- ret = createPTree("Client");
- ret = qd.root->addPropTree("Client",ret);
- ret->setPropInt64("@session",sessionid);
- StringBuffer eps;
- ret->setProp("@node",queryMyNode()->endpoint().getUrlStr(eps).str());
- }
- return ret;
- }
- void dosubscribe()
- { // called in crit section
- ForEachQueue(qd) {
- if (qd->subscriberid) {
- querySDS().unsubscribe(qd->subscriberid);
- qd->subscriberid = 0;
- }
- StringBuffer path;
- path.appendf("/JobQueues/Queue[@name=\"%s\"]/Edition",qd->qname.get());
- qd->subscriberid = querySDS().subscribe(path.str(), subs, false);
- }
- }
- bool haschanged() // returns if any changed
- {
- bool changed = false;
- ForEachQueue(qd) {
- if (!qd->subscriberid) {
- StringBuffer path;
- path.appendf("/JobQueues/Queue[@name=\"%s\"]/Edition",qd->qname.get());
- qd->subscriberid = querySDS().subscribe(path.str(), subs, false);
- }
- unsigned e = (unsigned)qd->root->getPropInt("Edition", 1);
- if (e!=qd->lastWaitEdition) {
- qd->lastWaitEdition = e;
- changed = true;
- break;
- }
- }
- return changed;
- }
- void dounsubscribe()
- {
- // called in crit section
- ForEachQueue(qd) {
- if (qd->subscriberid) {
- querySDS().unsubscribe(qd->subscriberid);
- qd->subscriberid = 0;
- }
- }
- }
- void connect(bool _validateitemsessions)
- {
- Cconnlockblock block(this,true);
- validateitemsessions = _validateitemsessions;
- if (connected)
- disconnect();
- dosubscribe();
- ForEachQueue(qd) {
- unsigned connected;
- unsigned waiting;
- unsigned count;
- getStats(*qd,connected,waiting,count); // clear any duff clients
- IPropertyTree *croot = queryClientRoot(*qd);
- croot->setPropInt64("@connected",croot->getPropInt64("@connected",0)+1);
- }
- connected = true;
- }
- void disconnect() // signal no longer wil be dequeing (optional - done automatically on release)
- {
- Cconnlockblock block(this,true);
- if (connected) {
- dounsubscribe();
- ForEachQueue(qd) {
- IPropertyTree *croot = queryClientRoot(*qd);
- croot->setPropInt64("@connected",croot->getPropInt64("@connected",0)-1);
- }
- connected = false;
- }
- }
- bool itemOlder(IPropertyTree *qt1, IPropertyTree *qt2)
- {
- // if this ever becomes time critical thne could cache enqueued values
- StringBuffer d1s;
- if (qt1)
- qt1->getProp("@enqueuedt",d1s);
- StringBuffer d2s;
- if (qt2)
- qt2->getProp("@enqueuedt",d2s);
- return (strcmp(d1s.str(),d2s.str())<0);
- }
- sQueueData *findbestqueue(bool useprev,int minprio,unsigned numqueues,sQueueData **queues)
- {
- if (numqueues==0)
- return NULL;
- if (numqueues==1)
- return *queues;
- sQueueData *best = NULL;
- IPropertyTree *bestt = NULL;
- for (unsigned i=0;i<numqueues;i++) {
- sQueueData *qd = queues[i];
- unsigned count = qd->root->getPropInt("@count");
- if (count) {
- int mpr = useprev?std::max(qd->root->getPropInt("@prevpriority"),minprio):minprio;
- if (count&&((minprio==INT_MIN)||checkprio(*qd,mpr))) {
- StringBuffer path;
- IPropertyTree *item = qd->root->queryPropTree(getItemPath(path,0U).str());
- if (!item)
- continue;
- if (item->getPropInt("@num",0)<=0)
- continue;
- CDateTime dt;
- StringBuffer enqueued;
- if (!best||itemOlder(item,bestt)) {
- best = qd;
- bestt = item;
- }
- }
- }
- }
- return best;
- }
- void setWaiting(unsigned numqueues,sQueueData **queues, bool set)
- {
- for (unsigned i=0; i<numqueues; i++) {
- IPropertyTree *croot = queryClientRoot(*queues[i]);
- croot->setPropInt64("@waiting",croot->getPropInt64("@waiting",0)+(set?1:-1));
- }
- }
- // 'simple' queuing
- IJobQueueItem *dodequeue(int minprio,unsigned timeout=INFINITE, bool useprev=false, bool *timedout=NULL)
- {
- bool hasminprio=(minprio!=INT_MIN);
- if (timedout)
- *timedout = false;
- IJobQueueItem *ret=NULL;
- bool waitingset = false;
- while (!dequeuestop) {
- unsigned t;
- if (timeout!=(unsigned)INFINITE)
- t = msTick();
- {
- Cconnlockblock block(this,true);
- block.setRollback(true); // assume not going to update
- // now cycle through queues looking at state
- unsigned total = 0;
- unsigned stopped = 0;
- PointerArray active;
- ForEachQueue(qd) {
- total++;
- const char *state = qd->root->queryProp("@state");
- if (state) {
- if (strcmp(state,"stopped")==0)
- stopped++;
- else if (strcmp(state,"paused")!=0)
- active.append(qd);
- }
- else
- active.append(qd);
- }
- if (stopped==total)
- return NULL; // all stopped
- sQueueData **activeqds = (sQueueData **)active.getArray();
- unsigned activenum = active.ordinality();
- if (activenum) {
- sQueueData *bestqd = findbestqueue(useprev,minprio,activenum,activeqds);
- unsigned count = bestqd?bestqd->root->getPropInt("@count"):0;
- // load minp from cache
- if (count) {
- int mpr = useprev?std::max(bestqd->root->getPropInt("@prevpriority"),minprio):minprio;
- if (!hasminprio||checkprio(*bestqd,mpr)) {
- block.setRollback(false);
- ret = dotake(*bestqd,NULL,true,hasminprio,mpr);
- if (ret) // think it must be!
- timeout = 0; // so mark that done
- else if (!hasminprio) {
- WARNLOG("Resetting queue %s",bestqd->qname.get());
- clear(*bestqd); // reset queue as seems to have become out of sync
- }
- }
- }
- if (timeout!=0) { // more to do
- if (!connected) { // if connect already done non-zero
- connect(validateitemsessions);
- block.setRollback(false);
- }
- if (!waitingset) {
- setWaiting(activenum,activeqds,true);
- block.commit();
- waitingset = true;
- }
- }
- }
- if (timeout==0) {
- if (waitingset) {
- setWaiting(activenum,activeqds,false);
- block.commit();
- }
- if (timedout)
- *timedout = (ret==NULL);
- break;
- }
- }
- unsigned to = 5*60*1000;
- // check every 5 mins independant of notify (in case subscription lost for some reason)
- if (to>timeout)
- to = timeout;
- notifysem.wait(to);
- if (timeout!=(unsigned)INFINITE) {
- t = msTick()-t;
- if (t<timeout)
- timeout -= t;
- else
- timeout = 0;
- }
- }
- return ret;
- }
- IJobQueueItem *dequeue(unsigned timeout=INFINITE)
- {
- return dodequeue(INT_MIN,timeout);
- }
- IJobQueueItem *prioDequeue(int minprio,unsigned timeout=INFINITE) // minprio == MAX_INT - used cache priority
- {
- return dodequeue(minprio,timeout);
- }
- void placeonqueue(sQueueData &qd, IJobQueueItem *qitem,unsigned idx) // takes ownership of qitem
- {
- Owned<IJobQueueItem> qi = qitem;
- remove(qi->queryWUID()); // just in case trying to put on twice!
- int priority = qi->getPriority();
- unsigned count = qd.root->getPropInt("@count");
- StringBuffer path;
- if (count&&(idx!=(unsigned)-1)) { // need to check before and after
- if (idx) {
- IPropertyTree *pt = qd.root->queryPropTree(getItemPath(path.clear(),idx-1).str());
- if (pt) {
- int pp = pt->getPropInt("@priority");
- if (priority>pp) {
- qi->setPriority(pp);
- priority = pp;
- }
- }
- else // what happened here?
- idx = (unsigned)-1;
- }
- if (idx<count) {
- IPropertyTree *pt = qd.root->queryPropTree(getItemPath(path.clear(),idx).str());
- if (pt) {
- int pp = pt->getPropInt("@priority");
- if (priority<pp) {
- qi->setPriority(pp);
- priority = pp;
- }
- }
- else // what happened here?
- idx = (unsigned)-1;
- }
- }
- if (idx==(unsigned)-1) {
- idx = count;
- while (idx) {
- IPropertyTree *previtem = qd.root->queryPropTree(getItemPath(path.clear(),idx-1).str());
- if (previtem) {
- if (previtem->getPropInt("@priority")>=priority) {
- break;
- }
- }
- else
- count--; // how did that happen?
- idx--;
- }
- }
- CJobQueueItem::assignBranch(addItem(qd,createPTree("Item"),idx,count),qi);
- qd.root->setPropInt("@count",count+1);
- }
- void enqueue(sQueueData &qd,IJobQueueItem *qitem) // takes ownership of qitem
- {
- Cconnlockblock block(this,true);
- placeonqueue(qd,qitem,(unsigned)-1);
- }
- void enqueueBefore(sQueueData &qd,IJobQueueItem *qitem,const char *wuid)
- {
- Cconnlockblock block(this,true);
- placeonqueue(qd,qitem,findRank(qd,wuid));
- }
- void enqueueAfter(sQueueData &qd,IJobQueueItem *qitem,const char *wuid)
- {
- Cconnlockblock block(this,true);
- unsigned idx = findRank(qd,wuid);
- if (idx!=(unsigned)-1)
- idx++;
- placeonqueue(qd,qitem,idx);
- }
- void enqueueTail(sQueueData &qd,IJobQueueItem *qitem)
- {
- Cconnlockblock block(this,true);
- Owned<IJobQueueItem> qi = getTail(qd);
- if (qi)
- enqueueAfter(qd,qitem,qi->queryWUID());
- else
- enqueue(qd,qitem);
- }
- void enqueueHead(sQueueData &qd,IJobQueueItem *qitem)
- {
- Cconnlockblock block(this,true);
- Owned<IJobQueueItem> qi = getHead(qd);
- if (qi)
- enqueueBefore(qd,qitem,qi->queryWUID());
- else
- enqueue(qd,qitem);
- }
- unsigned ordinality(sQueueData &qd)
- {
- Cconnlockblock block(this,false);
- return qd.root->getPropInt("@count");
- }
- unsigned waiting(sQueueData &qd)
- {
- Cconnlockblock block(this,false);
- unsigned ret = 0;
- for (unsigned i=0;;i++) {
- IPropertyTree *croot = queryClientRoot(qd,i);
- if (!croot)
- break;
- ret += croot->getPropInt("@waiting");
- }
- return ret;
- }
- IJobQueueItem *getItem(sQueueData &qd,unsigned idx)
- {
- Cconnlockblock block(this,false);
- if (idx==(unsigned)-1) {
- idx = qd.root->getPropInt("@count");
- if (!idx)
- return NULL;
- idx--;
- }
- StringBuffer path;
- IPropertyTree *item = qd.root->queryPropTree(getItemPath(path,idx).str());
- if (!item)
- return NULL;
- return new CJobQueueItem(item);
- }
- IJobQueueItem *getHead(sQueueData &qd)
- {
- return getItem(qd,0);
- }
- IJobQueueItem *getTail(sQueueData &qd)
- {
- return getItem(qd,(unsigned)-1);
- }
- unsigned doFindRank(sQueueData &qd,const char *wuid)
- {
- StringBuffer path;
- IPropertyTree *item = qd.root->queryPropTree(getItemPath(path,wuid).str());
- if (!item)
- return (unsigned)-1;
- return item->getPropInt("@num")-1;
- }
- unsigned findRank(sQueueData &qd,const char *wuid)
- {
- Cconnlockblock block(this,false);
- return doFindRank(qd,wuid);
- }
- IJobQueueItem *loadItem(sQueueData &qd,IJobQueueItem *qi)
- {
- Cconnlockblock block(this,false);
- StringBuffer path;
- IPropertyTree *item = qd.root->queryPropTree(getItemPath(path,qi->queryWUID()).str());
- if (!item)
- return NULL;
- bool cached = item->getPropInt("@num",0)<=0;
- if (cached)
- return NULL; // don't want cached value
- return new CJobQueueItem(item);
- }
- IJobQueueItem *find(sQueueData &qd,const char *wuid)
- {
- Cconnlockblock block(this,false);
- StringBuffer path;
- IPropertyTree *item = qd.root->queryPropTree(getItemPath(path,wuid).str());
- if (!item)
- return NULL;
- bool cached = item->getPropInt("@num",0)<=0;
- if (wuid&&cached)
- return NULL; // don't want cached value unless explicit
- return new CJobQueueItem(item);
- }
- bool checkprio(sQueueData &qd,int minprio=0)
- {
- StringBuffer path;
- IPropertyTree *item = qd.root->queryPropTree(getItemPath(path,0U).str());
- if (!item)
- return false;
- return (item->getPropInt("@priority")>=minprio);
- }
- IJobQueueItem *dotake(sQueueData &qd,const char *wuid,bool saveitem,bool hasminprio=false,int minprio=0)
- {
- StringBuffer path;
- IPropertyTree *item = qd.root->queryPropTree(getItemPath(path,wuid).str());
- if (!item)
- return NULL;
- if (item->getPropInt("@num",0)<=0)
- return NULL; // don't want (old) cached value
- if (hasminprio&&(item->getPropInt("@priority")<minprio))
- return NULL;
- IJobQueueItem *ret = new CJobQueueItem(item);
- removeItem(qd,item,saveitem);
- unsigned count = qd.root->getPropInt("@count");
- assertex(count);
- qd.root->setPropInt("@count",count-1);
- return ret;
- }
- IJobQueueItem *take(sQueueData &qd,const char *wuid)
- {
- Cconnlockblock block(this,true);
- return dotake(qd,wuid,false);
- }
- unsigned copyItemsImpl(sQueueData &qd,CJobQueueContents &dest)
- {
- unsigned ret=0;
- StringBuffer path;
- for (unsigned i=0;;i++) {
- IPropertyTree *item = qd.root->queryPropTree(getItemPath(path.clear(),i).str());
- if (!item)
- break;
- ret++;
- dest.append(*new CJobQueueItem(item));
- }
- return ret;
- }
- unsigned copyItems(sQueueData &qd,CJobQueueContents &dest)
- {
- Cconnlockblock block(this,false);
- return copyItemsImpl(qd,dest);
- }
- void copyItemsAndState(CJobQueueContents& contents, StringBuffer& state)
- {
- assertex(qdata);
- Cconnlockblock block(this,false);
- assertex(qdata->root);
- copyItemsImpl(*qdata,contents);
- const char *st = qdata->root->queryProp("@state");
- if (st&&*st)
- state.set(st);
- }
- unsigned takeItems(sQueueData &qd,CJobQueueContents &dest)
- {
- Cconnlockblock block(this,true);
- unsigned ret = copyItems(qd,dest);
- clear(qd);
- return ret;
- }
- void enqueueItems(sQueueData &qd,CJobQueueContents &items)
- {
- unsigned n=items.ordinality();
- if (n) {
- Cconnlockblock block(this,true);
- for (unsigned i=0;i<n;i++)
- enqueue(qd,items.item(i).clone());
- }
- }
- sQueueData *findQD(const char *wuid)
- {
- if (wuid&&*wuid) {
- ForEachQueue(qd) {
- unsigned idx = doFindRank(*qd,wuid);
- if (idx!=(unsigned)-1)
- return qd;
- }
- }
- return NULL;
- }
- void enqueueBefore(IJobQueueItem *qitem,const char *wuid)
- {
- Cconnlockblock block(this,true);
- sQueueData *qd = qdata->next?findQD(wuid):qdata;
- enqueueBefore(*qd,qitem,wuid);
- }
- void enqueueAfter(IJobQueueItem *qitem,const char *wuid)
- {
- Cconnlockblock block(this,true);
- sQueueData *qd = qdata->next?findQD(wuid):qdata;
- enqueueAfter(*qd,qitem,wuid);
- }
- bool moveBefore(const char *wuid,const char *nextwuid)
- {
- if (!qdata)
- return false;
- Cconnlockblock block(this,true);
- sQueueData *qd = qdata->next?findQD(wuid):qdata;
- if (!qd)
- return false;
- IJobQueueItem *qi=take(*qd,wuid);
- if (!qi)
- return false;
- sQueueData *qdd = NULL;
- if (qdata->next)
- qdd = findQD(nextwuid);
- if (!qdd)
- qdd = qd;
- enqueueBefore(*qdd,qi,nextwuid);
- return true;
- }
- bool moveAfter(const char *wuid,const char *prevwuid)
- {
- if (!qdata)
- return false;
- Cconnlockblock block(this,true);
- sQueueData *qd = qdata->next?findQD(wuid):qdata;
- if (!qd)
- return false;
- IJobQueueItem *qi=take(*qd,wuid);
- if (!qi)
- return false;
- sQueueData *qdd = NULL;
- if (qdata->next)
- qdd = findQD(prevwuid);
- if (!qdd)
- qdd = qd;
- enqueueAfter(*qdd,qi,prevwuid);
- return true;
- }
- bool moveToHead(const char *wuid)
- {
- if (!qdata)
- return false;
- Cconnlockblock block(this,true);
- sQueueData *qd = qdata->next?findQD(wuid):qdata;
- if (!qd)
- return false;
- IJobQueueItem *qi=take(*qd,wuid);
- if (!qi)
- return false;
- enqueueHead(*qd,qi);
- return true;
- }
- bool moveToTail(const char *wuid)
- {
- if (!qdata)
- return false;
- Cconnlockblock block(this,true);
- sQueueData *qd = qdata->next?findQD(wuid):qdata;
- if (!qd)
- return false;
- IJobQueueItem *qi=take(*qd,wuid);
- if (!qi)
- return false;
- enqueueTail(*qd,qi);
- return true;
- }
- bool remove(const char *wuid)
- {
- if (!qdata)
- return false;
- Cconnlockblock block(this,true);
- sQueueData *qd = qdata->next?findQD(wuid):qdata;
- if (!qd)
- return false;
- StringBuffer path;
- IPropertyTree *item = qd->root->queryPropTree(getItemPath(path,wuid).str());
- if (!item)
- return false;
- bool cached = item->getPropInt("@num",0)<=0; // old cached (bwd compat)
- removeItem(*qd,item,false);
- if (!cached) {
- unsigned count = qd->root->getPropInt("@count");
- assertex(count);
- qd->root->setPropInt("@count",count-1);
- }
- return true;
- }
- bool changePriority(const char *wuid,int value)
- {
- if (!qdata)
- return false;
- Cconnlockblock block(this,true);
- sQueueData *qd = qdata->next?findQD(wuid):qdata;
- if (!qd)
- return false;
- IJobQueueItem *qi=take(*qd,wuid);
- if (!qi) {
- StringBuffer ws("~"); // change cached item
- ws.append(wuid);
- StringBuffer path;
- IPropertyTree *item = qd->root->queryPropTree(getItemPath(path,ws.str()).str());
- if (item) {
- item->setPropInt("@priority",value);
- return true;
- }
- return false;
- }
- qi->setPriority(value);
- enqueue(*qd,qi);
- return true;
- }
- void clear(sQueueData &qd)
- {
- Cconnlockblock block(this,true);
- qd.root->setPropInt("@count",0);
- loop {
- IPropertyTree *item = qd.root->queryPropTree("Item[1]");
- if (!item)
- break;
- qd.root->removeTree(item);
- }
- }
- void lock()
- {
- connlock(false); // sub functions will change to exclusive if needed
- }
- void unlock(bool rollback=false)
- {
- connunlock(rollback);
- }
- void pause(sQueueData &qd)
- {
- Cconnlockblock block(this,true);
- qd.root->setProp("@state","paused");
- }
- void resume(sQueueData &qd)
- {
- Cconnlockblock block(this,true);
- qd.root->setProp("@state","active");
- }
- bool paused(sQueueData &qd)
- {
- Cconnlockblock block(this,false);
- const char *state = qd.root->queryProp("@state");
- return (state&&(strcmp(state,"paused")==0));
- }
- void stop(sQueueData &qd)
- {
- Cconnlockblock block(this,true);
- qd.root->setProp("@state","stopped");
- }
- bool stopped(sQueueData &qd)
- {
- Cconnlockblock block(this,false);
- const char *state = qd.root->queryProp("@state");
- return (state&&(strcmp(state,"stopped")==0));
- }
- void doGetStats(sQueueData &qd,unsigned &connected,unsigned &waiting,unsigned &enqueued)
- {
- Cconnlockblock block(this,false);
- connected = 0;
- waiting = 0;
- unsigned i=0;
- loop {
- IPropertyTree *croot = queryClientRoot(qd,i);
- if (!croot)
- break;
- if (!validSession(croot)) {
- Cconnlockblock block(this,true);
- qd.root->removeTree(croot);
- }
- else {
- waiting += croot->getPropInt("@waiting");
- connected += croot->getPropInt("@connected");
- i++;
- }
- }
- // now remove any duff queue items
- unsigned count = qd.root->getPropInt("@count");
- if (!validateitemsessions) {
- enqueued = count;
- return;
- }
- i=0;
- StringBuffer path;
- loop {
- IPropertyTree *item = qd.root->queryPropTree(getItemPath(path.clear(),i).str());
- if (!item)
- break;
- if (!validSession(item)) {
- Cconnlockblock block(this,true);
- item = qd.root->queryPropTree(path.str());
- if (!item)
- break;
- // PROGLOG("WUJOBQ: Removing %s as session %"I64F"x not active",item->queryProp("@wuid"),item->getPropInt64("@session"));
- removeItem(qd,item,false);
- }
- else
- i++;
- }
- if (count!=i) {
- Cconnlockblock block(this,true);
- qd.root->setPropInt("@count",i);
- }
- enqueued = i;
- }
- void getStats(sQueueData &qd,unsigned &connected,unsigned &waiting,unsigned &enqueued)
- {
- Cconnlockblock block(this,false);
- doGetStats(qd,connected,waiting,enqueued);
- }
- void getStats(unsigned &connected,unsigned &waiting,unsigned &enqueued)
- {
- // multi queue
- Cconnlockblock block(this,false);
- connected=0;
- waiting=0;
- enqueued=0;
- ForEachQueue(qd) {
- unsigned c;
- unsigned w;
- unsigned e;
- doGetStats(*qd,c,w,e);
- connected+=c;
- waiting+=w;
- enqueued+=e;
- }
- }
- unsigned waiting()
- {
- Cconnlockblock block(this,false);
- unsigned ret = 0;
- ForEachQueue(qd) {
- for (unsigned i=0;;i++) {
- IPropertyTree *croot = queryClientRoot(*qd,i);
- if (!croot)
- break;
- ret += croot->getPropInt("@waiting");
- }
- }
- return ret;
- }
- unsigned findRank(const char *wuid)
- {
- assertex(qdata);
- if (!qdata->next)
- return findRank(*qdata,wuid);
- Cconnlockblock block(this,false);
- cOrderedIterator it(*this);
- unsigned i = 0;
- ForEach(it) {
- const char *twuid = it.queryTree().queryProp("@wuid");
- if (twuid&&(strcmp(twuid,wuid)==0))
- return i;
- i++;
- }
- return (unsigned)-1;
- }
- unsigned copyItems(CJobQueueContents &dest)
- {
- assertex(qdata);
- if (!qdata->next)
- return copyItems(*qdata,dest);
- Cconnlockblock block(this,false);
- cOrderedIterator it(*this);
- unsigned ret = 0;
- ForEach(it) {
- dest.append(*new CJobQueueItem(&it.queryTree()));
- ret++;
- }
- return ret;
- }
- IJobQueueItem *take(const char *wuid)
- {
- assertex(qdata);
- if (!qdata->next)
- return take(*qdata,wuid);
- Cconnlockblock block(this,true);
- ForEachQueue(qd) {
- IJobQueueItem *ret = dotake(*qd,wuid,false);
- if (ret)
- return ret;
- }
- return NULL;
- }
- unsigned takeItems(CJobQueueContents &dest)
- {
- assertex(qdata);
- if (!qdata->next)
- return takeItems(*qdata,dest);
- Cconnlockblock block(this,true);
- unsigned ret = 0;
- ForEachQueue(qd) {
- ret += copyItems(*qd,dest);
- clear(*qd);
- }
- return ret;
- }
- void enqueueItems(CJobQueueContents &items)
- { // enqueues to firs sub-queue (not sure that useful)
- assertex(qdata);
- return enqueueItems(*qdata,items);
- }
- void clear()
- {
- ForEachQueue(qd) {
- clear(*qd);
- }
- }
- bool validSession(IPropertyTree *item)
- {
- Owned<INode> node = createINode(item->queryProp("@node"),DALI_SERVER_PORT); // port should always be present
- return (querySessionManager().lookupProcessSession(node)==(SessionId)item->getPropInt64("@session"));
- }
- IConversation *initiateConversation(sQueueData &qd,IJobQueueItem *item)
- {
- CriticalBlock block(crit);
- assertex(!initiateconv.get());
- SocketEndpoint ep = item->queryEndpoint();
- unsigned short port = (unsigned short)item->getPort();
- initiateconv.setown(createSingletonSocketConnection(port));
- if (!port)
- item->setPort(initiateconv->setRandomPort(WUJOBQ_BASE_PORT,WUJOBQ_PORT_NUM));
- initiatewu.set(item->queryWUID());
- enqueue(qd,item);
- bool ok;
- {
- CriticalUnblock unblock(crit);
- ok = initiateconv->accept(INFINITE);
- }
- if (!ok)
- initiateconv.clear();
- return initiateconv.getClear();
- }
- IConversation *acceptConversation(IJobQueueItem *&retitem, unsigned prioritytransitiondelay,IDynamicPriority *maxp)
- {
- CriticalBlock block(crit);
- retitem = NULL;
- assertex(connected); // must be connected
- int curmp = maxp?maxp->get():0;
- int nextmp = curmp;
- loop {
- bool timedout = false;
- Owned<IJobQueueItem> item;
- {
- CriticalUnblock unblock(crit);
- // this is a bit complicated with multi-thor
- if (prioritytransitiondelay||maxp) {
- item.setown(dodequeue((std::max(curmp,nextmp)/10)*10, // round down to multiple of 10
- prioritytransitiondelay?prioritytransitiondelay:60000,prioritytransitiondelay>0,&timedout));
- // if dynamic priority check every minute
- if (!prioritytransitiondelay) {
- curmp = nextmp; // using max above is a bit devious to allow transition
- nextmp = maxp->get();
- }
- }
- else
- item.setown(dequeue(INFINITE));
- }
- if (item.get()) {
- if (item->isValidSession()) {
- SocketEndpoint ep = item->queryEndpoint();
- ep.port = item->getPort();
- Owned<IConversation> acceptconv = createSingletonSocketConnection(ep.port,&ep);
- if (acceptconv->connect(3*60*1000)) { // shouldn't need that long
- retitem = item.getClear();
- return acceptconv.getClear();
- }
- }
- }
- else if (prioritytransitiondelay)
- prioritytransitiondelay = 0;
- else if (!timedout)
- break;
- }
- return NULL;
- }
- void cancelInitiateConversation(sQueueData &qd)
- {
- CriticalBlock block(crit);
- if (initiatewu.get())
- remove(initiatewu);
- if (initiateconv.get())
- initiateconv->cancel();
- }
- void cancelAcceptConversation()
- {
- CriticalBlock block(crit);
- dequeuestop = true;
- notifysem.signal();
- }
- bool cancelInitiateConversation(sQueueData &qd,const char *wuid)
- {
- Cconnlockblock block(this,true);
- loop {
- Owned<IJobQueueItem> item = dotake(qd,wuid,false);
- if (!item.get())
- break;
- if (item->isValidSession()) {
- SocketEndpoint ep = item->queryEndpoint();
- ep.port = item->getPort();
- Owned<IConversation> acceptconv = createSingletonSocketConnection(ep.port,&ep);
- acceptconv->connect(3*60*1000); // connect then close should close other end
- return true;
- }
- }
- return false;
- }
- bool waitStatsChange(unsigned timeout)
- {
- assertex(!connected); // not allowed to call this while connected
- cancelwaiting = false;
- while(!cancelwaiting) {
- {
- Cconnlockblock block(this,false);
- if (haschanged())
- return true;
- }
- if (!notifysem.wait(timeout))
- break;
- }
- return false;
- }
- void cancelWaitStatsChange()
- {
- CriticalBlock block(crit);
- cancelwaiting = true;
- notifysem.signal();
- }
- virtual void enqueue(IJobQueueItem *qitem)
- {
- enqueue(*activeq,qitem);
- }
- void enqueueHead(IJobQueueItem *qitem)
- {
- enqueueHead(*activeq,qitem);
- }
- void enqueueTail(IJobQueueItem *qitem)
- {
- enqueueTail(*activeq,qitem);
- }
- IJobQueueItem *getItem(unsigned idx)
- {
- if (!qdata)
- return NULL;
- if (!qdata->next)
- return getItem(*qdata,idx);
- Cconnlockblock block(this,false);
- cOrderedIterator it(*this);
- unsigned i = 0;
- IPropertyTree *ret = NULL;
- ForEach(it) {
- if (i==idx) {
- ret = &it.queryTree();
- break;
- }
- else if (idx==(unsigned)-1) // -1 means return last
- ret = &it.queryTree();
- i++;
- }
- if (ret)
- return new CJobQueueItem(ret);
- return NULL;
- }
- IJobQueueItem *getHead()
- {
- if (!qdata)
- return NULL;
- if (!qdata->next)
- return getHead(*qdata);
- return getItem(0);
- }
- IJobQueueItem *getTail()
- {
- if (!qdata)
- return NULL;
- if (!qdata->next)
- return getHead(*qdata);
- return getItem((unsigned)-1);
- }
- IJobQueueItem *find(const char *wuid)
- {
- if (!qdata)
- return NULL;
- sQueueData *qd = qdata->next?findQD(wuid):qdata;
- if (!qd)
- return NULL;
- return find(*qd,wuid);
- }
- unsigned ordinality()
- {
- Cconnlockblock block(this,false);
- unsigned ret = 0;
- ForEachQueue(qd) {
- if (qd->root)
- ret += qd->root->getPropInt("@count");
- }
- return ret;
- }
- void pause()
- {
- Cconnlockblock block(this,true);
- ForEachQueue(qd) {
- if (qd->root)
- qd->root->setProp("@state","paused");
- }
- }
- bool paused()
- {
- // true if all paused
- Cconnlockblock block(this,false);
- ForEachQueue(qd) {
- if (qd->root) {
- const char *state = qd->root->queryProp("@state");
- if (state&&(strcmp(state,"paused")!=0))
- return false;
- }
- }
- return true;
- }
- void stop()
- {
- Cconnlockblock block(this,true);
- ForEachQueue(qd) {
- if (qd->root)
- qd->root->setProp("@state","stopped");
- }
- }
- bool stopped()
- {
- // true if all stopped
- Cconnlockblock block(this,false);
- ForEachQueue(qd) {
- if (qd->root) {
- const char *state = qd->root->queryProp("@state");
- if (state&&(strcmp(state,"stopped")!=0))
- return false;
- }
- }
- return true;
- }
- void resume()
- {
- Cconnlockblock block(this,true);
- ForEachQueue(qd) {
- if (qd->root)
- qd->root->setProp("@state","active");
- }
- }
- IConversation *initiateConversation(IJobQueueItem *item)
- {
- return initiateConversation(*activeq,item);
- }
- void cancelInitiateConversation()
- {
- return cancelInitiateConversation(*activeq);
- }
- bool cancelInitiateConversation(const char *wuid)
- {
- return cancelInitiateConversation(*activeq,wuid);
- }
- const char * queryActiveQueueName()
- {
- return activeq->qname;
- }
-
- void setActiveQueue(const char *name)
- {
- ForEachQueue(qd) {
- if (!name||(strcmp(qd->qname.get(),name)==0)) {
- activeq = qd;
- return;
- }
- }
- if (name)
- throw MakeStringException (-1,"queue %s not found",name);
- }
-
- const char *nextQueueName(const char *last)
- {
- ForEachQueue(qd) {
- if (!last||(strcmp(qd->qname.get(),last)==0)) {
- if (qd->next)
- return qd->next->qname.get();
- break;
- }
- }
- return NULL;
- }
- bool getLastDequeuedInfo(StringAttr &wuid, CDateTime &enqueuedt, int &priority)
- {
- priority = 0;
- if (!activeq)
- return false;
- const char *w = activeq->root->queryProp("@prevwuid");
- if (!w||!*w)
- return false;
- wuid.set(w);
- StringBuffer dts;
- if (activeq->root->getProp("@prevenqueuedt",dts))
- enqueuedt.setString(dts.str());
- priority = activeq->root->getPropInt("@prevpriority");
- return true;
- }
- };
- IJobQueue *createJobQueue(const char *name)
- {
- if (!name||!*name)
- throw MakeStringException(-1,"createJobQueue empty name");
- return new CJobQueue(name);
- }
- extern bool WORKUNIT_API runWorkUnit(const char *wuid, const char *cluster)
- {
- Owned<IConstWUClusterInfo> clusterInfo = getTargetClusterInfo(cluster);
- if (!clusterInfo.get())
- return false;
- SCMStringBuffer agentQueue;
- clusterInfo->getAgentQueue(agentQueue);
- if (!agentQueue.length())
- return false;
- Owned<IJobQueue> queue = createJobQueue(agentQueue.str());
- if (!queue.get())
- throw MakeStringException(-1, "Could not create workunit queue");
- IJobQueueItem *item = createJobQueueItem(wuid);
- queue->enqueue(item);
- PROGLOG("Agent request '%s' enqueued on '%s'", wuid, agentQueue.str());
- return true;
- }
- extern bool WORKUNIT_API runWorkUnit(const char *wuid)
- {
- Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
- Owned<IConstWorkUnit> w = factory->openWorkUnit(wuid, false);
- if (w)
- {
- SCMStringBuffer clusterName;
- w->getClusterName(clusterName);
- w.clear();
- return runWorkUnit(wuid,clusterName.str());
- }
- else
- return false;
- }
- extern WORKUNIT_API StringBuffer &getQueuesContainingWorkUnit(const char *wuid, StringBuffer &queueList)
- {
- Owned<IRemoteConnection> conn = querySDS().connect("/JobQueues", myProcessSession(), RTM_LOCK_READ, 5000);
- if (!conn)
- return queueList;
- VStringBuffer xpath("Queue[Item/@wuid='%s']", wuid);
- Owned<IPropertyTreeIterator> it = conn->getElements(xpath.str());
- ForEach(*it)
- {
- if (queueList.length())
- queueList.append(',');
- queueList.append(it->query().queryProp("@name"));
- }
- return queueList;
- }
- extern void WORKUNIT_API removeWorkUnitFromAllQueues(const char *wuid)
- {
- StringBuffer queueList;
- if (!getQueuesContainingWorkUnit(wuid, queueList).length())
- return;
- Owned<IJobQueue> q = createJobQueue(queueList.str());
- if (q)
- while(q->remove(wuid));
- }
- extern bool WORKUNIT_API switchWorkUnitQueue(IWorkUnit* wu, const char *cluster)
- {
- if (!wu)
- return false;
- class cQswitcher: public CInterface, implements IQueueSwitcher
- {
- public:
- IMPLEMENT_IINTERFACE;
- void * getQ(const char * qname, const char * wuid)
- {
- Owned<IJobQueue> q = createJobQueue(qname);
- return q->take(wuid);
- }
- void putQ(const char * qname, const char * wuid, void * qitem)
- {
- Owned<IJobQueue> q = createJobQueue(qname);
- q->enqueue((IJobQueueItem *)qitem);
- }
- bool isAuto()
- {
- return false;
- }
- } switcher;
- return wu->switchThorQueue(cluster, &switcher);
- }
|