tunny.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380
  1. /*
  2. Copyright (c) 2014 Ashley Jeffs
  3. Permission is hereby granted, free of charge, to any person obtaining a copy
  4. of this software and associated documentation files (the "Software"), to deal
  5. in the Software without restriction, including without limitation the rights
  6. to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  7. copies of the Software, and to permit persons to whom the Software is
  8. furnished to do so, subject to the following conditions:
  9. The above copyright notice and this permission notice shall be included in
  10. all copies or substantial portions of the Software.
  11. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  12. IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  13. FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  14. AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  15. LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  16. OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  17. THE SOFTWARE.
  18. */
  19. // Package tunny implements a simple pool for maintaining independant worker goroutines.
  20. package tunny
  21. import (
  22. "errors"
  23. "expvar"
  24. "reflect"
  25. "strconv"
  26. "sync"
  27. "sync/atomic"
  28. "time"
  29. )
  30. // Errors that are used throughout the Tunny API.
  31. var (
  32. ErrPoolAlreadyRunning = errors.New("the pool is already running")
  33. ErrPoolNotRunning = errors.New("the pool is not running")
  34. ErrJobNotFunc = errors.New("generic worker not given a func()")
  35. ErrWorkerClosed = errors.New("worker was closed")
  36. ErrJobTimedOut = errors.New("job request timed out")
  37. )
  38. /*
  39. Worker - The basic interface of a tunny worker.
  40. */
  41. type Worker interface {
  42. // Called for each job, expects the result to be returned synchronously
  43. Job(interface{}) interface{}
  44. // Called after each job, this indicates whether the worker is ready for the next job.
  45. // The default implementation is to return true always. If false is returned then the
  46. // method is called every five milliseconds until either true is returned or the pool
  47. // is closed. For efficiency you should have this call block until your worker is ready,
  48. // otherwise you introduce a 5ms latency between jobs.
  49. Ready() bool
  50. }
  51. /*
  52. ExtendedWorker - An optional interface that can be implemented if the worker needs
  53. more control over its state.
  54. */
  55. type ExtendedWorker interface {
  56. // Called when the pool is opened, this will be called before any jobs are sent.
  57. Initialize()
  58. // Called when the pool is closed, this will be called after all jobs are completed.
  59. Terminate()
  60. }
  61. /*
  62. Interruptable - An optional interface that can be implemented in order to allow the
  63. worker to drop jobs when they are abandoned.
  64. */
  65. type Interruptable interface {
  66. // Called when the current job has been abandoned by the client.
  67. TunnyInterrupt()
  68. }
  69. /*
  70. Default and very basic implementation of a tunny worker. This worker holds a closure which
  71. is assigned at construction, and this closure is called on each job.
  72. */
  73. type defaultWorker struct {
  74. job *func(interface{}) interface{}
  75. }
  76. func (worker *defaultWorker) Job(data interface{}) interface{} {
  77. return (*worker.job)(data)
  78. }
  79. func (worker *defaultWorker) Ready() bool {
  80. return true
  81. }
  82. /*
  83. WorkPool contains the structures and methods required to communicate with your pool, it must
  84. be opened before sending work and closed when all jobs are completed.
  85. You may open and close a pool as many times as you wish, calling close is a blocking call that
  86. guarantees all goroutines are stopped.
  87. */
  88. type WorkPool struct {
  89. workers []*workerWrapper
  90. selects []reflect.SelectCase
  91. statusMutex sync.RWMutex
  92. running uint32
  93. pendingAsyncJobs int32
  94. }
  95. func (pool *WorkPool) isRunning() bool {
  96. return (atomic.LoadUint32(&pool.running) == 1)
  97. }
  98. func (pool *WorkPool) setRunning(running bool) {
  99. if running {
  100. atomic.SwapUint32(&pool.running, 1)
  101. } else {
  102. atomic.SwapUint32(&pool.running, 0)
  103. }
  104. }
  105. /*
  106. Open all channels and launch the background goroutines managed by the pool.
  107. */
  108. func (pool *WorkPool) Open() (*WorkPool, error) {
  109. pool.statusMutex.Lock()
  110. defer pool.statusMutex.Unlock()
  111. if !pool.isRunning() {
  112. pool.selects = make([]reflect.SelectCase, len(pool.workers))
  113. for i, workerWrapper := range pool.workers {
  114. workerWrapper.Open()
  115. pool.selects[i] = reflect.SelectCase{
  116. Dir: reflect.SelectRecv,
  117. Chan: reflect.ValueOf(workerWrapper.readyChan),
  118. }
  119. }
  120. pool.setRunning(true)
  121. return pool, nil
  122. }
  123. return nil, ErrPoolAlreadyRunning
  124. }
  125. /*
  126. Close all channels and goroutines managed by the pool.
  127. */
  128. func (pool *WorkPool) Close() error {
  129. pool.statusMutex.Lock()
  130. defer pool.statusMutex.Unlock()
  131. if pool.isRunning() {
  132. for _, workerWrapper := range pool.workers {
  133. workerWrapper.Close()
  134. }
  135. for _, workerWrapper := range pool.workers {
  136. workerWrapper.Join()
  137. }
  138. pool.setRunning(false)
  139. return nil
  140. }
  141. return ErrPoolNotRunning
  142. }
  143. /*
  144. CreatePool - Creates a pool of workers, and takes a closure argument which is the action
  145. to perform for each job.
  146. */
  147. func CreatePool(numWorkers int, job func(interface{}) interface{}) *WorkPool {
  148. pool := WorkPool{running: 0}
  149. pool.workers = make([]*workerWrapper, numWorkers)
  150. for i := range pool.workers {
  151. newWorker := workerWrapper{
  152. worker: &(defaultWorker{&job}),
  153. }
  154. pool.workers[i] = &newWorker
  155. }
  156. return &pool
  157. }
  158. /*
  159. CreatePoolGeneric - Creates a pool of generic workers. When sending work to a pool of
  160. generic workers you send a closure (func()) which is the job to perform.
  161. */
  162. func CreatePoolGeneric(numWorkers int) *WorkPool {
  163. return CreatePool(numWorkers, func(jobCall interface{}) interface{} {
  164. if method, ok := jobCall.(func()); ok {
  165. method()
  166. return nil
  167. }
  168. return ErrJobNotFunc
  169. })
  170. }
  171. /*
  172. CreateCustomPool - Creates a pool for an array of custom workers. The custom workers
  173. must implement Worker, and may also optionally implement ExtendedWorker and
  174. Interruptable.
  175. */
  176. func CreateCustomPool(customWorkers []Worker) *WorkPool {
  177. pool := WorkPool{running: 0}
  178. pool.workers = make([]*workerWrapper, len(customWorkers))
  179. for i := range pool.workers {
  180. newWorker := workerWrapper{
  181. worker: customWorkers[i],
  182. }
  183. pool.workers[i] = &newWorker
  184. }
  185. return &pool
  186. }
  187. /*
  188. SendWorkTimed - Send a job to a worker and return the result, this is a synchronous
  189. call with a timeout.
  190. */
  191. func (pool *WorkPool) SendWorkTimed(milliTimeout time.Duration, jobData interface{}) (interface{}, error) {
  192. pool.statusMutex.RLock()
  193. defer pool.statusMutex.RUnlock()
  194. if pool.isRunning() {
  195. before := time.Now()
  196. // Create a new time out timer
  197. timeout := time.NewTimer(milliTimeout * time.Millisecond)
  198. defer timeout.Stop()
  199. // Create new selectcase[] and add time out case
  200. selectCases := append(pool.selects[:], reflect.SelectCase{
  201. Dir: reflect.SelectRecv,
  202. Chan: reflect.ValueOf(timeout.C),
  203. })
  204. // Wait for workers, or time out
  205. if chosen, _, ok := reflect.Select(selectCases); ok {
  206. // Check if the selected index is a worker, otherwise we timed out
  207. if chosen < (len(selectCases) - 1) {
  208. pool.workers[chosen].jobChan <- jobData
  209. timeoutRemain := time.NewTimer((milliTimeout * time.Millisecond) - time.Since(before))
  210. defer timeoutRemain.Stop()
  211. // Wait for response, or time out
  212. select {
  213. case data, open := <-pool.workers[chosen].outputChan:
  214. if !open {
  215. return nil, ErrWorkerClosed
  216. }
  217. return data, nil
  218. case <-timeoutRemain.C:
  219. /* If we time out here we also need to ensure that the output is still
  220. * collected and that the worker can move on. Therefore, we fork the
  221. * waiting process into a new goroutine.
  222. */
  223. go func() {
  224. pool.workers[chosen].Interrupt()
  225. <-pool.workers[chosen].outputChan
  226. }()
  227. return nil, ErrJobTimedOut
  228. }
  229. } else {
  230. return nil, ErrJobTimedOut
  231. }
  232. } else {
  233. // This means the chosen channel was closed
  234. return nil, ErrWorkerClosed
  235. }
  236. } else {
  237. return nil, ErrPoolNotRunning
  238. }
  239. }
  240. /*
  241. SendWorkTimedAsync - Send a timed job to a worker without blocking, and optionally
  242. send the result to a receiving closure. You may set the closure to nil if no
  243. further actions are required.
  244. */
  245. func (pool *WorkPool) SendWorkTimedAsync(
  246. milliTimeout time.Duration,
  247. jobData interface{},
  248. after func(interface{}, error),
  249. ) {
  250. atomic.AddInt32(&pool.pendingAsyncJobs, 1)
  251. go func() {
  252. defer atomic.AddInt32(&pool.pendingAsyncJobs, -1)
  253. result, err := pool.SendWorkTimed(milliTimeout, jobData)
  254. if after != nil {
  255. after(result, err)
  256. }
  257. }()
  258. }
  259. /*
  260. SendWork - Send a job to a worker and return the result, this is a synchronous call.
  261. */
  262. func (pool *WorkPool) SendWork(jobData interface{}) (interface{}, error) {
  263. pool.statusMutex.RLock()
  264. defer pool.statusMutex.RUnlock()
  265. if pool.isRunning() {
  266. if chosen, _, ok := reflect.Select(pool.selects); ok && chosen >= 0 {
  267. pool.workers[chosen].jobChan <- jobData
  268. result, open := <-pool.workers[chosen].outputChan
  269. if !open {
  270. return nil, ErrWorkerClosed
  271. }
  272. return result, nil
  273. }
  274. return nil, ErrWorkerClosed
  275. }
  276. return nil, ErrPoolNotRunning
  277. }
  278. /*
  279. SendWorkAsync - Send a job to a worker without blocking, and optionally send the
  280. result to a receiving closure. You may set the closure to nil if no further actions
  281. are required.
  282. */
  283. func (pool *WorkPool) SendWorkAsync(jobData interface{}, after func(interface{}, error)) {
  284. atomic.AddInt32(&pool.pendingAsyncJobs, 1)
  285. go func() {
  286. defer atomic.AddInt32(&pool.pendingAsyncJobs, -1)
  287. result, err := pool.SendWork(jobData)
  288. if after != nil {
  289. after(result, err)
  290. }
  291. }()
  292. }
  293. /*
  294. NumPendingAsyncJobs - Get the current count of async jobs either in flight, or waiting for a worker
  295. */
  296. func (pool *WorkPool) NumPendingAsyncJobs() int32 {
  297. return atomic.LoadInt32(&pool.pendingAsyncJobs)
  298. }
  299. /*
  300. NumWorkers - Number of workers in the pool
  301. */
  302. func (pool *WorkPool) NumWorkers() int {
  303. return len(pool.workers)
  304. }
  305. type liveVarAccessor func() string
  306. func (a liveVarAccessor) String() string {
  307. return a()
  308. }
  309. /*
  310. PublishExpvarMetrics - Publishes the NumWorkers and NumPendingAsyncJobs to expvars
  311. */
  312. func (pool *WorkPool) PublishExpvarMetrics(poolName string) {
  313. ret := expvar.NewMap(poolName)
  314. asyncJobsFn := func() string {
  315. return strconv.FormatInt(int64(pool.NumPendingAsyncJobs()), 10)
  316. }
  317. numWorkersFn := func() string {
  318. return strconv.FormatInt(int64(pool.NumWorkers()), 10)
  319. }
  320. ret.Set("pendingAsyncJobs", liveVarAccessor(asyncJobsFn))
  321. ret.Set("numWorkers", liveVarAccessor(numWorkersFn))
  322. }