class LogStash::Agent

Collect logs, ship them out.

Attributes

config[R]
filters[R]
inputs[R]
outputs[R]

Public Class Methods

new(config) click to toggle source
# File lib/logstash/agent.rb, line 17
def initialize(config)
  log_to(STDERR)

  @config = config
  @outputs = []
  @inputs = []
  @filters = []
  # Config should have:
  # - list of logs to monitor
  #   - log config
  # - where to ship to
end

Public Instance Methods

log_to(target) click to toggle source
# File lib/logstash/agent.rb, line 31
def log_to(target)
  @logger = LogStash::Logger.new(target)
end
register() click to toggle source

Register any event handlers with EventMachine Technically, this agent could listen for anything (files, sockets, amqp, stomp, etc).

# File lib/logstash/agent.rb, line 39
def register
  # TODO(sissel): warn when no inputs and no outputs are defined.
  # TODO(sissel): Refactor this madness into a config lib
  
  if (["inputs", "outputs"] & @config.keys).length == 0
    $stderr.puts "No inputs or no outputs configured. This probably isn't what you want."
  end

  # Register input and output stuff
  inputs = @config["inputs"]
  inputs.each do |value|
    # If 'url' is an array, then inputs is a hash and the key is the type
    if inputs.is_a?(Hash)
      type, urls = value
    else
      raise "config error, no type for url #{urls.inspect}"
    end

    # url could be a string or an array.
    urls = [urls] if !urls.is_a?(Array)

    urls.each do |url|
      @logger.debug("Using input #{url} of type #{type}")
      input = LogStash::Inputs.from_url(url, type) { |event| receive(event) }
      input.logger = @logger
      input.register
      @inputs << input
    end
  end # each input

  if @config.include?("filters")
    filters = @config["filters"]
    filters.collect { |x| x.to_a[0] }.each do |filter|
      name, value = filter
      @logger.debug("Using filter #{name} => #{value.inspect}")
      filter = LogStash::Filters.from_name(name, value)
      filter.logger = @logger
      filter.register
      @filters << filter
    end # each filter
  end # if we have filters

  @config["outputs"].each do |url|
    @logger.debug("Using output #{url}")
    output = LogStash::Outputs.from_url(url)
    output.logger = @logger
    output.register
    @outputs << output
  end # each output

  # Register any signal handlers
  register_signal_handler
end
register_signal_handler() click to toggle source
# File lib/logstash/agent.rb, line 137
def register_signal_handler
  @sigchannel = EventMachine::Channel.new
  Signal.trap("USR1") do
    @sigchannel.push(:USR1)
  end

  Signal.trap("INT") do
    @sigchannel.push(:INT)
  end

  @sigchannel.subscribe do |msg|
    case msg
    when :USR1
      counts = Hash.new { |h,k| h[k] = 0 }
      ObjectSpace.each_object do |obj|
        counts[obj.class] += 1
      end

      @logger.info("SIGUSR1 received. Dumping state")
      @logger.info("#{self.class.name} config")
      @logger.info(["  Inputs:", @inputs])
      @logger.info(["  Filters:", @filters])
      @logger.info(["  Outputs:", @outputs])

      @logger.info("Dumping counts of objects by class")
      counts.sort { |a,b| a[1] <=> b[1] or a[0] <=> b[0] }.each do |key, value|
        @logger.info("Class: [#{value}] #{key}")
      end
    when :INT
      @logger.warn("SIGINT received. Shutting down.")
      EventMachine::stop_event_loop
      # TODO(sissel): Should have input/output/filter register shutdown
      # hooks.
    end # case msg
  end # @sigchannel.subscribe
end
run() { || ... } click to toggle source
# File lib/logstash/agent.rb, line 94
def run(&block)
  EventMachine.run do
    self.register
    yield if block_given?
  end # EventMachine.run
end
stop() click to toggle source
# File lib/logstash/agent.rb, line 102
def stop
  # TODO(sissel): Stop inputs, fluch outputs, wait for finish,
  # then stop the event loop
  EventMachine.stop_event_loop

  # EventMachine has no default way to indicate a 'stopping' state.
  $EVENTMACHINE_STOPPING = true
end

Protected Instance Methods

filter(event) click to toggle source
# File lib/logstash/agent.rb, line 112
def filter(event)
  @filters.each do |f|
    f.filter(event)
    break if event.cancelled?
  end
end
output(event) click to toggle source
# File lib/logstash/agent.rb, line 120
def output(event)
  @outputs.each do |o|
    o.receive(event)
  end # each output
end
receive(event) click to toggle source

Process a message

# File lib/logstash/agent.rb, line 128
def receive(event)
  filter(event)

  if !event.cancelled?
    output(event)
  end
end