class Daemonic::Pool
Constants
- STOP_SIGNAL
Attributes
producer[R]
Public Class Methods
new(producer)
click to toggle source
# File lib/daemonic/pool.rb, line 18 def initialize(producer) @producer = producer @jobs = SizedQueue.new(producer.queue_size) @threads = producer.concurrency.times.map {|worker_num| Thread.new do dispatch(worker_num) end } end
Public Instance Methods
enqueue(job)
click to toggle source
# File lib/daemonic/pool.rb, line 28 def enqueue(job) logger.debug { "Enqueueing #{job.inspect}" } @jobs.push(job) end
Also aliased as: <<
stop()
click to toggle source
# File lib/daemonic/pool.rb, line 34 def stop @threads.size.times do enqueue(STOP_SIGNAL) end @threads.each(&:join) end
Private Instance Methods
dispatch(worker_num)
click to toggle source
# File lib/daemonic/pool.rb, line 43 def dispatch(worker_num) logger.debug { "T#{worker_num}: Starting" } loop do job = @jobs.pop if STOP_SIGNAL.equal?(job) logger.debug { "T#{worker_num}: Received stop signal, terminating." } break end begin logger.debug { "T#{worker_num}: Consuming #{job.inspect}" } worker.consume(job) Thread.pass rescue Object => error if error.is_a?(SystemExit) # allow app to exit logger.warn { "T#{worker_num}: Received SystemExit, shutting down" } producer.stop else logger.warn { "T#{worker_num}: #{error.class} while processing #{job}: #{error}" } logger.info { error.backtrace.join("\n") } end Thread.pass end end logger.debug { "T#{worker_num}: Stopped" } end
logger()
click to toggle source
# File lib/daemonic/pool.rb, line 73 def logger producer.logger end
worker()
click to toggle source
# File lib/daemonic/pool.rb, line 69 def worker producer.worker end