class Dynamiq::Queue
Constants
- REDIS_RPOPZADD
Attributes
name[R]
path[R]
Public Class Methods
all()
click to toggle source
# File lib/dynamiq/queue.rb, line 112 def self.all Sidekiq.redis { |redis| redis.smembers 'queues' } .sort .map { |queue| Queue.new queue } end
dynamic()
click to toggle source
# File lib/dynamiq/queue.rb, line 99 def self.dynamic Sidekiq.redis { |redis| redis.smembers :dynamic_queues } .sort .map { |queue| Queue.new queue } end
fifo()
click to toggle source
# File lib/dynamiq/queue.rb, line 105 def self.fifo queues = Sidekiq.redis { |redis| redis.smembers :queues } dynamic = Sidekiq.redis { |redis| redis.smembers :dynamic_queues } (queues - dynamic).sort.map { |queue| Queue.new queue } end
new(name = 'default')
click to toggle source
# File lib/dynamiq/queue.rb, line 12 def initialize(name = 'default') @name = name @path = [ (dynamic? ? :dynamic_queue : :queue), name].join(':') end
Public Instance Methods
==(object)
click to toggle source
# File lib/dynamiq/queue.rb, line 95 def ==(object) object.respond_to?(:name) and @name == object.name end
clear()
click to toggle source
# File lib/dynamiq/queue.rb, line 65 def clear index_list = dynamic? ? :dynamic_queues : :queues Sidekiq.redis do |redis| redis.multi do redis.del path redis.srem index_list, name end end end
Also aliased as: 💣
dynamic?()
click to toggle source
# File lib/dynamiq/queue.rb, line 17 def dynamic? @dynamic ||= Sidekiq.redis { |redis| redis.sismember :dynamic_queues, name } end
each(&block)
click to toggle source
# File lib/dynamiq/queue.rb, line 37 def each(&block) initial_size = size deleted_size = 0 page = 0 page_size = 50 loop do start = page * page_size - deleted_size entries = fetch_jobs start, page_size break if entries.empty? page += 1 entries.each do |entry| block.call Job.new(*entry, @name) end deleted_size = initial_size - size end end
eql?(obj)
click to toggle source
# File lib/dynamiq/queue.rb, line 85 def eql?(obj) return false unless obj.respond_to? :name name.eql? obj.name end
hash()
click to toggle source
# File lib/dynamiq/queue.rb, line 91 def hash name.hash end
latency()
click to toggle source
# File lib/dynamiq/queue.rb, line 58 def latency method = dynamic? ? :zrange : :lrange entry = Sidekiq.redis { |redis| redis.send(method, path, -1, -1) }.first return 0 unless entry Time.now.to_f - Sidekiq.load_json(entry)['enqueued_at'] end
paused?()
click to toggle source
# File lib/dynamiq/queue.rb, line 21 def paused? false end
pop()
click to toggle source
# File lib/dynamiq/queue.rb, line 33 def pop job = pop_job and Job.new self, job end
requeue(message)
click to toggle source
# File lib/dynamiq/queue.rb, line 25 def requeue(message) if dynamic? Dynamiq::Client.new.push_message message else Sidekiq.redis { |redis| redis.rpush path, message } end end
size()
click to toggle source
# File lib/dynamiq/queue.rb, line 77 def size if dynamic? Sidekiq.redis { |redis| redis.zcount path, '-inf', '+inf' } else Sidekiq.redis { |redis| redis.llen path } end end
Private Instance Methods
fetch_jobs(start, page_size)
click to toggle source
# File lib/dynamiq/queue.rb, line 120 def fetch_jobs(start, page_size) Sidekiq.redis do |redis| if dynamic? limit = [start, page_size] redis.zrangebyscore @rname, '-inf', '+inf', limit: limit, with_scores: true else redis.lrange @rname, start, (start + page_size - 1) end end end
pop_job()
click to toggle source
# File lib/dynamiq/queue.rb, line 131 def pop_job if dynamic? job = Sidekiq.redis { |redis| redis.eval REDIS_RPOPZADD, [ path ] } return nil if job.empty? JSON.parse(job.first).merge(score: job.last.to_f).to_json else Sidekiq.redis { |redis| redis.rpop path } end end