class Sidekiq::JobSet
Base class for all sorted sets which contain jobs, e.g. scheduled, retry and dead. Sidekiq
Pro and Enterprise add additional sorted sets which do not contain job data, e.g. Batches.
Public Instance Methods
# File lib/sidekiq/api.rb, line 706 def each initial_size = @_size offset_size = 0 page = -1 page_size = 50 loop do range_start = page * page_size + offset_size range_end = range_start + page_size - 1 elements = Sidekiq.redis { |conn| conn.zrange name, range_start, range_end, "withscores" } break if elements.empty? page -= 1 elements.reverse_each do |element, score| yield SortedEntry.new(self, score, element) end offset_size = initial_size - @_size end end
Fetch jobs that match a given time or Range. Job
ID is an optional second argument.
@param score [Time,Range] a specific timestamp or range @param jid [String, optional] find a specific JID within the score @return [Array<SortedEntry>] any results found, can be empty
# File lib/sidekiq/api.rb, line 734 def fetch(score, jid = nil) begin_score, end_score = if score.is_a?(Range) [score.first, score.last] else [score, score] end elements = Sidekiq.redis { |conn| conn.zrange(name, begin_score, end_score, "BYSCORE", "withscores") } elements.each_with_object([]) do |element, result| data, job_score = element entry = SortedEntry.new(self, job_score, data) result << entry if jid.nil? || entry.jid == jid end end
Find the job with the given JID within this sorted set. *This is a slow O(n) operation*. Do not use for app logic.
@param jid [String] the job identifier @return [SortedEntry] the record or nil
# File lib/sidekiq/api.rb, line 759 def find_job(jid) Sidekiq.redis do |conn| conn.zscan(name, match: "*#{jid}*", count: 100) do |entry, score| job = Sidekiq.load_json(entry) matched = job["jid"] == jid return SortedEntry.new(self, score, entry) if matched end end nil end
Move all jobs from this Set to the Dead Set. See DeadSet#kill
# File lib/sidekiq/api.rb, line 693 def kill_all(notify_failure: false, ex: nil) ds = DeadSet.new opts = {notify_failure: notify_failure, ex: ex, trim: false} begin pop_each do |msg, _| ds.kill(msg, opts) end ensure ds.trim end end
# File lib/sidekiq/api.rb, line 671 def pop_each Sidekiq.redis do |c| size.times do data, score = c.zpopmin(name, 1)&.first break unless data yield data, score end end end
# File lib/sidekiq/api.rb, line 681 def retry_all c = Sidekiq::Client.new pop_each do |msg, _| job = Sidekiq.load_json(msg) # Manual retries should not count against the retry limit. job["retry_count"] -= 1 if job["retry_count"] c.push(job) end end
Add a job with the associated timestamp to this set. @param timestamp [Time] the score for the job @param job [Hash] the job data
# File lib/sidekiq/api.rb, line 665 def schedule(timestamp, job) Sidekiq.redis do |conn| conn.zadd(name, timestamp.to_f.to_s, Sidekiq.dump_json(job)) end end