class Resqued::Worker

Models a worker process.

Constants

DEFAULT_WORKER_FACTORY

Attributes

pid[R]

Public: The pid of the worker process.

queues[R]

Private.

Public Class Methods

new(options) click to toggle source
# File lib/resqued/worker.rb, line 24
def initialize(options)
  @queues = options.fetch(:queues)
  @config = options.fetch(:config)
  @interval = options[:interval]
  @backoff = Backoff.new
  @worker_factory = options.fetch(:worker_factory, DEFAULT_WORKER_FACTORY)
  @pids = []
end

Public Instance Methods

backing_off_for() click to toggle source

Public: The amount of time we need to wait before starting a new worker.

# File lib/resqued/worker.rb, line 83
def backing_off_for
  @pid ? nil : @backoff.how_long?
end
finished!(process_status) click to toggle source

Public: The old worker process finished!

# File lib/resqued/worker.rb, line 64
def finished!(process_status)
  summary = "(#{@pid}/#{@pids.inspect}/self_started=#{@self_started}/killed=#{@killed})"
  if process_status.nil? && !@self_started
    log :debug, "#{summary} I am no longer blocked."
    @pid = nil
    @backoff.died unless @killed
  elsif !process_status.nil? && @self_started
    alive_time_sec = Process.clock_gettime(Process::CLOCK_MONOTONIC) - @start_time
    @config.after_exit(WorkerSummary.new(alive_time_sec: alive_time_sec, process_status: process_status))

    log :debug, "#{summary} I exited: #{process_status}"
    @pid = nil
    @backoff.died unless @killed
  else
    log :debug, "#{summary} Reports of my death are highly exaggerated (#{process_status.inspect})"
  end
end
idle?() click to toggle source

Public: True if there is no worker process mapped to this object.

# File lib/resqued/worker.rb, line 40
def idle?
  pid.nil?
end
kill(signal) click to toggle source

Public: Shut this worker down.

# File lib/resqued/worker.rb, line 117
def kill(signal)
  Process.kill(signal.to_s, pid) if pid && @self_started
  @killed = true
rescue Errno::ESRCH => e
  log "Can't kill #{pid}: #{e}"
end
queue_key() click to toggle source

Public: A string that compares if this worker is equivalent to a worker in another Resqued::Listener.

# File lib/resqued/worker.rb, line 50
def queue_key
  Digest::SHA256.hexdigest(queues.sort.join(";"))
end
running_here?() click to toggle source

Public: True if this worker is running in this process.

# File lib/resqued/worker.rb, line 45
def running_here?
  !idle? && @self_started
end
try_start() click to toggle source

Public: Start a job, if there’s one waiting in one of my queues.

# File lib/resqued/worker.rb, line 88
def try_start
  return if @backoff.wait?

  @backoff.started
  @self_started = true
  @killed = false
  @start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)

  if @pid = fork
    @pids << @pid
    # still in the listener
    log "Forked worker #{@pid}"
  else
    # In case we get a signal before resque is ready for it.
    Resqued::Listener::ALL_SIGNALS.each { |signal| trap(signal, "DEFAULT") }
    # Continue ignoring SIGHUP, though.
    trap(:HUP) {}
    # If we get a QUIT during boot, just spin back down.
    trap(:QUIT) { exit! 0 }

    $0 = "STARTING RESQUE FOR #{queues.join(',')}"
    resque_worker = @worker_factory.call(queues)
    @config.after_fork(resque_worker)
    resque_worker.work(@interval || 5)
    exit 0
  end
end
wait_for(pid) click to toggle source

Public: Claim this worker for another listener’s worker.

# File lib/resqued/worker.rb, line 55
def wait_for(pid)
  raise "Already running #{@pid} (can't wait for #{pid})" if @pid

  @self_started = false
  @pids << pid
  @pid = pid
end