| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380 | /*Copyright (c) 2014 Ashley JeffsPermission is hereby granted, free of charge, to any person obtaining a copyof this software and associated documentation files (the "Software"), to dealin the Software without restriction, including without limitation the rightsto use, copy, modify, merge, publish, distribute, sublicense, and/or sellcopies of the Software, and to permit persons to whom the Software isfurnished to do so, subject to the following conditions:The above copyright notice and this permission notice shall be included inall copies or substantial portions of the Software.THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS ORIMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THEAUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHERLIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS INTHE SOFTWARE.*/// Package tunny implements a simple pool for maintaining independant worker goroutines.package tunnyimport (	"errors"	"expvar"	"reflect"	"strconv"	"sync"	"sync/atomic"	"time")// Errors that are used throughout the Tunny API.var (	ErrPoolAlreadyRunning = errors.New("the pool is already running")	ErrPoolNotRunning     = errors.New("the pool is not running")	ErrJobNotFunc         = errors.New("generic worker not given a func()")	ErrWorkerClosed       = errors.New("worker was closed")	ErrJobTimedOut        = errors.New("job request timed out"))/*Worker - The basic interface of a tunny worker.*/type Worker interface {	// Called for each job, expects the result to be returned synchronously	Job(interface{}) interface{}	// Called after each job, this indicates whether the worker is ready for the next job.	// The default implementation is to return true always. If false is returned then the	// method is called every five milliseconds until either true is returned or the pool	// is closed. For efficiency you should have this call block until your worker is ready,	// otherwise you introduce a 5ms latency between jobs.	Ready() bool}/*ExtendedWorker - An optional interface that can be implemented if the worker needsmore control over its state.*/type ExtendedWorker interface {	// Called when the pool is opened, this will be called before any jobs are sent.	Initialize()	// Called when the pool is closed, this will be called after all jobs are completed.	Terminate()}/*Interruptable - An optional interface that can be implemented in order to allow theworker to drop jobs when they are abandoned.*/type Interruptable interface {	// Called when the current job has been abandoned by the client.	TunnyInterrupt()}/*Default and very basic implementation of a tunny worker. This worker holds a closure whichis assigned at construction, and this closure is called on each job.*/type defaultWorker struct {	job *func(interface{}) interface{}}func (worker *defaultWorker) Job(data interface{}) interface{} {	return (*worker.job)(data)}func (worker *defaultWorker) Ready() bool {	return true}/*WorkPool contains the structures and methods required to communicate with your pool, it mustbe opened before sending work and closed when all jobs are completed.You may open and close a pool as many times as you wish, calling close is a blocking call thatguarantees all goroutines are stopped.*/type WorkPool struct {	workers          []*workerWrapper	selects          []reflect.SelectCase	statusMutex      sync.RWMutex	running          uint32	pendingAsyncJobs int32}func (pool *WorkPool) isRunning() bool {	return (atomic.LoadUint32(&pool.running) == 1)}func (pool *WorkPool) setRunning(running bool) {	if running {		atomic.SwapUint32(&pool.running, 1)	} else {		atomic.SwapUint32(&pool.running, 0)	}}/*Open all channels and launch the background goroutines managed by the pool.*/func (pool *WorkPool) Open() (*WorkPool, error) {	pool.statusMutex.Lock()	defer pool.statusMutex.Unlock()	if !pool.isRunning() {		pool.selects = make([]reflect.SelectCase, len(pool.workers))		for i, workerWrapper := range pool.workers {			workerWrapper.Open()			pool.selects[i] = reflect.SelectCase{				Dir:  reflect.SelectRecv,				Chan: reflect.ValueOf(workerWrapper.readyChan),			}		}		pool.setRunning(true)		return pool, nil	}	return nil, ErrPoolAlreadyRunning}/*Close all channels and goroutines managed by the pool.*/func (pool *WorkPool) Close() error {	pool.statusMutex.Lock()	defer pool.statusMutex.Unlock()	if pool.isRunning() {		for _, workerWrapper := range pool.workers {			workerWrapper.Close()		}		for _, workerWrapper := range pool.workers {			workerWrapper.Join()		}		pool.setRunning(false)		return nil	}	return ErrPoolNotRunning}/*CreatePool - Creates a pool of workers, and takes a closure argument which is the actionto perform for each job.*/func CreatePool(numWorkers int, job func(interface{}) interface{}) *WorkPool {	pool := WorkPool{running: 0}	pool.workers = make([]*workerWrapper, numWorkers)	for i := range pool.workers {		newWorker := workerWrapper{			worker: &(defaultWorker{&job}),		}		pool.workers[i] = &newWorker	}	return &pool}/*CreatePoolGeneric - Creates a pool of generic workers. When sending work to a pool ofgeneric workers you send a closure (func()) which is the job to perform.*/func CreatePoolGeneric(numWorkers int) *WorkPool {	return CreatePool(numWorkers, func(jobCall interface{}) interface{} {		if method, ok := jobCall.(func()); ok {			method()			return nil		}		return ErrJobNotFunc	})}/*CreateCustomPool - Creates a pool for an array of custom workers. The custom workersmust implement Worker, and may also optionally implement ExtendedWorker andInterruptable.*/func CreateCustomPool(customWorkers []Worker) *WorkPool {	pool := WorkPool{running: 0}	pool.workers = make([]*workerWrapper, len(customWorkers))	for i := range pool.workers {		newWorker := workerWrapper{			worker: customWorkers[i],		}		pool.workers[i] = &newWorker	}	return &pool}/*SendWorkTimed - Send a job to a worker and return the result, this is a synchronouscall with a timeout.*/func (pool *WorkPool) SendWorkTimed(milliTimeout time.Duration, jobData interface{}) (interface{}, error) {	pool.statusMutex.RLock()	defer pool.statusMutex.RUnlock()	if pool.isRunning() {		before := time.Now()		// Create a new time out timer		timeout := time.NewTimer(milliTimeout * time.Millisecond)		defer timeout.Stop()		// Create new selectcase[] and add time out case		selectCases := append(pool.selects[:], reflect.SelectCase{			Dir:  reflect.SelectRecv,			Chan: reflect.ValueOf(timeout.C),		})		// Wait for workers, or time out		if chosen, _, ok := reflect.Select(selectCases); ok {			// Check if the selected index is a worker, otherwise we timed out			if chosen < (len(selectCases) - 1) {				pool.workers[chosen].jobChan <- jobData				timeoutRemain := time.NewTimer((milliTimeout * time.Millisecond) - time.Since(before))				defer timeoutRemain.Stop()				// Wait for response, or time out				select {				case data, open := <-pool.workers[chosen].outputChan:					if !open {						return nil, ErrWorkerClosed					}					return data, nil				case <-timeoutRemain.C:					/* If we time out here we also need to ensure that the output is still					 * collected and that the worker can move on. Therefore, we fork the					 * waiting process into a new goroutine.					 */					go func() {						pool.workers[chosen].Interrupt()						<-pool.workers[chosen].outputChan					}()					return nil, ErrJobTimedOut				}			} else {				return nil, ErrJobTimedOut			}		} else {			// This means the chosen channel was closed			return nil, ErrWorkerClosed		}	} else {		return nil, ErrPoolNotRunning	}}/*SendWorkTimedAsync - Send a timed job to a worker without blocking, and optionallysend the result to a receiving closure. You may set the closure to nil if nofurther actions are required.*/func (pool *WorkPool) SendWorkTimedAsync(	milliTimeout time.Duration,	jobData interface{},	after func(interface{}, error),) {	atomic.AddInt32(&pool.pendingAsyncJobs, 1)	go func() {		defer atomic.AddInt32(&pool.pendingAsyncJobs, -1)		result, err := pool.SendWorkTimed(milliTimeout, jobData)		if after != nil {			after(result, err)		}	}()}/*SendWork - Send a job to a worker and return the result, this is a synchronous call.*/func (pool *WorkPool) SendWork(jobData interface{}) (interface{}, error) {	pool.statusMutex.RLock()	defer pool.statusMutex.RUnlock()	if pool.isRunning() {		if chosen, _, ok := reflect.Select(pool.selects); ok && chosen >= 0 {			pool.workers[chosen].jobChan <- jobData			result, open := <-pool.workers[chosen].outputChan			if !open {				return nil, ErrWorkerClosed			}			return result, nil		}		return nil, ErrWorkerClosed	}	return nil, ErrPoolNotRunning}/*SendWorkAsync - Send a job to a worker without blocking, and optionally send theresult to a receiving closure. You may set the closure to nil if no further actionsare required.*/func (pool *WorkPool) SendWorkAsync(jobData interface{}, after func(interface{}, error)) {	atomic.AddInt32(&pool.pendingAsyncJobs, 1)	go func() {		defer atomic.AddInt32(&pool.pendingAsyncJobs, -1)		result, err := pool.SendWork(jobData)		if after != nil {			after(result, err)		}	}()}/*NumPendingAsyncJobs - Get the current count of async jobs either in flight, or waiting for a worker*/func (pool *WorkPool) NumPendingAsyncJobs() int32 {	return atomic.LoadInt32(&pool.pendingAsyncJobs)}/*NumWorkers - Number of workers in the pool*/func (pool *WorkPool) NumWorkers() int {	return len(pool.workers)}type liveVarAccessor func() stringfunc (a liveVarAccessor) String() string {	return a()}/*PublishExpvarMetrics - Publishes the NumWorkers and NumPendingAsyncJobs to expvars*/func (pool *WorkPool) PublishExpvarMetrics(poolName string) {	ret := expvar.NewMap(poolName)	asyncJobsFn := func() string {		return strconv.FormatInt(int64(pool.NumPendingAsyncJobs()), 10)	}	numWorkersFn := func() string {		return strconv.FormatInt(int64(pool.NumWorkers()), 10)	}	ret.Set("pendingAsyncJobs", liveVarAccessor(asyncJobsFn))	ret.Set("numWorkers", liveVarAccessor(numWorkersFn))}
 |