class QueueingRabbit::Worker
Attributes
concurrency[R]
jobs[R]
mutex_pool[R]
Public Class Methods
new(jobs, concurrency = nil)
click to toggle source
# File lib/queueing_rabbit/worker.rb, line 12 def initialize(jobs, concurrency = nil) @jobs = jobs.map { |job| job.to_s.strip }.reject { |job| job.empty? } @concurrency = concurrency || @jobs.count @mutex_pool = ::MutexPool.new(@concurrency) sync_stdio validate_jobs constantize_jobs end
Public Instance Methods
invoke_job(job, payload, metadata)
click to toggle source
# File lib/queueing_rabbit/worker.rb, line 92 def invoke_job(job, payload, metadata) info "performing job #{job}" if job.respond_to?(:perform) job.perform(payload, metadata) elsif job <= QueueingRabbit::AbstractJob job.new(payload, metadata).perform else error "don't know how to perform job #{job}" end rescue => e QueueingRabbit.trigger_event(:consumer_error, e) error "unexpected error #{e.class} occured: #{e.message}" debug e end
pid()
click to toggle source
# File lib/queueing_rabbit/worker.rb, line 61 def pid Process.pid end
pidfile_exists?()
click to toggle source
# File lib/queueing_rabbit/worker.rb, line 57 def pidfile_exists? @pidfile && File.exists?(@pidfile) end
read_pidfile()
click to toggle source
# File lib/queueing_rabbit/worker.rb, line 53 def read_pidfile File.read(@pidfile).to_i if pidfile_exists? end
remove_pidfile()
click to toggle source
# File lib/queueing_rabbit/worker.rb, line 49 def remove_pidfile File.delete(@pidfile) if pidfile_exists? end
stop(connection = QueueingRabbit.connection, graceful = false)
click to toggle source
# File lib/queueing_rabbit/worker.rb, line 69 def stop(connection = QueueingRabbit.connection, graceful = false) connection.next_tick do begin @working = false if graceful Timeout.timeout(QueueingRabbit.jobs_wait_timeout) { @mutex_pool.lock } QueueingRabbit.trigger_event(:consuming_done) info "gracefully shutting down the worker #{self}" end rescue Timeout::Error error "a timeout (> #{QueueingRabbit.jobs_wait_timeout}s) when trying to gracefully shut down the worker " \ "#{self}" rescue => e error "a #{e.class} error occurred when trying to shut down the worker #{self}" debug e ensure connection.close do remove_pidfile end end end end
to_s()
click to toggle source
# File lib/queueing_rabbit/worker.rb, line 65 def to_s "PID=#{pid}, JOBS=#{jobs.join(',')} CONCURRENCY=#{@concurrency}" end
use_pidfile(filename)
click to toggle source
# File lib/queueing_rabbit/worker.rb, line 43 def use_pidfile(filename) @pidfile = filename cleanup_pidfile File.open(@pidfile, 'w') { |f| f << pid } end
work()
click to toggle source
# File lib/queueing_rabbit/worker.rb, line 26 def work return if working? @working = true QueueingRabbit.trigger_event(:worker_ready) jobs.each { |job| run_job(QueueingRabbit.connection, job) } QueueingRabbit.trigger_event(:consuming_started) end
work!()
click to toggle source
# File lib/queueing_rabbit/worker.rb, line 35 def work! return if working? trap_signals info "starting a new queueing_rabbit worker #{self}" QueueingRabbit.begin_worker_loop { work } end
working?()
click to toggle source
# File lib/queueing_rabbit/worker.rb, line 22 def working? @working end
Private Instance Methods
cleanup_pidfile()
click to toggle source
# File lib/queueing_rabbit/worker.rb, line 150 def cleanup_pidfile return unless pid_in_file = read_pidfile Process.getpgid(pid_in_file) fatal "failed to use the pidfile #{@pidfile}. It is already " \ "in use by a process with pid=#{pid_in_file}." raise WorkerError.new('The pidfile is already in use.') rescue Errno::ESRCH info "found abandoned pidfile: #{@pidfile}. Can be safely overwritten." end
constantize_jobs()
click to toggle source
# File lib/queueing_rabbit/worker.rb, line 117 def constantize_jobs @jobs = @jobs.map do |job| begin Kernel.const_get(job) rescue NameError fatal "job #{job} doesn't exist." raise JobNotFoundError.new("Job #{job} doesn't exist.") end end end
run_job(conn, job)
click to toggle source
# File lib/queueing_rabbit/worker.rb, line 128 def run_job(conn, job) QueueingRabbit.follow_job_requirements(job) do |_, _, queue| conn.listen_queue(queue, job.listening_options) do |payload, metadata| @mutex_pool.synchronize do invoke_job(job, payload, metadata) end end end end
sync_stdio()
click to toggle source
# File lib/queueing_rabbit/worker.rb, line 138 def sync_stdio $stdout.sync = true $stderr.sync = true end
trap_signals()
click to toggle source
# File lib/queueing_rabbit/worker.rb, line 143 def trap_signals connection = QueueingRabbit.connection Signal.trap('QUIT') { stop(connection, true) } Signal.trap('TERM') { stop(connection) } Signal.trap('INT') { stop(connection) } end
validate_jobs()
click to toggle source
# File lib/queueing_rabbit/worker.rb, line 110 def validate_jobs if @jobs.nil? || @jobs.empty? fatal "no jobs specified to work on." raise JobNotPresentError.new("No jobs specified to work on.") end end