class LogStash::Outputs::Fluentd

Constants

VERSION

Public Instance Methods

close() click to toggle source
# File lib/logstash/outputs/fluentd.rb, line 87
def close
  buffer_flush(final: true)
end
convert_from_event_to_msgpack(event) click to toggle source
# File lib/logstash/outputs/fluentd.rb, line 36
def convert_from_event_to_msgpack(event)
  entry = [(event.timestamp.to_i || Time.now.to_i), event.to_hash]
  begin
    entry.to_msgpack
  rescue ArgumentError, NoMethodError
    LogStash::Json.load(LogStash::Json.dump(entry)).to_msgpack
  end
end
flush(events, teardown = false) click to toggle source
# File lib/logstash/outputs/fluentd.rb, line 64
def flush(events, teardown = false)
  @logger.debug "flushing #{events} events"
  return if events.size < 1

  data = [@tag, events.join].to_msgpack

  @logger.debug "sending chunk #{data.bytesize} bytes"
  connect.write data

  @logger.debug "done"
end
on_flush_error(e) click to toggle source
# File lib/logstash/outputs/fluentd.rb, line 77
def on_flush_error(e)
  @logger.warn "flush error #{e.class}: #{e.message}"
end
on_full_buffer_error(opts={}) click to toggle source
# File lib/logstash/outputs/fluentd.rb, line 82
def on_full_buffer_error(opts={})
  @logger.warn "buffer exceeds limits: pending:#{opts[:pending_count]}, outgoing:#{opts[:outgoing_count]}"
end
receive(event) click to toggle source
# File lib/logstash/outputs/fluentd.rb, line 55
def receive(event)
  @logger.debug "receive a event"

  buffer_receive(convert_from_event_to_msgpack(event))

  @logger.debug "buffered a event"
end
register() click to toggle source
# File lib/logstash/outputs/fluentd.rb, line 46
def register
  buffer_initialize(
    max_items: @flush_size,
    max_interval: @flush_interval,
    logger: @logger
  )
end

Private Instance Methods

connect() click to toggle source
# File lib/logstash/outputs/fluentd.rb, line 92
def connect
  Stud::try do
    return TCPSocket.new(@host, @port)
  end
end