class StatsdServer

Attributes

logger[RW]
stats[RW]

Public Class Methods

new(opts, input_config, output_config) click to toggle source
# File lib/statsdserver.rb, line 20
def initialize(opts, input_config, output_config)
  @stats = StatsdServer::Stats.new
  @logger = Logger.new(STDERR)
  @logger.progname = File.basename($0)

  @opts = {
    :bind => "127.0.0.1",
    :port => 8125,
    :percentile => 90,
    :flush_interval => 30,
    :prefix => "stats",
    :preserve_counters => "true",
    :timer_names_before_suffix => "true",
  }.merge(opts)
  @input_config = input_config
  @output_config = output_config

  # argument checking
  [:port, :percentile, :flush_interval].each do |key|
    begin
      @opts[key] = Float(@opts[key])
    rescue
      raise "#{key}: #{@opts[key].inspect}: must be a valid number"
    end
  end
end

Public Instance Methods

carbon_update_str() click to toggle source
# File lib/statsdserver.rb, line 164
def carbon_update_str
  updates = []
  now = Time.now.to_i

  timers = {}
  gauges = {}
  counters = {}

  @stats.timers.keys.each do |k|
    timers[k] = @stats.timers.delete(k)
  end

  @stats.gauges.keys.each do |k|
    #gauges[k] = @stats.gauges.delete(k)
    gauges[k] = stats.gauges[k]
  end

  @stats.counters.keys.each do |k|
    counters[k] = @stats.counters.delete(k)
  end

  if @opts[:preserve_counters] == "true"
    # Keep sending a 0 for counters (even if we don't get updates)
    counters.keys.each do |k|
      @stats.counters[k] ||= 0    # Keep sending a 0 if we don't get updates
    end
  end

  timers.each do |key, values|
    next if values.length == 0
    summary = ::StatsdServer::Math.summarize(values, @opts)
    if @opts[:timer_names_before_suffix] == "true"
      updates << [metric_name("timers.#{key}.mean"),
                  summary[:mean], now].join(" ")
      updates << [metric_name("timers.#{key}.upper"),
                  summary[:max], now].join(" ")
      updates << [metric_name("timers.#{key}.lower"),
                  summary[:min], now].join(" ")
      updates << [metric_name("timers.#{key}.count"),
                  values.length, now].join(" ")
      updates << [metric_name("timers.#{key}.upper_#{@opts[:percentile].to_i}"),
                  summary[:max_at_threshold], now].join(" ")
    else
      updates << [metric_name("timers.#{key}") + ".mean",
                  summary[:mean], now].join(" ")
      updates << [metric_name("timers.#{key}") + ".upper",
                  summary[:max], now].join(" ")
      updates << [metric_name("timers.#{key}") + ".lower",
                  summary[:min], now].join(" ")
      updates << [metric_name("timers.#{key}") + ".count",
                  values.length, now].join(" ")
      updates << [metric_name("timers.#{key}") + ".upper_#{@opts[:percentile].to_i}",
                  summary[:max_at_threshold], now].join(" ")
    end
  end # timers.each

  counters.each do |key, value|
    updates << [metric_name(key),
                value / @opts[:flush_interval],
                now].join(" ")
  end # counters.each

  gauges.each do |key, value|
    updates << [metric_name(key),
                value,
                now].join(" ")
  end # gauges.each

  return updates.length == 0 ? nil : updates.join("\n") + "\n"
end
flush() click to toggle source
# File lib/statsdserver.rb, line 236
def flush
  s = carbon_update_str
  return unless s

  if @outputs.nil? or @outputs.length == 0
    @logger.warn("no outputs configured, can't flush data")
    return
  end

  @outputs.each do |output|
    output.send(s)
  end
end
run() click to toggle source
# File lib/statsdserver.rb, line 48
def run
  # initialize outputs
  @outputs = []
  @output_config.each do |output, config|
    klass = StatsdServer::Output.const_get(output.capitalize)
    if klass.nil?
      @logger.fatal("unknown output #{output.inspect}")
      exit EX_DATAERR
    end
    @outputs << klass.new(config)
  end # @output_config.each

  # start inputs
  @input_config.each do |input, config|
    case input
    when "udp"
      EM.open_datagram_socket(config["bind"], config["port"].to_i,
                              Input::Udp) do |s|
        s.logger = @logger
        s.stats = @stats
      end # EM.open_datagram_socket
    when "zeromq"
      s = Input::ZeroMQ.new
      s.logger = @logger
      s.stats = @stats
      $ctx = EM::ZeroMQ::Context.new(1)
      sock = $ctx.socket(ZMQ::PULL, s)
      sock.setsockopt(ZMQ::HWM, 100)
      sock.bind(config["bind"])
    else
      @logger.fatal("unknown input #{input.inspect}")
      exit EX_DATAERR
    end # case input
  end # @inputs.each

  # start flusher
  Thread.abort_on_exception = true
  @flusher = Thread.new do
    while sleep(@opts[:flush_interval])
      begin
        flush
      rescue => e
        @logger.warn("trouble flushing: #{$!}")
        @logger.debug(e.backtrace.join("\n"))
      end
    end
  end
end

Private Instance Methods

metric_name(name) click to toggle source

private def setup_amqp

begin
  require "amqp"
rescue LoadError
  @logger.fatal("missing amqp ruby module. try gem install amqp")
  exit(1)
end

  user = @output_url.user || ""
  user, vhost = user.split("@", 2)
  _, mqtype, mqname = @output_url.path.split("/", 3)
  amqp_settings = {
    :host => @output_url.host,
    :port => @output_url.port || 5672,
    :user => user,
    :pass => @output_url.password,
    :vhost => vhost || "/",
  }

  @amqp = AMQP.connect(amqp_settings)
  @channel = AMQP::Channel.new(@amqp)

  opts = {
    :durable => true,
    :auto_delete => false,
  }

  if @output_url.query
    @output_url.query.split("&").each do |param|
      k, v = param.split("=", 2)
      opts[:durable] = false if k == "durable" and v == "false"
      opts[:auto_delete] = true if k == "autodelete" and v == "true"
    end
  end

  @exchange = case mqtype
  when "fanout"
    @channel.fanout(mqname, opts)
  when "direct"
    @channel.exchange(mqname, opts)
  when "topic"
    @channel.topic(mqname, opts)
  else
    raise TypeError, "unknown amqp output type #{mqtype}"
  end
end # def setup_amqp
# File lib/statsdserver.rb, line 147
def metric_name(name)
  if @opts[:prefix] && !@opts[:prefix].empty?
    prefix = @opts[:prefix] + "."
  else
    prefix = ""
  end

  if @opts[:suffix] && !@opts[:suffix].empty?
    suffix = "." + @opts[:suffix]
  else
    suffix = ""
  end

  return [prefix, name, suffix].join("")
end