class Flapjack::CLI::Receiver

Public Class Methods

new(global_options, options) click to toggle source
# File lib/flapjack/cli/receiver.rb, line 15
def initialize(global_options, options)
  @global_options = global_options
  @options = options

  if @global_options[:'force-utf8']
    Encoding.default_external = 'UTF-8'
    Encoding.default_internal = 'UTF-8'
  end

  @config = Flapjack::Configuration.new
  @config.load(global_options[:config])
  @config_env = @config.all

  if @config_env.nil? || @config_env.empty?
    unless 'mirror'.eql?(@options[:type])
      exit_now! "No config data found in '#{global_options[:config]}'"
    end
  end

  unless 'mirror'.eql?(@options[:type])
    Flapjack::RedisProxy.config = @config.for_redis
    Zermelo.redis = Flapjack.redis
  end

  @redis_options = @config.for_redis
end

Public Instance Methods

json() click to toggle source
# File lib/flapjack/cli/receiver.rb, line 53
def json
  json_feeder(:from => @options[:from])
end
mirror() click to toggle source
# File lib/flapjack/cli/receiver.rb, line 57
def mirror
  if (@options[:dest].nil? || @options[:dest].strip.empty?) &&
    @redis_options.nil?

    exit_now! "No destination redis URL passed, and none configured"
  end

  mirror_receive(:source => @options[:source],
    :dest => @options[:dest] || @redis_options,
    :include => @options[:include], :all => @options[:all],
    :follow => @options[:follow], :last => @options[:last],
    :time => @options[:time])
end
start() click to toggle source
# File lib/flapjack/cli/receiver.rb, line 42
def start
  puts "#{@options[:type]}-receiver starting..."
  begin
    main(:fifo => @options[:fifo], :type => @options[:type])
  rescue Exception => e
    p e.message
    puts e.backtrace.join("\n")
  end
  puts " done."
end

Private Instance Methods

json_feeder(opts = {}) click to toggle source
# File lib/flapjack/cli/receiver.rb, line 179
def json_feeder(opts = {})
  require 'json/stream'

  input = if opts[:from]
    File.open(opts[:from]) # Explodes if file does not exist.
  elsif !'java'.eql?(RUBY_PLATFORM) && STDIN.tty?
    # tty check isn't working under JRuby, assume STDIN is OK to use
    # https://github.com/jruby/jruby/issues/1332
    exit_now! "No file provided, and STDIN is from terminal! Exiting..."
  else
    STDIN
  end

  # Sit and churn through the input stream until a valid JSON blob has been assembled.
  # This handles both the case of a process sending a single JSON and then exiting
  # (eg. cat foo.json | bin/flapjack receiver json) *and* a longer-running process spitting
  # out events (eg. /usr/bin/slow-event-feed | bin/flapjack receiver json)
  #
  # @data is a stack, but @stack is used by the Parser class
  parser = JSON::Stream::Parser.new do
    start_document do
      @data = []
      @keys = []
      @result = nil
    end

    end_document {
      # interfering with json-stream's "one object per stream" model,
      # but it errors without this
      @state = :start_document
    }

    start_object do
      @data.push({})
    end

    end_object do
      node = @data.pop

      if @data.size > 0
        top = @data.last
        case top
        when Hash
          top[@keys.pop] = node
        when Array
          top << node
        end
      else
        errors = Flapjack::Data::Event.validation_errors_for_hash(node)
        if errors.empty?
          Flapjack::Data::Event.push('events', node)
          puts "Enqueued event data, #{node.inspect}"
        else
          puts "Invalid event data received, #{errors.join(', ')} #{node.inspect}"
        end
      end
    end

    start_array do
      @data.push([])
    end

    end_array do
      node = @data.pop
      if @data.size > 0
        top = @data.last
        case top
        when Hash
          top[@keys.pop] = node
        when Array
          top << node
        end
      end
    end

    key do |k|
      @keys << k
    end

    value do |v|
      top = @data.last
      case top
      when Hash
        top[@keys.pop] = v
      when Array
        top << v
      else
        @data << v
      end
    end
  end

  while data = input.read(4096)
    parser << data
  end

  puts "Done."
end
main(opts) click to toggle source
# File lib/flapjack/cli/receiver.rb, line 170
def main(opts)
  fifo = opts[:fifo]
  while true
    process_input(:fifo => fifo, :type => opts[:type])
    puts "Whoops with the fifo, restarting main loop in 10 seconds"
    sleep 10
  end
end
mirror_get_archive_keys_stats(opts = {}) click to toggle source
# File lib/flapjack/cli/receiver.rb, line 379
def mirror_get_archive_keys_stats(opts = {})
  source_redis = opts[:source]
  source_redis.smembers("known_events_archive_keys").sort.collect do |eak|
    {:name => eak, :size => source_redis.llen(eak)}
  end
end
mirror_receive(opts) click to toggle source
# File lib/flapjack/cli/receiver.rb, line 278
def mirror_receive(opts)
  unless opts[:follow] || opts[:all]
    exit_now! "one or both of --follow or --all is required"
  end

  include_re = nil
  unless opts[:include].nil? || opts[:include].strip.empty?
    begin
      include_re = Regexp.new(opts[:include].strip)
    rescue RegexpError
      exit_now! "could not parse include Regexp: #{opts[:include].strip}"
    end
  end

  source_addr = opts[:source]
  source_redis = Redis.new(:url => source_addr, :driver => :hiredis)

  dest_addr  = opts[:dest]
  dest_redis = case dest_addr
  when Hash
    Redis.new(dest_addr.merge(:driver => :hiredis))
  when String
    Redis.new(:url => dest_addr, :driver => :hiredis)
  else
    exit_now! "could not understand destination Redis config"
  end

  Flapjack::RedisProxy.config = dest_redis
  Zermelo.redis = Flapjack.redis

  archives = mirror_get_archive_keys_stats(:source => source_redis)
  raise "found no archives!" if archives.empty?

  puts "found archives: #{archives.inspect}"

  # each archive bucket is a redis list that is written
  # with brpoplpush, that is newest items are added to the left (head)
  # of the list, so oldest events are to be found at the tail of the list.
  #
  # the index of these archives, in the 'archives' array, also stores the
  # redis key names for each bucket in oldest to newest
  events_sent = 0
  case
  when opts[:all]
    archive_idx = 0
    cursor      = -1
  when opts[:last], opts[:time]
    raise "Sorry, unimplemented"
  else
    # wait for the next event to be archived, so point the cursor at a non-existant
    # slot in the list, the one before the 0'th
    archive_idx = archives.size - 1
    cursor      = -1 - archives[-1][:size]
  end

  archive_key = archives[archive_idx][:name]
  puts archive_key

  loop do
    event_json = source_redis.lindex(archive_key, cursor)
    if event_json
      event, errors = Flapjack::Data::Event.parse_and_validate(event_json)

      if !errors.nil? && !errors.empty?
        Flapjack.logger.error {
          error_str = errors.nil? ? '' : errors.join(', ')
          "Invalid event data received, #{error_str} #{event.inspect}"
        }
      elsif (include_re.nil? ||
        (include_re === "#{event['entity']}:#{event['check']}"))

        Flapjack::Data::Event.add(event)
        events_sent += 1
        print "#{events_sent} " if events_sent % 1000 == 0
      end
      cursor -= 1
      next
    end

    archives = mirror_get_archive_keys_stats(:source => source_redis).select {|a|
      a[:size] > 0
    }

    if archives.empty?
      sleep 1
      next
    end

    archive_idx = archives.index {|a| a[:name] == archive_key }
    archive_idx = archive_idx.nil? ? 0 : (archive_idx + 1)
    if archives[archive_idx]
      archive_key = archives[archive_idx][:name]
      puts archive_key
      cursor = -1
    else
      break unless opts[:follow]
      sleep 1
    end
  end
end
process_input(opts) click to toggle source
# File lib/flapjack/cli/receiver.rb, line 77
def process_input(opts)
  config_rec = case opts[:type]
  when /nagios/
    @config_env['nagios-receiver'] || {}
  when /nsca/
    @config_env['nsca-receiver'] || {}
  else
    raise "Unknown receiver type"
  end

  opt_fifo = (opts[:fifo] || config_rec['fifo'] || '/var/cache/nagios3/event_stream.fifo')
  unless File.exist?(opt_fifo)
    raise "No fifo (named pipe) file found at #{opt_fifo}"
  end
  unless File.pipe?(opt_fifo)
    raise "The file at #{opt_fifo} is not a named pipe, try using mkfifo to make one"
  end
  unless File.readable?(opt_fifo)
    raise "The fifo (named pipe) at #{opt_fifo} is unreadable"
  end

  fifo  = File.new(opt_fifo)
  begin
    while line = fifo.gets
      skip unless line
      split_line = line.split("\t")

      object_type, timestamp, entity, check, state, check_time,
        check_latency, check_output, check_perfdata, check_long_output =
         [nil] * 10

      case opts[:type]
      when /nagios/
        object_type, timestamp, entity, check, state, check_time,
          check_latency, check_output, check_perfdata, check_long_output = split_line

        case
        when split_line.length < 9
          puts "ERROR - rejecting this line as it doesn't split into at least 9 tab separated strings: [#{line}]"
          next
        when timestamp !~ /^\d+$/
          puts "ERROR - rejecting this line as second string doesn't look like a timestamp: [#{line}]"
          next
        when (object_type != '[HOSTPERFDATA]') && (object_type != '[SERVICEPERFDATA]')
          puts "ERROR - rejecting this line as first string is neither '[HOSTPERFDATA]' nor '[SERVICEPERFDATA]': [#{line}]"
          next
        end

      when /nsca/

        timestamp, passivecheck = split_line
        split_passive = passivecheck.split(";")
        timestamp = timestamp.delete('[]')

        check_long_output = ''
        object_type, entity, check, state, check_output = split_passive

        case
        when (split_line.length < 2 || split_passive.length < 5)
          puts "ERROR - rejecting this line; illegal format: [#{line}]"
          next
        when (timestamp !~ /^\d+$/)
          puts "ERROR - rejecting this line; timestamp look like a timestamp: [#{line}]"
          next
        when (object_type != 'PROCESS_SERVICE_CHECK_RESULT')
          puts "ERROR - rejecting this line; identifier 'PROCESS_SERVICE_CHECK_RESULT' is missing: [#{line}]"
          next
        end

      end

      puts "#{object_type}, #{timestamp}, #{entity}, #{check}, #{state}, #{check_output}, #{check_long_output}"

      state = 'ok'       if state.downcase == 'up'
      state = 'critical' if state.downcase == 'down'
      details = check_long_output ? check_long_output.gsub(/\\n/, "\n") : nil
      event = {
        'entity'    => entity,
        'check'     => check,
        'type'      => 'service',
        'state'     => state,
        'summary'   => check_output,
        'details'   => details,
        'perfdata'  => check_perfdata,
        'time'      => timestamp,
      }
      Flapjack::Data::Event.push('events', event)
    end
  rescue Redis::CannotConnectError
    puts "Error, unable to to connect to the redis server (#{$!})"
  end
end
redis() click to toggle source
# File lib/flapjack/cli/receiver.rb, line 73
def redis
  @redis ||= Redis.new(@redis_options)
end