class Lowkiq::Queue::ShardMetrics
Public Class Methods
new(redis_pool)
click to toggle source
# File lib/lowkiq/queue/shard_metrics.rb, line 4 def initialize(redis_pool) @redis_pool = redis_pool @timestamp = Utils::Timestamp.method(:now) end
Public Instance Methods
call(queues)
click to toggle source
# File lib/lowkiq/queue/shard_metrics.rb, line 9 def call(queues) result = @redis_pool.with do |redis| redis.pipelined do queues.each { |queue| pipeline redis, queue } end end result.each_slice(pipeline_count).map do |res| coerce(res) end end
Private Instance Methods
coerce(result)
click to toggle source
# File lib/lowkiq/queue/shard_metrics.rb, line 38 def coerce(result) OpenStruct.new lag: coerce_lag(result[0]) end
coerce_lag(res)
click to toggle source
# File lib/lowkiq/queue/shard_metrics.rb, line 42 def coerce_lag(res) _id, score = res.first return 0.0 if score.nil? lag = @timestamp.call - score return 0.0 if lag < 0.0 lag end
pipeline(redis, id)
click to toggle source
# File lib/lowkiq/queue/shard_metrics.rb, line 23 def pipeline(redis, id) name = id[:queue_name] shard = id[:shard] keys = Keys.new name # lag [id, score] redis.zrange keys.ids_scored_by_perform_in_zset(shard), 0, 0, with_scores: true end
pipeline_count()
click to toggle source
# File lib/lowkiq/queue/shard_metrics.rb, line 34 def pipeline_count 1 end