class Disque::ActiveJobWorker

Attributes

disque_client[R]
disque_count[R]
disque_queues[R]
disque_timeout[R]

Public Class Methods

new(client: nil , queues: nil, count: 1, timeout: nil) click to toggle source
# File lib/disque/active_job_worker.rb, line 15
def initialize(client: nil , queues: nil, count: 1, timeout: nil)
  @disque_client = client || Disque.new(
    ENV.fetch('DISQUE_NODES', 'localhost:7711'),
    auth: ENV.fetch('DISQUE_AUTH', nil),
    cycle: ENV.fetch('DISQUE_CYCLE', '20000').to_i
  )

  @disque_queues = case
                   when queues.is_a?(Array)
                     queues
                   when queues.is_a?(String)
                     queues.split(',')
                   when queues.nil?
                    ENV.fetch('DISQUE_QUEUES', 'default').split(',')
                   else
                    raise 'Invalid Disque Queues'
                   end


  @disque_count = count
  @disque_timeout = timeout

  self
end
run() click to toggle source
# File lib/disque/active_job_worker.rb, line 11
def self.run
  new.run
end

Public Instance Methods

run() click to toggle source
# File lib/disque/active_job_worker.rb, line 40
def run
  loop do
    disque_client.fetch(
      from: disque_queues,
      timeout: disque_timeout,
      count: disque_count
    ) do |serialized_job, _|
      ActiveJob::Base.deserialize(
        ActiveSupport::JSON.decode(serialized_job)
      ).perform_now
    end
  end
end