class Fleiss::Worker

Attributes

queues[R]
uuid[R]
wait_time[R]

Public Class Methods

new(queues: [Fleiss::DEFAULT_QUEUE], concurrency: 10, wait_time: 1) click to toggle source

Init a new worker instance @param [ConnectionPool] disque client connection pool @param [Hash] options @option [Array<String>] :queues queues to watch. Default: [“default”] @option [Integer] :concurrency the number of concurrent pool. Default: 10 @option [Numeric] :wait_time maximum time (in seconds) to wait for jobs when retrieving next batch. Default: 1s.

# File lib/fleiss/worker.rb, line 19
def initialize(queues: [Fleiss::DEFAULT_QUEUE], concurrency: 10, wait_time: 1)
  @uuid      = SecureRandom.uuid
  @queues    = Array(queues)
  @pool      = Fleiss::Executor.new(max_size: concurrency)
  @wait_time = wait_time
end
run(**opts) click to toggle source

Shortcut for new(**opts).run

# File lib/fleiss/worker.rb, line 9
def self.run(**opts)
  new(**opts).run
end

Public Instance Methods

run() click to toggle source

Run starts the worker

# File lib/fleiss/worker.rb, line 27
def run
  log(:info) { "Worker #{uuid} starting - queues: #{queues.inspect}, concurrency: #{@pool.max_size}" }
  loop do
    run_cycle
    sleep @wait_time
  end
rescue SignalException => e
  log(:info) { "Worker #{uuid} received #{e.message}. Shutting down..." }
ensure
  @pool.shutdown
  @pool.wait_for_termination
end

Private Instance Methods

handle_exception(err, intro) click to toggle source
# File lib/fleiss/worker.rb, line 92
def handle_exception(err, intro)
  log(:error) do
    [
      "Worker #{uuid} error on #{intro}:",
      "#{err.class.name}: #{err.message}",
      err.backtrace,
    ].compact.flatten.join("\n")
  end
end
log(severity, &block) click to toggle source
# File lib/fleiss/worker.rb, line 42
def log(severity, &block)
  logger = ActiveJob::Base.logger
  if logger.respond_to?(:tagged)
    logger.tagged('Fleiss') { logger.send(severity, &block) }
  else
    logger.send(severity, &block)
  end
end
perform(job) click to toggle source
# File lib/fleiss/worker.rb, line 72
def perform(job)
  thread_id = Thread.current.object_id.to_s(16).reverse
  owner     = "#{uuid}/#{thread_id}"
  return unless job.start(owner)

  log(:info) { "Worker #{uuid} execute job ##{job.id} (by thread #{thread_id})" }
  finished = false
  begin
    ActiveJob::Base.execute job.job_data
    finished = true
  rescue StandardError
    finished = true
    raise
  ensure
    finished ? job.finish(owner) : job.reschedule(owner)
  end
rescue StandardError => e
  handle_exception e, "processing job ##{job.id} (by thread #{thread_id})"
end
run_cycle() click to toggle source
# File lib/fleiss/worker.rb, line 51
def run_cycle
  return unless @pool.running?

  limit = @pool.capacity
  return unless limit.positive?

  batch = Fleiss.backend
                .in_queue(queues)
                .pending
                .limit(limit)
                .to_a

  batch.each do |job|
    @pool.post do
      Fleiss.backend.wrap_perform { perform(job) }
    end
  end
rescue StandardError => e
  handle_exception e, 'running cycle'
end