class Resqued::Worker
Models a worker process.
Constants
- DEFAULT_WORKER_FACTORY
Attributes
pid[R]
Public: The pid of the worker process.
queues[R]
Private.
Public Class Methods
new(options)
click to toggle source
# File lib/resqued/worker.rb, line 24 def initialize(options) @queues = options.fetch(:queues) @config = options.fetch(:config) @interval = options[:interval] @backoff = Backoff.new @worker_factory = options.fetch(:worker_factory, DEFAULT_WORKER_FACTORY) @pids = [] end
Public Instance Methods
backing_off_for()
click to toggle source
Public: The amount of time we need to wait before starting a new worker.
# File lib/resqued/worker.rb, line 83 def backing_off_for @pid ? nil : @backoff.how_long? end
finished!(process_status)
click to toggle source
Public: The old worker process finished!
# File lib/resqued/worker.rb, line 64 def finished!(process_status) summary = "(#{@pid}/#{@pids.inspect}/self_started=#{@self_started}/killed=#{@killed})" if process_status.nil? && !@self_started log :debug, "#{summary} I am no longer blocked." @pid = nil @backoff.died unless @killed elsif !process_status.nil? && @self_started alive_time_sec = Process.clock_gettime(Process::CLOCK_MONOTONIC) - @start_time @config.after_exit(WorkerSummary.new(alive_time_sec: alive_time_sec, process_status: process_status)) log :debug, "#{summary} I exited: #{process_status}" @pid = nil @backoff.died unless @killed else log :debug, "#{summary} Reports of my death are highly exaggerated (#{process_status.inspect})" end end
idle?()
click to toggle source
Public: True if there is no worker process mapped to this object.
# File lib/resqued/worker.rb, line 40 def idle? pid.nil? end
kill(signal)
click to toggle source
Public: Shut this worker down.
# File lib/resqued/worker.rb, line 117 def kill(signal) Process.kill(signal.to_s, pid) if pid && @self_started @killed = true rescue Errno::ESRCH => e log "Can't kill #{pid}: #{e}" end
queue_key()
click to toggle source
Public: A string that compares if this worker is equivalent to a worker in another Resqued::Listener
.
# File lib/resqued/worker.rb, line 50 def queue_key Digest::SHA256.hexdigest(queues.sort.join(";")) end
running_here?()
click to toggle source
Public: True if this worker is running in this process.
# File lib/resqued/worker.rb, line 45 def running_here? !idle? && @self_started end
try_start()
click to toggle source
Public: Start a job, if there’s one waiting in one of my queues.
# File lib/resqued/worker.rb, line 88 def try_start return if @backoff.wait? @backoff.started @self_started = true @killed = false @start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) if @pid = fork @pids << @pid # still in the listener log "Forked worker #{@pid}" else # In case we get a signal before resque is ready for it. Resqued::Listener::ALL_SIGNALS.each { |signal| trap(signal, "DEFAULT") } # Continue ignoring SIGHUP, though. trap(:HUP) {} # If we get a QUIT during boot, just spin back down. trap(:QUIT) { exit! 0 } $0 = "STARTING RESQUE FOR #{queues.join(',')}" resque_worker = @worker_factory.call(queues) @config.after_fork(resque_worker) resque_worker.work(@interval || 5) exit 0 end end
wait_for(pid)
click to toggle source
Public: Claim this worker for another listener’s worker.
# File lib/resqued/worker.rb, line 55 def wait_for(pid) raise "Already running #{@pid} (can't wait for #{pid})" if @pid @self_started = false @pids << pid @pid = pid end