class Sidekiq::CloudWatchMetrics::Publisher

Constants

INTERVAL

Public Class Methods

new(client: Aws::CloudWatch::Client.new, namespace: "Sidekiq", additional_dimensions: {}) click to toggle source
# File lib/sidekiq/cloudwatchmetrics.rb, line 41
def initialize(client: Aws::CloudWatch::Client.new, namespace: "Sidekiq", additional_dimensions: {})
  @client = client
  @namespace = namespace
  @additional_dimensions = additional_dimensions.map { |k, v| {name: k.to_s, value: v.to_s} }
end

Public Instance Methods

publish() click to toggle source
# File lib/sidekiq/cloudwatchmetrics.rb, line 76
def publish
  now = Time.now
  stats = Sidekiq::Stats.new
  processes = Sidekiq::ProcessSet.new.to_enum(:each).to_a
  queues = stats.queues

  metrics = [
    {
      metric_name: "ProcessedJobs",
      timestamp: now,
      value: stats.processed,
      unit: "Count",
    },
    {
      metric_name: "FailedJobs",
      timestamp: now,
      value: stats.failed,
      unit: "Count",
    },
    {
      metric_name: "EnqueuedJobs",
      timestamp: now,
      value: stats.enqueued,
      unit: "Count",
    },
    {
      metric_name: "ScheduledJobs",
      timestamp: now,
      value: stats.scheduled_size,
      unit: "Count",
    },
    {
      metric_name: "RetryJobs",
      timestamp: now,
      value: stats.retry_size,
      unit: "Count",
    },
    {
      metric_name: "DeadJobs",
      timestamp: now,
      value: stats.dead_size,
      unit: "Count",
    },
    {
      metric_name: "Workers",
      timestamp: now,
      value: stats.workers_size,
      unit: "Count",
    },
    {
      metric_name: "Processes",
      timestamp: now,
      value: stats.processes_size,
      unit: "Count",
    },
    {
      metric_name: "DefaultQueueLatency",
      timestamp: now,
      value: stats.default_queue_latency,
      unit: "Seconds",
    },
    {
      metric_name: "Capacity",
      timestamp: now,
      value: calculate_capacity(processes),
      unit: "Count",
    },
    {
      metric_name: "Utilization",
      timestamp: now,
      value: calculate_utilization(processes) * 100.0,
      unit: "Percent",
    },
  ]

  processes.each do |process|
    metrics << {
      metric_name: "Utilization",
      dimensions: [{name: "Hostname", value: process["hostname"]}],
      timestamp: now,
      value: process["busy"] / process["concurrency"].to_f * 100.0,
      unit: "Percent",
    }
  end

  queues.each do |(queue_name, queue_size)|
    metrics << {
      metric_name: "QueueSize",
      dimensions: [{name: "QueueName", value: queue_name}],
      timestamp: now,
      value: queue_size,
      unit: "Count",
    }

    queue_latency = Sidekiq::Queue.new(queue_name).latency

    metrics << {
      metric_name: "QueueLatency",
      dimensions: [{name: "QueueName", value: queue_name}],
      timestamp: now,
      value: queue_latency,
      unit: "Seconds",
    }
  end

  unless @additional_dimensions.empty?
    metrics = metrics.each do |metric|
      metric[:dimensions] = (metric[:dimensions] || []) + @additional_dimensions
    end
  end

  # We can only put 20 metrics at a time
  metrics.each_slice(20) do |some_metrics|
    @client.put_metric_data(
      namespace: @namespace,
      metric_data: some_metrics,
    )
  end
end
quiet() click to toggle source
# File lib/sidekiq/cloudwatchmetrics.rb, line 210
def quiet
  logger.info { "Quieting Sidekiq CloudWatch Metrics Publisher" }
  @stop = true
end
run() click to toggle source
# File lib/sidekiq/cloudwatchmetrics.rb, line 58
def run
  logger.info { "Started Sidekiq CloudWatch Metrics Publisher" }

  # Publish stats every INTERVAL seconds, sleeping as required between runs
  now = Time.now.to_f
  tick = now
  until @stop
    logger.info { "Publishing Sidekiq CloudWatch Metrics" }
    publish

    now = Time.now.to_f
    tick = [tick + INTERVAL, now].max
    sleep(tick - now) if tick > now
  end

  logger.info { "Stopped Sidekiq CloudWatch Metrics Publisher" }
end
running?() click to toggle source
# File lib/sidekiq/cloudwatchmetrics.rb, line 54
def running?
  !@thread.nil? && @thread.alive?
end
start() click to toggle source
# File lib/sidekiq/cloudwatchmetrics.rb, line 47
def start
  logger.info { "Starting Sidekiq CloudWatch Metrics Publisher" }

  @done = false
  @thread = safe_thread("cloudwatch metrics publisher", &method(:run))
end
stop() click to toggle source
# File lib/sidekiq/cloudwatchmetrics.rb, line 215
def stop
  logger.info { "Stopping Sidekiq CloudWatch Metrics Publisher" }
  @stop = true
  @thread.wakeup
  @thread.join
rescue ThreadError
  # Don't raise if thread is already dead.
  nil
end

Private Instance Methods

calculate_capacity(processes) click to toggle source

Returns the total number of workers across all processes

# File lib/sidekiq/cloudwatchmetrics.rb, line 197
        def calculate_capacity(processes)
  processes.map do |process|
    process["concurrency"]
  end.sum
end
calculate_utilization(processes) click to toggle source

Returns busy / concurrency averaged across processes (for scaling)

# File lib/sidekiq/cloudwatchmetrics.rb, line 204
        def calculate_utilization(processes)
  processes.map do |process|
    process["busy"] / process["concurrency"].to_f
  end.sum / processes.size.to_f
end