class Fluent::Plugin::SidekiqMetricInput
Public Instance Methods
clear_redis()
click to toggle source
# File lib/fluent/plugin/in_sidekiq_metric.rb, line 135 def clear_redis @redis = nil end
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_sidekiq_metric.rb, line 34 def configure(conf) super @connect_opts = @connect_opts.map { |k, v| [k.to_sym, v] }.to_h end
fetch_queue_lengths()
click to toggle source
From sidekiq gem (lib/sidekiq/api.rb)
# File lib/fluent/plugin/in_sidekiq_metric.rb, line 99 def fetch_queue_lengths return {} if @queue_names.empty? queues = redis.smembers('queues'.freeze) & @queue_names return {} if queues.empty? lengths = redis.pipelined do queues.each do |queue| redis.llen("queue:#{queue}") end end i = 0 array_of_arrays = queues.inject({}) do |memo, queue| memo["#{queue}_length"] = lengths[i] i += 1 memo end Hash[array_of_arrays] end
fetch_stats()
click to toggle source
From sidekiq gem (lib/sidekiq/api.rb)
# File lib/fluent/plugin/in_sidekiq_metric.rb, line 52 def fetch_stats pipe1_res = redis.pipelined do redis.get('stat:processed'.freeze) redis.get('stat:failed'.freeze) redis.zcard('schedule'.freeze) redis.zcard('retry'.freeze) redis.zcard('dead'.freeze) redis.scard('processes'.freeze) redis.lrange('queue:default'.freeze, -1, -1) redis.smembers('processes'.freeze) redis.smembers('queues'.freeze) end pipe2_res = redis.pipelined do pipe1_res[7].each { |key| redis.hget(key, 'busy'.freeze) } pipe1_res[8].each { |queue| redis.llen("queue:#{queue}") } end s = pipe1_res[7].size workers_size = pipe2_res[0...s].map(&:to_i).inject(0, &:+) enqueued = pipe2_res[s..-1].map(&:to_i).inject(0, &:+) default_queue_latency = if (entry = pipe1_res[6].first) job = Oj.load(entry) rescue {} now = Time.now.to_f thence = job['enqueued_at'.freeze] || now now - thence else 0 end { processed: pipe1_res[0].to_i, failed: pipe1_res[1].to_i, scheduled_size: pipe1_res[2], retry_size: pipe1_res[3], dead_size: pipe1_res[4], processes_size: pipe1_res[5], default_queue_latency: default_queue_latency, workers_size: workers_size, enqueued: enqueued } end
redis()
click to toggle source
# File lib/fluent/plugin/in_sidekiq_metric.rb, line 121 def redis @redis ||= if @namespace client = Redis.new(url: @redis_url, **@connect_opts).tap do |cl| cl.auth(@password) if @password end Redis::Namespace.new(@namespace, redis: client) else Redis.new(url: @redis_url, **@connect_opts).tap do |cl| cl.auth(@password) if @password end end end
run()
click to toggle source
# File lib/fluent/plugin/in_sidekiq_metric.rb, line 44 def run stats = fetch_stats queue_lengths = fetch_queue_lengths record = stats.merge(queue_lengths) router.emit(@tag, Fluent::EventTime.now, record) end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_sidekiq_metric.rb, line 39 def start super @timer = timer_execute(:sidekiq_metric_timer, @fetch_interval, &method(:run)) end