module HireFire::Macro::Sidekiq::JobQueueLatency

@!visibility private

Public Instance Methods

call(*queues, skip_retries: false, skip_scheduled: false) click to toggle source
# File lib/hirefire/macro/sidekiq.rb, line 105
def call(*queues, skip_retries: false, skip_scheduled: false)
  require "sidekiq/api"

  queues = normalize_queues(queues, allow_empty: true)
  latencies = []
  latencies << enqueued_latency(queues)
  latencies << set_latency(::Sidekiq::RetrySet.new, queues) unless skip_retries
  latencies << set_latency(::Sidekiq::ScheduledSet.new, queues) unless skip_scheduled
  latencies.max
end

Private Instance Methods

enqueued_latency(queues) click to toggle source
# File lib/hirefire/macro/sidekiq.rb, line 118
def enqueued_latency(queues)
  queues = registered_queues if queues.empty?

  oldest_jobs = ::Sidekiq.redis do |conn|
    conn.pipelined do |pipeline|
      queues.each do |queue|
        pipeline.lindex("queue:#{queue}", -1)
      end
    end
  end

  max_latencies = oldest_jobs.map do |job_payload|
    job = job_payload ? JSON.parse(job_payload) : {}
    job["enqueued_at"] ? Time.now.to_f - job["enqueued_at"] : 0.0
  end

  max_latencies.max || 0.0
end
set_latency(set, queues) click to toggle source
# File lib/hirefire/macro/sidekiq.rb, line 137
def set_latency(set, queues)
  max_latency = 0.0
  now = Time.now

  find_each_in_set(set) do |job|
    if job.at > now
      break
    elsif queues.empty? || queues.include?(job.queue)
      max_latency = now - job.at
      break
    end
  end

  max_latency
end