class Anschel::Main

Public Instance Methods

agent() click to toggle source
# File lib/anschel/main.rb, line 39
def agent
  log.info \
    event: 'hello',
    version: VERSION,
    options: options.to_hash,
    num_cpus: num_cpus

  input, filter, output, store, stats, ts = \
    nil, nil, nil, nil, nil, nil, nil

  begin
    # Allow for //-style inline comments in JSON
    raw_config = File.read(options.config).gsub(/^\s*\/\/ .*/, '')

    config = JrJackson::Json.load \
      raw_config, symbolize_keys: true

    setup_log4j config[:log4j]

    store  = Store.new config[:store], log

    stats  = Stats.new config[:stats_interval], log

    filter = Filter.new config[:filter], stats, log

    output = Output.new config[:output], stats, log

    input  = Input.new \
      config[:input], config[:queue_size], stats, log, store[:input]

    stats.create 'event'
    stats.get 'event'

    ts = []

    stats_port = config[:stats_port] || 3345
    ts << stats_endpoint(stats, stats_port, log)

    ts += num_cpus.times.map do
      Thread.new do
        loop do
          raw = input.shift.encode 'UTF-8', \
            invalid: :replace, undef: :replace, replace: '?'

          begin
            event = JrJackson::Json.load raw, symbolize_keys: true
          rescue JrJackson::ParseError
            log_event = {
              event: 'main-input-error',
              reason: 'could not parse event',
              remediation: 'skipping'
            }
            log_event[:raw_event] = event if log.debug?
            log.error log_event
            next
          end

          output.push filter.apply(event)
          stats.inc 'event'
        end
      end
    end

  rescue Exception => e
    log.fatal \
      event: 'exception',
      exception: e.inspect,
      class: e.class,
      message: e.message,
      backtrace: e.backtrace
    bye output, input, store, log, :error
    exit 2
  end

  log.info event: 'all-systems-clear'

  trap('SIGINT') do
    bye output, input, store, log
    exit
  end

  ts.map &:join
end
art() click to toggle source
# File lib/anschel/main.rb, line 27
def art
  puts "\n%s\n" % ART
end
version() click to toggle source
# File lib/anschel/main.rb, line 21
def version
  puts VERSION
end

Private Instance Methods

bye(output, input, store, log, level=:info) click to toggle source
# File lib/anschel/main.rb, line 125
def bye output, input, store, log, level=:info
  if input && output
    output.stop ; input.stop
    store[:input] = input.leftovers
    log.warn event: 'flush-leftovers'
  end
  log.send level, event: 'goodbye', version: VERSION
end
stats_endpoint(stats, stats_port, log) click to toggle source
# File lib/anschel/main.rb, line 135
def stats_endpoint stats, stats_port, log
  ok = '200'
  json = { 'Content-Type' => 'application/json' }

  app = lambda do |_|
    hash_stats = { version: VERSION }.merge stats.read
    json_stats = JSON.pretty_generate hash_stats
    [ ok, json, [ json_stats ] ]
  end

  t = Thread.new do
    Rack::Handler::WEBrick.run app, \
      Port: stats_port,
      Logger: log,
      AccessLog: []
  end

  log.info event: 'serving-metrics', port: stats_port
  return t
end