class Quiq::Scheduler

Constants

SCHEDULER_KEY

Public Class Methods

enqueue_at(job, scheduled_at) click to toggle source
# File lib/quiq/scheduler.rb, line 31
def self.enqueue_at(job, scheduled_at)
  Quiq.redis.zadd(SCHEDULER_KEY, scheduled_at, job)
end

Public Instance Methods

start() click to toggle source
# File lib/quiq/scheduler.rb, line 11
def start
  # Set the process name
  Process.setproctitle('quiq scheduler')

  Async do
    loop do
      sleep 0.2

      # TODO: use ZRANGEBYSCORE instead to batch enqueuing
      job, scheduled_at = Quiq.redis.zrange(
        SCHEDULER_KEY, 0, 0, with_scores: true
      )

      enqueue(job) if job && scheduled_at.to_f <= Time.now.to_f
    end
  ensure
    Quiq.redis.close
  end
end

Private Instance Methods

enqueue(job) click to toggle source

Push the job in its queue and remove from scheduler_queue

# File lib/quiq/scheduler.rb, line 38
def enqueue(job)
  begin
    payload = JSON.parse(job)
  rescue JSON::ParserError => e
    Quiq.logger.warn("Invalid format: #{e}")
    Queue.send_to_dlq(job)
  end

  # TODO: wrap those 2 calls in a transaction
  Queue.push(payload['queue_name'], job)
  Quiq.redis.zrem(SCHEDULER_KEY, job)
end