class Flapjack::CLI::Receiver
Public Class Methods
new(global_options, options)
click to toggle source
# File lib/flapjack/cli/receiver.rb, line 15 def initialize(global_options, options) @global_options = global_options @options = options if @global_options[:'force-utf8'] Encoding.default_external = 'UTF-8' Encoding.default_internal = 'UTF-8' end @config = Flapjack::Configuration.new @config.load(global_options[:config]) @config_env = @config.all if @config_env.nil? || @config_env.empty? unless 'mirror'.eql?(@options[:type]) exit_now! "No config data found in '#{global_options[:config]}'" end end unless 'mirror'.eql?(@options[:type]) Flapjack::RedisProxy.config = @config.for_redis Zermelo.redis = Flapjack.redis end @redis_options = @config.for_redis end
Public Instance Methods
json()
click to toggle source
# File lib/flapjack/cli/receiver.rb, line 53 def json json_feeder(:from => @options[:from]) end
mirror()
click to toggle source
# File lib/flapjack/cli/receiver.rb, line 57 def mirror if (@options[:dest].nil? || @options[:dest].strip.empty?) && @redis_options.nil? exit_now! "No destination redis URL passed, and none configured" end mirror_receive(:source => @options[:source], :dest => @options[:dest] || @redis_options, :include => @options[:include], :all => @options[:all], :follow => @options[:follow], :last => @options[:last], :time => @options[:time]) end
start()
click to toggle source
# File lib/flapjack/cli/receiver.rb, line 42 def start puts "#{@options[:type]}-receiver starting..." begin main(:fifo => @options[:fifo], :type => @options[:type]) rescue Exception => e p e.message puts e.backtrace.join("\n") end puts " done." end
Private Instance Methods
json_feeder(opts = {})
click to toggle source
# File lib/flapjack/cli/receiver.rb, line 179 def json_feeder(opts = {}) require 'json/stream' input = if opts[:from] File.open(opts[:from]) # Explodes if file does not exist. elsif !'java'.eql?(RUBY_PLATFORM) && STDIN.tty? # tty check isn't working under JRuby, assume STDIN is OK to use # https://github.com/jruby/jruby/issues/1332 exit_now! "No file provided, and STDIN is from terminal! Exiting..." else STDIN end # Sit and churn through the input stream until a valid JSON blob has been assembled. # This handles both the case of a process sending a single JSON and then exiting # (eg. cat foo.json | bin/flapjack receiver json) *and* a longer-running process spitting # out events (eg. /usr/bin/slow-event-feed | bin/flapjack receiver json) # # @data is a stack, but @stack is used by the Parser class parser = JSON::Stream::Parser.new do start_document do @data = [] @keys = [] @result = nil end end_document { # interfering with json-stream's "one object per stream" model, # but it errors without this @state = :start_document } start_object do @data.push({}) end end_object do node = @data.pop if @data.size > 0 top = @data.last case top when Hash top[@keys.pop] = node when Array top << node end else errors = Flapjack::Data::Event.validation_errors_for_hash(node) if errors.empty? Flapjack::Data::Event.push('events', node) puts "Enqueued event data, #{node.inspect}" else puts "Invalid event data received, #{errors.join(', ')} #{node.inspect}" end end end start_array do @data.push([]) end end_array do node = @data.pop if @data.size > 0 top = @data.last case top when Hash top[@keys.pop] = node when Array top << node end end end key do |k| @keys << k end value do |v| top = @data.last case top when Hash top[@keys.pop] = v when Array top << v else @data << v end end end while data = input.read(4096) parser << data end puts "Done." end
main(opts)
click to toggle source
# File lib/flapjack/cli/receiver.rb, line 170 def main(opts) fifo = opts[:fifo] while true process_input(:fifo => fifo, :type => opts[:type]) puts "Whoops with the fifo, restarting main loop in 10 seconds" sleep 10 end end
mirror_get_archive_keys_stats(opts = {})
click to toggle source
# File lib/flapjack/cli/receiver.rb, line 379 def mirror_get_archive_keys_stats(opts = {}) source_redis = opts[:source] source_redis.smembers("known_events_archive_keys").sort.collect do |eak| {:name => eak, :size => source_redis.llen(eak)} end end
mirror_receive(opts)
click to toggle source
# File lib/flapjack/cli/receiver.rb, line 278 def mirror_receive(opts) unless opts[:follow] || opts[:all] exit_now! "one or both of --follow or --all is required" end include_re = nil unless opts[:include].nil? || opts[:include].strip.empty? begin include_re = Regexp.new(opts[:include].strip) rescue RegexpError exit_now! "could not parse include Regexp: #{opts[:include].strip}" end end source_addr = opts[:source] source_redis = Redis.new(:url => source_addr, :driver => :hiredis) dest_addr = opts[:dest] dest_redis = case dest_addr when Hash Redis.new(dest_addr.merge(:driver => :hiredis)) when String Redis.new(:url => dest_addr, :driver => :hiredis) else exit_now! "could not understand destination Redis config" end Flapjack::RedisProxy.config = dest_redis Zermelo.redis = Flapjack.redis archives = mirror_get_archive_keys_stats(:source => source_redis) raise "found no archives!" if archives.empty? puts "found archives: #{archives.inspect}" # each archive bucket is a redis list that is written # with brpoplpush, that is newest items are added to the left (head) # of the list, so oldest events are to be found at the tail of the list. # # the index of these archives, in the 'archives' array, also stores the # redis key names for each bucket in oldest to newest events_sent = 0 case when opts[:all] archive_idx = 0 cursor = -1 when opts[:last], opts[:time] raise "Sorry, unimplemented" else # wait for the next event to be archived, so point the cursor at a non-existant # slot in the list, the one before the 0'th archive_idx = archives.size - 1 cursor = -1 - archives[-1][:size] end archive_key = archives[archive_idx][:name] puts archive_key loop do event_json = source_redis.lindex(archive_key, cursor) if event_json event, errors = Flapjack::Data::Event.parse_and_validate(event_json) if !errors.nil? && !errors.empty? Flapjack.logger.error { error_str = errors.nil? ? '' : errors.join(', ') "Invalid event data received, #{error_str} #{event.inspect}" } elsif (include_re.nil? || (include_re === "#{event['entity']}:#{event['check']}")) Flapjack::Data::Event.add(event) events_sent += 1 print "#{events_sent} " if events_sent % 1000 == 0 end cursor -= 1 next end archives = mirror_get_archive_keys_stats(:source => source_redis).select {|a| a[:size] > 0 } if archives.empty? sleep 1 next end archive_idx = archives.index {|a| a[:name] == archive_key } archive_idx = archive_idx.nil? ? 0 : (archive_idx + 1) if archives[archive_idx] archive_key = archives[archive_idx][:name] puts archive_key cursor = -1 else break unless opts[:follow] sleep 1 end end end
process_input(opts)
click to toggle source
# File lib/flapjack/cli/receiver.rb, line 77 def process_input(opts) config_rec = case opts[:type] when /nagios/ @config_env['nagios-receiver'] || {} when /nsca/ @config_env['nsca-receiver'] || {} else raise "Unknown receiver type" end opt_fifo = (opts[:fifo] || config_rec['fifo'] || '/var/cache/nagios3/event_stream.fifo') unless File.exist?(opt_fifo) raise "No fifo (named pipe) file found at #{opt_fifo}" end unless File.pipe?(opt_fifo) raise "The file at #{opt_fifo} is not a named pipe, try using mkfifo to make one" end unless File.readable?(opt_fifo) raise "The fifo (named pipe) at #{opt_fifo} is unreadable" end fifo = File.new(opt_fifo) begin while line = fifo.gets skip unless line split_line = line.split("\t") object_type, timestamp, entity, check, state, check_time, check_latency, check_output, check_perfdata, check_long_output = [nil] * 10 case opts[:type] when /nagios/ object_type, timestamp, entity, check, state, check_time, check_latency, check_output, check_perfdata, check_long_output = split_line case when split_line.length < 9 puts "ERROR - rejecting this line as it doesn't split into at least 9 tab separated strings: [#{line}]" next when timestamp !~ /^\d+$/ puts "ERROR - rejecting this line as second string doesn't look like a timestamp: [#{line}]" next when (object_type != '[HOSTPERFDATA]') && (object_type != '[SERVICEPERFDATA]') puts "ERROR - rejecting this line as first string is neither '[HOSTPERFDATA]' nor '[SERVICEPERFDATA]': [#{line}]" next end when /nsca/ timestamp, passivecheck = split_line split_passive = passivecheck.split(";") timestamp = timestamp.delete('[]') check_long_output = '' object_type, entity, check, state, check_output = split_passive case when (split_line.length < 2 || split_passive.length < 5) puts "ERROR - rejecting this line; illegal format: [#{line}]" next when (timestamp !~ /^\d+$/) puts "ERROR - rejecting this line; timestamp look like a timestamp: [#{line}]" next when (object_type != 'PROCESS_SERVICE_CHECK_RESULT') puts "ERROR - rejecting this line; identifier 'PROCESS_SERVICE_CHECK_RESULT' is missing: [#{line}]" next end end puts "#{object_type}, #{timestamp}, #{entity}, #{check}, #{state}, #{check_output}, #{check_long_output}" state = 'ok' if state.downcase == 'up' state = 'critical' if state.downcase == 'down' details = check_long_output ? check_long_output.gsub(/\\n/, "\n") : nil event = { 'entity' => entity, 'check' => check, 'type' => 'service', 'state' => state, 'summary' => check_output, 'details' => details, 'perfdata' => check_perfdata, 'time' => timestamp, } Flapjack::Data::Event.push('events', event) end rescue Redis::CannotConnectError puts "Error, unable to to connect to the redis server (#{$!})" end end
redis()
click to toggle source
# File lib/flapjack/cli/receiver.rb, line 73 def redis @redis ||= Redis.new(@redis_options) end