class Quiq::Job

Public Class Methods

new(raw, queue) click to toggle source
# File lib/quiq/job.rb, line 7
def initialize(raw, queue)
  @raw = raw
  @queue = queue
end

Public Instance Methods

run() click to toggle source
# File lib/quiq/job.rb, line 12
def run
  Async do
    begin
      # First parse the raw message from redis
      payload = JSON.parse(@raw)

      # Then load the definition of the job + its arguments
      klass = Object.const_get(payload['job_class'])
      args = payload['arguments']

      # Then run the task
      klass.new.perform(*args)
    rescue JSON::ParserError => e
      Quiq.logger.warn("Invalid format: #{e}")
      send_to_dlq(@raw, e)
    rescue StandardError => e
      Quiq.logger.debug("Sending message to DLQ: #{e}")
      send_to_dlq(payload, e)
    ensure
      # Remove the job from the processing list
      Queue.delete(@queue.processing, @raw)
    end
  end
end

Private Instance Methods

send_to_dlq(payload, exception) click to toggle source
# File lib/quiq/job.rb, line 39
def send_to_dlq(payload, exception)
  if payload.is_a?(Hash)
    payload['error'] = exception.to_s
    payload['backtrace'] = exception.backtrace
    message = JSON.dump(payload)
  else
    message = @raw
  end

  Queue.send_to_dlq(message)
end