class Fleiss::Worker
Attributes
queues[R]
uuid[R]
wait_time[R]
Public Class Methods
new(queues: [Fleiss::DEFAULT_QUEUE], concurrency: 10, wait_time: 1)
click to toggle source
Init a new worker instance @param [ConnectionPool] disque client connection pool @param [Hash] options @option [Array<String>] :queues queues to watch. Default: [“default”] @option [Integer] :concurrency the number of concurrent pool. Default: 10 @option [Numeric] :wait_time maximum time (in seconds) to wait for jobs when retrieving next batch. Default: 1s.
# File lib/fleiss/worker.rb, line 19 def initialize(queues: [Fleiss::DEFAULT_QUEUE], concurrency: 10, wait_time: 1) @uuid = SecureRandom.uuid @queues = Array(queues) @pool = Fleiss::Executor.new(max_size: concurrency) @wait_time = wait_time end
run(**opts)
click to toggle source
Shortcut for new(**opts).run
# File lib/fleiss/worker.rb, line 9 def self.run(**opts) new(**opts).run end
Public Instance Methods
run()
click to toggle source
Run starts the worker
# File lib/fleiss/worker.rb, line 27 def run log(:info) { "Worker #{uuid} starting - queues: #{queues.inspect}, concurrency: #{@pool.max_size}" } loop do run_cycle sleep @wait_time end rescue SignalException => e log(:info) { "Worker #{uuid} received #{e.message}. Shutting down..." } ensure @pool.shutdown @pool.wait_for_termination end
Private Instance Methods
handle_exception(err, intro)
click to toggle source
# File lib/fleiss/worker.rb, line 92 def handle_exception(err, intro) log(:error) do [ "Worker #{uuid} error on #{intro}:", "#{err.class.name}: #{err.message}", err.backtrace, ].compact.flatten.join("\n") end end
log(severity, &block)
click to toggle source
# File lib/fleiss/worker.rb, line 42 def log(severity, &block) logger = ActiveJob::Base.logger if logger.respond_to?(:tagged) logger.tagged('Fleiss') { logger.send(severity, &block) } else logger.send(severity, &block) end end
perform(job)
click to toggle source
# File lib/fleiss/worker.rb, line 72 def perform(job) thread_id = Thread.current.object_id.to_s(16).reverse owner = "#{uuid}/#{thread_id}" return unless job.start(owner) log(:info) { "Worker #{uuid} execute job ##{job.id} (by thread #{thread_id})" } finished = false begin ActiveJob::Base.execute job.job_data finished = true rescue StandardError finished = true raise ensure finished ? job.finish(owner) : job.reschedule(owner) end rescue StandardError => e handle_exception e, "processing job ##{job.id} (by thread #{thread_id})" end
run_cycle()
click to toggle source
# File lib/fleiss/worker.rb, line 51 def run_cycle return unless @pool.running? limit = @pool.capacity return unless limit.positive? batch = Fleiss.backend .in_queue(queues) .pending .limit(limit) .to_a batch.each do |job| @pool.post do Fleiss.backend.wrap_perform { perform(job) } end end rescue StandardError => e handle_exception e, 'running cycle' end