class Fluent::Logger::FluentLogger
Constants
- BUFFER_LIMIT
- RECONNECT_WAIT
- RECONNECT_WAIT_INCR_RATE
- RECONNECT_WAIT_MAX
- RECONNECT_WAIT_MAX_COUNT
Attributes
limit[RW]
log_reconnect_error_threshold[RW]
logger[RW]
Public Class Methods
new(tag_prefix = nil, *args)
click to toggle source
Calls superclass method
# File lib/fluent/logger/fluent_logger.rb, line 64 def initialize(tag_prefix = nil, *args) super() options = { :host => 'localhost', :port => 24224, :use_nonblock => false } case args.first when String, Symbol # backward compatible options[:host] = args[0] options[:port] = args[1] if args[1] when Hash options.update args.first end @tag_prefix = tag_prefix @host = options[:host] @port = options[:port] @socket_path = options[:socket_path] @nanosecond_precision = options[:nanosecond_precision] @use_nonblock = options[:use_nonblock] @tls_options = options[:tls_options] @factory = MessagePack::Factory.new if @nanosecond_precision @factory.register_type(EventTime::TYPE, EventTime) end @packer = @factory.packer @mon = Monitor.new @pending = nil @connect_error_history = [] @limit = options[:buffer_limit] || BUFFER_LIMIT @log_reconnect_error_threshold = options[:log_reconnect_error_threshold] || RECONNECT_WAIT_MAX_COUNT @buffer_overflow_handler = options[:buffer_overflow_handler] if options[:logger] @logger = options[:logger] else @logger = ::Logger.new(STDERR) if options[:debug] @logger.level = ::Logger::DEBUG else @logger.level = ::Logger::INFO end end @wait_writeable = true @wait_writeable = options[:wait_writeable] if options.key?(:wait_writeable) @last_error = {} begin connect! rescue => e set_last_error(e) @logger.error "Failed to connect fluentd: #{$!}" @logger.error "Connection will be retried." end at_exit { close } end
Public Instance Methods
close()
click to toggle source
# File lib/fluent/logger/fluent_logger.rb, line 147 def close @mon.synchronize { if @pending begin send_data(@pending) rescue => e set_last_error(e) @logger.error("FluentLogger: Can't send logs to #{connection_string}: #{$!}") call_buffer_overflow_handler(@pending) end end @con.close if connect? @con = nil @pending = nil } self end
connect?()
click to toggle source
# File lib/fluent/logger/fluent_logger.rb, line 165 def connect? @con && !@con.closed? end
connection_string()
click to toggle source
# File lib/fluent/logger/fluent_logger.rb, line 204 def connection_string @socket_path ? "#{@socket_path}" : "#{@host}:#{@port}" end
create_socket!()
click to toggle source
# File lib/fluent/logger/fluent_logger.rb, line 169 def create_socket! if @socket_path @con = UNIXSocket.new(@socket_path) else @con = TCPSocket.new(@host, @port) if @tls_options context = OpenSSL::SSL::SSLContext.new if @tls_options[:insecure] context.verify_mode = OpenSSL::SSL::VERIFY_NONE else context.set_params({}) context.verify_mode = OpenSSL::SSL::VERIFY_PEER cert_store = OpenSSL::X509::Store.new if @tls_options[:use_default_ca] cert_store.set_default_paths end if @tls_options[:ca] cert_store.add_file(@tls_options[:ca]) end context.cert = OpenSSL::X509::Certificate.new(File.read(@tls_options[:cert])) if @tls_options[:cert] context.key = OpenSSL::PKey::read(File.read(@tls_options[:key]), @tls_options[:key_passphrase]) if @tls_options[:key] context.ciphers = @tls_options[:ciphers] || "ALL:!aNULL:!eNULL:!SSLv2".freeze context.cert_store = cert_store end set_tls_version(context) @con = OpenSSL::SSL::SSLSocket.new(@con, context) @con.sync_close = true @con.connect end @con end end
last_error()
click to toggle source
# File lib/fluent/logger/fluent_logger.rb, line 133 def last_error @last_error[Thread.current.object_id] end
pending_bytesize()
click to toggle source
# File lib/fluent/logger/fluent_logger.rb, line 208 def pending_bytesize if @pending @pending.bytesize else 0 end end
post_with_time(tag, map, time)
click to toggle source
# File lib/fluent/logger/fluent_logger.rb, line 137 def post_with_time(tag, map, time) @logger.debug { "event: #{tag} #{map.to_json}" rescue nil } if @logger.debug? tag = "#{@tag_prefix}.#{tag}" if @tag_prefix if @nanosecond_precision && time.is_a?(Time) write [tag, EventTime.new(time.to_i, time.nsec), map] else write [tag, time.to_i, map] end end
Private Instance Methods
call_buffer_overflow_handler(pending)
click to toggle source
# File lib/fluent/logger/fluent_logger.rb, line 356 def call_buffer_overflow_handler(pending) if @buffer_overflow_handler @buffer_overflow_handler.call(pending) end rescue Exception => e @logger.error("FluentLogger: Can't call buffer overflow handler: #{$!}") end
connect!()
click to toggle source
# File lib/fluent/logger/fluent_logger.rb, line 337 def connect! create_socket! @con.sync = true @connect_error_history.clear @logged_reconnect_error = false rescue => e @connect_error_history << Time.now.to_i if @connect_error_history.size > RECONNECT_WAIT_MAX_COUNT @connect_error_history.shift end if @connect_error_history.size >= @log_reconnect_error_threshold && !@logged_reconnect_error log_reconnect_error @logged_reconnect_error = true end raise e end
log_reconnect_error()
click to toggle source
# File lib/fluent/logger/fluent_logger.rb, line 364 def log_reconnect_error @logger.error("FluentLogger: Can't connect to #{connection_string}(#{@connect_error_history.size} retried): #{$!}") end
send_data(data)
click to toggle source
# File lib/fluent/logger/fluent_logger.rb, line 296 def send_data(data) unless connect? connect! end if @use_nonblock send_data_nonblock(data) else _, ws = IO.select([], [@con]) Thread.handle_interrupt(::Timeout::Error => :never) do # block timeout error during IO#write ws.first.write(data) end end #while true # puts "sending #{data.length} bytes" # if data.length > 32*1024 # n = @con.syswrite(data[0..32*1024]) # else # n = @con.syswrite(data) # end # puts "sent #{n}" # if n >= data.bytesize # break # end # data = data[n..-1] #end end
send_data_nonblock(data)
click to toggle source
# File lib/fluent/logger/fluent_logger.rb, line 324 def send_data_nonblock(data) written = @con.write_nonblock(data) remaining = data.bytesize - written while remaining > 0 len = @con.write_nonblock(data.byteslice(written, remaining)) remaining -= len written += len end written end
set_last_error(e)
click to toggle source
# File lib/fluent/logger/fluent_logger.rb, line 368 def set_last_error(e) # TODO: Check non GVL env @last_error[Thread.current.object_id] = e end
set_tls_version(context)
click to toggle source
# File lib/fluent/logger/fluent_logger.rb, line 218 def set_tls_version(context) if context.respond_to?(:min_version=) ver = @tls_options[:version] || OpenSSL::SSL::TLS1_2_VERSION context.min_version = ver context.max_version = ver else context.ssl_version = @tls_options[:version] || :'TLSv1_2' end end
suppress_sec()
click to toggle source
# File lib/fluent/logger/fluent_logger.rb, line 240 def suppress_sec if (sz = @connect_error_history.size) < RECONNECT_WAIT_MAX_COUNT RECONNECT_WAIT * (RECONNECT_WAIT_INCR_RATE ** (sz - 1)) else RECONNECT_WAIT_MAX end end
to_msgpack(msg)
click to toggle source
# File lib/fluent/logger/fluent_logger.rb, line 228 def to_msgpack(msg) @mon.synchronize { res = begin @packer.pack(msg).to_s rescue NoMethodError JSON.parse(JSON.generate(msg)).to_msgpack end @packer.clear res } end
wait_writeable?(e)
click to toggle source
# File lib/fluent/logger/fluent_logger.rb, line 373 def wait_writeable?(e) if e.instance_of?(IO::EAGAINWaitWritable) @wait_writeable else true end end
write(msg)
click to toggle source
# File lib/fluent/logger/fluent_logger.rb, line 248 def write(msg) begin data = to_msgpack(msg) rescue => e set_last_error(e) @logger.error("FluentLogger: Can't convert to msgpack: #{msg.inspect}: #{$!}") return false end @mon.synchronize { if @pending @pending << data else @pending = data end # suppress reconnection burst if !@connect_error_history.empty? && pending_bytesize <= @limit if Time.now.to_i - @connect_error_history.last < suppress_sec return false end end begin written = send_data(@pending) if @pending.bytesize != written raise "Actual written data size(#{written} bytes) is different from the received data size(#{@pending.bytesize} bytes)." end @pending = nil true rescue => e unless wait_writeable?(e) raise e end set_last_error(e) if pending_bytesize > @limit @logger.error("FluentLogger: Can't send logs to #{connection_string}: #{$!}") call_buffer_overflow_handler(@pending) @pending = nil end @con.close if connect? @con = nil false end } end