module HireFire::Macro::Sidekiq::JobQueueSize
@!visibility private
Constants
- SERVER_SIDE_SCRIPT
- SERVER_SIDE_SCRIPT_SHA
Public Instance Methods
call(*queues, server: false, **options)
click to toggle source
# File lib/hirefire/macro/sidekiq.rb, line 160 def call(*queues, server: false, **options) require "sidekiq/api" queues = normalize_queues(queues, allow_empty: true) if server server_lookup(queues, **options) else client_lookup(queues, **options) end end
Private Instance Methods
client_lookup(queues, skip_retries: false, skip_scheduled: false, skip_working: false, max_scheduled: nil)
click to toggle source
# File lib/hirefire/macro/sidekiq.rb, line 174 def client_lookup(queues, skip_retries: false, skip_scheduled: false, skip_working: false, max_scheduled: nil) size = enqueued_size(queues) size += scheduled_size(queues, max_scheduled) unless skip_scheduled size += retry_size(queues) unless skip_retries size += working_size(queues) unless skip_working size end
count_with_redis(connection, *args)
click to toggle source
# File lib/hirefire/macro/sidekiq.rb, line 350 def count_with_redis(connection, *args) connection.evalsha(SERVER_SIDE_SCRIPT_SHA, argv: args) rescue Redis::CommandError => e if e.message.include?("NOSCRIPT") connection.script(:load, SERVER_SIDE_SCRIPT) retry else raise end end
count_with_redis_client(connection, *args)
click to toggle source
# File lib/hirefire/macro/sidekiq.rb, line 361 def count_with_redis_client(connection, *args) connection.call("evalsha", SERVER_SIDE_SCRIPT_SHA, 0, *args) rescue RedisClient::CommandError => e if e.message.include?("NOSCRIPT") connection.call("script", "load", SERVER_SIDE_SCRIPT) retry else raise end end
enqueued_size(queues)
click to toggle source
# File lib/hirefire/macro/sidekiq.rb, line 182 def enqueued_size(queues) queues = registered_queues if queues.empty? ::Sidekiq.redis do |conn| conn.pipelined do |pipeline| queues.each { |name| pipeline.llen("queue:#{name}") } end end.sum end
retry_size(queues)
click to toggle source
# File lib/hirefire/macro/sidekiq.rb, line 206 def retry_size(queues) size = 0 now = Time.now find_each_in_set(::Sidekiq::RetrySet.new) do |job| if job.at > now break elsif queues.empty? || queues.include?(job["queue"]) size += 1 end end size end
scheduled_size(queues, max = nil)
click to toggle source
# File lib/hirefire/macro/sidekiq.rb, line 192 def scheduled_size(queues, max = nil) size, now = 0, Time.now find_each_in_set(::Sidekiq::ScheduledSet.new) do |job| if job.at > now || max && size >= max break elsif queues.empty? || queues.include?(job["queue"]) size += 1 end end size end
server_lookup(queues, skip_scheduled: false, skip_retries: false, skip_working: false, max_scheduled: 0)
click to toggle source
# File lib/hirefire/macro/sidekiq.rb, line 333 def server_lookup(queues, skip_scheduled: false, skip_retries: false, skip_working: false, max_scheduled: 0) ::Sidekiq.redis do |connection| now = Time.now.to_i skip_scheduled = skip_scheduled ? 1 : 0 skip_retries = skip_retries ? 1 : 0 skip_working = skip_working ? 1 : 0 if defined?(::Sidekiq::RedisClientAdapter::CompatClient) && connection.is_a?(::Sidekiq::RedisClientAdapter::CompatClient) count_with_redis_client(connection, now, max_scheduled, skip_scheduled, skip_retries, skip_working, *queues) elsif defined?(::Redis) && connection.is_a?(::Redis) count_with_redis(connection, now, max_scheduled, skip_scheduled, skip_retries, skip_working, *queues) else raise "Unsupported Redis connection type: #{connection.class}" end end end
working_size(queues)
click to toggle source
# File lib/hirefire/macro/sidekiq.rb, line 221 def working_size(queues) now = Time.now now_as_i = now.to_i ::Sidekiq::Workers.new.count do |key, tid, job| if job.is_a?(Hash) # Sidekiq < 7.2.1 (queues.empty? || queues.include?(job["queue"])) && job["run_at"] <= now_as_i else # Sidekiq >= 7.2.1 (queues.empty? || queues.include?(job.queue)) && job.run_at <= now end end end