123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380 |
- /*
- Copyright (c) 2014 Ashley Jeffs
- Permission is hereby granted, free of charge, to any person obtaining a copy
- of this software and associated documentation files (the "Software"), to deal
- in the Software without restriction, including without limitation the rights
- to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- copies of the Software, and to permit persons to whom the Software is
- furnished to do so, subject to the following conditions:
- The above copyright notice and this permission notice shall be included in
- all copies or substantial portions of the Software.
- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
- THE SOFTWARE.
- */
- // Package tunny implements a simple pool for maintaining independant worker goroutines.
- package tunny
- import (
- "errors"
- "expvar"
- "reflect"
- "strconv"
- "sync"
- "sync/atomic"
- "time"
- )
- // Errors that are used throughout the Tunny API.
- var (
- ErrPoolAlreadyRunning = errors.New("the pool is already running")
- ErrPoolNotRunning = errors.New("the pool is not running")
- ErrJobNotFunc = errors.New("generic worker not given a func()")
- ErrWorkerClosed = errors.New("worker was closed")
- ErrJobTimedOut = errors.New("job request timed out")
- )
- /*
- Worker - The basic interface of a tunny worker.
- */
- type Worker interface {
- // Called for each job, expects the result to be returned synchronously
- Job(interface{}) interface{}
- // Called after each job, this indicates whether the worker is ready for the next job.
- // The default implementation is to return true always. If false is returned then the
- // method is called every five milliseconds until either true is returned or the pool
- // is closed. For efficiency you should have this call block until your worker is ready,
- // otherwise you introduce a 5ms latency between jobs.
- Ready() bool
- }
- /*
- ExtendedWorker - An optional interface that can be implemented if the worker needs
- more control over its state.
- */
- type ExtendedWorker interface {
- // Called when the pool is opened, this will be called before any jobs are sent.
- Initialize()
- // Called when the pool is closed, this will be called after all jobs are completed.
- Terminate()
- }
- /*
- Interruptable - An optional interface that can be implemented in order to allow the
- worker to drop jobs when they are abandoned.
- */
- type Interruptable interface {
- // Called when the current job has been abandoned by the client.
- TunnyInterrupt()
- }
- /*
- Default and very basic implementation of a tunny worker. This worker holds a closure which
- is assigned at construction, and this closure is called on each job.
- */
- type defaultWorker struct {
- job *func(interface{}) interface{}
- }
- func (worker *defaultWorker) Job(data interface{}) interface{} {
- return (*worker.job)(data)
- }
- func (worker *defaultWorker) Ready() bool {
- return true
- }
- /*
- WorkPool contains the structures and methods required to communicate with your pool, it must
- be opened before sending work and closed when all jobs are completed.
- You may open and close a pool as many times as you wish, calling close is a blocking call that
- guarantees all goroutines are stopped.
- */
- type WorkPool struct {
- workers []*workerWrapper
- selects []reflect.SelectCase
- statusMutex sync.RWMutex
- running uint32
- pendingAsyncJobs int32
- }
- func (pool *WorkPool) isRunning() bool {
- return (atomic.LoadUint32(&pool.running) == 1)
- }
- func (pool *WorkPool) setRunning(running bool) {
- if running {
- atomic.SwapUint32(&pool.running, 1)
- } else {
- atomic.SwapUint32(&pool.running, 0)
- }
- }
- /*
- Open all channels and launch the background goroutines managed by the pool.
- */
- func (pool *WorkPool) Open() (*WorkPool, error) {
- pool.statusMutex.Lock()
- defer pool.statusMutex.Unlock()
- if !pool.isRunning() {
- pool.selects = make([]reflect.SelectCase, len(pool.workers))
- for i, workerWrapper := range pool.workers {
- workerWrapper.Open()
- pool.selects[i] = reflect.SelectCase{
- Dir: reflect.SelectRecv,
- Chan: reflect.ValueOf(workerWrapper.readyChan),
- }
- }
- pool.setRunning(true)
- return pool, nil
- }
- return nil, ErrPoolAlreadyRunning
- }
- /*
- Close all channels and goroutines managed by the pool.
- */
- func (pool *WorkPool) Close() error {
- pool.statusMutex.Lock()
- defer pool.statusMutex.Unlock()
- if pool.isRunning() {
- for _, workerWrapper := range pool.workers {
- workerWrapper.Close()
- }
- for _, workerWrapper := range pool.workers {
- workerWrapper.Join()
- }
- pool.setRunning(false)
- return nil
- }
- return ErrPoolNotRunning
- }
- /*
- CreatePool - Creates a pool of workers, and takes a closure argument which is the action
- to perform for each job.
- */
- func CreatePool(numWorkers int, job func(interface{}) interface{}) *WorkPool {
- pool := WorkPool{running: 0}
- pool.workers = make([]*workerWrapper, numWorkers)
- for i := range pool.workers {
- newWorker := workerWrapper{
- worker: &(defaultWorker{&job}),
- }
- pool.workers[i] = &newWorker
- }
- return &pool
- }
- /*
- CreatePoolGeneric - Creates a pool of generic workers. When sending work to a pool of
- generic workers you send a closure (func()) which is the job to perform.
- */
- func CreatePoolGeneric(numWorkers int) *WorkPool {
- return CreatePool(numWorkers, func(jobCall interface{}) interface{} {
- if method, ok := jobCall.(func()); ok {
- method()
- return nil
- }
- return ErrJobNotFunc
- })
- }
- /*
- CreateCustomPool - Creates a pool for an array of custom workers. The custom workers
- must implement Worker, and may also optionally implement ExtendedWorker and
- Interruptable.
- */
- func CreateCustomPool(customWorkers []Worker) *WorkPool {
- pool := WorkPool{running: 0}
- pool.workers = make([]*workerWrapper, len(customWorkers))
- for i := range pool.workers {
- newWorker := workerWrapper{
- worker: customWorkers[i],
- }
- pool.workers[i] = &newWorker
- }
- return &pool
- }
- /*
- SendWorkTimed - Send a job to a worker and return the result, this is a synchronous
- call with a timeout.
- */
- func (pool *WorkPool) SendWorkTimed(milliTimeout time.Duration, jobData interface{}) (interface{}, error) {
- pool.statusMutex.RLock()
- defer pool.statusMutex.RUnlock()
- if pool.isRunning() {
- before := time.Now()
- // Create a new time out timer
- timeout := time.NewTimer(milliTimeout * time.Millisecond)
- defer timeout.Stop()
- // Create new selectcase[] and add time out case
- selectCases := append(pool.selects[:], reflect.SelectCase{
- Dir: reflect.SelectRecv,
- Chan: reflect.ValueOf(timeout.C),
- })
- // Wait for workers, or time out
- if chosen, _, ok := reflect.Select(selectCases); ok {
- // Check if the selected index is a worker, otherwise we timed out
- if chosen < (len(selectCases) - 1) {
- pool.workers[chosen].jobChan <- jobData
- timeoutRemain := time.NewTimer((milliTimeout * time.Millisecond) - time.Since(before))
- defer timeoutRemain.Stop()
- // Wait for response, or time out
- select {
- case data, open := <-pool.workers[chosen].outputChan:
- if !open {
- return nil, ErrWorkerClosed
- }
- return data, nil
- case <-timeoutRemain.C:
- /* If we time out here we also need to ensure that the output is still
- * collected and that the worker can move on. Therefore, we fork the
- * waiting process into a new goroutine.
- */
- go func() {
- pool.workers[chosen].Interrupt()
- <-pool.workers[chosen].outputChan
- }()
- return nil, ErrJobTimedOut
- }
- } else {
- return nil, ErrJobTimedOut
- }
- } else {
- // This means the chosen channel was closed
- return nil, ErrWorkerClosed
- }
- } else {
- return nil, ErrPoolNotRunning
- }
- }
- /*
- SendWorkTimedAsync - Send a timed job to a worker without blocking, and optionally
- send the result to a receiving closure. You may set the closure to nil if no
- further actions are required.
- */
- func (pool *WorkPool) SendWorkTimedAsync(
- milliTimeout time.Duration,
- jobData interface{},
- after func(interface{}, error),
- ) {
- atomic.AddInt32(&pool.pendingAsyncJobs, 1)
- go func() {
- defer atomic.AddInt32(&pool.pendingAsyncJobs, -1)
- result, err := pool.SendWorkTimed(milliTimeout, jobData)
- if after != nil {
- after(result, err)
- }
- }()
- }
- /*
- SendWork - Send a job to a worker and return the result, this is a synchronous call.
- */
- func (pool *WorkPool) SendWork(jobData interface{}) (interface{}, error) {
- pool.statusMutex.RLock()
- defer pool.statusMutex.RUnlock()
- if pool.isRunning() {
- if chosen, _, ok := reflect.Select(pool.selects); ok && chosen >= 0 {
- pool.workers[chosen].jobChan <- jobData
- result, open := <-pool.workers[chosen].outputChan
- if !open {
- return nil, ErrWorkerClosed
- }
- return result, nil
- }
- return nil, ErrWorkerClosed
- }
- return nil, ErrPoolNotRunning
- }
- /*
- SendWorkAsync - Send a job to a worker without blocking, and optionally send the
- result to a receiving closure. You may set the closure to nil if no further actions
- are required.
- */
- func (pool *WorkPool) SendWorkAsync(jobData interface{}, after func(interface{}, error)) {
- atomic.AddInt32(&pool.pendingAsyncJobs, 1)
- go func() {
- defer atomic.AddInt32(&pool.pendingAsyncJobs, -1)
- result, err := pool.SendWork(jobData)
- if after != nil {
- after(result, err)
- }
- }()
- }
- /*
- NumPendingAsyncJobs - Get the current count of async jobs either in flight, or waiting for a worker
- */
- func (pool *WorkPool) NumPendingAsyncJobs() int32 {
- return atomic.LoadInt32(&pool.pendingAsyncJobs)
- }
- /*
- NumWorkers - Number of workers in the pool
- */
- func (pool *WorkPool) NumWorkers() int {
- return len(pool.workers)
- }
- type liveVarAccessor func() string
- func (a liveVarAccessor) String() string {
- return a()
- }
- /*
- PublishExpvarMetrics - Publishes the NumWorkers and NumPendingAsyncJobs to expvars
- */
- func (pool *WorkPool) PublishExpvarMetrics(poolName string) {
- ret := expvar.NewMap(poolName)
- asyncJobsFn := func() string {
- return strconv.FormatInt(int64(pool.NumPendingAsyncJobs()), 10)
- }
- numWorkersFn := func() string {
- return strconv.FormatInt(int64(pool.NumWorkers()), 10)
- }
- ret.Set("pendingAsyncJobs", liveVarAccessor(asyncJobsFn))
- ret.Set("numWorkers", liveVarAccessor(numWorkersFn))
- }
|