class Fluent::Agent

Agent is a resource unit who manages emittable plugins

Next step: `fluentd/root_agent.rb` Next step: `fluentd/label.rb`

Attributes

context[R]
error_collector[R]
event_router[R]
filters[R]
log[R]
outputs[R]

Public Class Methods

new(opts = {}) click to toggle source
Calls superclass method Fluent::Configurable.new
# File lib/fluent/agent.rb, line 29
def initialize(opts = {})
  super()

  @context = nil
  @outputs = []
  @filters = []
  @started_outputs = []
  @started_filters = []

  @log = Engine.log
  @event_router = EventRouter.new(NoMatchMatch.new(log), self)
  @error_collector = nil
end

Public Instance Methods

add_filter(type, pattern, conf) click to toggle source
# File lib/fluent/agent.rb, line 134
def add_filter(type, pattern, conf)
  log.info "adding filter#{@context.nil? ? '' : " in #{@context}"}", pattern: pattern, type: type

  filter = Plugin.new_filter(type)
  filter.router = @event_router
  filter.configure(conf)
  @filters << filter
  @event_router.add_rule(pattern, filter)

  filter
end
add_match(type, pattern, conf) click to toggle source
# File lib/fluent/agent.rb, line 122
def add_match(type, pattern, conf)
  log.info "adding match#{@context.nil? ? '' : " in #{@context}"}", pattern: pattern, type: type

  output = Plugin.new_output(type)
  output.router = @event_router
  output.configure(conf)
  @outputs << output
  @event_router.add_rule(pattern, output)

  output
end
configure(conf) click to toggle source
Calls superclass method Fluent::Configurable#configure
# File lib/fluent/agent.rb, line 50
def configure(conf)
  super

  # initialize <match> and <filter> elements
  conf.elements.select { |e| e.name == 'filter' || e.name == 'match' }.each { |e|
    pattern = e.arg.empty? ? '**' : e.arg
    type = e['@type'] || e['type']
    if e.name == 'filter'
      add_filter(type, pattern, e)
    else
      add_match(type, pattern, e)
    end
  }
end
emit_error_event(tag, time, record, error) click to toggle source

For handling invalid record

# File lib/fluent/agent.rb, line 147
def emit_error_event(tag, time, record, error)
end
flush!() click to toggle source
# File lib/fluent/agent.rb, line 103
def flush!
  flush_recursive(@outputs)
end
flush_recursive(array) click to toggle source
# File lib/fluent/agent.rb, line 107
def flush_recursive(array)
  array.each { |o|
    begin
      if o.is_a?(BufferedOutput)
        o.force_flush
      elsif o.is_a?(MultiOutput)
        flush_recursive(o.outputs)
      end
    rescue => e
      log.debug "error while force flushing", :error_class => e.class, :error => e
      log.debug_backtrace
    end
  }
end
handle_emits_error(tag, es, error) click to toggle source
# File lib/fluent/agent.rb, line 150
def handle_emits_error(tag, es, error)
end
shutdown() click to toggle source
# File lib/fluent/agent.rb, line 77
def shutdown
  @started_filters.map { |f|
    Thread.new do
      begin
        f.shutdown
      rescue => e
        log.warn "unexpected error while shutting down filter plugins", :plugin => f.class, :plugin_id => f.plugin_id, :error_class => e.class, :error => e
        log.warn_backtrace
      end
    end
  }.each { |t| t.join }

  # Output plugin as filter emits records at shutdown so emit problem still exist.
  # This problem will be resolved after actual filter mechanizm.
  @started_outputs.map { |o|
    Thread.new do
      begin
        o.shutdown
      rescue => e
        log.warn "unexpected error while shutting down output plugins", :plugin => o.class, :plugin_id => o.plugin_id, :error_class => e.class, :error => e
        log.warn_backtrace
      end
    end
  }.each { |t| t.join }
end
start() click to toggle source
# File lib/fluent/agent.rb, line 65
def start
  @outputs.each { |o|
    o.start
    @started_outputs << o
  }

  @filters.each { |f|
    f.start
    @started_filters << f
  }
end