jisem.hpp 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #ifndef __JISEM__
  14. #define __JISEM__
  15. #include "jiface.hpp"
  16. #include "jsem.hpp"
  17. #include "jmutex.hpp"
  18. #include "jexcept.hpp"
  19. #include "jthread.hpp"
  20. class jlib_thrown_decl InterruptedSemaphoreException : implements IException, public CInterface
  21. {
  22. public:
  23. IMPLEMENT_IINTERFACE;
  24. virtual int errorCode() const { return 0; }
  25. virtual StringBuffer & errorMessage(StringBuffer &msg) const { return msg.append("Semaphore interrupted"); }
  26. virtual MessageAudience errorAudience() const { return MSGAUD_programmer; }
  27. };
  28. class jlib_decl InterruptableSemaphore : public Semaphore
  29. {
  30. private:
  31. Owned<IException> error;
  32. CriticalSection crit;
  33. public:
  34. InterruptableSemaphore(unsigned _initialCount = 0U) : Semaphore(_initialCount) {}
  35. void interrupt(IException *_error = NULL, unsigned count=1)
  36. {
  37. CriticalBlock b(crit);
  38. if (error)
  39. ::Release(_error);
  40. else
  41. {
  42. if (!_error)
  43. _error = new InterruptedSemaphoreException;
  44. error.setown(_error);
  45. signal(count);
  46. }
  47. }
  48. void wait()
  49. {
  50. Semaphore::wait();
  51. CriticalBlock b(crit);
  52. if (error)
  53. {
  54. throw error.getClear();
  55. }
  56. }
  57. bool wait(unsigned timeout)
  58. {
  59. bool ret = Semaphore::wait(timeout);
  60. CriticalBlock b(crit);
  61. if (error)
  62. {
  63. throw error.getClear();
  64. }
  65. return ret;
  66. }
  67. void reinit(unsigned _initialCount = 0U)
  68. {
  69. CriticalBlock b(crit);
  70. error.clear();
  71. Semaphore::reinit(_initialCount);
  72. }
  73. };
  74. class jlib_decl TokenBucket : public CInterface
  75. {
  76. SpinLock crit; // MORE: I suspect this should be a critical section
  77. Semaphore tokens;
  78. unsigned tokensAvailable;
  79. unsigned maxBucketSize;
  80. unsigned tokensPerPeriod;
  81. unsigned period;
  82. unsigned then;
  83. inline void tokenUsed()
  84. {
  85. SpinBlock b(crit);
  86. assertex(tokensAvailable);
  87. tokensAvailable--;
  88. }
  89. void refill(unsigned tokensToAdd)
  90. {
  91. if (tokensAvailable + tokensToAdd > maxBucketSize)
  92. {
  93. if (maxBucketSize > tokensAvailable)
  94. tokensToAdd = maxBucketSize - tokensAvailable;
  95. else
  96. tokensToAdd = 0;
  97. }
  98. if (tokensToAdd)
  99. {
  100. tokensAvailable += tokensToAdd;
  101. tokens.signal(tokensToAdd);
  102. }
  103. }
  104. public:
  105. TokenBucket(unsigned _tokensPerPeriod, unsigned _period, unsigned _maxBucketSize)
  106. : tokens(_maxBucketSize), maxBucketSize(_maxBucketSize), tokensPerPeriod(_tokensPerPeriod), period(_period)
  107. {
  108. tokensAvailable = _maxBucketSize;
  109. then = msTick();
  110. }
  111. ~TokenBucket()
  112. {
  113. }
  114. void wait(unsigned tokensNeeded)
  115. {
  116. while (tokensNeeded)
  117. {
  118. unsigned timeout;
  119. {
  120. SpinBlock b(crit);
  121. unsigned now = msTick();
  122. unsigned elapsed = now - then;
  123. if (elapsed >= period)
  124. {
  125. refill(tokensPerPeriod * (elapsed/period));
  126. timeout = (elapsed % period);
  127. then = now - timeout;
  128. }
  129. else
  130. timeout = elapsed;
  131. }
  132. if (tokens.wait(period-timeout))
  133. {
  134. tokenUsed();
  135. tokensNeeded--;
  136. }
  137. }
  138. }
  139. };
  140. #endif