class Lowkiq::Schedulers::Lag

Public Class Methods

new(wait, metrics) click to toggle source
# File lib/lowkiq/schedulers/lag.rb, line 4
def initialize(wait, metrics)
  @metrics = metrics
  @wait = wait
end

Public Instance Methods

build_job(shard_handlers) click to toggle source
# File lib/lowkiq/schedulers/lag.rb, line 9
def build_job(shard_handlers)
  Proc.new do
    identifiers = shard_handlers.map { |sh| { queue_name: sh.queue_name, shard: sh.shard_index } }
    metrics = @metrics.call identifiers
    shard_handler, _lag =
                   shard_handlers.zip(metrics.map(&:lag))
                     .select { |(_, lag)| lag > 0.0 }
                     .max_by { |(_, lag)| lag }

    if shard_handler
      shard_handler.process
    else
      @wait.call
    end
  end
end