class Sidekiq::Scheduled::Enq
Constants
- LUA_ZPOPBYSCORE
Public Class Methods
new()
click to toggle source
# File lib/sidekiq/scheduled.rb, line 20 def initialize @done = false @lua_zpopbyscore_sha = nil end
Public Instance Methods
enqueue_jobs(sorted_sets = SETS)
click to toggle source
# File lib/sidekiq/scheduled.rb, line 25 def enqueue_jobs(sorted_sets = SETS) # A job's "score" in Redis is the time at which it should be processed. # Just check Redis for the set of jobs with a timestamp before now. Sidekiq.redis do |conn| sorted_sets.each do |sorted_set| # Get next item in the queue with score (time to execute) <= now. # We need to go through the list one at a time to reduce the risk of something # going wrong between the time jobs are popped from the scheduled queue and when # they are pushed onto a work queue and losing the jobs. while !@done && (job = zpopbyscore(conn, keys: [sorted_set], argv: [Time.now.to_f.to_s])) Sidekiq::Client.push(Sidekiq.load_json(job)) Sidekiq.logger.debug { "enqueued #{sorted_set}: #{job}" } end end end end
terminate()
click to toggle source
# File lib/sidekiq/scheduled.rb, line 42 def terminate @done = true end
Private Instance Methods
zpopbyscore(conn, keys: nil, argv: nil)
click to toggle source
# File lib/sidekiq/scheduled.rb, line 48 def zpopbyscore(conn, keys: nil, argv: nil) if @lua_zpopbyscore_sha.nil? raw_conn = conn.respond_to?(:redis) ? conn.redis : conn @lua_zpopbyscore_sha = raw_conn.script(:load, LUA_ZPOPBYSCORE) end conn.evalsha(@lua_zpopbyscore_sha, keys, argv) rescue RedisConnection.adapter::CommandError => e raise unless e.message.start_with?("NOSCRIPT") @lua_zpopbyscore_sha = nil retry end