class Resqued::Config::Worker

A config handler that builds workers.

No worker processes are spawned by this class.

Public Class Methods

new(options = {}) click to toggle source

Public.

# File lib/resqued/config/worker.rb, line 11
def initialize(options = {})
  options = options.dup
  @worker_class = options.delete(:worker_class) || Resqued::Worker
  @worker_options = options
  @workers = []
end

Public Instance Methods

queue(*queues) click to toggle source

DSL: Define a queue for the worker_pool to work from.

queue 'one'
queue '*'
queue 'four-a', 'four-b', :percent => 10
queue 'five', :count => 5
# File lib/resqued/config/worker.rb, line 53
def queue(*queues)
  options = queues.last.is_a?(Hash) ? queues.pop : {}
  concurrency =
    case options
    when Hash
      if percent = options[:percent]
        percent * 0.01
      elsif count = options[:count]
        count
      else
        1.0
      end
    else
      1.0
    end
  queues.each { |queue| @pool_queues[queue] = concurrency }
end
worker(*queues) click to toggle source

DSL: Create a worker for the exact queues listed.

worker 'one', :interval => 1
# File lib/resqued/config/worker.rb, line 21
def worker(*queues)
  options = queues.last.is_a?(Hash) ? queues.pop.dup : {}
  queues = queues.flatten
  queues = ["*"] if queues.empty?
  queues = queues.shuffle if options.delete(:shuffle_queues)
  @workers << @worker_class.new(options.merge(@worker_options).merge(queues: queues))
end
worker_factory(&block) click to toggle source

DSL: Define a factory Proc used to create Resque::Workers. The factory Proc receives a list of queues as an argument.

worker_factory { |queues| Resque::Worker.new(*queues) }
# File lib/resqued/config/worker.rb, line 43
def worker_factory(&block)
  @worker_options.merge!(worker_factory: block)
end
worker_pool(count, *queues) click to toggle source

DSL: Set up a pool of workers. Define queues for the members of the pool with ‘queue`.

worker_pool 20, :interval => 1
# File lib/resqued/config/worker.rb, line 32
def worker_pool(count, *queues)
  @pool_size = count
  @pool_options = queues.last.is_a?(Hash) ? queues.pop : {}
  @pool_queues = {}
  queues.each { |q| queue q }
end

Private Instance Methods

_fixed_concurrency_queues() click to toggle source

Internal: Like @queues but with concrete fixed concurrency values. All percentage based concurrency values are converted to fixnum total number of workers that queue should run on.

# File lib/resqued/config/worker.rb, line 101
def _fixed_concurrency_queues
  @pool_queues.map { |name, concurrency| [name, _translate_concurrency_value(concurrency)] }
end
_translate_concurrency_value(value) click to toggle source

Internal: Convert a queue worker concurrency value to a fixed number of workers. This supports values that are fixed numbers as well as percentage values (between 0.0 and 1.0). The value may also be nil, in which case the maximum worker_processes value is returned.

# File lib/resqued/config/worker.rb, line 109
def _translate_concurrency_value(value)
  if value.nil?
    @pool_size
  elsif value.is_a?(1.class)
    value < @pool_size ? value : @pool_size
  elsif value.is_a?(Float) && value >= 0.0 && value <= 1.0
    [(@pool_size * value).to_i, 1].max
  else
    raise TypeError, "Unknown concurrency value: #{value.inspect}"
  end
end
build_pool_workers!() click to toggle source

Internal: Build the pool workers.

Build an array of Worker objects with queue lists configured based on the concurrency values established and the total number of workers.

# File lib/resqued/config/worker.rb, line 82
def build_pool_workers!
  return unless @pool_size

  queues = _fixed_concurrency_queues
  1.upto(@pool_size) do |worker_num|
    queue_names = queues
                  .select { |_name, concurrency| concurrency >= worker_num }
                  .map { |name, _concurrency| name }
    if queue_names.any?
      worker(queue_names, @pool_options)
    else
      worker("*", @pool_options)
    end
  end
end
results() click to toggle source
# File lib/resqued/config/worker.rb, line 73
def results
  build_pool_workers!
  @workers
end