class ResqueAdmin::Worker

A ResqueAdmin Worker processes jobs. On platforms that support fork(2), the worker will fork off a child to process each job. This ensures a clean slate when beginning the next job and cuts down on gradual memory growth as well as low level failures.

It also ensures workers are always listening to signals from you, their master, and can react accordingly.

Attributes

fork_per_job[W]
graceful_term[RW]

should term kill workers gracefully (vs. immediately) Makes SIGTERM work like SIGQUIT

hostname[W]
job[W]
pid[W]
pre_shutdown_timeout[RW]
run_at_exit_hooks[RW]

When set to true, forked workers will exit with `exit`, calling any `at_exit` code handlers that have been registered in the application. Otherwise, forked workers exit with `exit!`

term_child[RW]

decide whether to use new_kill_child logic

term_child_signal[RW]
term_timeout[RW]
to_s[W]

Public Class Methods

all() click to toggle source

Returns an array of all worker objects.

# File lib/resque_admin/worker.rb, line 71
def self.all
  data_store.worker_ids.map { |id| find(id, :skip_exists => true) }.compact
end
all_heartbeats() click to toggle source
# File lib/resque_admin/worker.rb, line 475
def self.all_heartbeats
  data_store.all_heartbeats
end
all_workers_with_expired_heartbeats() click to toggle source

Returns a list of workers that have sent a heartbeat in the past, but which already expired (does NOT include workers that have never sent a heartbeat at all).

# File lib/resque_admin/worker.rb, line 481
def self.all_workers_with_expired_heartbeats
  workers = Worker.all
  heartbeats = Worker.all_heartbeats
  now = data_store.server_time

  workers.select do |worker|
    id = worker.to_s
    heartbeat = heartbeats[id]

    if heartbeat
      seconds_since_heartbeat = (now - Time.parse(heartbeat)).to_i
      seconds_since_heartbeat > ResqueAdmin.prune_interval
    else
      false
    end
  end
end
attach(worker_id) click to toggle source

Alias of `find`

# File lib/resque_admin/worker.rb, line 119
def self.attach(worker_id)
  find(worker_id)
end
data_store() click to toggle source
# File lib/resque_admin/worker.rb, line 33
def self.data_store
  self.redis
end
exists?(worker_id) click to toggle source

Given a string worker id, return a boolean indicating whether the worker exists

# File lib/resque_admin/worker.rb, line 125
def self.exists?(worker_id)
  data_store.worker_exists?(worker_id)
end
find(worker_id, options = {}) click to toggle source

Returns a single worker object. Accepts a string id.

# File lib/resque_admin/worker.rb, line 102
def self.find(worker_id, options = {})
  skip_exists = options[:skip_exists]

  if skip_exists || exists?(worker_id)
    host, pid, queues_raw = worker_id.split(':')
    queues = queues_raw.split(',')
    worker = new(*queues)
    worker.hostname = host
    worker.to_s = worker_id
    worker.pid = pid.to_i
    worker
  else
    nil
  end
end
kill_all_heartbeat_threads() click to toggle source
# File lib/resque_admin/worker.rb, line 19
def self.kill_all_heartbeat_threads
  @@all_heartbeat_threads.each(&:kill).each(&:join)
  @@all_heartbeat_threads = []
end
new(*queues) click to toggle source

Workers should be initialized with an array of string queue names. The order is important: a Worker will check the first queue given for a job. If none is found, it will check the second queue name given. If a job is found, it will be processed. Upon completion, the Worker will again check the first queue given, and so forth. In this way the queue list passed to a Worker on startup defines the priorities of queues.

If passed a single “*”, this Worker will operate on all queues in alphabetical order. Queues can be dynamically added or removed without needing to restart workers using this method.

Workers should have `#prepare` called after they are initialized if you are running work on the worker.

# File lib/resque_admin/worker.rb, line 143
def initialize(*queues)
  @shutdown = nil
  @paused = nil
  @before_first_fork_hook_ran = false

  verbose_value = ENV['LOGGING'] || ENV['VERBOSE']
  self.verbose = verbose_value if verbose_value
  self.very_verbose = ENV['VVERBOSE'] if ENV['VVERBOSE']
  self.pre_shutdown_timeout = (ENV['RESQUE_PRE_SHUTDOWN_TIMEOUT'] || 0.0).to_f
  self.term_timeout = (ENV['RESQUE_TERM_TIMEOUT'] || 4.0).to_f
  self.term_child = ENV['TERM_CHILD']
  self.graceful_term = ENV['GRACEFUL_TERM']
  self.run_at_exit_hooks = ENV['RUN_AT_EXIT_HOOKS']

  self.queues = queues
end
redis() click to toggle source
# File lib/resque_admin/worker.rb, line 29
def self.redis
  ResqueAdmin.redis
end
working() click to toggle source

Returns an array of all worker objects currently processing jobs.

# File lib/resque_admin/worker.rb, line 77
def self.working
  names = all
  return [] unless names.any?

  reportedly_working = {}

  begin
    reportedly_working = data_store.workers_map(names).reject do |key, value|
      value.nil? || value.empty?
    end
  rescue Redis::Distributed::CannotDistribute
    names.each do |name|
      value = data_store.get_worker_payload(name)
      reportedly_working[name] = value unless value.nil? || value.empty?
    end
  end

  reportedly_working.keys.map do |key|
    worker = find(key.sub("worker:", ''), :skip_exists => true)
    worker.job = worker.decode(reportedly_working[key])
    worker
  end.compact
end

Public Instance Methods

==(other) click to toggle source

Is this worker the same as another worker?

# File lib/resque_admin/worker.rb, line 767
def ==(other)
  to_s == other.to_s
end
child_already_exited?() click to toggle source
# File lib/resque_admin/worker.rb, line 545
def child_already_exited?
  Process.waitpid(@child, Process::WNOHANG)
end
data_store()
Alias for: redis
decode(object) click to toggle source

Given a string, returns a Ruby object.

# File lib/resque_admin/worker.rb, line 44
def decode(object)
  ResqueAdmin.decode(object)
end
done_working() click to toggle source

Called when we are done working - clears our `working_on` state and tells Redis we processed a job.

# File lib/resque_admin/worker.rb, line 699
def done_working
  data_store.worker_done_working(self) do
    processed!
  end
end
enable_gc_optimizations() click to toggle source

Enables GC Optimizations if you're running REE. www.rubyenterpriseedition.com/faq.html#adapt_apps_for_cow

# File lib/resque_admin/worker.rb, line 365
def enable_gc_optimizations
  if GC.respond_to?(:copy_on_write_friendly=)
    GC.copy_on_write_friendly = true
  end
end
encode(object) click to toggle source

Given a Ruby object, returns a string suitable for storage in a queue.

# File lib/resque_admin/worker.rb, line 39
def encode(object)
  ResqueAdmin.encode(object)
end
failed() click to toggle source

How many failed jobs has this worker seen? Returns an int.

# File lib/resque_admin/worker.rb, line 717
def failed
  Stat["failed:#{self}"]
end
failed!() click to toggle source

Tells Redis we've failed a job.

# File lib/resque_admin/worker.rb, line 722
def failed!
  Stat << "failed"
  Stat << "failed:#{self}"
end
fork_per_job?() click to toggle source
# File lib/resque_admin/worker.rb, line 755
def fork_per_job?
  return @fork_per_job if defined?(@fork_per_job)
  @fork_per_job = ENV["FORK_PER_JOB"] != 'false' && Kernel.respond_to?(:fork)
end
glob_match(pattern) click to toggle source
# File lib/resque_admin/worker.rb, line 202
def glob_match(pattern)
  ResqueAdmin.queues.select do |queue|
    File.fnmatch?(pattern, queue)
  end.sort
end
heartbeat() click to toggle source
# File lib/resque_admin/worker.rb, line 463
def heartbeat
  data_store.heartbeat(self)
end
heartbeat!(time = data_store.server_time) click to toggle source
# File lib/resque_admin/worker.rb, line 471
def heartbeat!(time = data_store.server_time)
  data_store.heartbeat!(self, time)
end
hostname() click to toggle source

chomp'd hostname of this worker's machine

# File lib/resque_admin/worker.rb, line 783
def hostname
  @hostname ||= Socket.gethostname
end
id()
Alias for: to_s
idle?() click to toggle source

Boolean - true if idle, false if not

# File lib/resque_admin/worker.rb, line 751
def idle?
  state == :idle
end
inspect() click to toggle source
# File lib/resque_admin/worker.rb, line 771
def inspect
  "#<Worker #{to_s}>"
end
job(reload = true) click to toggle source

Returns a hash explaining the Job we're currently processing, if any.

# File lib/resque_admin/worker.rb, line 738
def job(reload = true)
  @job = nil if reload
  @job ||= decode(data_store.get_worker_payload(self)) || {}
end
Also aliased as: processing
kill_background_threads() click to toggle source
# File lib/resque_admin/worker.rb, line 647
def kill_background_threads
  if @heartbeat_thread
    @heartbeat_thread_signal.signal
    @heartbeat_thread.join
  end
end
kill_child() click to toggle source

Kills the forked child immediately, without remorse. The job it is processing will not be completed.

# File lib/resque_admin/worker.rb, line 451
def kill_child
  if @child
    log_with_severity :debug, "Killing child at #{@child}"
    if `ps -o pid,state -p #{@child}`
      Process.kill("KILL", @child) rescue nil
    else
      log_with_severity :debug, "Child #{@child} not found, restarting."
      shutdown
    end
  end
end
linux_worker_pids() click to toggle source

Find ResqueAdmin worker pids on Linux and OS X.

# File lib/resque_admin/worker.rb, line 813
def linux_worker_pids
  `ps -A -o pid,command | grep -E "[r]esque:work|[r]esque:\sStarting|[r]esque-[0-9]" | grep -v "resque_admin-web"`.split("\n").map do |line|
    line.split(' ')[0]
  end
end
log(message) click to toggle source
# File lib/resque_admin/worker.rb, line 841
def log(message)
  info(message)
end
log!(message) click to toggle source
# File lib/resque_admin/worker.rb, line 845
def log!(message)
  debug(message)
end
new_kill_child() click to toggle source

Kills the forked child immediately with minimal remorse. The job it is processing will not be completed. Send the child a TERM signal, wait <term_timeout> seconds, and then a KILL signal if it has not quit If pre_shutdown_timeout has been set to a positive number, it will allow the child that many seconds before sending the aforementioned TERM and KILL.

# File lib/resque_admin/worker.rb, line 520
def new_kill_child
  if @child
    unless child_already_exited?
      if pre_shutdown_timeout && pre_shutdown_timeout > 0.0
        log_with_severity :debug, "Waiting #{pre_shutdown_timeout.to_f}s for child process to exit"
        return if wait_for_child_exit(pre_shutdown_timeout)
      end

      log_with_severity :debug, "Sending TERM signal to child #{@child}"
      Process.kill("TERM", @child)

      if wait_for_child_exit(term_timeout)
        return
      else
        log_with_severity :debug, "Sending KILL signal to child #{@child}"
        Process.kill("KILL", @child)
      end
    else
      log_with_severity :debug, "Child #{@child} already quit."
    end
  end
rescue SystemCallError
  log_with_severity :error, "Child #{@child} already quit and reaped."
end
pause_processing() click to toggle source

Stop processing jobs after the current one has completed (if we're currently running one).

# File lib/resque_admin/worker.rb, line 564
def pause_processing
  log_with_severity :info, "USR2 received; pausing job processing"
  run_hook :before_pause, self
  @paused = true
end
paused?() click to toggle source

are we paused?

# File lib/resque_admin/worker.rb, line 558
def paused?
  @paused
end
perform(job) { |job| ... } click to toggle source

Processes a given job in the child.

# File lib/resque_admin/worker.rb, line 295
def perform(job)
  begin
    if fork_per_job?
      reconnect
      run_hook :after_fork, job
    end
    job.perform
  rescue Object => e
    report_failed_job(job,e)
  else
    log_with_severity :info, "done: #{job.inspect}"
  ensure
    yield job if block_given?
  end
end
pid() click to toggle source

Returns Integer PID of running worker

# File lib/resque_admin/worker.rb, line 788
def pid
  @pid ||= Process.pid
end
prepare() click to toggle source

Daemonizes the worker if ENV is set and writes the process id to ENV if set. Should only be called once per worker.

# File lib/resque_admin/worker.rb, line 163
def prepare
  if ENV['BACKGROUND']
    Process.daemon(true)
  end

  if ENV['PIDFILE']
    File.open(ENV['PIDFILE'], 'w') { |f| f << pid }
  end

  self.reconnect if ENV['BACKGROUND']
end
process(job = nil, &block) click to toggle source

DEPRECATED. Processes a single job. If none is given, it will try to produce one. Usually run in the child.

# File lib/resque_admin/worker.rb, line 268
def process(job = nil, &block)
  return unless job ||= reserve

  job.worker = self
  working_on job
  perform(job, &block)
ensure
  done_working
end
processed() click to toggle source

How many jobs has this worker processed? Returns an int.

# File lib/resque_admin/worker.rb, line 706
def processed
  Stat["processed:#{self}"]
end
processed!() click to toggle source

Tell Redis we've processed a job.

# File lib/resque_admin/worker.rb, line 711
def processed!
  Stat << "processed"
  Stat << "processed:#{self}"
end
processing(reload = true)
Alias for: job
procline(string) click to toggle source

Given a string, sets the procline ($0) and logs. Procline is always in the format of:

RESQUE_PROCLINE_PREFIXresque-VERSION: STRING
# File lib/resque_admin/worker.rb, line 836
def procline(string)
  $0 = "#{ENV['RESQUE_PROCLINE_PREFIX']}resque_admin-#{ResqueAdmin::Version}: #{string}"
  log_with_severity :debug, $0
end
prune_dead_workers() click to toggle source

Looks for any workers which should be running on this server and, if they're not, removes them from Redis.

This is a form of garbage collection. If a server is killed by a hard shutdown, power failure, or something else beyond our control, the ResqueAdmin workers will not die gracefully and therefore will leave stale state information in Redis.

By checking the current Redis state against the actual environment, we can determine if Redis is old and clean it up a bit.

# File lib/resque_admin/worker.rb, line 587
def prune_dead_workers
  all_workers = Worker.all

  unless all_workers.empty?
    known_workers = worker_pids
    all_workers_with_expired_heartbeats = Worker.all_workers_with_expired_heartbeats
  end

  all_workers.each do |worker|
    # If the worker hasn't sent a heartbeat, remove it from the registry.
    #
    # If the worker hasn't ever sent a heartbeat, we won't remove it since
    # the first heartbeat is sent before the worker is registred it means
    # that this is a worker that doesn't support heartbeats, e.g., another
    # client library or an older version of ResqueAdmin. We won't touch these.
    if all_workers_with_expired_heartbeats.include?(worker)
      log_with_severity :info, "Pruning dead worker: #{worker}"
      worker.unregister_worker(PruneDeadWorkerDirtyExit.new(worker.to_s))
      next
    end

    host, pid, worker_queues_raw = worker.id.split(':')
    worker_queues = worker_queues_raw.split(",")
    unless @queues.include?("*") || (worker_queues.to_set == @queues.to_set)
      # If the worker we are trying to prune does not belong to the queues
      # we are listening to, we should not touch it.
      # Attempt to prune a worker from different queues may easily result in
      # an unknown class exception, since that worker could easily be even
      # written in different language.
      next
    end

    next unless host == hostname
    next if known_workers.include?(pid)

    log_with_severity :debug, "Pruning dead worker: #{worker}"
    worker.unregister_worker
  end
end
queues() click to toggle source

Returns a list of queues to use when searching for a job. A splat (“*”) means you want every queue (in alpha order) - this can be useful for dynamically adding new queues.

# File lib/resque_admin/worker.rb, line 197
def queues
  return @static_queues if @static_queues
  @queues.map { |queue| glob_match(queue) }.flatten.uniq
end
queues=(queues) click to toggle source
# File lib/resque_admin/worker.rb, line 175
def queues=(queues)
  queues = queues.empty? ? (ENV["QUEUES"] || ENV['QUEUE']).to_s.split(',') : queues
  @queues = queues.map { |queue| queue.to_s.strip }
  unless ['*', '?', '{', '}', '[', ']'].any? {|char| @queues.join.include?(char) }
    @static_queues = @queues.flatten.uniq
  end
  validate_queues
end
reconnect() click to toggle source

Reconnect to Redis to avoid sharing a connection with the parent, retry up to 3 times with increasing delay before giving up.

# File lib/resque_admin/worker.rb, line 331
def reconnect
  tries = 0
  begin
    data_store.reconnect
  rescue Redis::BaseConnectionError
    if (tries += 1) <= 3
      log_with_severity :error, "Error reconnecting to Redis; retrying"
      sleep(tries)
      retry
    else
      log_with_severity :error, "Error reconnecting to Redis; quitting"
      raise
    end
  end
end
redis() click to toggle source
# File lib/resque_admin/worker.rb, line 24
def redis
  ResqueAdmin.redis
end
Also aliased as: data_store
register_signal_handlers() click to toggle source

Registers the various signal handlers a worker responds to.

TERM: Shutdown immediately, stop processing jobs.

INT: Shutdown immediately, stop processing jobs.

QUIT: Shutdown after the current job has finished processing. USR1: Kill the forked child immediately, continue processing jobs. USR2: Don't process any new jobs CONT: Start processing jobs again after a USR2

# File lib/resque_admin/worker.rb, line 379
def register_signal_handlers
  trap('TERM') { graceful_term ? shutdown : shutdown!  }
  trap('INT')  { shutdown!  }

  begin
    trap('QUIT') { shutdown   }
    if term_child
      trap('USR1') { new_kill_child }
    else
      trap('USR1') { kill_child }
    end
    trap('USR2') { pause_processing }
    trap('CONT') { unpause_processing }
  rescue ArgumentError
    log_with_severity :warn, "Signals QUIT, USR1, USR2, and/or CONT not supported."
  end

  log_with_severity :debug, "Registered signals"
end
register_worker() click to toggle source

Registers ourself as a worker. Useful when entering the worker lifecycle on startup.

# File lib/resque_admin/worker.rb, line 629
def register_worker
  data_store.register_worker(self)
end
remove_heartbeat() click to toggle source
# File lib/resque_admin/worker.rb, line 467
def remove_heartbeat
  data_store.remove_heartbeat(self)
end
report_failed_job(job,exception) click to toggle source

Reports the exception and marks the job as failed

# File lib/resque_admin/worker.rb, line 279
def report_failed_job(job,exception)
  log_with_severity :error, "#{job.inspect} failed: #{exception.inspect}"
  begin
    job.fail(exception)
  rescue Object => exception
    log_with_severity :error, "Received exception when reporting failure: #{exception.inspect}"
  end
  begin
    failed!
  rescue Object => exception
    log_with_severity :error, "Received exception when increasing failed jobs counter (redis issue) : #{exception.inspect}"
  end
end
reserve() click to toggle source

Attempts to grab a job off one of the provided queues. Returns nil if no job can be found.

# File lib/resque_admin/worker.rb, line 313
def reserve
  queues.each do |queue|
    log_with_severity :debug, "Checking #{queue}"
    if job = ResqueAdmin.reserve(queue)
      log_with_severity :debug, "Found job on #{queue}"
      return job
    end
  end

  nil
rescue Exception => e
  log_with_severity :error, "Error reserving job: #{e.inspect}"
  log_with_severity :error, e.backtrace.join("\n")
  raise e
end
run_hook(name, *args) click to toggle source

Runs a named hook, passing along any arguments.

# File lib/resque_admin/worker.rb, line 634
def run_hook(name, *args)
  return unless hooks = ResqueAdmin.send(name)
  return if name == :before_first_fork && @before_first_fork_hook_ran
  msg = "Running #{name} hooks"
  msg << " with #{args.inspect}" if args.any?
  log_with_severity :info, msg

  hooks.each do |hook|
    args.any? ? hook.call(*args) : hook.call
    @before_first_fork_hook_ran = true if name == :before_first_fork
  end
end
shutdown() click to toggle source

Schedule this worker for shutdown. Will finish processing the current job.

# File lib/resque_admin/worker.rb, line 420
def shutdown
  log_with_severity :info, 'Exiting...'
  @shutdown = true
end
shutdown!() click to toggle source

Kill the child and shutdown immediately. If not forking, abort this process.

# File lib/resque_admin/worker.rb, line 427
def shutdown!
  shutdown
  if term_child
    if fork_per_job?
      new_kill_child
    else
      # Raise TermException in the same process
      trap('TERM') do
        # ignore subsequent terms
      end
      raise TermException.new("SIGTERM")
    end
  else
    kill_child
  end
end
shutdown?() click to toggle source

Should this worker shutdown as soon as current job is finished?

# File lib/resque_admin/worker.rb, line 445
def shutdown?
  @shutdown
end
solaris_worker_pids() click to toggle source

Find ResqueAdmin worker pids on Solaris.

Returns an Array of string pids of all the other workers on this machine. Useful when pruning dead workers on startup.

# File lib/resque_admin/worker.rb, line 823
def solaris_worker_pids
  `ps -A -o pid,comm | grep "[r]uby" | grep -v "resque_admin-web"`.split("\n").map do |line|
    real_pid = line.split(' ')[0]
    pargs_command = `pargs -a #{real_pid} 2>/dev/null | grep [r]esque | grep -v "resque_admin-web"`
    if pargs_command.split(':')[1] == " resque_admin-#{ResqueAdmin::Version}"
      real_pid
    end
  end.compact
end
start_heartbeat() click to toggle source
# File lib/resque_admin/worker.rb, line 499
def start_heartbeat
  remove_heartbeat

  @heartbeat_thread_signal = ResqueAdmin::ThreadSignal.new

  @heartbeat_thread = Thread.new do
    loop do
      heartbeat!
      signaled = @heartbeat_thread_signal.wait_for_signal(ResqueAdmin.heartbeat_interval)
      break if signaled
    end
  end

  @@all_heartbeat_threads << @heartbeat_thread
end
started() click to toggle source

What time did this worker start? Returns an instance of `Time`

# File lib/resque_admin/worker.rb, line 728
def started
  data_store.worker_start_time(self)
end
started!() click to toggle source

Tell Redis we've started

# File lib/resque_admin/worker.rb, line 733
def started!
  data_store.worker_started(self)
end
startup() click to toggle source

Runs all the methods needed when a worker begins its lifecycle.

# File lib/resque_admin/worker.rb, line 348
def startup
  $0 = "resque_admin: Starting"

  enable_gc_optimizations
  register_signal_handlers
  start_heartbeat
  prune_dead_workers
  run_hook :before_first_fork
  register_worker

  # Fix buffering so we can `rake resque_admin:work > resque_admin.log` and
  # get output from the child in there.
  $stdout.sync = true
end
state() click to toggle source

Returns a symbol representing the current worker state, which can be either :working or :idle

# File lib/resque_admin/worker.rb, line 762
def state
  data_store.get_worker_payload(self) ? :working : :idle
end
to_s() click to toggle source

The string representation is the same as the id for this worker instance. Can be used with `Worker.find`.

# File lib/resque_admin/worker.rb, line 777
def to_s
  @to_s ||= "#{hostname}:#{pid}:#{@queues.join(',')}"
end
Also aliased as: id
unpause_processing() click to toggle source

Start processing jobs again after a pause

# File lib/resque_admin/worker.rb, line 571
def unpause_processing
  log_with_severity :info, "CONT received; resuming job processing"
  @paused = false
  run_hook :after_pause, self
end
unregister_signal_handlers() click to toggle source
# File lib/resque_admin/worker.rb, line 399
def unregister_signal_handlers
  trap('TERM') do
    trap('TERM') do
      # Ignore subsequent term signals
    end

    raise TermException.new("SIGTERM")
  end

  trap('INT', 'DEFAULT')

  begin
    trap('QUIT', 'DEFAULT')
    trap('USR1', 'DEFAULT')
    trap('USR2', 'DEFAULT')
  rescue ArgumentError
  end
end
unregister_worker(exception = nil) click to toggle source

Unregisters ourself as a worker. Useful when shutting down.

# File lib/resque_admin/worker.rb, line 655
def unregister_worker(exception = nil)
  # If we're still processing a job, make sure it gets logged as a
  # failure.
  if (hash = processing) && !hash.empty?
    job = Job.new(hash['queue'], hash['payload'])
    # Ensure the proper worker is attached to this job, even if
    # it's not the precise instance that died.
    job.worker = self
    begin
      job.fail(exception || DirtyExit.new("Job still being processed"))
    rescue RuntimeError => e
      log_with_severity :error, e.message
    end
  end

  kill_background_threads

  data_store.unregister_worker(self) do
    Stat.clear("processed:#{self}")
    Stat.clear("failed:#{self}")
  end
rescue Exception => exception_while_unregistering
  message = exception_while_unregistering.message
  if exception
    message += "\nOriginal Exception (#{exception.class}): #{exception.message}"
    message += "\n  #{exception.backtrace.join("  \n")}" if exception.backtrace
  end
  fail(exception_while_unregistering.class,
       message,
       exception_while_unregistering.backtrace)
end
validate_queues() click to toggle source

A worker must be given a queue, otherwise it won't know what to do with itself.

You probably never need to call this.

# File lib/resque_admin/worker.rb, line 188
def validate_queues
  if @queues.nil? || @queues.empty?
    raise NoQueueError.new("Please give each worker at least one queue.")
  end
end
verbose() click to toggle source
# File lib/resque_admin/worker.rb, line 850
def verbose
  @verbose
end
verbose=(value) click to toggle source
# File lib/resque_admin/worker.rb, line 858
def verbose=(value);
  if value && !very_verbose
    ResqueAdmin.logger.formatter = VerboseFormatter.new
    ResqueAdmin.logger.level = Logger::INFO
  elsif !value
    ResqueAdmin.logger.formatter = QuietFormatter.new
  end

  @verbose = value
end
very_verbose() click to toggle source
# File lib/resque_admin/worker.rb, line 854
def very_verbose
  @very_verbose
end
very_verbose=(value) click to toggle source
# File lib/resque_admin/worker.rb, line 869
def very_verbose=(value)
  if value
    ResqueAdmin.logger.formatter = VeryVerboseFormatter.new
    ResqueAdmin.logger.level = Logger::DEBUG
  elsif !value && verbose
    ResqueAdmin.logger.formatter = VerboseFormatter.new
    ResqueAdmin.logger.level = Logger::INFO
  else
    ResqueAdmin.logger.formatter = QuietFormatter.new
  end

  @very_verbose = value
end
wait_for_child_exit(timeout) click to toggle source
# File lib/resque_admin/worker.rb, line 549
def wait_for_child_exit(timeout)
  (timeout * 10).round.times do |i|
    sleep(0.1)
    return true if child_already_exited?
  end
  false
end
windows_worker_pids() click to toggle source

Returns an Array of string pids of all the other workers on this machine. Useful when pruning dead workers on startup.

# File lib/resque_admin/worker.rb, line 806
def windows_worker_pids
  tasklist_output = `tasklist /FI "IMAGENAME eq ruby.exe" /FO list`.encode("UTF-8", Encoding.locale_charmap)
  tasklist_output.split($/).select { |line| line =~ /^PID:/}.collect{ |line| line.gsub /PID:\s+/, '' }
end
work(interval = 5.0, &block) click to toggle source

This is the main workhorse method. Called on a Worker instance, it begins the worker life cycle.

The following events occur during a worker's life cycle:

  1. Startup: Signals are registered, dead workers are pruned,

    and this worker is registered.
  2. Work loop: Jobs are pulled from a queue and processed.

  3. Teardown: This worker is unregistered.

Can be passed a float representing the polling frequency. The default is 5 seconds, but for a semi-active site you may want to use a smaller value.

Also accepts a block which will be passed the job as soon as it has completed processing. Useful for testing.

# File lib/resque_admin/worker.rb, line 224
def work(interval = 5.0, &block)
  interval = Float(interval)
  startup

  loop do
    break if shutdown?

    unless work_one_job(&block)
      break if interval.zero?
      log_with_severity :debug, "Sleeping for #{interval} seconds"
      procline paused? ? "Paused" : "Waiting for #{queues.join(',')}"
      sleep interval
    end
  end

  unregister_worker
rescue Exception => exception
  return if exception.class == SystemExit && !@child && run_at_exit_hooks
  log_with_severity :error, "Failed to start worker : #{exception.inspect}"
  unregister_worker(exception)
end
work_one_job(job = nil, &block) click to toggle source
# File lib/resque_admin/worker.rb, line 246
def work_one_job(job = nil, &block)
  return false if paused?
  return false unless job ||= reserve

  working_on job
  procline "Processing #{job.queue} since #{Time.now.to_i} [#{job.payload_class_name}]"

  log_with_severity :info, "got: #{job.inspect}"
  job.worker = self

  if fork_per_job?
    perform_with_fork(job, &block)
  else
    perform(job, &block)
  end

  done_working
  true
end
worker_pids() click to toggle source

Returns an Array of string pids of all the other workers on this machine. Useful when pruning dead workers on startup.

# File lib/resque_admin/worker.rb, line 794
def worker_pids
  if RUBY_PLATFORM =~ /solaris/
    solaris_worker_pids
  elsif RUBY_PLATFORM =~ /mingw32/
    windows_worker_pids
  else
    linux_worker_pids
  end
end
working?() click to toggle source

Boolean - true if working, false if not

# File lib/resque_admin/worker.rb, line 746
def working?
  state == :working
end
working_on(job) click to toggle source

Given a job, tells Redis we're working on it. Useful for seeing what workers are doing and when.

# File lib/resque_admin/worker.rb, line 689
def working_on(job)
  data = encode \
    :queue   => job.queue,
    :run_at  => Time.now.utc.iso8601,
    :payload => job.payload
  data_store.set_worker_payload(self,data)
end

Private Instance Methods

log_with_severity(severity, message) click to toggle source
# File lib/resque_admin/worker.rb, line 913
def log_with_severity(severity, message)
  Logging.log(severity, message)
end
perform_with_fork(job, &block) click to toggle source
# File lib/resque_admin/worker.rb, line 885
def perform_with_fork(job, &block)
  run_hook :before_fork, job

  begin
    @child = fork do
      unregister_signal_handlers if term_child
      perform(job, &block)
      exit! unless run_at_exit_hooks
    end
  rescue NotImplementedError
    @fork_per_job = false
    perform(job, &block)
    return
  end

  srand # Reseeding
  procline "Forked #{@child} at #{Time.now.to_i}"

  begin
    Process.waitpid(@child)
  rescue SystemCallError
    nil
  end

  job.fail(DirtyExit.new("Child process received unhandled signal #{$?.stopsig}", $?)) if $?.signaled?
  @child = nil
end