/* Copyright (c) 2014 Ashley Jeffs Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ // Package tunny implements a simple pool for maintaining independant worker goroutines. package tunny import ( "errors" "expvar" "reflect" "strconv" "sync" "sync/atomic" "time" ) // Errors that are used throughout the Tunny API. var ( ErrPoolAlreadyRunning = errors.New("the pool is already running") ErrPoolNotRunning = errors.New("the pool is not running") ErrJobNotFunc = errors.New("generic worker not given a func()") ErrWorkerClosed = errors.New("worker was closed") ErrJobTimedOut = errors.New("job request timed out") ) /* Worker - The basic interface of a tunny worker. */ type Worker interface { // Called for each job, expects the result to be returned synchronously Job(interface{}) interface{} // Called after each job, this indicates whether the worker is ready for the next job. // The default implementation is to return true always. If false is returned then the // method is called every five milliseconds until either true is returned or the pool // is closed. For efficiency you should have this call block until your worker is ready, // otherwise you introduce a 5ms latency between jobs. Ready() bool } /* ExtendedWorker - An optional interface that can be implemented if the worker needs more control over its state. */ type ExtendedWorker interface { // Called when the pool is opened, this will be called before any jobs are sent. Initialize() // Called when the pool is closed, this will be called after all jobs are completed. Terminate() } /* Interruptable - An optional interface that can be implemented in order to allow the worker to drop jobs when they are abandoned. */ type Interruptable interface { // Called when the current job has been abandoned by the client. TunnyInterrupt() } /* Default and very basic implementation of a tunny worker. This worker holds a closure which is assigned at construction, and this closure is called on each job. */ type defaultWorker struct { job *func(interface{}) interface{} } func (worker *defaultWorker) Job(data interface{}) interface{} { return (*worker.job)(data) } func (worker *defaultWorker) Ready() bool { return true } /* WorkPool contains the structures and methods required to communicate with your pool, it must be opened before sending work and closed when all jobs are completed. You may open and close a pool as many times as you wish, calling close is a blocking call that guarantees all goroutines are stopped. */ type WorkPool struct { workers []*workerWrapper selects []reflect.SelectCase statusMutex sync.RWMutex running uint32 pendingAsyncJobs int32 } func (pool *WorkPool) isRunning() bool { return (atomic.LoadUint32(&pool.running) == 1) } func (pool *WorkPool) setRunning(running bool) { if running { atomic.SwapUint32(&pool.running, 1) } else { atomic.SwapUint32(&pool.running, 0) } } /* Open all channels and launch the background goroutines managed by the pool. */ func (pool *WorkPool) Open() (*WorkPool, error) { pool.statusMutex.Lock() defer pool.statusMutex.Unlock() if !pool.isRunning() { pool.selects = make([]reflect.SelectCase, len(pool.workers)) for i, workerWrapper := range pool.workers { workerWrapper.Open() pool.selects[i] = reflect.SelectCase{ Dir: reflect.SelectRecv, Chan: reflect.ValueOf(workerWrapper.readyChan), } } pool.setRunning(true) return pool, nil } return nil, ErrPoolAlreadyRunning } /* Close all channels and goroutines managed by the pool. */ func (pool *WorkPool) Close() error { pool.statusMutex.Lock() defer pool.statusMutex.Unlock() if pool.isRunning() { for _, workerWrapper := range pool.workers { workerWrapper.Close() } for _, workerWrapper := range pool.workers { workerWrapper.Join() } pool.setRunning(false) return nil } return ErrPoolNotRunning } /* CreatePool - Creates a pool of workers, and takes a closure argument which is the action to perform for each job. */ func CreatePool(numWorkers int, job func(interface{}) interface{}) *WorkPool { pool := WorkPool{running: 0} pool.workers = make([]*workerWrapper, numWorkers) for i := range pool.workers { newWorker := workerWrapper{ worker: &(defaultWorker{&job}), } pool.workers[i] = &newWorker } return &pool } /* CreatePoolGeneric - Creates a pool of generic workers. When sending work to a pool of generic workers you send a closure (func()) which is the job to perform. */ func CreatePoolGeneric(numWorkers int) *WorkPool { return CreatePool(numWorkers, func(jobCall interface{}) interface{} { if method, ok := jobCall.(func()); ok { method() return nil } return ErrJobNotFunc }) } /* CreateCustomPool - Creates a pool for an array of custom workers. The custom workers must implement Worker, and may also optionally implement ExtendedWorker and Interruptable. */ func CreateCustomPool(customWorkers []Worker) *WorkPool { pool := WorkPool{running: 0} pool.workers = make([]*workerWrapper, len(customWorkers)) for i := range pool.workers { newWorker := workerWrapper{ worker: customWorkers[i], } pool.workers[i] = &newWorker } return &pool } /* SendWorkTimed - Send a job to a worker and return the result, this is a synchronous call with a timeout. */ func (pool *WorkPool) SendWorkTimed(milliTimeout time.Duration, jobData interface{}) (interface{}, error) { pool.statusMutex.RLock() defer pool.statusMutex.RUnlock() if pool.isRunning() { before := time.Now() // Create a new time out timer timeout := time.NewTimer(milliTimeout * time.Millisecond) defer timeout.Stop() // Create new selectcase[] and add time out case selectCases := append(pool.selects[:], reflect.SelectCase{ Dir: reflect.SelectRecv, Chan: reflect.ValueOf(timeout.C), }) // Wait for workers, or time out if chosen, _, ok := reflect.Select(selectCases); ok { // Check if the selected index is a worker, otherwise we timed out if chosen < (len(selectCases) - 1) { pool.workers[chosen].jobChan <- jobData timeoutRemain := time.NewTimer((milliTimeout * time.Millisecond) - time.Since(before)) defer timeoutRemain.Stop() // Wait for response, or time out select { case data, open := <-pool.workers[chosen].outputChan: if !open { return nil, ErrWorkerClosed } return data, nil case <-timeoutRemain.C: /* If we time out here we also need to ensure that the output is still * collected and that the worker can move on. Therefore, we fork the * waiting process into a new goroutine. */ go func() { pool.workers[chosen].Interrupt() <-pool.workers[chosen].outputChan }() return nil, ErrJobTimedOut } } else { return nil, ErrJobTimedOut } } else { // This means the chosen channel was closed return nil, ErrWorkerClosed } } else { return nil, ErrPoolNotRunning } } /* SendWorkTimedAsync - Send a timed job to a worker without blocking, and optionally send the result to a receiving closure. You may set the closure to nil if no further actions are required. */ func (pool *WorkPool) SendWorkTimedAsync( milliTimeout time.Duration, jobData interface{}, after func(interface{}, error), ) { atomic.AddInt32(&pool.pendingAsyncJobs, 1) go func() { defer atomic.AddInt32(&pool.pendingAsyncJobs, -1) result, err := pool.SendWorkTimed(milliTimeout, jobData) if after != nil { after(result, err) } }() } /* SendWork - Send a job to a worker and return the result, this is a synchronous call. */ func (pool *WorkPool) SendWork(jobData interface{}) (interface{}, error) { pool.statusMutex.RLock() defer pool.statusMutex.RUnlock() if pool.isRunning() { if chosen, _, ok := reflect.Select(pool.selects); ok && chosen >= 0 { pool.workers[chosen].jobChan <- jobData result, open := <-pool.workers[chosen].outputChan if !open { return nil, ErrWorkerClosed } return result, nil } return nil, ErrWorkerClosed } return nil, ErrPoolNotRunning } /* SendWorkAsync - Send a job to a worker without blocking, and optionally send the result to a receiving closure. You may set the closure to nil if no further actions are required. */ func (pool *WorkPool) SendWorkAsync(jobData interface{}, after func(interface{}, error)) { atomic.AddInt32(&pool.pendingAsyncJobs, 1) go func() { defer atomic.AddInt32(&pool.pendingAsyncJobs, -1) result, err := pool.SendWork(jobData) if after != nil { after(result, err) } }() } /* NumPendingAsyncJobs - Get the current count of async jobs either in flight, or waiting for a worker */ func (pool *WorkPool) NumPendingAsyncJobs() int32 { return atomic.LoadInt32(&pool.pendingAsyncJobs) } /* NumWorkers - Number of workers in the pool */ func (pool *WorkPool) NumWorkers() int { return len(pool.workers) } type liveVarAccessor func() string func (a liveVarAccessor) String() string { return a() } /* PublishExpvarMetrics - Publishes the NumWorkers and NumPendingAsyncJobs to expvars */ func (pool *WorkPool) PublishExpvarMetrics(poolName string) { ret := expvar.NewMap(poolName) asyncJobsFn := func() string { return strconv.FormatInt(int64(pool.NumPendingAsyncJobs()), 10) } numWorkersFn := func() string { return strconv.FormatInt(int64(pool.NumWorkers()), 10) } ret.Set("pendingAsyncJobs", liveVarAccessor(asyncJobsFn)) ret.Set("numWorkers", liveVarAccessor(numWorkersFn)) }