class Fluent::Plugin::SidekiqMetricInput

Public Instance Methods

clear_redis() click to toggle source
# File lib/fluent/plugin/in_sidekiq_metric.rb, line 135
def clear_redis
  @redis = nil
end
configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_sidekiq_metric.rb, line 34
def configure(conf)
  super
  @connect_opts = @connect_opts.map { |k, v| [k.to_sym, v] }.to_h
end
fetch_queue_lengths() click to toggle source

From sidekiq gem (lib/sidekiq/api.rb)

# File lib/fluent/plugin/in_sidekiq_metric.rb, line 99
def fetch_queue_lengths
  return {} if @queue_names.empty?

  queues = redis.smembers('queues'.freeze) & @queue_names
  return {} if queues.empty?

  lengths = redis.pipelined do
    queues.each do |queue|
      redis.llen("queue:#{queue}")
    end
  end

  i = 0
  array_of_arrays = queues.inject({}) do |memo, queue|
    memo["#{queue}_length"] = lengths[i]
    i += 1
    memo
  end

  Hash[array_of_arrays]
end
fetch_stats() click to toggle source

From sidekiq gem (lib/sidekiq/api.rb)

# File lib/fluent/plugin/in_sidekiq_metric.rb, line 52
def fetch_stats
  pipe1_res = redis.pipelined do
    redis.get('stat:processed'.freeze)
    redis.get('stat:failed'.freeze)
    redis.zcard('schedule'.freeze)
    redis.zcard('retry'.freeze)
    redis.zcard('dead'.freeze)
    redis.scard('processes'.freeze)
    redis.lrange('queue:default'.freeze,  -1,  -1)
    redis.smembers('processes'.freeze)
    redis.smembers('queues'.freeze)
  end

  pipe2_res = redis.pipelined do
    pipe1_res[7].each { |key| redis.hget(key, 'busy'.freeze) }
    pipe1_res[8].each { |queue| redis.llen("queue:#{queue}") }
  end

  s = pipe1_res[7].size
  workers_size = pipe2_res[0...s].map(&:to_i).inject(0, &:+)
  enqueued     = pipe2_res[s..-1].map(&:to_i).inject(0, &:+)

  default_queue_latency =
    if (entry = pipe1_res[6].first)
      job = Oj.load(entry) rescue {}
      now = Time.now.to_f
      thence = job['enqueued_at'.freeze] || now
      now - thence
    else
      0
    end

  {
    processed:             pipe1_res[0].to_i,
    failed:                pipe1_res[1].to_i,
    scheduled_size:        pipe1_res[2],
    retry_size:            pipe1_res[3],
    dead_size:             pipe1_res[4],
    processes_size:        pipe1_res[5],

    default_queue_latency: default_queue_latency,
    workers_size:          workers_size,
    enqueued:              enqueued
  }
end
redis() click to toggle source
# File lib/fluent/plugin/in_sidekiq_metric.rb, line 121
def redis
  @redis ||=
    if @namespace
      client = Redis.new(url: @redis_url, **@connect_opts).tap do |cl|
        cl.auth(@password) if @password
      end
      Redis::Namespace.new(@namespace, redis: client)
    else
      Redis.new(url: @redis_url, **@connect_opts).tap do |cl|
        cl.auth(@password) if @password
      end
    end
end
run() click to toggle source
# File lib/fluent/plugin/in_sidekiq_metric.rb, line 44
def run
  stats = fetch_stats
  queue_lengths = fetch_queue_lengths
  record = stats.merge(queue_lengths)
  router.emit(@tag, Fluent::EventTime.now, record)
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_sidekiq_metric.rb, line 39
def start
  super
  @timer = timer_execute(:sidekiq_metric_timer, @fetch_interval, &method(:run))
end