class SimpleWorker::Runner

Runner.run(redis, tasks, opts)

where tasks is an Array of strings and ‘opts’ is a Hash of options:

:namespace    => String prefix to keys in redis used by SimpleWorker (default: simpleworker)
:timeout      => Fixnum max allowed time between events (default: 30 seconds)
:task_timeout => Fixnum max time allowed for a task to take (default: 10 seconds)
:interval     => Fixnum interval at which SimpleWorker checks the status of all tasks (default: 5 seconds)
:notify       =>        Array[AbstractListener] objects implementing the AbstractListener API
:max_retries  => Fixnum number of times expired tasks will be retried (default: 0)

Constants

DEFAULT_OPTIONS

Public Class Methods

new(redis, tasks, opts = {}) click to toggle source
# File lib/simpleworker/runner.rb, line 28
def initialize(redis, tasks, opts = {})
  opts = DEFAULT_OPTIONS.dup.merge(opts)

  @redis      = redis
  @jobid      = SecureRandom.hex(6)
  @namespace  = opts[:namespace]
  @timeout    = opts[:timeout]
  @interval   = opts[:interval]
  max_retries = opts[:max_retries]
  listeners   = Array(opts[:notify])

  STDERR.puts 'WARNING: to prevent a race condition :timeout should be > :task_timeout' if @timeout < opts[:task_timeout]
  load_lua_scripts
  @redis.set(config_key, {'task_timeout' => opts[:task_timeout]}.to_json)
  @redis.rpush(tasks_key, tasks)

  if opts[:log]
    listeners << LoggingListener.new
  end

  @event_server = EventServer.new(redis, namespace, jobid)
  @event_monitor = EventMonitor.new
  listeners << @event_monitor

  @retry_listener = RetryListener.new(redis, max_retries, namespace, jobid)
  listeners << @retry_listener

  listeners.each do |listener|
    add_observer listener
    @event_server.add_observer listener
  end
end
run(redis, tasks, opts = {}) click to toggle source
# File lib/simpleworker/runner.rb, line 61
def self.run(redis, tasks, opts = {})
  new(redis, tasks, opts).run
end

Public Instance Methods

run() click to toggle source
# File lib/simpleworker/runner.rb, line 65
def run
  start
  process
  stop
rescue Interrupt
  fire 'on_interrupted'
  stop
rescue StandardError => e
  fire 'on_interrupted'
  stop
  raise e
end

Private Instance Methods

fire(*args) click to toggle source
# File lib/simpleworker/runner.rb, line 113
def fire(*args)
  changed
  notify_observers *args
end
process() click to toggle source
# File lib/simpleworker/runner.rb, line 84
def process
  remaining_tasks = @event_server.pull_events

  until @event_monitor.done? remaining_tasks
    sleep @interval

    remaining_tasks = @event_server.pull_events

    current_time = Time.now
    if (current_time - @event_monitor.latest_time) > @timeout
      fire 'on_timeout'
      break
    end
  end
end
start() click to toggle source
# File lib/simpleworker/runner.rb, line 80
def start
  fire('on_start', @jobid)
end
stop() click to toggle source
# File lib/simpleworker/runner.rb, line 100
def stop
  fire 'on_stop'

  @redis.multi do
    @redis.del tasks_key
    @redis.del active_tasks_key
    @redis.del log_key
    @redis.del config_key
  end
end