class Fluent::RouterOutput

Constants

MIN_REAPER_INTERVAL
ROUTER_STATE_FILE

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_router.rb, line 29
def initialize
  super
  @outputs = {}
  @last_used_time = {}
  @last_reaper_run = Time.now
  @semaphore = Mutex.new
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_router.rb, line 37
def configure(conf)
  super
  log "Starting the out_router plugin"
  @inactivity_timeout = conf['inactivity_timeout'].to_i
  @output_config = conf.elements.select { |e| e.name == 'config' }.first
  mark_used @output_config
  filename = File.join(File.dirname(Fluent::DEFAULT_CONFIG_PATH),
                       ROUTER_STATE_FILE)
  @state_serializer = ConsistentArrayStorage.new(filename)
  output_keys = @state_serializer.load

  # Restarts all output plugins for each stored key after a crash
  # or a stop.
  # This way we ensure every needed buffered output plugin, like
  # the s3 plugin, will start after a crash and flush its buffer
  # files to s3.
  log "Reading file " + filename +
    " and found output keys: " + output_keys.to_json
  output_keys.each do |key|
    start_output(key)
  end
end
emit(tag, es, chain) click to toggle source
# File lib/fluent/plugin/out_router.rb, line 60
def emit(tag, es, chain)
  # This is the router logic. Splitting tag on character '.'
  # uses the value after '.' as the router key.
  key = tag.split(/\./)[1]
  out = @outputs[key]
  unless out
    out = start_output(key)
  end
  out.emit tag, es, chain
  now = Time.now
  @last_used_time[key] = now
  # Removed call to reaper because of two issues:
  # https://github.com/campanja/fluent-output-router/issues/3
  # https://github.com/campanja/fluent-output-router/issues/4
  #run_reaper(now)
end

Private Instance Methods

log(string) click to toggle source
# File lib/fluent/plugin/out_router.rb, line 128
def log(string)
  $log.info "out_router: " + string
end
mark_used(conf) click to toggle source
# File lib/fluent/plugin/out_router.rb, line 109
def mark_used(conf)
  conf.used = conf.keys
  conf.elements.each { |e| mark_used e }
end
run_reaper(now) click to toggle source

This is the reaper that is used to avoids resource leaks It will stop any active output plugin that does not get any input The code in run_reaper is synchroned with a mutex to avoid any concurrency problems if the input plugin is multithreaded for example: the scribe input plugin is multitreaded by default

# File lib/fluent/plugin/out_router.rb, line 137
def run_reaper(now)
  if now - @last_reaper_run > MIN_REAPER_INTERVAL then
    @last_reaper_run = now
    @semaphore.synchronize do
      @last_used_time.each {|key, stored_time|
        if now - stored_time > @inactivity_timeout then
          stop_output(key)
        end }
    end
  end
end
start_output(key) click to toggle source

start_output is synchroned with a mutex to avoid any concurrency problems if the input plugin is multithreaded for example: the scribe input plugin is multitreaded by default

# File lib/fluent/plugin/out_router.rb, line 82
def start_output(key)
  @semaphore.synchronize do
    unless @outputs[key]
      log "Starting a new output of type " + @output_config['type'] +
        " for key #{key}"
      @outputs[key] = Fluent::Plugin.new_output @output_config['type']
      @last_used_time[key] = Time.now
      config = @output_config.clone
      template! config, :key => key
      out = @outputs[key]
      out.configure config
      out.start
      @state_serializer.store(@outputs.keys)
    end
    @outputs[key]
  end
end
stop_output(key) click to toggle source
# File lib/fluent/plugin/out_router.rb, line 100
def stop_output(key)
  log "Stopping output of type " + @output_config['type'] +
    " for key #{key} due to inactivity (no data comming in)"
  out = @outputs.delete(key)
  @last_used_time.delete(key)
  out.shutdown
  @state_serializer.store(@outputs.keys)
end
template!(conf, keys) click to toggle source
# File lib/fluent/plugin/out_router.rb, line 114
def template!(conf, keys)
  conf.each do |k,v|
    case v
    when Hash
      conf[k] = template v, keys
    when String
      conf[k] = ERB.new(v).result(binding)
    else
      $log.error "Template error #{v.inspect}"
      $log.error_backtrace
    end
  end
end