class Fluent::RtailOutput

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_rtail.rb, line 18
def initialize
  super
  require 'multi_json'
  require 'socket'
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_rtail.rb, line 24
def configure(conf)
  super
  @socket = UDPSocket.new
  @socket.connect(@host, @port)
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_rtail.rb, line 30
def format(tag, time, record)
  [tag, time, record].to_msgpack
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_rtail.rb, line 34
 def write(chunk)
  chunk = chunk.to_enum(:msgpack_each)

  chunk.each do |tag, time, record|
    log_stream_id = get_log_stream_id(tag, time, record)
    next unless log_stream_id

    content = get_content(tag, time, record)
    next unless content

    send_message(log_stream_id, time, content)
  end
rescue => e
  log.error e.message
  log.error_backtrace e.backtrace
end

Private Instance Methods

get_content(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_rtail.rb, line 67
def get_content(tag, time, record)
  if @use_record_as_content
    record
  else
    content = record[@content_key]

    if content
      content = content.to_s
    else
      log.warn("'#{@content_key}' key does not exist: #{[tag, time, record].inspect}")
    end

    content
  end
end
get_log_stream_id(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_rtail.rb, line 53
def get_log_stream_id(tag, time, record)
  if @use_tag_as_id
    tag
  else
    log_stream_id = record[@id_key]

    unless log_stream_id
      log.warn("'#{@id_key}' key does not exist: #{[tag, time, record].inspect}")
    end

    log_stream_id
  end
end
send_message(log_stream_id, time, content) click to toggle source
# File lib/fluent/plugin/out_rtail.rb, line 83
def send_message(log_stream_id, time, content)
  message = {
    'id' => log_stream_id,
    'timestamp' => (time.to_f * 1000).to_i,
    'content' => content,
  }

  message = MultiJson.dump(message)
  @socket.send(message, 0)
end