module HireFire::Macro::Sidekiq::JobQueueLatency
@!visibility private
Public Instance Methods
call(*queues, skip_retries: false, skip_scheduled: false)
click to toggle source
# File lib/hirefire/macro/sidekiq.rb, line 105 def call(*queues, skip_retries: false, skip_scheduled: false) require "sidekiq/api" queues = normalize_queues(queues, allow_empty: true) latencies = [] latencies << enqueued_latency(queues) latencies << set_latency(::Sidekiq::RetrySet.new, queues) unless skip_retries latencies << set_latency(::Sidekiq::ScheduledSet.new, queues) unless skip_scheduled latencies.max end
Private Instance Methods
enqueued_latency(queues)
click to toggle source
# File lib/hirefire/macro/sidekiq.rb, line 118 def enqueued_latency(queues) queues = registered_queues if queues.empty? oldest_jobs = ::Sidekiq.redis do |conn| conn.pipelined do |pipeline| queues.each do |queue| pipeline.lindex("queue:#{queue}", -1) end end end max_latencies = oldest_jobs.map do |job_payload| job = job_payload ? JSON.parse(job_payload) : {} job["enqueued_at"] ? Time.now.to_f - job["enqueued_at"] : 0.0 end max_latencies.max || 0.0 end
set_latency(set, queues)
click to toggle source
# File lib/hirefire/macro/sidekiq.rb, line 137 def set_latency(set, queues) max_latency = 0.0 now = Time.now find_each_in_set(set) do |job| if job.at > now break elsif queues.empty? || queues.include?(job.queue) max_latency = now - job.at break end end max_latency end