wujobq.hpp 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #ifndef WUJOBQ_HPP
  14. #define WUJOBQ_HPP
  15. #include "jsocket.hpp"
  16. #include "dasess.hpp"
  17. interface IJobQueueItem: extends serializable
  18. {
  19. virtual const char *queryWUID()=0;
  20. virtual const char *queryOwner()=0;
  21. virtual int getPriority()=0;
  22. virtual SessionId getSessionId()=0;
  23. virtual SocketEndpoint &queryEndpoint()=0; // dali client ep
  24. virtual unsigned getPort()=0; // conversation port (not used for DFU server)
  25. virtual bool equals(IJobQueueItem *other)=0;
  26. virtual IJobQueueItem* clone()=0;
  27. virtual void setPriority(int priority)=0;
  28. virtual void setOwner(const char *owner)=0;
  29. virtual void setEndpoint(const SocketEndpoint &ep)=0;
  30. virtual void setPort(unsigned)=0;
  31. virtual void setSessionId(SessionId id)=0;
  32. virtual bool isValidSession()=0; // used before conversation started
  33. virtual CDateTime &queryEnqueuedTime()=0; // when was enqueued
  34. virtual void setEnqueuedTime(const CDateTime &dt)=0;
  35. };
  36. typedef IIteratorOf<IJobQueueItem> IJobQueueIterator;
  37. #ifdef WORKUNIT_EXPORTS
  38. #define WORKUNIT_API DECL_EXPORT
  39. #else
  40. #define WORKUNIT_API DECL_IMPORT
  41. #endif
  42. class WORKUNIT_API CJobQueueContents: public IArrayOf<IJobQueueItem>
  43. { // used as a 'snapshot' of queue items
  44. public:
  45. IJobQueueIterator *getIterator(); // only valid during lifetime of CJobQueueContents
  46. };
  47. interface IDynamicPriority
  48. {
  49. virtual int get()=0;
  50. };
  51. interface IJobQueueConst: extends IInterface
  52. {
  53. virtual unsigned ordinality()=0; // number of items on queue
  54. virtual unsigned waiting()=0; // number currently waiting on dequeue
  55. virtual IJobQueueItem *getItem(unsigned idx)=0;
  56. virtual IJobQueueItem *getHead()=0;
  57. virtual IJobQueueItem *getTail()=0;
  58. virtual IJobQueueItem *find(const char *wuid)=0;
  59. virtual unsigned findRank(const char *wuid)=0;
  60. virtual unsigned copyItems(CJobQueueContents &dest)=0; // takes a snapshot copy of the entire queue (returns number copied)
  61. virtual bool getLastDequeuedInfo(StringAttr &wuid, CDateTime &enqueuedt, int &priority)=0;
  62. virtual void copyItemsAndState(CJobQueueContents& contents, StringBuffer& state, StringBuffer& stateDetails)=0;
  63. virtual void getState(StringBuffer& state, StringBuffer& stateDetails)=0;
  64. virtual bool paused()=0; // true if paused
  65. virtual bool paused(StringBuffer& info)=0; // true if paused
  66. virtual bool stopped()=0; // true if stopped
  67. virtual bool stopped(StringBuffer& info)=0; // true if stopped
  68. };
  69. interface IJobQueue: extends IJobQueueConst
  70. {
  71. // enqueuing
  72. // the following enqueues all take ownership of qitem passed
  73. virtual void enqueue(IJobQueueItem *qitem)=0;
  74. virtual void enqueueHead(IJobQueueItem *qitem)=0;
  75. virtual void enqueueTail(IJobQueueItem *qitem)=0;
  76. virtual void enqueueBefore(IJobQueueItem *qitem,const char *nextwuid)=0;
  77. virtual void enqueueAfter(IJobQueueItem *qitem,const char *prevwuid)=0;
  78. // dequeueing
  79. virtual void connect(bool validateitemsessions)=0; // must be called before dequeueing
  80. // validateitemsessions ensures that all queue items have running session
  81. virtual IJobQueueItem *dequeue(unsigned timeout=INFINITE)=0;
  82. virtual IJobQueueItem *prioDequeue(int minprio,unsigned timeout=INFINITE)=0;
  83. virtual void disconnect()=0; // signal no longer wil be dequeing (optional - done automatically on release)
  84. virtual void getStats(unsigned &connected,unsigned &waiting, unsigned &enqueued)=0; // this not quick as validates clients still running
  85. virtual bool waitStatsChange(unsigned timeout)=0;
  86. virtual void cancelWaitStatsChange()=0;
  87. //manipulation
  88. virtual IJobQueueItem *take(const char *wuid)=0; // finds and removes
  89. virtual unsigned takeItems(CJobQueueContents &dest)=0; // takes items and clears queue
  90. virtual void enqueueItems(CJobQueueContents &items)=0; // enqueues to first sub-queue
  91. virtual bool moveBefore(const char *wuid,const char *nextwuid)=0;
  92. virtual bool moveAfter(const char *wuid,const char *prevwuid)=0;
  93. virtual bool moveToHead(const char *wuid)=0;
  94. virtual bool moveToTail(const char *wuid)=0;
  95. virtual bool remove(const char *wuid)=0;
  96. virtual bool changePriority(const char *wuid,int value)=0;
  97. virtual void clear()=0; // removes all items
  98. // transactions (optional)
  99. virtual void lock()=0;
  100. virtual void unlock(bool rollback=false)=0;
  101. // control:
  102. virtual void pause()=0; // marks queue as paused - and subsequent dequeues block until resumed
  103. virtual void pause(const char *info)=0; // marks queue as paused - and subsequent dequeues block until resumed
  104. virtual void stop()=0; // sets stopped flags - all current and subsequent dequeues return NULL
  105. virtual void stop(const char *info)=0; // sets stopped flags - all current and subsequent dequeues return NULL
  106. virtual void resume()=0; // removes paused or stopped flag
  107. virtual void resume(const char *info)=0; // removes paused or stopped flag
  108. // conversations:
  109. virtual IConversation *initiateConversation(IJobQueueItem *item)=0; // does enqueue - take ownership of item
  110. virtual IConversation *acceptConversation(IJobQueueItem *&item,unsigned prioritytransitiondelay=0,IDynamicPriority *maxp=NULL)=0;
  111. // does dequeue - returns queue item dequeued
  112. virtual void cancelInitiateConversation()=0; // cancels initiateConversation in progress
  113. virtual bool cancelInitiateConversation(const char *wuid)=0; // cancels remote initiate
  114. virtual void cancelAcceptConversation()=0; // cancels acceptConversation in progress
  115. virtual const char * queryActiveQueueName()=0;
  116. virtual void setActiveQueue(const char *name)=0;
  117. virtual const char *nextQueueName(const char *last)=0;
  118. };
  119. interface IJQSnapshot : extends IInterface
  120. {
  121. virtual IJobQueueConst *getJobQueue(const char *name)=0;
  122. };
  123. extern WORKUNIT_API IJQSnapshot *createJQSnapshot();
  124. extern WORKUNIT_API IJobQueueItem *createJobQueueItem(const char *wuid);
  125. extern WORKUNIT_API IJobQueueItem *deserializeJobQueueItem(MemoryBuffer &mb);
  126. extern WORKUNIT_API IJobQueue *createJobQueue(const char *name);
  127. extern bool WORKUNIT_API runWorkUnit(const char *wuid, const char *queueName);
  128. extern bool WORKUNIT_API runWorkUnit(const char *wuid);
  129. extern WORKUNIT_API StringBuffer & getQueuesContainingWorkUnit(const char *wuid, StringBuffer &queueList);
  130. extern WORKUNIT_API void removeWorkUnitFromAllQueues(const char *wuid);
  131. extern bool WORKUNIT_API switchWorkUnitQueue(IWorkUnit* wu, const char *cluster);
  132. #endif