class Munin2Graphite::Scheduler

This class holds the main scheduler of the system, it will perform the applicacion loops

Attributes

scheduler[RW]

Public Class Methods

new(config) click to toggle source
# File lib/munin2graphite/scheduler.rb, line 28
def initialize(config)
  @config = config
end

Public Instance Methods

carbon=(socket) click to toggle source
# File lib/munin2graphite/scheduler.rb, line 32
def carbon=(socket)
  @carbon = Carbon.new(socket)
end
category_from_config(config) click to toggle source
# File lib/munin2graphite/scheduler.rb, line 36
def category_from_config(config)
  config.each_line do |configline|
    if configline =~ /^graph_category ([\w\-_\.]+)$/
      return configline.split[1]
    end
  end
  raise "CategoryNotFound in #{config}"
end
metric_loop(worker) click to toggle source
# File lib/munin2graphite/scheduler.rb, line 212
def metric_loop(worker)
  config = @config.config_for_worker worker
  retries = 3
  begin
    obtain_metrics(worker)
  rescue => e
    config.log.error("Exception found: (#{e.to_s})")
    e.backtrace.each { |line| config.log.error(line) }
    sleep 1
    retries -= 1
    config.log.error("Retrying")
    retry unless retries < 0
    config.log.error("Exitting, exception not solved")
    exit(1)
  end
end
munin_config(reload = false) click to toggle source
# File lib/munin2graphite/scheduler.rb, line 60
def munin_config(reload = false)
  return @munin_config if @munin_config && !reload
  @munin_config = {}
  @config.log.info("Obtaining metrics configuration")
  @munin_config[:workers] = []

  semaphore = Mutex.new
  threads = []

  workers.each do |worker|
    threads << Thread.new do
      current_config = {}
      config = @config.config_for_worker(worker)
      begin
        munin_worker  = Munin::Node.new(config["munin_hostname"],config["munin_port"])
        nodes = config["munin_nodes"] ? config["munin_nodes"].split(",") : munin_worker.nodes
      rescue Exception => e
        config.log.error("Error when trying to connect to munin-node on #{config["munin_hostname"]}:#{config["munin_port"]}")
        config.log.error("This node will be skipped")
        Thread.current.exit
      end

      current_config[:nodes] = {}
      semaphore_nodes = Mutex.new
      threads_nodes = []
      nodes.each do |node|
        threads_nodes << Thread.new do
          munin  = Munin::Node.new(config["munin_hostname"],config["munin_port"])
          metrics = munin.list(node)
          config.log.info("Config for node #{worker}::#{node}")
          semaphore_nodes.synchronize do
            current_config[:nodes][node] = { :metrics => {} }
          end
          metrics.each do |metric|
            begin
              raw_config = munin.config(metric,true)[metric]
              category = category_from_config(raw_config)
              # We prepend the worker name to the graph title for clarity
              if config["graph_legend_prepend"] == "true"
                nodename = config["graphite_name_schema"] == "worker" ? worker : node
                if raw_config.match("graph_title ")
                  raw_config.gsub!("graph_title ","graph_itle #{nodename} ")
                else
                  raw_config << "\ngraph_title #{nodename}"
                end
              end
              semaphore_nodes.synchronize do
                current_config[:nodes][node][:metrics][metric] = {
                  :config => munin.config(metric)[metric],
                  :raw_config => raw_config,
                  :category => category
                }
              end
            rescue Exception
              config.log.error("Error when trying to obtain graph conf for #{worker}::#{node}::#{metric} Ignored")
            end
          end
        end
      end
      threads_nodes.each { |i| i.join }
      #       @config.log.debug(current_config.inspect)
      semaphore.synchronize do
        @munin_config[worker] = current_config
        @munin_config[:workers] << worker
      end
      munin_worker.disconnect
      config.log.info("Config for #{worker} obtained")
    end
  end
  threads.each { |i| i.join }
  @munin_config
end
obtain_graphs() click to toggle source
# File lib/munin2graphite/scheduler.rb, line 190
def obtain_graphs
  munin_config(true)
  munin_config[:workers].each do |worker|
    time = Time.now
    config = @config.config_for_worker worker
    @config.log.info("Begin : Sending Graph Information to Graphite for worker #{worker}")
    Graphite::Base.set_connection(config["graphite_endpoint"])
    Graphite::Base.authenticate(config["graphite_user"],config["graphite_password"])
    munin_config[worker][:nodes].keys.each do |node|
      @config.log.info("Graphs for #{node}")
      munin_config[worker][:nodes][node][:metrics].each do |metric,value|
        @config.log.info("Configuring #{metric}")
        munin_graph = MuninGraph.graph_for value[:raw_config]
        munin_graph.config = config.merge("metric" => "#{metric}","hostname" => node.split(".").first)
        @config.log.debug("Saving graph #{metric}")
        munin_graph.to_graphite.save!
      end
    end
    config.log.info("End : Sending Graph Information to Graphite for worker #{worker}, elapsed time (#{Time.now - time}s)")
  end
end
obtain_metrics(worker = "global") click to toggle source

This is the loop of the metrics scheduling

# File lib/munin2graphite/scheduler.rb, line 140
def obtain_metrics(worker = "global")
  my_munin_config = munin_config.dup
  time = Time.now
  config = @config.config_for_worker(worker)
  config.log.info("Worker #{worker}")
  metric_base = [config["graphite_user"], config["graphite_prefix"]].reject{|i| i== ""}.compact.join(".")

  my_munin_config[worker][:nodes].each do |node,node_info|
    node_name = metric_base + "." + node.split(".").first
    config.log.debug("Doing #{node_name}")
    values = {}
    config.log.debug("Asking for: #{node}")
    metric_time = Time.now
    metrics = node_info[:metrics].keys
    config.log.debug("Metrics " + metrics.join(","))
    metrics_threads = []
    categories = {}
    metrics.each do |metric|
      begin
        local_munin  = Munin::Node.new(config["munin_hostname"],config["munin_port"])
        values[metric] =  local_munin.fetch metric
        local_munin.disconnect
      rescue Exception => e
        @config.log.error("There was a problem when getting the metric #{metric} for #{node} , Ignored")
        @config.log.error(e.message)
        @config.log.error(e.backtrace.inspect)
      end
    end
    config.log.debug(values.inspect)
    config.log.info("Done with: #{node} (#{Time.now - metric_time} s)")
    carbon = @carbon || Carbon.new(config["carbon_hostname"],config["carbon_port"])
    string_to_send = ""
    values.each do |metric,results|
      category = node_info[:metrics][metric][:category]
      results.each do |k,v|
        v.each do |c_metric,c_value|
          name = "#{node_name}.#{category}.#{metric}.#{c_metric}".gsub("-","_")
          string_to_send += "#{name} #{c_value} #{Time.now.to_i}\n" if c_value != "U"
        end
      end
    end
    @config.log.debug(string_to_send)
    send_time = Time.now
    carbon.send(string_to_send)
    carbon.flush
    carbon.close
  end if my_munin_config[worker]
  @config.log.info("End getting metrics for worker #{worker}, elapsed time (#{Time.now - time}s)")
end
start() click to toggle source
# File lib/munin2graphite/scheduler.rb, line 242
def start
  @config.log.info("Scheduler started")
  obtain_graphs
  @scheduler = Rufus::Scheduler.start_new
  workers.each do |worker|
    config = @config.config_for_worker worker
    config.log.info("Scheduling worker #{worker} every  #{config["scheduler_metrics_period"]} ")
    metric_loop(worker)
    @scheduler.every config["scheduler_metrics_period"] do
      metric_loop(worker)
    end
  end
end
start1r_graphs() click to toggle source
# File lib/munin2graphite/scheduler.rb, line 237
def start1r_graphs
  @config.log.info("One-run started: graphs")
  obtain_graphs
end
start1r_metrics() click to toggle source
# File lib/munin2graphite/scheduler.rb, line 229
def start1r_metrics
  @config.log.info("One-run started: metrics")
  workers.each do |worker|
    config = @config.config_for_worker worker
    metric_loop(worker)
  end
end
workers() click to toggle source
# File lib/munin2graphite/scheduler.rb, line 133
def workers
  workers_from_consul_config!
  @workers ||= (@config.workers.empty? ?  ["global"] : @config.workers )
end
workers_from_consul_config!() click to toggle source
# File lib/munin2graphite/scheduler.rb, line 45
def workers_from_consul_config!
  return unless @config['consul_url']
  Diplomat.configure do |c|
    c.url = @config['consul_url']
  end
  Diplomat::Service.get(@config['consul_service'], :all).each do |i|
    @config.config.params['workers'] ||= []
    @config.config.params['workers'] << i['Node']
    @config.config.params[i['Node']] =  {
      'munin_hostname' => i['Address'],
      'munin_port'     => i['ServicePort'],
    }
  end
end