class Fluent::RtailOutput
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_rtail.rb, line 18 def initialize super require 'multi_json' require 'socket' end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_rtail.rb, line 24 def configure(conf) super @socket = UDPSocket.new @socket.connect(@host, @port) end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_rtail.rb, line 30 def format(tag, time, record) [tag, time, record].to_msgpack end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_rtail.rb, line 34 def write(chunk) chunk = chunk.to_enum(:msgpack_each) chunk.each do |tag, time, record| log_stream_id = get_log_stream_id(tag, time, record) next unless log_stream_id content = get_content(tag, time, record) next unless content send_message(log_stream_id, time, content) end rescue => e log.error e.message log.error_backtrace e.backtrace end
Private Instance Methods
get_content(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_rtail.rb, line 67 def get_content(tag, time, record) if @use_record_as_content record else content = record[@content_key] if content content = content.to_s else log.warn("'#{@content_key}' key does not exist: #{[tag, time, record].inspect}") end content end end
get_log_stream_id(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_rtail.rb, line 53 def get_log_stream_id(tag, time, record) if @use_tag_as_id tag else log_stream_id = record[@id_key] unless log_stream_id log.warn("'#{@id_key}' key does not exist: #{[tag, time, record].inspect}") end log_stream_id end end
send_message(log_stream_id, time, content)
click to toggle source
# File lib/fluent/plugin/out_rtail.rb, line 83 def send_message(log_stream_id, time, content) message = { 'id' => log_stream_id, 'timestamp' => (time.to_f * 1000).to_i, 'content' => content, } message = MultiJson.dump(message) @socket.send(message, 0) end