class QueueingRabbit::Worker

Attributes

concurrency[R]
jobs[R]
mutex_pool[R]

Public Class Methods

new(jobs, concurrency = nil) click to toggle source
# File lib/queueing_rabbit/worker.rb, line 12
def initialize(jobs, concurrency = nil)
  @jobs = jobs.map { |job| job.to_s.strip }.reject { |job| job.empty? }
  @concurrency = concurrency || @jobs.count
  @mutex_pool = ::MutexPool.new(@concurrency)

  sync_stdio
  validate_jobs
  constantize_jobs
end

Public Instance Methods

invoke_job(job, payload, metadata) click to toggle source
# File lib/queueing_rabbit/worker.rb, line 92
def invoke_job(job, payload, metadata)
  info "performing job #{job}"
  
  if job.respond_to?(:perform)
    job.perform(payload, metadata)
  elsif job <= QueueingRabbit::AbstractJob
    job.new(payload, metadata).perform
  else
    error "don't know how to perform job #{job}"
  end
rescue => e
  QueueingRabbit.trigger_event(:consumer_error, e)
  error "unexpected error #{e.class} occured: #{e.message}"
  debug e
end
pid() click to toggle source
# File lib/queueing_rabbit/worker.rb, line 61
def pid
  Process.pid
end
pidfile_exists?() click to toggle source
# File lib/queueing_rabbit/worker.rb, line 57
def pidfile_exists?
  @pidfile && File.exists?(@pidfile)
end
read_pidfile() click to toggle source
# File lib/queueing_rabbit/worker.rb, line 53
def read_pidfile
  File.read(@pidfile).to_i if pidfile_exists?
end
remove_pidfile() click to toggle source
# File lib/queueing_rabbit/worker.rb, line 49
def remove_pidfile
  File.delete(@pidfile) if pidfile_exists?
end
stop(connection = QueueingRabbit.connection, graceful = false) click to toggle source
# File lib/queueing_rabbit/worker.rb, line 69
def stop(connection = QueueingRabbit.connection, graceful = false)
  connection.next_tick do
    begin
      @working = false
      if graceful
        Timeout.timeout(QueueingRabbit.jobs_wait_timeout) { @mutex_pool.lock }
        QueueingRabbit.trigger_event(:consuming_done)
        info "gracefully shutting down the worker #{self}"
      end
    rescue Timeout::Error
      error "a timeout (> #{QueueingRabbit.jobs_wait_timeout}s) when trying to gracefully shut down the worker " \
            "#{self}"
    rescue => e
      error "a #{e.class} error occurred when trying to shut down the worker #{self}"
      debug e
    ensure
      connection.close do
        remove_pidfile
      end
    end
  end
end
to_s() click to toggle source
# File lib/queueing_rabbit/worker.rb, line 65
def to_s
  "PID=#{pid}, JOBS=#{jobs.join(',')} CONCURRENCY=#{@concurrency}"
end
use_pidfile(filename) click to toggle source
# File lib/queueing_rabbit/worker.rb, line 43
def use_pidfile(filename)
  @pidfile = filename
  cleanup_pidfile
  File.open(@pidfile, 'w') { |f| f << pid }
end
work() click to toggle source
# File lib/queueing_rabbit/worker.rb, line 26
def work
  return if working?
  @working = true

  QueueingRabbit.trigger_event(:worker_ready)
  jobs.each { |job| run_job(QueueingRabbit.connection, job) }
  QueueingRabbit.trigger_event(:consuming_started)
end
work!() click to toggle source
# File lib/queueing_rabbit/worker.rb, line 35
def work!
  return if working?

  trap_signals
  info "starting a new queueing_rabbit worker #{self}"
  QueueingRabbit.begin_worker_loop { work }
end
working?() click to toggle source
# File lib/queueing_rabbit/worker.rb, line 22
def working?
  @working
end

Private Instance Methods

cleanup_pidfile() click to toggle source
# File lib/queueing_rabbit/worker.rb, line 150
def cleanup_pidfile
  return unless pid_in_file = read_pidfile
  Process.getpgid(pid_in_file)
  fatal "failed to use the pidfile #{@pidfile}. It is already " \
        "in use by a process with pid=#{pid_in_file}."
  raise WorkerError.new('The pidfile is already in use.')
rescue Errno::ESRCH
  info "found abandoned pidfile: #{@pidfile}. Can be safely overwritten."
end
constantize_jobs() click to toggle source
# File lib/queueing_rabbit/worker.rb, line 117
def constantize_jobs
  @jobs = @jobs.map do |job|
    begin
      Kernel.const_get(job)
    rescue NameError
      fatal "job #{job} doesn't exist."
      raise JobNotFoundError.new("Job #{job} doesn't exist.")
    end
  end
end
run_job(conn, job) click to toggle source
# File lib/queueing_rabbit/worker.rb, line 128
def run_job(conn, job)
  QueueingRabbit.follow_job_requirements(job) do |_, _, queue|
    conn.listen_queue(queue, job.listening_options) do |payload, metadata|
      @mutex_pool.synchronize do
        invoke_job(job, payload, metadata)
      end
    end
  end
end
sync_stdio() click to toggle source
# File lib/queueing_rabbit/worker.rb, line 138
def sync_stdio
  $stdout.sync = true
  $stderr.sync = true
end
trap_signals() click to toggle source
# File lib/queueing_rabbit/worker.rb, line 143
def trap_signals
  connection = QueueingRabbit.connection
  Signal.trap('QUIT') { stop(connection, true) }
  Signal.trap('TERM') { stop(connection) }
  Signal.trap('INT') { stop(connection) }
end
validate_jobs() click to toggle source
# File lib/queueing_rabbit/worker.rb, line 110
def validate_jobs
  if @jobs.nil? || @jobs.empty?
    fatal "no jobs specified to work on."
    raise JobNotPresentError.new("No jobs specified to work on.")
  end
end