class Daemonic::Pool

Constants

STOP_SIGNAL

Attributes

producer[R]

Public Class Methods

new(producer) click to toggle source
# File lib/daemonic/pool.rb, line 18
def initialize(producer)
  @producer = producer
  @jobs    = SizedQueue.new(producer.queue_size)
  @threads = producer.concurrency.times.map {|worker_num|
    Thread.new do
      dispatch(worker_num)
    end
  }
end

Public Instance Methods

<<(job)
Alias for: enqueue
enqueue(job) click to toggle source
# File lib/daemonic/pool.rb, line 28
def enqueue(job)
  logger.debug { "Enqueueing #{job.inspect}" }
  @jobs.push(job)
end
Also aliased as: <<
stop() click to toggle source
# File lib/daemonic/pool.rb, line 34
def stop
  @threads.size.times do
    enqueue(STOP_SIGNAL)
  end
  @threads.each(&:join)
end

Private Instance Methods

dispatch(worker_num) click to toggle source
# File lib/daemonic/pool.rb, line 43
def dispatch(worker_num)
  logger.debug { "T#{worker_num}: Starting" }
  loop do
    job = @jobs.pop
    if STOP_SIGNAL.equal?(job)
      logger.debug { "T#{worker_num}: Received stop signal, terminating." }
      break
    end
    begin
      logger.debug { "T#{worker_num}: Consuming #{job.inspect}" }
      worker.consume(job)
      Thread.pass
    rescue Object => error
      if error.is_a?(SystemExit) # allow app to exit
        logger.warn { "T#{worker_num}: Received SystemExit, shutting down" }
        producer.stop
      else
        logger.warn { "T#{worker_num}: #{error.class} while processing #{job}: #{error}" }
        logger.info { error.backtrace.join("\n") }
      end
      Thread.pass
    end
  end
  logger.debug { "T#{worker_num}: Stopped" }
end
logger() click to toggle source
# File lib/daemonic/pool.rb, line 73
def logger
  producer.logger
end
worker() click to toggle source
# File lib/daemonic/pool.rb, line 69
def worker
  producer.worker
end