class Worker

Public Class Methods

new(opts={}, &block) click to toggle source
# File lib/worker.rb, line 30
def initialize(opts={}, &block)
  @in = Queue.new
  @out = Queue.new
  @block = block
  @ctx = Worker::Ctx.new
  @defers = Queue.new

  @retries = 0
  @opts = opts
  run!
end

Public Instance Methods

join() click to toggle source
# File lib/worker.rb, line 79
def join
  loop do
    break if @defers.size == 0
    sleep 0.1
  end
end
perform(*args) click to toggle source
# File lib/worker.rb, line 42
def perform(*args)
  @in.push args

  ret = @out.pop
  if ret.is_a? Exception
    raise ret
  else
    ret
  end
rescue Exception => ex
  backoff = @opts.dig(:backoff) || 0.1
  backoff_max = @opts.dig(:backoff_max)
  retries_max = @opts.dig(:retry) || 0

  if @retries == retries_max
    @retries = 0
    return if @opts.dig(:raise) == false
    raise ex
  end
  @retries += 1
  sleeping = @retries * backoff
  sleeping = backoff_max if backoff_max && sleeping > backoff_max

  sleep sleeping
  retry
end
perform_async(*args) click to toggle source
# File lib/worker.rb, line 69
def perform_async(*args)
  defer = Defer.new do
    ret = perform(*args)
    @defers.pop
    ret
  end
  @defers.push defer
  defer
end
run!() click to toggle source
# File lib/worker.rb, line 86
def run!
  @thread = Thread.new do
    loop do
      ret = @ctx.instance_exec *@in.pop, &@block
      @out.push ret
    rescue Exception => ex
      @out.push ex
    end
  end
  self
end
stop!() click to toggle source
# File lib/worker.rb, line 98
def stop!
  @thread&.kill
  self
end