class Resque::Worker
A Resque
Worker
processes jobs. On platforms that support fork(2), the worker will fork off a child to process each job. This ensures a clean slate when beginning the next job and cuts down on gradual memory growth as well as low level failures.
It also ensures workers are always listening to signals from you, their master, and can react accordingly.
Constants
- WILDCARDS
Attributes
should term kill workers gracefully (vs. immediately) Makes SIGTERM work like SIGQUIT
When set to true, forked workers will exit with `exit`, calling any `at_exit` code handlers that have been registered in the application. Otherwise, forked workers exit with `exit!`
decide whether to use new_kill_child
logic
Public Class Methods
Returns an array of all worker objects.
# File lib/resque/worker.rb, line 71 def self.all data_store.worker_ids.map { |id| find(id, :skip_exists => true) }.compact end
# File lib/resque/worker.rb, line 487 def self.all_heartbeats data_store.all_heartbeats end
Returns a list of workers that have sent a heartbeat in the past, but which already expired (does NOT include workers that have never sent a heartbeat at all).
# File lib/resque/worker.rb, line 493 def self.all_workers_with_expired_heartbeats # Use `Worker.all_heartbeats` instead of `Worker.all` # to prune workers which haven't been registered but have set a heartbeat. # https://github.com/resque/resque/pull/1751 heartbeats = Worker.all_heartbeats now = data_store.server_time heartbeats.select do |id, heartbeat| if heartbeat seconds_since_heartbeat = (now - Time.parse(heartbeat)).to_i seconds_since_heartbeat > Resque.prune_interval else false end end.each_key.map do |id| # skip_exists must be true to include not registered workers find(id, :skip_exists => true) end end
Alias of `find`
# File lib/resque/worker.rb, line 119 def self.attach(worker_id) find(worker_id) end
# File lib/resque/worker.rb, line 33 def self.data_store self.redis end
Given a string worker id, return a boolean indicating whether the worker exists
# File lib/resque/worker.rb, line 125 def self.exists?(worker_id) data_store.worker_exists?(worker_id) end
Returns a single worker object. Accepts a string id.
# File lib/resque/worker.rb, line 102 def self.find(worker_id, options = {}) skip_exists = options[:skip_exists] if skip_exists || exists?(worker_id) host, pid, queues_raw = worker_id.split(':', 3) queues = queues_raw.split(',') worker = new(*queues) worker.hostname = host worker.to_s = worker_id worker.pid = pid.to_i worker else nil end end
# File lib/resque/worker.rb, line 19 def self.kill_all_heartbeat_threads @@all_heartbeat_threads.each(&:kill).each(&:join) @@all_heartbeat_threads = [] end
Workers should be initialized with an array of string queue names. The order is important: a Worker
will check the first queue given for a job. If none is found, it will check the second queue name given. If a job is found, it will be processed. Upon completion, the Worker
will again check the first queue given, and so forth. In this way the queue list passed to a Worker
on startup defines the priorities of queues.
If passed a single “*”, this Worker
will operate on all queues in alphabetical order. Queues can be dynamically added or removed without needing to restart workers using this method.
Workers should have `#prepare` called after they are initialized if you are running work on the worker.
# File lib/resque/worker.rb, line 143 def initialize(*queues) @shutdown = nil @paused = nil @before_first_fork_hook_ran = false @heartbeat_thread = nil @heartbeat_thread_signal = nil @last_state = :idle verbose_value = ENV['LOGGING'] || ENV['VERBOSE'] self.verbose = verbose_value if verbose_value self.very_verbose = ENV['VVERBOSE'] if ENV['VVERBOSE'] self.pre_shutdown_timeout = (ENV['RESQUE_PRE_SHUTDOWN_TIMEOUT'] || 0.0).to_f self.term_timeout = (ENV['RESQUE_TERM_TIMEOUT'] || 4.0).to_f self.term_child = ENV['TERM_CHILD'] self.graceful_term = ENV['GRACEFUL_TERM'] self.run_at_exit_hooks = ENV['RUN_AT_EXIT_HOOKS'] self.queues = queues end
# File lib/resque/worker.rb, line 29 def self.redis Resque.redis end
Returns an array of all worker objects currently processing jobs.
# File lib/resque/worker.rb, line 77 def self.working names = all return [] unless names.any? reportedly_working = {} begin reportedly_working = data_store.workers_map(names).reject do |key, value| value.nil? || value.empty? end rescue Redis::Distributed::CannotDistribute names.each do |name| value = data_store.get_worker_payload(name) reportedly_working[name] = value unless value.nil? || value.empty? end end reportedly_working.keys.map do |key| worker = find(key.sub("worker:", ''), :skip_exists => true) worker.job = worker.decode(reportedly_working[key]) worker end.compact end
Public Instance Methods
Is this worker the same as another worker?
# File lib/resque/worker.rb, line 795 def ==(other) to_s == other.to_s end
# File lib/resque/worker.rb, line 559 def child_already_exited? Process.waitpid(@child, Process::WNOHANG) end
Given a string, returns a Ruby object.
# File lib/resque/worker.rb, line 44 def decode(object) Resque.decode(object) end
Called when we are done working - clears our `working_on` state and tells Redis we processed a job.
# File lib/resque/worker.rb, line 719 def done_working data_store.worker_done_working(self) do processed! end end
Enables GC Optimizations if you're running REE. www.rubyenterpriseedition.com/faq.html#adapt_apps_for_cow
# File lib/resque/worker.rb, line 377 def enable_gc_optimizations if GC.respond_to?(:copy_on_write_friendly=) GC.copy_on_write_friendly = true end end
Given a Ruby object, returns a string suitable for storage in a queue.
# File lib/resque/worker.rb, line 39 def encode(object) Resque.encode(object) end
How many failed jobs has this worker seen? Returns an int.
# File lib/resque/worker.rb, line 745 def failed Stat["failed:#{self}"] end
Tells Redis we've failed a job.
# File lib/resque/worker.rb, line 750 def failed! Stat << "failed" Stat << "failed:#{self}" end
# File lib/resque/worker.rb, line 783 def fork_per_job? return @fork_per_job if defined?(@fork_per_job) @fork_per_job = ENV["FORK_PER_JOB"] != 'false' && Kernel.respond_to?(:fork) end
# File lib/resque/worker.rb, line 211 def glob_match(list, pattern) list.select do |queue| File.fnmatch?(pattern, queue) end.sort end
# File lib/resque/worker.rb, line 475 def heartbeat data_store.heartbeat(self) end
# File lib/resque/worker.rb, line 483 def heartbeat!(time = data_store.server_time) data_store.heartbeat!(self, time) end
chomp'd hostname of this worker's machine
# File lib/resque/worker.rb, line 811 def hostname @hostname ||= Socket.gethostname end
Boolean - true if idle, false if not
# File lib/resque/worker.rb, line 779 def idle? state == :idle end
# File lib/resque/worker.rb, line 799 def inspect "#<Worker #{to_s}>" end
Returns a hash explaining the Job
we're currently processing, if any.
# File lib/resque/worker.rb, line 766 def job(reload = true) @job = nil if reload @job ||= decode(data_store.get_worker_payload(self)) || {} end
# File lib/resque/worker.rb, line 666 def kill_background_threads if @heartbeat_thread @heartbeat_thread_signal.signal @heartbeat_thread.join end end
Kills the forked child immediately, without remorse. The job it is processing will not be completed.
# File lib/resque/worker.rb, line 463 def kill_child if @child log_with_severity :debug, "Killing child at #{@child}" if `ps -o pid,state -p #{@child}` Process.kill("KILL", @child) rescue nil else log_with_severity :debug, "Child #{@child} not found, restarting." shutdown end end end
Find Resque
worker pids on Linux and OS X.
# File lib/resque/worker.rb, line 841 def linux_worker_pids `ps -A -o pid,command | grep -E "[r]esque:work|[r]esque:\sStarting|[r]esque-[0-9]" | grep -v "resque-web"`.split("\n").map do |line| line.split(' ')[0] end end
# File lib/resque/worker.rb, line 869 def log(message) info(message) end
# File lib/resque/worker.rb, line 873 def log!(message) debug(message) end
Kills the forked child immediately with minimal remorse. The job it is processing will not be completed. Send the child a TERM signal, wait <term_timeout> seconds, and then a KILL signal if it has not quit If pre_shutdown_timeout
has been set to a positive number, it will allow the child that many seconds before sending the aforementioned TERM and KILL.
# File lib/resque/worker.rb, line 534 def new_kill_child if @child unless child_already_exited? if pre_shutdown_timeout && pre_shutdown_timeout > 0.0 log_with_severity :debug, "Waiting #{pre_shutdown_timeout.to_f}s for child process to exit" return if wait_for_child_exit(pre_shutdown_timeout) end log_with_severity :debug, "Sending TERM signal to child #{@child}" Process.kill("TERM", @child) if wait_for_child_exit(term_timeout) return else log_with_severity :debug, "Sending KILL signal to child #{@child}" Process.kill("KILL", @child) end else log_with_severity :debug, "Child #{@child} already quit." end end rescue SystemCallError log_with_severity :error, "Child #{@child} already quit and reaped." end
Stop processing jobs after the current one has completed (if we're currently running one).
# File lib/resque/worker.rb, line 578 def pause_processing log_with_severity :info, "USR2 received; pausing job processing" run_hook :before_pause, self @paused = true end
are we paused?
# File lib/resque/worker.rb, line 572 def paused? @paused end
Processes a given job in the child.
# File lib/resque/worker.rb, line 307 def perform(job) begin if fork_per_job? reconnect run_hook :after_fork, job end job.perform rescue Object => e report_failed_job(job,e) else log_with_severity :info, "done: #{job.inspect}" ensure yield job if block_given? end end
Returns Integer PID of running worker
# File lib/resque/worker.rb, line 816 def pid @pid ||= Process.pid end
Daemonizes the worker if ENV is set and writes the process id to ENV if set. Should only be called once per worker.
# File lib/resque/worker.rb, line 168 def prepare if ENV['BACKGROUND'] Process.daemon(true) end if ENV['PIDFILE'] File.open(ENV['PIDFILE'], 'w') { |f| f << pid } end self.reconnect if ENV['BACKGROUND'] end
DEPRECATED. Processes a single job. If none is given, it will try to produce one. Usually run in the child.
# File lib/resque/worker.rb, line 280 def process(job = nil, &block) return unless job ||= reserve job.worker = self working_on job perform(job, &block) ensure done_working end
How many jobs has this worker processed? Returns an int.
# File lib/resque/worker.rb, line 734 def processed Stat["processed:#{self}"] end
Tell Redis we've processed a job.
# File lib/resque/worker.rb, line 739 def processed! Stat << "processed" Stat << "processed:#{self}" end
Given a string, sets the procline ($0) and logs. Procline is always in the format of:
RESQUE_PROCLINE_PREFIXresque-VERSION: STRING
# File lib/resque/worker.rb, line 864 def procline(string) $0 = "#{ENV['RESQUE_PROCLINE_PREFIX']}resque-#{Resque::Version}: #{string}" log_with_severity :debug, $0 end
Looks for any workers which should be running on this server and, if they're not, removes them from Redis.
This is a form of garbage collection. If a server is killed by a hard shutdown, power failure, or something else beyond our control, the Resque
workers will not die gracefully and therefore will leave stale state information in Redis.
By checking the current Redis state against the actual environment, we can determine if Redis is old and clean it up a bit.
# File lib/resque/worker.rb, line 601 def prune_dead_workers return unless data_store.acquire_pruning_dead_worker_lock(self, Resque.heartbeat_interval) all_workers = Worker.all known_workers = worker_pids all_workers_with_expired_heartbeats = Worker.all_workers_with_expired_heartbeats all_workers_with_expired_heartbeats.each do |worker| # If the worker hasn't sent a heartbeat, remove it from the registry. # # If the worker hasn't ever sent a heartbeat, we won't remove it since # the first heartbeat is sent before the worker is registred it means # that this is a worker that doesn't support heartbeats, e.g., another # client library or an older version of Resque. We won't touch these. log_with_severity :info, "Pruning dead worker: #{worker}" job_class = worker.job(false)['payload']['class'] rescue nil worker.unregister_worker(PruneDeadWorkerDirtyExit.new(worker.to_s, job_class)) end all_workers.each do |worker| if all_workers_with_expired_heartbeats.include?(worker) next end host, pid, worker_queues_raw = worker.id.split(':') worker_queues = worker_queues_raw.split(",") unless @queues.include?("*") || (worker_queues.to_set == @queues.to_set) # If the worker we are trying to prune does not belong to the queues # we are listening to, we should not touch it. # Attempt to prune a worker from different queues may easily result in # an unknown class exception, since that worker could easily be even # written in different language. next end next unless host == hostname next if known_workers.include?(pid) log_with_severity :debug, "Pruning dead worker: #{worker}" worker.unregister_worker end end
Returns a list of queues to use when searching for a job. A splat (“*”) means you want every queue (in alpha order) - this can be useful for dynamically adding new queues.
# File lib/resque/worker.rb, line 202 def queues if @has_dynamic_queues current_queues = Resque.queues @queues.map { |queue| glob_match(current_queues, queue) }.flatten.uniq else @queues end end
# File lib/resque/worker.rb, line 182 def queues=(queues) queues = queues.empty? ? (ENV["QUEUES"] || ENV['QUEUE']).to_s.split(',') : queues @queues = queues.map { |queue| queue.to_s.strip } @has_dynamic_queues = WILDCARDS.any? {|char| @queues.join.include?(char) } validate_queues end
Reconnect to Redis to avoid sharing a connection with the parent, retry up to 3 times with increasing delay before giving up.
# File lib/resque/worker.rb, line 343 def reconnect tries = 0 begin data_store.reconnect rescue Redis::BaseConnectionError if (tries += 1) <= 3 log_with_severity :error, "Error reconnecting to Redis; retrying" sleep(tries) retry else log_with_severity :error, "Error reconnecting to Redis; quitting" raise end end end
# File lib/resque/worker.rb, line 24 def redis Resque.redis end
Registers the various signal handlers a worker responds to.
TERM: Shutdown immediately, stop processing jobs.
INT: Shutdown immediately, stop processing jobs.
QUIT: Shutdown after the current job has finished processing. USR1: Kill the forked child immediately, continue processing jobs. USR2: Don't process any new jobs CONT: Start processing jobs again after a USR2
# File lib/resque/worker.rb, line 391 def register_signal_handlers trap('TERM') { graceful_term ? shutdown : shutdown! } trap('INT') { shutdown! } begin trap('QUIT') { shutdown } if term_child trap('USR1') { new_kill_child } else trap('USR1') { kill_child } end trap('USR2') { pause_processing } trap('CONT') { unpause_processing } rescue ArgumentError log_with_severity :warn, "Signals QUIT, USR1, USR2, and/or CONT not supported." end log_with_severity :debug, "Registered signals" end
Registers ourself as a worker. Useful when entering the worker lifecycle on startup.
# File lib/resque/worker.rb, line 647 def register_worker data_store.register_worker(self) end
# File lib/resque/worker.rb, line 479 def remove_heartbeat data_store.remove_heartbeat(self) end
Reports the exception and marks the job as failed
# File lib/resque/worker.rb, line 291 def report_failed_job(job,exception) log_with_severity :error, "#{job.inspect} failed: #{exception.inspect}" begin job.fail(exception) rescue Object => exception log_with_severity :error, "Received exception when reporting failure: #{exception.inspect}" end begin failed! rescue Object => exception log_with_severity :error, "Received exception when increasing failed jobs counter (redis issue) : #{exception.inspect}" end end
Attempts to grab a job off one of the provided queues. Returns nil if no job can be found.
# File lib/resque/worker.rb, line 325 def reserve queues.each do |queue| log_with_severity :debug, "Checking #{queue}" if job = Resque.reserve(queue) log_with_severity :debug, "Found job on #{queue}" return job end end nil rescue Exception => e log_with_severity :error, "Error reserving job: #{e.inspect}" log_with_severity :error, e.backtrace.join("\n") raise e end
Runs a named hook, passing along any arguments.
# File lib/resque/worker.rb, line 652 def run_hook(name, *args) hooks = Resque.send(name) return if hooks.empty? return if name == :before_first_fork && @before_first_fork_hook_ran msg = "Running #{name} hooks" msg << " with #{args.inspect}" if args.any? log_with_severity :info, msg hooks.each do |hook| args.any? ? hook.call(*args) : hook.call @before_first_fork_hook_ran = true if name == :before_first_fork end end
Schedule this worker for shutdown. Will finish processing the current job.
# File lib/resque/worker.rb, line 432 def shutdown log_with_severity :info, 'Exiting...' @shutdown = true end
Kill the child and shutdown immediately. If not forking, abort this process.
# File lib/resque/worker.rb, line 439 def shutdown! shutdown if term_child if fork_per_job? new_kill_child else # Raise TermException in the same process trap('TERM') do # ignore subsequent terms end raise TermException.new("SIGTERM") end else kill_child end end
Should this worker shutdown as soon as current job is finished?
# File lib/resque/worker.rb, line 457 def shutdown? @shutdown end
Find Resque
worker pids on Solaris.
Returns an Array of string pids of all the other workers on this machine. Useful when pruning dead workers on startup.
# File lib/resque/worker.rb, line 851 def solaris_worker_pids `ps -A -o pid,comm | grep "[r]uby" | grep -v "resque-web"`.split("\n").map do |line| real_pid = line.split(' ')[0] pargs_command = `pargs -a #{real_pid} 2>/dev/null | grep [r]esque | grep -v "resque-web"` if pargs_command.split(':')[1] == " resque-#{Resque::Version}" real_pid end end.compact end
# File lib/resque/worker.rb, line 513 def start_heartbeat remove_heartbeat @heartbeat_thread_signal = Resque::ThreadSignal.new @heartbeat_thread = Thread.new do loop do heartbeat! signaled = @heartbeat_thread_signal.wait_for_signal(Resque.heartbeat_interval) break if signaled end end @@all_heartbeat_threads << @heartbeat_thread end
What time did this worker start? Returns an instance of `Time`
# File lib/resque/worker.rb, line 756 def started data_store.worker_start_time(self) end
Tell Redis we've started
# File lib/resque/worker.rb, line 761 def started! data_store.worker_started(self) end
Runs all the methods needed when a worker begins its lifecycle.
# File lib/resque/worker.rb, line 360 def startup $0 = "resque: Starting" enable_gc_optimizations register_signal_handlers start_heartbeat prune_dead_workers run_hook :before_first_fork register_worker # Fix buffering so we can `rake resque:work > resque.log` and # get output from the child in there. $stdout.sync = true end
Returns a symbol representing the current worker state, which can be either :working or :idle
# File lib/resque/worker.rb, line 790 def state data_store.get_worker_payload(self) ? :working : :idle end
# File lib/resque/worker.rb, line 725 def state_change current_state = state if current_state != @last_state run_hook :queue_empty if current_state == :idle @last_state = current_state end end
The string representation is the same as the id for this worker instance. Can be used with `Worker.find`.
# File lib/resque/worker.rb, line 805 def to_s @to_s ||= "#{hostname}:#{pid}:#{@queues.join(',')}" end
Start processing jobs again after a pause
# File lib/resque/worker.rb, line 585 def unpause_processing log_with_severity :info, "CONT received; resuming job processing" @paused = false run_hook :after_pause, self end
# File lib/resque/worker.rb, line 411 def unregister_signal_handlers trap('TERM') do trap('TERM') do # Ignore subsequent term signals end raise TermException.new("SIGTERM") end trap('INT', 'DEFAULT') begin trap('QUIT', 'DEFAULT') trap('USR1', 'DEFAULT') trap('USR2', 'DEFAULT') rescue ArgumentError end end
Unregisters ourself as a worker. Useful when shutting down.
# File lib/resque/worker.rb, line 674 def unregister_worker(exception = nil) # If we're still processing a job, make sure it gets logged as a # failure. if (hash = processing) && !hash.empty? job = Job.new(hash['queue'], hash['payload']) # Ensure the proper worker is attached to this job, even if # it's not the precise instance that died. job.worker = self begin job.fail(exception || DirtyExit.new("Job still being processed")) rescue RuntimeError => e log_with_severity :error, e.message end end kill_background_threads data_store.unregister_worker(self) do Stat.clear("processed:#{self}") Stat.clear("failed:#{self}") end rescue Exception => exception_while_unregistering message = exception_while_unregistering.message if exception message += "\nOriginal Exception (#{exception.class}): #{exception.message}" message += "\n #{exception.backtrace.join(" \n")}" if exception.backtrace end fail(exception_while_unregistering.class, message, exception_while_unregistering.backtrace) end
A worker must be given a queue, otherwise it won't know what to do with itself.
You probably never need to call this.
# File lib/resque/worker.rb, line 193 def validate_queues if @queues.nil? || @queues.empty? raise NoQueueError.new("Please give each worker at least one queue.") end end
# File lib/resque/worker.rb, line 880 def verbose=(value); if value && !very_verbose Resque.logger.formatter = VerboseFormatter.new Resque.logger.level = Logger::INFO elsif !value Resque.logger.formatter = QuietFormatter.new end @verbose = value end
# File lib/resque/worker.rb, line 891 def very_verbose=(value) if value Resque.logger.formatter = VeryVerboseFormatter.new Resque.logger.level = Logger::DEBUG elsif !value && verbose Resque.logger.formatter = VerboseFormatter.new Resque.logger.level = Logger::INFO else Resque.logger.formatter = QuietFormatter.new end @very_verbose = value end
# File lib/resque/worker.rb, line 563 def wait_for_child_exit(timeout) (timeout * 10).round.times do |i| sleep(0.1) return true if child_already_exited? end false end
Returns an Array of string pids of all the other workers on this machine. Useful when pruning dead workers on startup.
# File lib/resque/worker.rb, line 834 def windows_worker_pids tasklist_output = `tasklist /FI "IMAGENAME eq ruby.exe" /FO list`.encode("UTF-8", Encoding.locale_charmap) tasklist_output.split($/).select { |line| line =~ /^PID:/ }.collect { |line| line.gsub(/PID:\s+/, '') } end
This is the main workhorse method. Called on a Worker
instance, it begins the worker life cycle.
The following events occur during a worker's life cycle:
-
Startup: Signals are registered, dead workers are pruned,
and this worker is registered.
-
Work loop: Jobs are pulled from a queue and processed.
-
Teardown: This worker is unregistered.
Can be passed a float representing the polling frequency. The default is 5 seconds, but for a semi-active site you may want to use a smaller value.
Also accepts a block which will be passed the job as soon as it has completed processing. Useful for testing.
# File lib/resque/worker.rb, line 233 def work(interval = 5.0, &block) interval = Float(interval) startup loop do break if shutdown? unless work_one_job(&block) state_change break if interval.zero? log_with_severity :debug, "Sleeping for #{interval} seconds" procline paused? ? "Paused" : "Waiting for #{queues.join(',')}" sleep interval end end unregister_worker run_hook :worker_exit rescue Exception => exception return if exception.class == SystemExit && !@child && run_at_exit_hooks log_with_severity :error, "Failed to start worker : #{exception.inspect}" unregister_worker(exception) run_hook :worker_exit end
# File lib/resque/worker.rb, line 258 def work_one_job(job = nil, &block) return false if paused? return false unless job ||= reserve working_on job procline "Processing #{job.queue} since #{Time.now.to_i} [#{job.payload_class_name}]" log_with_severity :info, "got: #{job.inspect}" job.worker = self if fork_per_job? perform_with_fork(job, &block) else perform(job, &block) end done_working true end
Returns an Array of string pids of all the other workers on this machine. Useful when pruning dead workers on startup.
# File lib/resque/worker.rb, line 822 def worker_pids if RUBY_PLATFORM =~ /solaris/ solaris_worker_pids elsif RUBY_PLATFORM =~ /mingw32/ windows_worker_pids else linux_worker_pids end end
Boolean - true if working, false if not
# File lib/resque/worker.rb, line 774 def working? state == :working end
Given a job, tells Redis we're working on it. Useful for seeing what workers are doing and when.
# File lib/resque/worker.rb, line 708 def working_on(job) data = encode \ :queue => job.queue, :run_at => Time.now.utc.iso8601, :payload => job.payload data_store.set_worker_payload(self,data) state_change end
Private Instance Methods
# File lib/resque/worker.rb, line 935 def log_with_severity(severity, message) Logging.log(severity, message) end
# File lib/resque/worker.rb, line 907 def perform_with_fork(job, &block) run_hook :before_fork, job begin @child = fork do unregister_signal_handlers if term_child perform(job, &block) exit! unless run_at_exit_hooks end rescue NotImplementedError @fork_per_job = false perform(job, &block) return end srand # Reseeding procline "Forked #{@child} at #{Time.now.to_i}" begin Process.waitpid(@child) rescue SystemCallError nil end job.fail(DirtyExit.new("Child process received unhandled signal #{$?}", $?)) if $?.signaled? @child = nil end