module Resque::Plugins::State
Resque::Plugins::State
is a module your jobs will include. It provides helper methods for updating the status/etc from within an instance as well as class methods for creating and queuing the jobs.
All you have to do to get this functionality is include Resque::Plugins::State
and then implement a <tt>perform<tt> method.
For example
class ExampleJob include Resque::Plugins::State def perform num = options['num'] i = 0 while i < num i += 1 at(i, num) end completed("Finished!") end end
This job would iterate num times updating the status as it goes. At the end we update the status telling anyone listening to this job that its complete.
Constants
- STATUSES
- STATUS_COMPLETED
- STATUS_FAILED
- STATUS_KILLED
- STATUS_PAUSED
- STATUS_QUEUED
- STATUS_REVERTED
- STATUS_REVERTING
- STATUS_WAITING
- STATUS_WORKING
- VERSION
Attributes
Public Class Methods
# File lib/resque/plugins/state.rb, line 65 def self.included(base) base.extend(ClassMethods) end
Create a new instance with uuid
and options
# File lib/resque/plugins/state.rb, line 164 def initialize(uuid, options = {}) @reverting = false @uuid = uuid @options = options @logger = Resque.logger end
Public Instance Methods
set the status of the job for the current itteration. num
and total
are passed to the status as well as any messages. This will kill the job if it has been added to the kill list with Resque::Plugins::State::Hash.kill()
# File lib/resque/plugins/state.rb, line 247 def at(num, total, *messages) if total.to_f <= 0.0 raise(NotANumber, "Called at() with total=#{total} which is not a number") end tick({ 'num' => num, 'total' => total }, *messages) end
set the status to 'completed' passing along any addional messages
# File lib/resque/plugins/state.rb, line 290 def completed(*messages) job_status({ 'status' => STATUS_COMPLETED, 'message' => "Completed at #{Time.now}" }, *messages) @logger.info("Job #{@uuid}: #{messages.join(' ')}") end
set the status to 'failed' passing along any additional messages
# File lib/resque/plugins/state.rb, line 278 def failed(*messages) job_status({ 'status' => STATUS_FAILED }, *messages) @logger.error("Job #{@uuid}: #{messages.join(' ')}") end
kill the current job, setting the status to 'killed' and raising Killed
# File lib/resque/plugins/state.rb, line 300 def kill! messages = ["Killed at #{Time.now}"] job_status('status' => STATUS_KILLED, 'message' => messages[0]) @logger.error("Job #{@uuid}: #{messages.join(' ')}") raise Killed end
lock against a provided or automatic key to prevent duplicate jobs
# File lib/resque/plugins/state.rb, line 344 def lock!(key = nil) lock = Digest::SHA1.hexdigest @options.to_json lock = key if key if locked?(lock) messages = ["Waiting at #{Time.now} due to existing job"] job_status('status' => STATUS_WAITING, 'message' => messages[0]) while locked?(lock) kill! if should_kill? pause! if should_pause? sleep 10 end else Resque::Plugins::State::Hash.lock(lock) end end
Checks against the lock list if this specific job instance should wait before starting
# File lib/resque/plugins/state.rb, line 239 def locked?(key) Resque::Plugins::State::Hash.locked?(key) end
# File lib/resque/plugins/state.rb, line 215 def name "#{self.class.name}(#{options.inspect unless options.empty?})" end
pause the current job, setting the status to 'paused' and sleeping 10 seconds
# File lib/resque/plugins/state.rb, line 329 def pause!(pause_text = nil) Resque::Plugins::State::Hash.pause(uuid) messages = ["Paused at #{Time.now} #{pause_text}"] job_status('status' => STATUS_PAUSED, 'message' => messages[0]) raise Killed if @testing # Don't loop or complete during testing @logger.info("Job #{@uuid}: #{messages.join(' ')}") while should_pause? kill! if should_kill? revert! if should_revert? sleep 10 end end
revert the current job, setting the status to 'reverting' and raising Revert
# File lib/resque/plugins/state.rb, line 310 def revert! if respond_to?(:on_revert) messages = ["Reverting at #{Time.now}"] Resque::Plugins::State::Hash.unpause(uuid) if should_pause? @reverting = true job_status('status' => STATUS_REVERTING, 'message' => messages[0]) @logger.info("Job #{@uuid}: #{messages.join(' ')}") raise Revert else @logger.error("Job #{@uuid}: Attempted revert on job with no revert"\ " support") Resque::Plugins::State::Hash.no_revert(@uuid) pause!('This job does not support revert functionality') end end
set the status to 'reverted' passing along any additional messages
# File lib/resque/plugins/state.rb, line 284 def reverted(*messages) job_status({ 'status' => STATUS_REVERTED }, *messages) @logger.error("Job #{@uuid}: #{messages.join(' ')}") end
Run by the Resque::Worker when processing this job. It wraps the perform
method ensuring that the final status of the job is set regardless of error. If an error occurs within the job's work, it will set the status as failed and re-raise the error.
# File lib/resque/plugins/state.rb, line 175 def safe_perform! job_status('status' => STATUS_WORKING) messages = ['Job starting'] @logger.info("#{@uuid}: #{messages.join(' ')}") perform if status && status.failed? on_failure(status.message) if respond_to?(:on_failure) return elsif status && !status.completed? completed end on_success if respond_to?(:on_success) rescue Killed Resque::Plugins::State::Hash.killed(uuid) on_killed if respond_to?(:on_killed) rescue Revert Resque::Plugins::State::Hash.revert(uuid) on_revert messages = ["Reverted at #{Time.now}"] job_status('status' => STATUS_REVERTED, 'message' => messages[0]) rescue => e messages = ["Failed at #{Time.now}: #{e.message}"] @logger.error("Job #{@uuid}: #{messages.join(' ')}") failed("The task failed because of an error: #{e}") raise e unless respond_to?(:on_failure) on_failure(e) end
Checks against the kill list if this specific job instance should be killed on the next iteration
# File lib/resque/plugins/state.rb, line 221 def should_kill? Resque::Plugins::State::Hash.should_kill?(uuid) end
Checks against the pause list if this specific job instance should be paused on the next iteration
# File lib/resque/plugins/state.rb, line 227 def should_pause? Resque::Plugins::State::Hash.should_pause?(uuid) end
Checks against the revert list if this specific job instance should be paused on the next iteration
# File lib/resque/plugins/state.rb, line 233 def should_revert? Resque::Plugins::State::Hash.should_revert?(uuid) end
get the Resque::Plugins::State::Hash
object for the current uuid
# File lib/resque/plugins/state.rb, line 211 def status Resque::Plugins::State::Hash.get(uuid) end
Set the jobs status. Can take an array of strings or hashes that are merged (in order) into a final status hash.
# File lib/resque/plugins/state.rb, line 206 def status=(new_status) Resque::Plugins::State::Hash.set(uuid, *new_status) end
sets the status of the job for the current itteration. You should use the at
method if you have actual numbers to track the iteration count. This will kill or pause the job if it has been added to either list with Resque::Plugins::State::Hash.pause()
or Resque::Plugins::State::Hash.kill()
respectively
# File lib/resque/plugins/state.rb, line 263 def tick(*messages) kill! if should_kill? if should_pause? pause! elsif should_revert? return revert! unless @reverting job_status({ 'status' => STATUS_REVERTING }, *messages) @logger.info("Job #{@uuid}: #{messages.join(' ')}") else job_status({ 'status' => STATUS_WORKING }, *messages) @logger.info("Job #{@uuid}: #{messages.join(' ')}") end end
unlock the provided or automatic key at the end of a job
# File lib/resque/plugins/state.rb, line 362 def unlock!(key = nil) lock = Digest::SHA1.hexdigest @options.to_json lock = key if key Resque::Plugins::State::Hash.unlock(lock) end
Private Instance Methods
# File lib/resque/plugins/state.rb, line 370 def job_status(*args) self.status = [status, { 'name' => name }, args].flatten end