| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380 | 
							- /*
 
- Copyright (c) 2014 Ashley Jeffs
 
- Permission is hereby granted, free of charge, to any person obtaining a copy
 
- of this software and associated documentation files (the "Software"), to deal
 
- in the Software without restriction, including without limitation the rights
 
- to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 
- copies of the Software, and to permit persons to whom the Software is
 
- furnished to do so, subject to the following conditions:
 
- The above copyright notice and this permission notice shall be included in
 
- all copies or substantial portions of the Software.
 
- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 
- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 
- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 
- AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 
- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 
- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
 
- THE SOFTWARE.
 
- */
 
- // Package tunny implements a simple pool for maintaining independant worker goroutines.
 
- package tunny
 
- import (
 
- 	"errors"
 
- 	"expvar"
 
- 	"reflect"
 
- 	"strconv"
 
- 	"sync"
 
- 	"sync/atomic"
 
- 	"time"
 
- )
 
- // Errors that are used throughout the Tunny API.
 
- var (
 
- 	ErrPoolAlreadyRunning = errors.New("the pool is already running")
 
- 	ErrPoolNotRunning     = errors.New("the pool is not running")
 
- 	ErrJobNotFunc         = errors.New("generic worker not given a func()")
 
- 	ErrWorkerClosed       = errors.New("worker was closed")
 
- 	ErrJobTimedOut        = errors.New("job request timed out")
 
- )
 
- /*
 
- Worker - The basic interface of a tunny worker.
 
- */
 
- type Worker interface {
 
- 	// Called for each job, expects the result to be returned synchronously
 
- 	Job(interface{}) interface{}
 
- 	// Called after each job, this indicates whether the worker is ready for the next job.
 
- 	// The default implementation is to return true always. If false is returned then the
 
- 	// method is called every five milliseconds until either true is returned or the pool
 
- 	// is closed. For efficiency you should have this call block until your worker is ready,
 
- 	// otherwise you introduce a 5ms latency between jobs.
 
- 	Ready() bool
 
- }
 
- /*
 
- ExtendedWorker - An optional interface that can be implemented if the worker needs
 
- more control over its state.
 
- */
 
- type ExtendedWorker interface {
 
- 	// Called when the pool is opened, this will be called before any jobs are sent.
 
- 	Initialize()
 
- 	// Called when the pool is closed, this will be called after all jobs are completed.
 
- 	Terminate()
 
- }
 
- /*
 
- Interruptable - An optional interface that can be implemented in order to allow the
 
- worker to drop jobs when they are abandoned.
 
- */
 
- type Interruptable interface {
 
- 	// Called when the current job has been abandoned by the client.
 
- 	TunnyInterrupt()
 
- }
 
- /*
 
- Default and very basic implementation of a tunny worker. This worker holds a closure which
 
- is assigned at construction, and this closure is called on each job.
 
- */
 
- type defaultWorker struct {
 
- 	job *func(interface{}) interface{}
 
- }
 
- func (worker *defaultWorker) Job(data interface{}) interface{} {
 
- 	return (*worker.job)(data)
 
- }
 
- func (worker *defaultWorker) Ready() bool {
 
- 	return true
 
- }
 
- /*
 
- WorkPool contains the structures and methods required to communicate with your pool, it must
 
- be opened before sending work and closed when all jobs are completed.
 
- You may open and close a pool as many times as you wish, calling close is a blocking call that
 
- guarantees all goroutines are stopped.
 
- */
 
- type WorkPool struct {
 
- 	workers          []*workerWrapper
 
- 	selects          []reflect.SelectCase
 
- 	statusMutex      sync.RWMutex
 
- 	running          uint32
 
- 	pendingAsyncJobs int32
 
- }
 
- func (pool *WorkPool) isRunning() bool {
 
- 	return (atomic.LoadUint32(&pool.running) == 1)
 
- }
 
- func (pool *WorkPool) setRunning(running bool) {
 
- 	if running {
 
- 		atomic.SwapUint32(&pool.running, 1)
 
- 	} else {
 
- 		atomic.SwapUint32(&pool.running, 0)
 
- 	}
 
- }
 
- /*
 
- Open all channels and launch the background goroutines managed by the pool.
 
- */
 
- func (pool *WorkPool) Open() (*WorkPool, error) {
 
- 	pool.statusMutex.Lock()
 
- 	defer pool.statusMutex.Unlock()
 
- 	if !pool.isRunning() {
 
- 		pool.selects = make([]reflect.SelectCase, len(pool.workers))
 
- 		for i, workerWrapper := range pool.workers {
 
- 			workerWrapper.Open()
 
- 			pool.selects[i] = reflect.SelectCase{
 
- 				Dir:  reflect.SelectRecv,
 
- 				Chan: reflect.ValueOf(workerWrapper.readyChan),
 
- 			}
 
- 		}
 
- 		pool.setRunning(true)
 
- 		return pool, nil
 
- 	}
 
- 	return nil, ErrPoolAlreadyRunning
 
- }
 
- /*
 
- Close all channels and goroutines managed by the pool.
 
- */
 
- func (pool *WorkPool) Close() error {
 
- 	pool.statusMutex.Lock()
 
- 	defer pool.statusMutex.Unlock()
 
- 	if pool.isRunning() {
 
- 		for _, workerWrapper := range pool.workers {
 
- 			workerWrapper.Close()
 
- 		}
 
- 		for _, workerWrapper := range pool.workers {
 
- 			workerWrapper.Join()
 
- 		}
 
- 		pool.setRunning(false)
 
- 		return nil
 
- 	}
 
- 	return ErrPoolNotRunning
 
- }
 
- /*
 
- CreatePool - Creates a pool of workers, and takes a closure argument which is the action
 
- to perform for each job.
 
- */
 
- func CreatePool(numWorkers int, job func(interface{}) interface{}) *WorkPool {
 
- 	pool := WorkPool{running: 0}
 
- 	pool.workers = make([]*workerWrapper, numWorkers)
 
- 	for i := range pool.workers {
 
- 		newWorker := workerWrapper{
 
- 			worker: &(defaultWorker{&job}),
 
- 		}
 
- 		pool.workers[i] = &newWorker
 
- 	}
 
- 	return &pool
 
- }
 
- /*
 
- CreatePoolGeneric - Creates a pool of generic workers. When sending work to a pool of
 
- generic workers you send a closure (func()) which is the job to perform.
 
- */
 
- func CreatePoolGeneric(numWorkers int) *WorkPool {
 
- 	return CreatePool(numWorkers, func(jobCall interface{}) interface{} {
 
- 		if method, ok := jobCall.(func()); ok {
 
- 			method()
 
- 			return nil
 
- 		}
 
- 		return ErrJobNotFunc
 
- 	})
 
- }
 
- /*
 
- CreateCustomPool - Creates a pool for an array of custom workers. The custom workers
 
- must implement Worker, and may also optionally implement ExtendedWorker and
 
- Interruptable.
 
- */
 
- func CreateCustomPool(customWorkers []Worker) *WorkPool {
 
- 	pool := WorkPool{running: 0}
 
- 	pool.workers = make([]*workerWrapper, len(customWorkers))
 
- 	for i := range pool.workers {
 
- 		newWorker := workerWrapper{
 
- 			worker: customWorkers[i],
 
- 		}
 
- 		pool.workers[i] = &newWorker
 
- 	}
 
- 	return &pool
 
- }
 
- /*
 
- SendWorkTimed - Send a job to a worker and return the result, this is a synchronous
 
- call with a timeout.
 
- */
 
- func (pool *WorkPool) SendWorkTimed(milliTimeout time.Duration, jobData interface{}) (interface{}, error) {
 
- 	pool.statusMutex.RLock()
 
- 	defer pool.statusMutex.RUnlock()
 
- 	if pool.isRunning() {
 
- 		before := time.Now()
 
- 		// Create a new time out timer
 
- 		timeout := time.NewTimer(milliTimeout * time.Millisecond)
 
- 		defer timeout.Stop()
 
- 		// Create new selectcase[] and add time out case
 
- 		selectCases := append(pool.selects[:], reflect.SelectCase{
 
- 			Dir:  reflect.SelectRecv,
 
- 			Chan: reflect.ValueOf(timeout.C),
 
- 		})
 
- 		// Wait for workers, or time out
 
- 		if chosen, _, ok := reflect.Select(selectCases); ok {
 
- 			// Check if the selected index is a worker, otherwise we timed out
 
- 			if chosen < (len(selectCases) - 1) {
 
- 				pool.workers[chosen].jobChan <- jobData
 
- 				timeoutRemain := time.NewTimer((milliTimeout * time.Millisecond) - time.Since(before))
 
- 				defer timeoutRemain.Stop()
 
- 				// Wait for response, or time out
 
- 				select {
 
- 				case data, open := <-pool.workers[chosen].outputChan:
 
- 					if !open {
 
- 						return nil, ErrWorkerClosed
 
- 					}
 
- 					return data, nil
 
- 				case <-timeoutRemain.C:
 
- 					/* If we time out here we also need to ensure that the output is still
 
- 					 * collected and that the worker can move on. Therefore, we fork the
 
- 					 * waiting process into a new goroutine.
 
- 					 */
 
- 					go func() {
 
- 						pool.workers[chosen].Interrupt()
 
- 						<-pool.workers[chosen].outputChan
 
- 					}()
 
- 					return nil, ErrJobTimedOut
 
- 				}
 
- 			} else {
 
- 				return nil, ErrJobTimedOut
 
- 			}
 
- 		} else {
 
- 			// This means the chosen channel was closed
 
- 			return nil, ErrWorkerClosed
 
- 		}
 
- 	} else {
 
- 		return nil, ErrPoolNotRunning
 
- 	}
 
- }
 
- /*
 
- SendWorkTimedAsync - Send a timed job to a worker without blocking, and optionally
 
- send the result to a receiving closure. You may set the closure to nil if no
 
- further actions are required.
 
- */
 
- func (pool *WorkPool) SendWorkTimedAsync(
 
- 	milliTimeout time.Duration,
 
- 	jobData interface{},
 
- 	after func(interface{}, error),
 
- ) {
 
- 	atomic.AddInt32(&pool.pendingAsyncJobs, 1)
 
- 	go func() {
 
- 		defer atomic.AddInt32(&pool.pendingAsyncJobs, -1)
 
- 		result, err := pool.SendWorkTimed(milliTimeout, jobData)
 
- 		if after != nil {
 
- 			after(result, err)
 
- 		}
 
- 	}()
 
- }
 
- /*
 
- SendWork - Send a job to a worker and return the result, this is a synchronous call.
 
- */
 
- func (pool *WorkPool) SendWork(jobData interface{}) (interface{}, error) {
 
- 	pool.statusMutex.RLock()
 
- 	defer pool.statusMutex.RUnlock()
 
- 	if pool.isRunning() {
 
- 		if chosen, _, ok := reflect.Select(pool.selects); ok && chosen >= 0 {
 
- 			pool.workers[chosen].jobChan <- jobData
 
- 			result, open := <-pool.workers[chosen].outputChan
 
- 			if !open {
 
- 				return nil, ErrWorkerClosed
 
- 			}
 
- 			return result, nil
 
- 		}
 
- 		return nil, ErrWorkerClosed
 
- 	}
 
- 	return nil, ErrPoolNotRunning
 
- }
 
- /*
 
- SendWorkAsync - Send a job to a worker without blocking, and optionally send the
 
- result to a receiving closure. You may set the closure to nil if no further actions
 
- are required.
 
- */
 
- func (pool *WorkPool) SendWorkAsync(jobData interface{}, after func(interface{}, error)) {
 
- 	atomic.AddInt32(&pool.pendingAsyncJobs, 1)
 
- 	go func() {
 
- 		defer atomic.AddInt32(&pool.pendingAsyncJobs, -1)
 
- 		result, err := pool.SendWork(jobData)
 
- 		if after != nil {
 
- 			after(result, err)
 
- 		}
 
- 	}()
 
- }
 
- /*
 
- NumPendingAsyncJobs - Get the current count of async jobs either in flight, or waiting for a worker
 
- */
 
- func (pool *WorkPool) NumPendingAsyncJobs() int32 {
 
- 	return atomic.LoadInt32(&pool.pendingAsyncJobs)
 
- }
 
- /*
 
- NumWorkers - Number of workers in the pool
 
- */
 
- func (pool *WorkPool) NumWorkers() int {
 
- 	return len(pool.workers)
 
- }
 
- type liveVarAccessor func() string
 
- func (a liveVarAccessor) String() string {
 
- 	return a()
 
- }
 
- /*
 
- PublishExpvarMetrics - Publishes the NumWorkers and NumPendingAsyncJobs to expvars
 
- */
 
- func (pool *WorkPool) PublishExpvarMetrics(poolName string) {
 
- 	ret := expvar.NewMap(poolName)
 
- 	asyncJobsFn := func() string {
 
- 		return strconv.FormatInt(int64(pool.NumPendingAsyncJobs()), 10)
 
- 	}
 
- 	numWorkersFn := func() string {
 
- 		return strconv.FormatInt(int64(pool.NumWorkers()), 10)
 
- 	}
 
- 	ret.Set("pendingAsyncJobs", liveVarAccessor(asyncJobsFn))
 
- 	ret.Set("numWorkers", liveVarAccessor(numWorkersFn))
 
- }
 
 
  |