module Resque::Plugins::State

Resque::Plugins::State is a module your jobs will include. It provides helper methods for updating the status/etc from within an instance as well as class methods for creating and queuing the jobs.

All you have to do to get this functionality is include Resque::Plugins::State and then implement a <tt>perform<tt> method.

For example

class ExampleJob
  include Resque::Plugins::State

  def perform
    num = options['num']
    i = 0
    while i < num
      i += 1
      at(i, num)
    end
    completed("Finished!")
  end

end

This job would iterate num times updating the status as it goes. At the end we update the status telling anyone listening to this job that its complete.

Constants

STATUSES
STATUS_COMPLETED
STATUS_FAILED
STATUS_KILLED
STATUS_PAUSED
STATUS_QUEUED
STATUS_REVERTED
STATUS_REVERTING
STATUS_WAITING
STATUS_WORKING
VERSION

Attributes

options[R]
uuid[R]

Public Class Methods

included(base) click to toggle source
# File lib/resque/plugins/state.rb, line 65
def self.included(base)
  base.extend(ClassMethods)
end
new(uuid, options = {}) click to toggle source

Create a new instance with uuid and options

# File lib/resque/plugins/state.rb, line 164
def initialize(uuid, options = {})
  @reverting = false
  @uuid      = uuid
  @options   = options
  @logger    = Resque.logger
end

Public Instance Methods

at(num, total, *messages) click to toggle source

set the status of the job for the current itteration. num and total are passed to the status as well as any messages. This will kill the job if it has been added to the kill list with Resque::Plugins::State::Hash.kill()

# File lib/resque/plugins/state.rb, line 247
def at(num, total, *messages)
  if total.to_f <= 0.0
    raise(NotANumber,
          "Called at() with total=#{total} which is not a number")
  end
  tick({
         'num' => num,
         'total' => total
       }, *messages)
end
completed(*messages) click to toggle source

set the status to 'completed' passing along any addional messages

# File lib/resque/plugins/state.rb, line 290
def completed(*messages)
  job_status({
               'status' => STATUS_COMPLETED,
               'message' => "Completed at #{Time.now}"
             }, *messages)
  @logger.info("Job #{@uuid}: #{messages.join(' ')}")
end
failed(*messages) click to toggle source

set the status to 'failed' passing along any additional messages

# File lib/resque/plugins/state.rb, line 278
def failed(*messages)
  job_status({ 'status' => STATUS_FAILED }, *messages)
  @logger.error("Job #{@uuid}: #{messages.join(' ')}")
end
kill!() click to toggle source

kill the current job, setting the status to 'killed' and raising Killed

# File lib/resque/plugins/state.rb, line 300
def kill!
  messages = ["Killed at #{Time.now}"]
  job_status('status' => STATUS_KILLED,
             'message' => messages[0])
  @logger.error("Job #{@uuid}: #{messages.join(' ')}")
  raise Killed
end
lock!(key = nil) click to toggle source

lock against a provided or automatic key to prevent duplicate jobs

# File lib/resque/plugins/state.rb, line 344
def lock!(key = nil)
  lock = Digest::SHA1.hexdigest @options.to_json
  lock = key if key
  if locked?(lock)
    messages = ["Waiting at #{Time.now} due to existing job"]
    job_status('status' => STATUS_WAITING,
               'message' => messages[0])
    while locked?(lock)
      kill! if should_kill?
      pause! if should_pause?
      sleep 10
    end
  else
    Resque::Plugins::State::Hash.lock(lock)
  end
end
locked?(key) click to toggle source

Checks against the lock list if this specific job instance should wait before starting

# File lib/resque/plugins/state.rb, line 239
def locked?(key)
  Resque::Plugins::State::Hash.locked?(key)
end
name() click to toggle source
# File lib/resque/plugins/state.rb, line 215
def name
  "#{self.class.name}(#{options.inspect unless options.empty?})"
end
pause!(pause_text = nil) click to toggle source

pause the current job, setting the status to 'paused' and sleeping 10 seconds

# File lib/resque/plugins/state.rb, line 329
def pause!(pause_text = nil)
  Resque::Plugins::State::Hash.pause(uuid)
  messages = ["Paused at #{Time.now} #{pause_text}"]
  job_status('status' => STATUS_PAUSED,
             'message' => messages[0])
  raise Killed if @testing # Don't loop or complete during testing
  @logger.info("Job #{@uuid}: #{messages.join(' ')}")
  while should_pause?
    kill! if should_kill?
    revert! if should_revert?
    sleep 10
  end
end
revert!() click to toggle source

revert the current job, setting the status to 'reverting' and raising Revert

# File lib/resque/plugins/state.rb, line 310
def revert!
  if respond_to?(:on_revert)
    messages = ["Reverting at #{Time.now}"]
    Resque::Plugins::State::Hash.unpause(uuid) if should_pause?
    @reverting = true
    job_status('status' => STATUS_REVERTING,
               'message' => messages[0])
    @logger.info("Job #{@uuid}: #{messages.join(' ')}")
    raise Revert
  else
    @logger.error("Job #{@uuid}: Attempted revert on job with no revert"\
                  " support")
    Resque::Plugins::State::Hash.no_revert(@uuid)
    pause!('This job does not support revert functionality')
  end
end
reverted(*messages) click to toggle source

set the status to 'reverted' passing along any additional messages

# File lib/resque/plugins/state.rb, line 284
def reverted(*messages)
  job_status({ 'status' => STATUS_REVERTED }, *messages)
  @logger.error("Job #{@uuid}: #{messages.join(' ')}")
end
safe_perform!() click to toggle source

Run by the Resque::Worker when processing this job. It wraps the perform method ensuring that the final status of the job is set regardless of error. If an error occurs within the job's work, it will set the status as failed and re-raise the error.

# File lib/resque/plugins/state.rb, line 175
def safe_perform!
  job_status('status' => STATUS_WORKING)
  messages = ['Job starting']
  @logger.info("#{@uuid}: #{messages.join(' ')}")
  perform
  if status && status.failed?
    on_failure(status.message) if respond_to?(:on_failure)
    return
  elsif status && !status.completed?
    completed
  end
  on_success if respond_to?(:on_success)
rescue Killed
  Resque::Plugins::State::Hash.killed(uuid)
  on_killed if respond_to?(:on_killed)
rescue Revert
  Resque::Plugins::State::Hash.revert(uuid)
  on_revert
  messages = ["Reverted at #{Time.now}"]
  job_status('status' => STATUS_REVERTED,
             'message' => messages[0])
rescue => e
  messages = ["Failed at #{Time.now}: #{e.message}"]
  @logger.error("Job #{@uuid}: #{messages.join(' ')}")
  failed("The task failed because of an error: #{e}")
  raise e unless respond_to?(:on_failure)
  on_failure(e)
end
should_kill?() click to toggle source

Checks against the kill list if this specific job instance should be killed on the next iteration

# File lib/resque/plugins/state.rb, line 221
def should_kill?
  Resque::Plugins::State::Hash.should_kill?(uuid)
end
should_pause?() click to toggle source

Checks against the pause list if this specific job instance should be paused on the next iteration

# File lib/resque/plugins/state.rb, line 227
def should_pause?
  Resque::Plugins::State::Hash.should_pause?(uuid)
end
should_revert?() click to toggle source

Checks against the revert list if this specific job instance should be paused on the next iteration

# File lib/resque/plugins/state.rb, line 233
def should_revert?
  Resque::Plugins::State::Hash.should_revert?(uuid)
end
status() click to toggle source

get the Resque::Plugins::State::Hash object for the current uuid

# File lib/resque/plugins/state.rb, line 211
def status
  Resque::Plugins::State::Hash.get(uuid)
end
status=(new_status) click to toggle source

Set the jobs status. Can take an array of strings or hashes that are merged (in order) into a final status hash.

# File lib/resque/plugins/state.rb, line 206
def status=(new_status)
  Resque::Plugins::State::Hash.set(uuid, *new_status)
end
tick(*messages) click to toggle source

sets the status of the job for the current itteration. You should use the at method if you have actual numbers to track the iteration count. This will kill or pause the job if it has been added to either list with Resque::Plugins::State::Hash.pause() or Resque::Plugins::State::Hash.kill() respectively

# File lib/resque/plugins/state.rb, line 263
def tick(*messages)
  kill! if should_kill?
  if should_pause?
    pause!
  elsif should_revert?
    return revert! unless @reverting
    job_status({ 'status' => STATUS_REVERTING }, *messages)
    @logger.info("Job #{@uuid}: #{messages.join(' ')}")
  else
    job_status({ 'status' => STATUS_WORKING }, *messages)
    @logger.info("Job #{@uuid}: #{messages.join(' ')}")
  end
end
unlock!(key = nil) click to toggle source

unlock the provided or automatic key at the end of a job

# File lib/resque/plugins/state.rb, line 362
def unlock!(key = nil)
  lock = Digest::SHA1.hexdigest @options.to_json
  lock = key if key
  Resque::Plugins::State::Hash.unlock(lock)
end

Private Instance Methods

job_status(*args) click to toggle source
# File lib/resque/plugins/state.rb, line 370
def job_status(*args)
  self.status = [status, { 'name' => name }, args].flatten
end