class Sensu::Extension::InfluxDB
Public Instance Methods
buffer_too_big?(handler)
click to toggle source
# File lib/sensu/extensions/influxdb.rb, line 191 def buffer_too_big?(handler) handler["buffer"].length >= handler["buffer_size"] end
buffer_too_old?(handler)
click to toggle source
# File lib/sensu/extensions/influxdb.rb, line 186 def buffer_too_old?(handler) buffer_age = Time.now.to_i - handler["buffer_flushed"] buffer_age >= handler["buffer_max_age"] end
create_config(name, defaults)
click to toggle source
# File lib/sensu/extensions/influxdb.rb, line 32 def create_config(name, defaults) if settings[name].nil? Raise ArgumentError "no configuration for #{name} provided. exiting..." end config = defaults.merge(settings[name]) @logger.debug("Config for #{name} created: #{config}") validate_config(name, config) hostname = config[:hostname] port = config[:port] database = config[:database] ssl = config[:ssl] ssl_ca_file = config[:ssl_ca_file] ssl_verify = if config.key?(:ssl_verify) then config[:ssl_verify] else true end precision = config[:precision] retention_policy = config[:retention_policy] rp_queryparam = if retention_policy.nil? then "" else "&rp=#{retention_policy}" end protocol = if ssl then "https" else "http" end username = config[:username] password = config[:password] auth_queryparam = if username.nil? or password.nil? then "" else "&u=#{username}&p=#{password}" end buffer_size = config[:buffer_size] buffer_max_age = config[:buffer_max_age] proxy_mode = config[:proxy_mode] string = "#{protocol}://#{hostname}:#{port}/write?db=#{database}&precision=#{precision}#{rp_queryparam}#{auth_queryparam}" uri = URI(string) http = Net::HTTP::new(uri.host, uri.port) if ssl http.ssl_version = :TLSv1 http.use_ssl = true http.verify_mode = if ssl_verify then OpenSSL::SSL::VERIFY_PEER else OpenSSL::SSL::VERIFY_NONE end http.ca_file = ssl_ca_file end @handlers ||= Hash.new @handlers[name] = { "http" => http, "uri" => uri, "buffer" => [], "buffer_flushed" => Time.now.to_i, "buffer_size" => buffer_size, "buffer_max_age" => buffer_max_age, "proxy_mode" => proxy_mode } @logger.info("#{name}: successfully initialized handler: hostname: #{hostname}, port: #{port}, database: #{database}, uri: #{uri.to_s}, username: #{username}, buffer_size: #{buffer_size}, buffer_max_age: #{buffer_max_age}") return config end
description()
click to toggle source
# File lib/sensu/extensions/influxdb.rb, line 17 def description "Transforms and sends metrics to InfluxDB" end
flush_buffer(handler)
click to toggle source
# File lib/sensu/extensions/influxdb.rb, line 180 def flush_buffer(handler) send_to_influxdb(handler) handler["buffer"] = [] handler["buffer_flushed"] = Time.now.to_i end
is_number?(input)
click to toggle source
# File lib/sensu/extensions/influxdb.rb, line 207 def is_number?(input) true if Integer(input) rescue false end
name()
click to toggle source
# File lib/sensu/extensions/influxdb.rb, line 13 def name @@extension_name end
post_init()
click to toggle source
# File lib/sensu/extensions/influxdb.rb, line 82 def post_init main_config = create_config(@@extension_name, @@default_config) if settings[name].key?(:additional_handlers) settings[name][:additional_handlers].each {|h| create_config(h, main_config)} end end
run(event) { |'ok', 0| ... }
click to toggle source
# File lib/sensu/extensions/influxdb.rb, line 89 def run(event) begin @logger.debug("event: #{event}") event = JSON.parse(event) handler = @handlers[@@extension_name] unless event["check"]["handlers"].nil? event["check"]["handlers"].each {|x| if @handlers.has_key?(x) @logger.debug("found additional handler: #{x}") handler = @handlers[x] break end } end if buffer_too_old?(handler) or buffer_too_big?(handler) flush_buffer(handler) end output = event["check"]["output"] if not handler["proxy_mode"] client_tags = event["client"]["tags"] || Hash.new check_tags = event["check"]["tags"] || Hash.new tags = create_tags(client_tags.merge(check_tags)) end output.split(/\r\n|\n/).each do |point| if not handler["proxy_mode"] measurement, field_value, timestamp = point.split(/\s+/) if not is_number?(timestamp) @logger.debug("invalid timestamp, skipping line in event #{event}") next end # Get event output tags if measurement.include?('eventtags') only_measurement, tagstub = measurement.split('.eventtags.',2) event_tags = Hash.new() tagstub.split('.').each_slice(2) do |key, value| event_tags[key] = value end measurement = only_measurement tags = create_tags(client_tags.merge(check_tags).merge(event_tags)) end point = "#{measurement}#{tags} value=#{field_value} #{timestamp}" end handler["buffer"].push(point) @logger.debug("#{@@extension_name}: stored point in buffer (#{handler['buffer'].length}/#{handler['buffer_size']})") end yield 'ok', 0 rescue => e @logger.error("#{@@extension_name}: unable to handle event #{event} - #{e}") yield 'error', 2 end end
send_to_influxdb(handler)
click to toggle source
# File lib/sensu/extensions/influxdb.rb, line 170 def send_to_influxdb(handler) payload = handler["buffer"].join("\n") request = Net::HTTP::Post.new(handler['uri'].request_uri) request.body = payload @logger.debug("#{@@extension_name}: writing payload #{payload} to endpoint #{handler['uri'].to_s}") response = handler["http"].request(request) @logger.debug("#{@@extension_name}: influxdb http response code = #{response.code}, body = #{response.body}") end
validate_config(name, config)
click to toggle source
# File lib/sensu/extensions/influxdb.rb, line 195 def validate_config(name, config) if config.nil? raise ArgumentError, "no configuration for #{name} provided. exiting..." end ["hostname", "database"].each do |required_setting| if config.has_key?(required_setting) raise ArgumentError, "required setting #{required_setting} not provided to extension. this should be provided as json element with key #{name}. exiting..." end end end