class Fluent::Logger::FluentLogger

Constants

BUFFER_LIMIT
RECONNECT_WAIT
RECONNECT_WAIT_INCR_RATE
RECONNECT_WAIT_MAX
RECONNECT_WAIT_MAX_COUNT

Attributes

limit[RW]
log_reconnect_error_threshold[RW]
logger[RW]

Public Class Methods

new(tag_prefix = nil, *args) click to toggle source
Calls superclass method
# File lib/fluent/logger/fluent_logger.rb, line 64
def initialize(tag_prefix = nil, *args)
  super()

  options = {
    :host => 'localhost',
    :port => 24224,
    :use_nonblock => false
  }

  case args.first
  when String, Symbol
    # backward compatible
    options[:host] = args[0]
    options[:port] = args[1] if args[1]
  when Hash
    options.update args.first
  end

  @tag_prefix = tag_prefix
  @host = options[:host]
  @port = options[:port]
  @socket_path = options[:socket_path]
  @nanosecond_precision = options[:nanosecond_precision]
  @use_nonblock = options[:use_nonblock]
  @tls_options = options[:tls_options]

  @factory = MessagePack::Factory.new
  if @nanosecond_precision
    @factory.register_type(EventTime::TYPE, EventTime)
  end
  @packer = @factory.packer

  @mon = Monitor.new
  @pending = nil
  @connect_error_history = []

  @limit = options[:buffer_limit] || BUFFER_LIMIT
  @log_reconnect_error_threshold = options[:log_reconnect_error_threshold] || RECONNECT_WAIT_MAX_COUNT

  @buffer_overflow_handler = options[:buffer_overflow_handler]
  if options[:logger]
    @logger = options[:logger]
  else
    @logger = ::Logger.new(STDERR)
    if options[:debug]
      @logger.level = ::Logger::DEBUG
    else
      @logger.level = ::Logger::INFO
    end
  end

  @wait_writeable = true
  @wait_writeable = options[:wait_writeable] if options.key?(:wait_writeable)

  @last_error = {}

  begin
    connect!
  rescue => e
    set_last_error(e)
    @logger.error "Failed to connect fluentd: #{$!}"
    @logger.error "Connection will be retried."
  end

  at_exit { close }
end

Public Instance Methods

close() click to toggle source
# File lib/fluent/logger/fluent_logger.rb, line 147
def close
  @mon.synchronize {
    if @pending
      begin
        send_data(@pending)
      rescue => e
        set_last_error(e)
        @logger.error("FluentLogger: Can't send logs to #{connection_string}: #{$!}")
        call_buffer_overflow_handler(@pending)
      end
    end
    @con.close if connect?
    @con = nil
    @pending = nil
  }
  self
end
connect?() click to toggle source
# File lib/fluent/logger/fluent_logger.rb, line 165
def connect?
  @con && !@con.closed?
end
connection_string() click to toggle source
# File lib/fluent/logger/fluent_logger.rb, line 204
def connection_string
  @socket_path ? "#{@socket_path}" : "#{@host}:#{@port}"
end
create_socket!() click to toggle source
# File lib/fluent/logger/fluent_logger.rb, line 169
def create_socket!
  if @socket_path
    @con = UNIXSocket.new(@socket_path)
  else
    @con = TCPSocket.new(@host, @port)
    if @tls_options
      context = OpenSSL::SSL::SSLContext.new
      if @tls_options[:insecure]
        context.verify_mode = OpenSSL::SSL::VERIFY_NONE
      else
        context.set_params({})
        context.verify_mode = OpenSSL::SSL::VERIFY_PEER
        cert_store = OpenSSL::X509::Store.new
        if @tls_options[:use_default_ca]
          cert_store.set_default_paths
        end
        if @tls_options[:ca]
          cert_store.add_file(@tls_options[:ca])
        end

        context.cert = OpenSSL::X509::Certificate.new(File.read(@tls_options[:cert])) if @tls_options[:cert]
        context.key = OpenSSL::PKey::read(File.read(@tls_options[:key]), @tls_options[:key_passphrase]) if @tls_options[:key]
        context.ciphers = @tls_options[:ciphers] || "ALL:!aNULL:!eNULL:!SSLv2".freeze
        context.cert_store = cert_store
      end
      set_tls_version(context)

      @con = OpenSSL::SSL::SSLSocket.new(@con, context)
      @con.sync_close = true
      @con.connect
    end
    @con
  end
end
last_error() click to toggle source
# File lib/fluent/logger/fluent_logger.rb, line 133
def last_error
  @last_error[Thread.current.object_id]
end
pending_bytesize() click to toggle source
# File lib/fluent/logger/fluent_logger.rb, line 208
def pending_bytesize
  if @pending
    @pending.bytesize
  else
    0
  end
end
post_with_time(tag, map, time) click to toggle source
# File lib/fluent/logger/fluent_logger.rb, line 137
def post_with_time(tag, map, time)
  @logger.debug { "event: #{tag} #{map.to_json}" rescue nil } if @logger.debug?
  tag = "#{@tag_prefix}.#{tag}" if @tag_prefix
  if @nanosecond_precision && time.is_a?(Time)
    write [tag, EventTime.new(time.to_i, time.nsec), map]
  else
    write [tag, time.to_i, map]
  end
end

Private Instance Methods

call_buffer_overflow_handler(pending) click to toggle source
# File lib/fluent/logger/fluent_logger.rb, line 356
def call_buffer_overflow_handler(pending)
  if @buffer_overflow_handler
    @buffer_overflow_handler.call(pending)
  end
rescue Exception => e
  @logger.error("FluentLogger: Can't call buffer overflow handler: #{$!}")
end
connect!() click to toggle source
# File lib/fluent/logger/fluent_logger.rb, line 337
def connect!
  create_socket!
  @con.sync = true
  @connect_error_history.clear
  @logged_reconnect_error = false
rescue => e
  @connect_error_history << Time.now.to_i
  if @connect_error_history.size > RECONNECT_WAIT_MAX_COUNT
    @connect_error_history.shift
  end

  if @connect_error_history.size >= @log_reconnect_error_threshold && !@logged_reconnect_error
    log_reconnect_error
    @logged_reconnect_error = true
  end

  raise e
end
log_reconnect_error() click to toggle source
# File lib/fluent/logger/fluent_logger.rb, line 364
def log_reconnect_error
  @logger.error("FluentLogger: Can't connect to #{connection_string}(#{@connect_error_history.size} retried): #{$!}")
end
send_data(data) click to toggle source
# File lib/fluent/logger/fluent_logger.rb, line 296
def send_data(data)
  unless connect?
    connect!
  end
  if @use_nonblock
    send_data_nonblock(data)
  else
    _, ws = IO.select([], [@con])
    Thread.handle_interrupt(::Timeout::Error => :never) do
      # block timeout error during IO#write
      ws.first.write(data)
    end
  end
  #while true
  #  puts "sending #{data.length} bytes"
  #  if data.length > 32*1024
  #    n = @con.syswrite(data[0..32*1024])
  #  else
  #    n = @con.syswrite(data)
  #  end
  #  puts "sent #{n}"
  #  if n >= data.bytesize
  #    break
  #  end
  #  data = data[n..-1]
  #end
end
send_data_nonblock(data) click to toggle source
# File lib/fluent/logger/fluent_logger.rb, line 324
def send_data_nonblock(data)
  written = @con.write_nonblock(data)
  remaining = data.bytesize - written

  while remaining > 0
    len = @con.write_nonblock(data.byteslice(written, remaining))
    remaining -= len
    written += len
  end

  written
end
set_last_error(e) click to toggle source
# File lib/fluent/logger/fluent_logger.rb, line 368
def set_last_error(e)
  # TODO: Check non GVL env
  @last_error[Thread.current.object_id] = e
end
set_tls_version(context) click to toggle source
# File lib/fluent/logger/fluent_logger.rb, line 218
def set_tls_version(context)
  if context.respond_to?(:min_version=)
    ver = @tls_options[:version] || OpenSSL::SSL::TLS1_2_VERSION
    context.min_version = ver
    context.max_version = ver
  else
    context.ssl_version = @tls_options[:version] || :'TLSv1_2'
  end
end
suppress_sec() click to toggle source
# File lib/fluent/logger/fluent_logger.rb, line 240
def suppress_sec
  if (sz = @connect_error_history.size) < RECONNECT_WAIT_MAX_COUNT
    RECONNECT_WAIT * (RECONNECT_WAIT_INCR_RATE ** (sz - 1))
  else
    RECONNECT_WAIT_MAX
  end
end
to_msgpack(msg) click to toggle source
# File lib/fluent/logger/fluent_logger.rb, line 228
def to_msgpack(msg)
  @mon.synchronize {
    res = begin
            @packer.pack(msg).to_s
          rescue NoMethodError
            JSON.parse(JSON.generate(msg)).to_msgpack
          end
    @packer.clear
    res
  }
end
wait_writeable?(e) click to toggle source
# File lib/fluent/logger/fluent_logger.rb, line 373
def wait_writeable?(e)
  if e.instance_of?(IO::EAGAINWaitWritable)
    @wait_writeable
  else
    true
  end
end
write(msg) click to toggle source
# File lib/fluent/logger/fluent_logger.rb, line 248
def write(msg)
  begin
    data = to_msgpack(msg)
  rescue => e
    set_last_error(e)
    @logger.error("FluentLogger: Can't convert to msgpack: #{msg.inspect}: #{$!}")
    return false
  end

  @mon.synchronize {
    if @pending
      @pending << data
    else
      @pending = data
    end

    # suppress reconnection burst
    if !@connect_error_history.empty? && pending_bytesize <= @limit
      if Time.now.to_i - @connect_error_history.last < suppress_sec
        return false
      end
    end

    begin
      written = send_data(@pending)
      if @pending.bytesize != written
        raise "Actual written data size(#{written} bytes) is different from the received data size(#{@pending.bytesize} bytes)."
      end

      @pending = nil
      true
    rescue => e
      unless wait_writeable?(e)
        raise e
      end
      set_last_error(e)
      if pending_bytesize > @limit
        @logger.error("FluentLogger: Can't send logs to #{connection_string}: #{$!}")
        call_buffer_overflow_handler(@pending)
        @pending = nil
      end
      @con.close if connect?
      @con = nil
      false
    end
  }
end