class Lowkiq::Queue::ShardMetrics

Public Class Methods

new(redis_pool) click to toggle source
# File lib/lowkiq/queue/shard_metrics.rb, line 4
def initialize(redis_pool)
  @redis_pool = redis_pool
  @timestamp = Utils::Timestamp.method(:now)
end

Public Instance Methods

call(queues) click to toggle source
# File lib/lowkiq/queue/shard_metrics.rb, line 9
def call(queues)
  result = @redis_pool.with do |redis|
    redis.pipelined do
      queues.each { |queue| pipeline redis, queue }
    end
  end

  result.each_slice(pipeline_count).map do |res|
    coerce(res)
  end
end

Private Instance Methods

coerce(result) click to toggle source
# File lib/lowkiq/queue/shard_metrics.rb, line 38
def coerce(result)
  OpenStruct.new lag: coerce_lag(result[0])
end
coerce_lag(res) click to toggle source
# File lib/lowkiq/queue/shard_metrics.rb, line 42
def coerce_lag(res)
  _id, score = res.first
  return 0.0 if score.nil?
  lag = @timestamp.call - score
  return 0.0 if lag < 0.0
  lag
end
pipeline(redis, id) click to toggle source
# File lib/lowkiq/queue/shard_metrics.rb, line 23
def pipeline(redis, id)
  name = id[:queue_name]
  shard = id[:shard]

  keys = Keys.new name

  # lag [id, score]
  redis.zrange keys.ids_scored_by_perform_in_zset(shard),
               0, 0, with_scores: true
end
pipeline_count() click to toggle source
# File lib/lowkiq/queue/shard_metrics.rb, line 34
def pipeline_count
  1
end