class Dynamiq::Queue

Constants

REDIS_RPOPZADD

Attributes

name[R]
path[R]

Public Class Methods

all() click to toggle source
# File lib/dynamiq/queue.rb, line 112
def self.all
  Sidekiq.redis { |redis| redis.smembers 'queues' }
    .sort
    .map { |queue| Queue.new queue }
end
dynamic() click to toggle source
# File lib/dynamiq/queue.rb, line 99
def self.dynamic
  Sidekiq.redis { |redis| redis.smembers :dynamic_queues }
    .sort
    .map { |queue| Queue.new queue }
end
fifo() click to toggle source
# File lib/dynamiq/queue.rb, line 105
def self.fifo
  queues = Sidekiq.redis { |redis| redis.smembers :queues }
  dynamic = Sidekiq.redis { |redis| redis.smembers :dynamic_queues }
  (queues - dynamic).sort.map { |queue| Queue.new queue }
end
new(name = 'default') click to toggle source
# File lib/dynamiq/queue.rb, line 12
def initialize(name = 'default')
  @name = name
  @path = [ (dynamic? ? :dynamic_queue : :queue), name].join(':')
end

Public Instance Methods

==(object) click to toggle source
# File lib/dynamiq/queue.rb, line 95
def ==(object)
  object.respond_to?(:name) and @name == object.name
end
clear() click to toggle source
# File lib/dynamiq/queue.rb, line 65
def clear
  index_list = dynamic? ? :dynamic_queues : :queues

  Sidekiq.redis do |redis|
    redis.multi do
      redis.del path
      redis.srem index_list, name
    end
  end
end
Also aliased as: 💣
dynamic?() click to toggle source
# File lib/dynamiq/queue.rb, line 17
def dynamic?
  @dynamic ||= Sidekiq.redis { |redis| redis.sismember :dynamic_queues, name }
end
each(&block) click to toggle source
# File lib/dynamiq/queue.rb, line 37
def each(&block)
  initial_size = size
  deleted_size = 0
  page = 0
  page_size = 50

  loop do
    start = page * page_size - deleted_size
    entries = fetch_jobs start, page_size

    break if entries.empty?

    page += 1
    entries.each do |entry|
      block.call Job.new(*entry, @name)
    end

    deleted_size = initial_size - size
  end
end
eql?(obj) click to toggle source
# File lib/dynamiq/queue.rb, line 85
def eql?(obj)
  return false unless obj.respond_to? :name

  name.eql? obj.name
end
hash() click to toggle source
# File lib/dynamiq/queue.rb, line 91
def hash
  name.hash
end
latency() click to toggle source
# File lib/dynamiq/queue.rb, line 58
def latency
  method = dynamic? ? :zrange : :lrange
  entry = Sidekiq.redis { |redis| redis.send(method, path, -1, -1) }.first
  return 0 unless entry
  Time.now.to_f - Sidekiq.load_json(entry)['enqueued_at']
end
paused?() click to toggle source
# File lib/dynamiq/queue.rb, line 21
def paused?
  false
end
pop() click to toggle source
# File lib/dynamiq/queue.rb, line 33
def pop
  job = pop_job and Job.new self, job
end
requeue(message) click to toggle source
# File lib/dynamiq/queue.rb, line 25
def requeue(message)
  if dynamic?
    Dynamiq::Client.new.push_message message
  else
    Sidekiq.redis { |redis| redis.rpush path, message }
  end
end
size() click to toggle source
# File lib/dynamiq/queue.rb, line 77
def size
  if dynamic?
    Sidekiq.redis { |redis| redis.zcount path, '-inf', '+inf' }
  else
    Sidekiq.redis { |redis| redis.llen path }
  end
end
💣()
Alias for: clear

Private Instance Methods

fetch_jobs(start, page_size) click to toggle source
# File lib/dynamiq/queue.rb, line 120
def fetch_jobs(start, page_size)
  Sidekiq.redis do |redis|
    if dynamic?
      limit = [start, page_size]
      redis.zrangebyscore @rname, '-inf', '+inf', limit: limit, with_scores: true
    else
      redis.lrange @rname, start, (start + page_size - 1)
    end
  end
end
pop_job() click to toggle source
# File lib/dynamiq/queue.rb, line 131
def pop_job
  if dynamic?
    job = Sidekiq.redis { |redis| redis.eval REDIS_RPOPZADD, [ path ] }
    return nil if job.empty?
    JSON.parse(job.first).merge(score: job.last.to_f).to_json
  else
    Sidekiq.redis { |redis| redis.rpop path }
  end
end