class Jobster::Worker

Public Class Methods

new(queues = nil) click to toggle source
# File lib/jobster/worker.rb, line 4
def initialize(queues = nil)
  @initial_queues = queues || self.class.queues || [:main]
  @active_queues = {}
  @running_jobs = []
  @process_name = $0
  set_process_name
end

Private Class Methods

queues() click to toggle source
# File lib/jobster/worker.rb, line 137
def self.queues
  @queues ||= [:main]
end

Public Instance Methods

perform_job(class_name, params = {}, id = nil) click to toggle source
# File lib/jobster/worker.rb, line 57
def perform_job(class_name, params = {}, id = nil)
  id ||= SecureRandom.uuid[0,8]
  start_time = Time.now
  exception = nil
  logger.info "[#{id}] Started processing \e[34m#{class_name}\e[0m job"
  begin
    klass = Object.const_get(class_name).new(id, params)
    run_callbacks :before_job, klass
    klass.perform
  rescue Job::Abort => e
    exception = e
    logger.info "[#{id}] Job aborted (#{e.message})"
  rescue => e
    exception = e
    logger.warn "[#{id}] \e[31m#{e.class}: #{e.message}\e[0m"
    e.backtrace.each do |line|
      logger.warn "[#{id}]    " + line
    end
    Jobster.config.worker_error_handlers.each { |handler| handler.call(e, klass) }
  ensure
    run_callbacks :after_job, klass, exception
    logger.info "[#{id}] Finished processing \e[34m#{class_name}\e[0m job in #{Time.now - start_time}s"
  end
end
set_process_name() click to toggle source
# File lib/jobster/worker.rb, line 12
def set_process_name
  prefix = @process_name.to_s
  prefix += " [exiting]" if @exit
  if @running_jobs.empty?
    $0 = "#{prefix} (idle)"
  else
    $0 = "#{prefix} (running #{@running_jobs.join(', ')})"
  end
end
work() click to toggle source
# File lib/jobster/worker.rb, line 22
def work
  logger.info "Jobster worker started (#{Jobster.config.worker_threads} thread(s))"
  run_callbacks :after_start

  Jobster.delay_queue # Declare it

  Signal.trap("INT")  { @exit = true; set_process_name }
  Signal.trap("TERM") { @exit = true; set_process_name }

  Jobster.channel.prefetch(Jobster.config.worker_threads)
  @initial_queues.uniq.each { |queue | join_queue(queue) }

  exit_checks = 0
  loop do
    if @exit && @running_jobs.empty?
      logger.info "Exiting immediately because no jobs running"
      run_callbacks :before_quit, :immediate
      exit 0
    elsif @exit
      if exit_checks >= 300
        logger.info "Job did not finish in a timely manner. Exiting"
        run_callbacks :before_quit, :timeout
        exit 0
      end
      if exit_checks == 0
        logger.info "Exit requested but job is running. Waiting for job to finish."
      end
      sleep 5
      exit_checks += 1
    else
      sleep 1
    end
  end
end

Private Instance Methods

join_queue(queue) click to toggle source
# File lib/jobster/worker.rb, line 105
def join_queue(queue)
  if @active_queues[queue]
    logger.info "Attempted to join queue #{queue} but already joined."
  else
    run_callbacks :before_queue_join, queue
    consumer = Jobster.queue(queue).subscribe(:manual_ack => true) do |delivery_info, properties, body|
      begin
        receive_job(properties, body)
      ensure
        Jobster.channel.ack(delivery_info.delivery_tag)
      end
    end
    @active_queues[queue] = consumer
    run_callbacks :after_queue_join, queue, consumer
    logger.info "Joined \e[32m#{queue}\e[0m queue"
  end
end
leave_queue(queue) click to toggle source
# File lib/jobster/worker.rb, line 123
def leave_queue(queue)
  if consumer = @active_queues[queue]
    consumer.cancel
    @active_queues.delete(queue)
    logger.info "Left \e[32m#{queue}\e[0m queue"
  else
    logger.info "Not joined #{queue} so cannot leave"
  end
end
logger() click to toggle source
# File lib/jobster/worker.rb, line 133
def logger
  Jobster.config.logger
end
receive_job(properties, body) click to toggle source
# File lib/jobster/worker.rb, line 84
def receive_job(properties, body)
  begin
    message = JSON.parse(body) rescue nil
    if message && message['class_name']
      Thread.current[:job_id] = message['id']
      @running_jobs << message['id']
      set_process_name
      perform_job(message['class_name'], message['params'] || {}, message['id'])
    end
  ensure
    Thread.current[:job_id] = nil
    @running_jobs.delete(message['id']) if message['id']
    set_process_name
    if @exit && @running_jobs.empty?
      logger.info "Exiting because all jobs have finished."
      run_callbacks :before_quit, :job_completed
      exit 0
    end
  end
end
run_callbacks(event, *args) click to toggle source
# File lib/jobster/worker.rb, line 141
def run_callbacks(event, *args)
  if callbacks = Jobster.config.worker_callbacks[event]
    callbacks.each do |callback|
      callback.call(*args)
    end
  end
end