class Quiq::Scheduler
Constants
- SCHEDULER_KEY
Public Class Methods
enqueue_at(job, scheduled_at)
click to toggle source
# File lib/quiq/scheduler.rb, line 31 def self.enqueue_at(job, scheduled_at) Quiq.redis.zadd(SCHEDULER_KEY, scheduled_at, job) end
Public Instance Methods
start()
click to toggle source
# File lib/quiq/scheduler.rb, line 11 def start # Set the process name Process.setproctitle('quiq scheduler') Async do loop do sleep 0.2 # TODO: use ZRANGEBYSCORE instead to batch enqueuing job, scheduled_at = Quiq.redis.zrange( SCHEDULER_KEY, 0, 0, with_scores: true ) enqueue(job) if job && scheduled_at.to_f <= Time.now.to_f end ensure Quiq.redis.close end end
Private Instance Methods
enqueue(job)
click to toggle source
Push the job in its queue and remove from scheduler_queue
# File lib/quiq/scheduler.rb, line 38 def enqueue(job) begin payload = JSON.parse(job) rescue JSON::ParserError => e Quiq.logger.warn("Invalid format: #{e}") Queue.send_to_dlq(job) end # TODO: wrap those 2 calls in a transaction Queue.push(payload['queue_name'], job) Quiq.redis.zrem(SCHEDULER_KEY, job) end