class Fluent::RouterOutput
Constants
- MIN_REAPER_INTERVAL
- ROUTER_STATE_FILE
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_router.rb, line 29 def initialize super @outputs = {} @last_used_time = {} @last_reaper_run = Time.now @semaphore = Mutex.new end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_router.rb, line 37 def configure(conf) super log "Starting the out_router plugin" @inactivity_timeout = conf['inactivity_timeout'].to_i @output_config = conf.elements.select { |e| e.name == 'config' }.first mark_used @output_config filename = File.join(File.dirname(Fluent::DEFAULT_CONFIG_PATH), ROUTER_STATE_FILE) @state_serializer = ConsistentArrayStorage.new(filename) output_keys = @state_serializer.load # Restarts all output plugins for each stored key after a crash # or a stop. # This way we ensure every needed buffered output plugin, like # the s3 plugin, will start after a crash and flush its buffer # files to s3. log "Reading file " + filename + " and found output keys: " + output_keys.to_json output_keys.each do |key| start_output(key) end end
emit(tag, es, chain)
click to toggle source
# File lib/fluent/plugin/out_router.rb, line 60 def emit(tag, es, chain) # This is the router logic. Splitting tag on character '.' # uses the value after '.' as the router key. key = tag.split(/\./)[1] out = @outputs[key] unless out out = start_output(key) end out.emit tag, es, chain now = Time.now @last_used_time[key] = now # Removed call to reaper because of two issues: # https://github.com/campanja/fluent-output-router/issues/3 # https://github.com/campanja/fluent-output-router/issues/4 #run_reaper(now) end
Private Instance Methods
log(string)
click to toggle source
# File lib/fluent/plugin/out_router.rb, line 128 def log(string) $log.info "out_router: " + string end
mark_used(conf)
click to toggle source
# File lib/fluent/plugin/out_router.rb, line 109 def mark_used(conf) conf.used = conf.keys conf.elements.each { |e| mark_used e } end
run_reaper(now)
click to toggle source
This is the reaper that is used to avoids resource leaks It will stop any active output plugin that does not get any input The code in run_reaper
is synchroned with a mutex to avoid any concurrency problems if the input plugin is multithreaded for example: the scribe input plugin is multitreaded by default
# File lib/fluent/plugin/out_router.rb, line 137 def run_reaper(now) if now - @last_reaper_run > MIN_REAPER_INTERVAL then @last_reaper_run = now @semaphore.synchronize do @last_used_time.each {|key, stored_time| if now - stored_time > @inactivity_timeout then stop_output(key) end } end end end
start_output(key)
click to toggle source
start_output
is synchroned with a mutex to avoid any concurrency problems if the input plugin is multithreaded for example: the scribe input plugin is multitreaded by default
# File lib/fluent/plugin/out_router.rb, line 82 def start_output(key) @semaphore.synchronize do unless @outputs[key] log "Starting a new output of type " + @output_config['type'] + " for key #{key}" @outputs[key] = Fluent::Plugin.new_output @output_config['type'] @last_used_time[key] = Time.now config = @output_config.clone template! config, :key => key out = @outputs[key] out.configure config out.start @state_serializer.store(@outputs.keys) end @outputs[key] end end
stop_output(key)
click to toggle source
# File lib/fluent/plugin/out_router.rb, line 100 def stop_output(key) log "Stopping output of type " + @output_config['type'] + " for key #{key} due to inactivity (no data comming in)" out = @outputs.delete(key) @last_used_time.delete(key) out.shutdown @state_serializer.store(@outputs.keys) end
template!(conf, keys)
click to toggle source
# File lib/fluent/plugin/out_router.rb, line 114 def template!(conf, keys) conf.each do |k,v| case v when Hash conf[k] = template v, keys when String conf[k] = ERB.new(v).result(binding) else $log.error "Template error #{v.inspect}" $log.error_backtrace end end end