module HireFire::Macro::Sidekiq::JobQueueSize

@!visibility private

Constants

SERVER_SIDE_SCRIPT
SERVER_SIDE_SCRIPT_SHA

Public Instance Methods

call(*queues, server: false, **options) click to toggle source
# File lib/hirefire/macro/sidekiq.rb, line 160
def call(*queues, server: false, **options)
  require "sidekiq/api"

  queues = normalize_queues(queues, allow_empty: true)

  if server
    server_lookup(queues, **options)
  else
    client_lookup(queues, **options)
  end
end

Private Instance Methods

client_lookup(queues, skip_retries: false, skip_scheduled: false, skip_working: false, max_scheduled: nil) click to toggle source
# File lib/hirefire/macro/sidekiq.rb, line 174
def client_lookup(queues, skip_retries: false, skip_scheduled: false, skip_working: false, max_scheduled: nil)
  size = enqueued_size(queues)
  size += scheduled_size(queues, max_scheduled) unless skip_scheduled
  size += retry_size(queues) unless skip_retries
  size += working_size(queues) unless skip_working
  size
end
count_with_redis(connection, *args) click to toggle source
# File lib/hirefire/macro/sidekiq.rb, line 350
def count_with_redis(connection, *args)
  connection.evalsha(SERVER_SIDE_SCRIPT_SHA, argv: args)
rescue Redis::CommandError => e
  if e.message.include?("NOSCRIPT")
    connection.script(:load, SERVER_SIDE_SCRIPT)
    retry
  else
    raise
  end
end
count_with_redis_client(connection, *args) click to toggle source
# File lib/hirefire/macro/sidekiq.rb, line 361
def count_with_redis_client(connection, *args)
  connection.call("evalsha", SERVER_SIDE_SCRIPT_SHA, 0, *args)
rescue RedisClient::CommandError => e
  if e.message.include?("NOSCRIPT")
    connection.call("script", "load", SERVER_SIDE_SCRIPT)
    retry
  else
    raise
  end
end
enqueued_size(queues) click to toggle source
# File lib/hirefire/macro/sidekiq.rb, line 182
def enqueued_size(queues)
  queues = registered_queues if queues.empty?

  ::Sidekiq.redis do |conn|
    conn.pipelined do |pipeline|
      queues.each { |name| pipeline.llen("queue:#{name}") }
    end
  end.sum
end
retry_size(queues) click to toggle source
# File lib/hirefire/macro/sidekiq.rb, line 206
def retry_size(queues)
  size = 0
  now = Time.now

  find_each_in_set(::Sidekiq::RetrySet.new) do |job|
    if job.at > now
      break
    elsif queues.empty? || queues.include?(job["queue"])
      size += 1
    end
  end

  size
end
scheduled_size(queues, max = nil) click to toggle source
# File lib/hirefire/macro/sidekiq.rb, line 192
def scheduled_size(queues, max = nil)
  size, now = 0, Time.now

  find_each_in_set(::Sidekiq::ScheduledSet.new) do |job|
    if job.at > now || max && size >= max
      break
    elsif queues.empty? || queues.include?(job["queue"])
      size += 1
    end
  end

  size
end
server_lookup(queues, skip_scheduled: false, skip_retries: false, skip_working: false, max_scheduled: 0) click to toggle source
# File lib/hirefire/macro/sidekiq.rb, line 333
def server_lookup(queues, skip_scheduled: false, skip_retries: false, skip_working: false, max_scheduled: 0)
  ::Sidekiq.redis do |connection|
    now = Time.now.to_i
    skip_scheduled = skip_scheduled ? 1 : 0
    skip_retries = skip_retries ? 1 : 0
    skip_working = skip_working ? 1 : 0

    if defined?(::Sidekiq::RedisClientAdapter::CompatClient) && connection.is_a?(::Sidekiq::RedisClientAdapter::CompatClient)
      count_with_redis_client(connection, now, max_scheduled, skip_scheduled, skip_retries, skip_working, *queues)
    elsif defined?(::Redis) && connection.is_a?(::Redis)
      count_with_redis(connection, now, max_scheduled, skip_scheduled, skip_retries, skip_working, *queues)
    else
      raise "Unsupported Redis connection type: #{connection.class}"
    end
  end
end
working_size(queues) click to toggle source
# File lib/hirefire/macro/sidekiq.rb, line 221
def working_size(queues)
  now = Time.now
  now_as_i = now.to_i

  ::Sidekiq::Workers.new.count do |key, tid, job|
    if job.is_a?(Hash) # Sidekiq < 7.2.1
      (queues.empty? || queues.include?(job["queue"])) && job["run_at"] <= now_as_i
    else # Sidekiq >= 7.2.1
      (queues.empty? || queues.include?(job.queue)) && job.run_at <= now
    end
  end
end