wujobq.hpp 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #ifndef WUJOBQ_HPP
  14. #define WUJOBQ_HPP
  15. #include "jsocket.hpp"
  16. #include "dasess.hpp"
  17. #include "workunit.hpp"
  18. interface IJobQueueItem: extends serializable
  19. {
  20. virtual const char *queryWUID()=0;
  21. virtual const char *queryOwner()=0;
  22. virtual int getPriority()=0;
  23. virtual SessionId getSessionId()=0;
  24. virtual SocketEndpoint &queryEndpoint()=0; // dali client ep
  25. virtual unsigned getPort()=0; // conversation port (not used for DFU server)
  26. virtual bool equals(IJobQueueItem *other)=0;
  27. virtual IJobQueueItem* clone()=0;
  28. virtual void setPriority(int priority)=0;
  29. virtual void setOwner(const char *owner)=0;
  30. virtual void setEndpoint(const SocketEndpoint &ep)=0;
  31. virtual void setPort(unsigned)=0;
  32. virtual void setSessionId(SessionId id)=0;
  33. virtual bool isValidSession()=0; // used before conversation started
  34. virtual CDateTime &queryEnqueuedTime()=0; // when was enqueued
  35. virtual void setEnqueuedTime(const CDateTime &dt)=0;
  36. };
  37. typedef IIteratorOf<IJobQueueItem> IJobQueueIterator;
  38. class WORKUNIT_API CJobQueueContents: public IArrayOf<IJobQueueItem>
  39. { // used as a 'snapshot' of queue items
  40. public:
  41. IJobQueueIterator *getIterator(); // only valid during lifetime of CJobQueueContents
  42. };
  43. interface IDynamicPriority
  44. {
  45. virtual int get()=0;
  46. };
  47. interface IJobQueueConst: extends IInterface
  48. {
  49. virtual unsigned ordinality()=0; // number of items on queue
  50. virtual unsigned waiting()=0; // number currently waiting on dequeue
  51. virtual IJobQueueItem *getItem(unsigned idx)=0;
  52. virtual IJobQueueItem *getHead()=0;
  53. virtual IJobQueueItem *getTail()=0;
  54. virtual IJobQueueItem *find(const char *wuid)=0;
  55. virtual unsigned findRank(const char *wuid)=0;
  56. virtual unsigned copyItems(CJobQueueContents &dest)=0; // takes a snapshot copy of the entire queue (returns number copied)
  57. virtual bool getLastDequeuedInfo(StringAttr &wuid, CDateTime &enqueuedt, int &priority)=0;
  58. virtual void copyItemsAndState(CJobQueueContents& contents, StringBuffer& state, StringBuffer& stateDetails)=0;
  59. virtual void getState(StringBuffer& state, StringBuffer& stateDetails)=0;
  60. virtual bool paused()=0; // true if paused
  61. virtual bool paused(StringBuffer& info)=0; // true if paused
  62. virtual bool stopped()=0; // true if stopped
  63. virtual bool stopped(StringBuffer& info)=0; // true if stopped
  64. };
  65. interface IJobQueue: extends IJobQueueConst
  66. {
  67. // enqueuing
  68. // the following enqueues all take ownership of qitem passed
  69. virtual void enqueue(IJobQueueItem *qitem)=0;
  70. virtual void enqueueHead(IJobQueueItem *qitem)=0;
  71. virtual void enqueueTail(IJobQueueItem *qitem)=0;
  72. virtual void enqueueBefore(IJobQueueItem *qitem,const char *nextwuid)=0;
  73. virtual void enqueueAfter(IJobQueueItem *qitem,const char *prevwuid)=0;
  74. // dequeueing
  75. virtual void connect(bool validateitemsessions)=0; // must be called before dequeueing
  76. // validateitemsessions ensures that all queue items have running session
  77. virtual IJobQueueItem *dequeue(unsigned timeout=INFINITE)=0;
  78. virtual IJobQueueItem *prioDequeue(int minprio,unsigned timeout=INFINITE)=0;
  79. virtual void disconnect()=0; // signal no longer wil be dequeing (optional - done automatically on release)
  80. virtual void getStats(unsigned &connected,unsigned &waiting, unsigned &enqueued)=0; // this not quick as validates clients still running
  81. virtual bool waitStatsChange(unsigned timeout)=0;
  82. virtual void cancelWaitStatsChange()=0;
  83. //manipulation
  84. virtual IJobQueueItem *take(const char *wuid)=0; // finds and removes
  85. virtual unsigned takeItems(CJobQueueContents &dest)=0; // takes items and clears queue
  86. virtual void enqueueItems(CJobQueueContents &items)=0; // enqueues to first sub-queue
  87. virtual bool moveBefore(const char *wuid,const char *nextwuid)=0;
  88. virtual bool moveAfter(const char *wuid,const char *prevwuid)=0;
  89. virtual bool moveToHead(const char *wuid)=0;
  90. virtual bool moveToTail(const char *wuid)=0;
  91. virtual bool remove(const char *wuid)=0;
  92. virtual bool changePriority(const char *wuid,int value)=0;
  93. virtual void clear()=0; // removes all items
  94. // transactions (optional)
  95. virtual void lock()=0;
  96. virtual void unlock(bool rollback=false)=0;
  97. // control:
  98. virtual void pause()=0; // marks queue as paused - and subsequent dequeues block until resumed
  99. virtual void pause(const char *info)=0; // marks queue as paused - and subsequent dequeues block until resumed
  100. virtual void stop()=0; // sets stopped flags - all current and subsequent dequeues return NULL
  101. virtual void stop(const char *info)=0; // sets stopped flags - all current and subsequent dequeues return NULL
  102. virtual void resume()=0; // removes paused or stopped flag
  103. virtual void resume(const char *info)=0; // removes paused or stopped flag
  104. // conversations:
  105. virtual IConversation *initiateConversation(IJobQueueItem *item)=0; // does enqueue - take ownership of item
  106. virtual IConversation *acceptConversation(IJobQueueItem *&item,unsigned prioritytransitiondelay=0,IDynamicPriority *maxp=NULL)=0;
  107. // does dequeue - returns queue item dequeued
  108. virtual void cancelInitiateConversation()=0; // cancels initiateConversation in progress
  109. virtual bool cancelInitiateConversation(const char *wuid)=0; // cancels remote initiate
  110. virtual void cancelAcceptConversation()=0; // cancels acceptConversation in progress
  111. virtual const char * queryActiveQueueName()=0;
  112. virtual void setActiveQueue(const char *name)=0;
  113. virtual const char *nextQueueName(const char *last)=0;
  114. };
  115. interface IJQSnapshot : extends IInterface
  116. {
  117. virtual IJobQueueConst *getJobQueue(const char *name)=0;
  118. };
  119. extern WORKUNIT_API IJQSnapshot *createJQSnapshot();
  120. extern WORKUNIT_API IJobQueueItem *createJobQueueItem(const char *wuid);
  121. extern WORKUNIT_API IJobQueueItem *deserializeJobQueueItem(MemoryBuffer &mb);
  122. extern WORKUNIT_API IJobQueue *createJobQueue(const char *name);
  123. extern bool WORKUNIT_API runWorkUnit(const char *wuid, const char *cluster);
  124. extern bool WORKUNIT_API runWorkUnit(const char *wuid);
  125. extern WORKUNIT_API StringBuffer & getQueuesContainingWorkUnit(const char *wuid, StringBuffer &queueList);
  126. extern WORKUNIT_API void removeWorkUnitFromAllQueues(const char *wuid);
  127. extern bool WORKUNIT_API switchWorkUnitQueue(IWorkUnit* wu, const char *cluster);
  128. #endif