class Sidekiq::Scheduled::Enq
Constants
- LUA_ZPOPBYSCORE
Public Class Methods
new(container)
click to toggle source
# File lib/sidekiq/scheduled.rb, line 22 def initialize(container) @config = container @client = Sidekiq::Client.new(config: container) @done = false @lua_zpopbyscore_sha = nil end
Public Instance Methods
enqueue_jobs(sorted_sets = SETS)
click to toggle source
# File lib/sidekiq/scheduled.rb, line 29 def enqueue_jobs(sorted_sets = SETS) # A job's "score" in Redis is the time at which it should be processed. # Just check Redis for the set of jobs with a timestamp before now. redis do |conn| sorted_sets.each do |sorted_set| # Get next item in the queue with score (time to execute) <= now. # We need to go through the list one at a time to reduce the risk of something # going wrong between the time jobs are popped from the scheduled queue and when # they are pushed onto a work queue and losing the jobs. while !@done && (job = zpopbyscore(conn, keys: [sorted_set], argv: [Time.now.to_f.to_s])) @client.push(Sidekiq.load_json(job)) logger.debug { "enqueued #{sorted_set}: #{job}" } end end end end
terminate()
click to toggle source
# File lib/sidekiq/scheduled.rb, line 46 def terminate @done = true end
Private Instance Methods
zpopbyscore(conn, keys: nil, argv: nil)
click to toggle source
# File lib/sidekiq/scheduled.rb, line 52 def zpopbyscore(conn, keys: nil, argv: nil) if @lua_zpopbyscore_sha.nil? @lua_zpopbyscore_sha = conn.script(:load, LUA_ZPOPBYSCORE) end conn.call("EVALSHA", @lua_zpopbyscore_sha, keys.size, *keys, *argv) rescue RedisClient::CommandError => e raise unless e.message.start_with?("NOSCRIPT") @lua_zpopbyscore_sha = nil retry end