class SimpleWorker::Runner
Runner.run
(redis, tasks, opts)
where tasks is an Array of strings and ‘opts’ is a Hash of options:
:namespace => String prefix to keys in redis used by SimpleWorker (default: simpleworker) :timeout => Fixnum max allowed time between events (default: 30 seconds) :task_timeout => Fixnum max time allowed for a task to take (default: 10 seconds) :interval => Fixnum interval at which SimpleWorker checks the status of all tasks (default: 5 seconds) :notify => Array[AbstractListener] objects implementing the AbstractListener API :max_retries => Fixnum number of times expired tasks will be retried (default: 0)
Constants
- DEFAULT_OPTIONS
Public Class Methods
new(redis, tasks, opts = {})
click to toggle source
# File lib/simpleworker/runner.rb, line 28 def initialize(redis, tasks, opts = {}) opts = DEFAULT_OPTIONS.dup.merge(opts) @redis = redis @jobid = SecureRandom.hex(6) @namespace = opts[:namespace] @timeout = opts[:timeout] @interval = opts[:interval] max_retries = opts[:max_retries] listeners = Array(opts[:notify]) STDERR.puts 'WARNING: to prevent a race condition :timeout should be > :task_timeout' if @timeout < opts[:task_timeout] load_lua_scripts @redis.set(config_key, {'task_timeout' => opts[:task_timeout]}.to_json) @redis.rpush(tasks_key, tasks) if opts[:log] listeners << LoggingListener.new end @event_server = EventServer.new(redis, namespace, jobid) @event_monitor = EventMonitor.new listeners << @event_monitor @retry_listener = RetryListener.new(redis, max_retries, namespace, jobid) listeners << @retry_listener listeners.each do |listener| add_observer listener @event_server.add_observer listener end end
run(redis, tasks, opts = {})
click to toggle source
# File lib/simpleworker/runner.rb, line 61 def self.run(redis, tasks, opts = {}) new(redis, tasks, opts).run end
Public Instance Methods
run()
click to toggle source
# File lib/simpleworker/runner.rb, line 65 def run start process stop rescue Interrupt fire 'on_interrupted' stop rescue StandardError => e fire 'on_interrupted' stop raise e end
Private Instance Methods
fire(*args)
click to toggle source
# File lib/simpleworker/runner.rb, line 113 def fire(*args) changed notify_observers *args end
process()
click to toggle source
# File lib/simpleworker/runner.rb, line 84 def process remaining_tasks = @event_server.pull_events until @event_monitor.done? remaining_tasks sleep @interval remaining_tasks = @event_server.pull_events current_time = Time.now if (current_time - @event_monitor.latest_time) > @timeout fire 'on_timeout' break end end end
start()
click to toggle source
# File lib/simpleworker/runner.rb, line 80 def start fire('on_start', @jobid) end
stop()
click to toggle source
# File lib/simpleworker/runner.rb, line 100 def stop fire 'on_stop' @redis.multi do @redis.del tasks_key @redis.del active_tasks_key @redis.del log_key @redis.del config_key end end