class Disque::ActiveJobWorker
Attributes
disque_client[R]
disque_count[R]
disque_queues[R]
disque_timeout[R]
Public Class Methods
new(client: nil , queues: nil, count: 1, timeout: nil)
click to toggle source
# File lib/disque/active_job_worker.rb, line 15 def initialize(client: nil , queues: nil, count: 1, timeout: nil) @disque_client = client || Disque.new( ENV.fetch('DISQUE_NODES', 'localhost:7711'), auth: ENV.fetch('DISQUE_AUTH', nil), cycle: ENV.fetch('DISQUE_CYCLE', '20000').to_i ) @disque_queues = case when queues.is_a?(Array) queues when queues.is_a?(String) queues.split(',') when queues.nil? ENV.fetch('DISQUE_QUEUES', 'default').split(',') else raise 'Invalid Disque Queues' end @disque_count = count @disque_timeout = timeout self end
run()
click to toggle source
# File lib/disque/active_job_worker.rb, line 11 def self.run new.run end
Public Instance Methods
run()
click to toggle source
# File lib/disque/active_job_worker.rb, line 40 def run loop do disque_client.fetch( from: disque_queues, timeout: disque_timeout, count: disque_count ) do |serialized_job, _| ActiveJob::Base.deserialize( ActiveSupport::JSON.decode(serialized_job) ).perform_now end end end