class Fluent::Plugin::BeatsInput
Constants
- DEFAULT_PARSER
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_beats.rb, line 56 def configure(conf) compat_parameters_convert(conf, :parser) super if !@tag && !@metadata_as_tag raise Fluent::ConfigError, "'tag' or 'metadata_as_tag' parameter is required on beats input" end @port += fluentd_worker_id @time_parser = time_parser_create(format: '%Y-%m-%dT%H:%M:%S.%N%z') @parser_config = conf.elements('parse').first if @parser_config @parser = parser_create end @connections = [] end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/in_beats.rb, line 52 def multi_workers_ready? true end
run()
click to toggle source
# File lib/fluent/plugin/in_beats.rb, line 94 def run until @lumberjack.closed? conn = @lumberjack.accept next if conn.nil? if @max_connections @connections.reject! { |c| c.closed? } if @connections.size >= @max_connections conn.close # close for retry on beats side sleep 1 next end @connections << conn end @thread_pool.post { begin conn.run { |map| tag = @metadata_as_tag ? map['@metadata']['beat'] : @tag if map.has_key?('message') && @parser_config message = map.delete('message') @parser.parse(message) { |time, record| record['@timestamp'] = map['@timestamp'] map.each { |k, v| record[k] = v } router.emit(tag, time, record) } next end router.emit(tag, @time_parser.parse(map['@timestamp']), map) } rescue => e log.error "unexpected error", :error => e.to_s log.error_backtrace end } end end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_beats.rb, line 87 def shutdown @lumberjack.close rescue nil @thread_pool.shutdown super end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_beats.rb, line 75 def start super @lumberjack = Lumberjack::Beats::Server.new( :address => @bind, :port => @port, :ssl => @use_ssl, :ssl_certificate => @ssl_certificate, :ssl_key => @ssl_key, :ssl_key_passphrase => @ssl_key_passphrase, :ssl_version => @ssl_version, :ssl_ciphers => @ssl_ciphers) # Lumberjack::Beats::Server depends on normal accept so we need to launch thread for each connection. # TODO: Re-implement Beats Server with Cool.io for resource control @thread_pool = Concurrent::CachedThreadPool.new(:idletime => 15) # idletime setting is based on logstash beats input thread_create(:in_beats_runner, &method(:run)) end