class Fluent::KestrelOutput
Attributes
kestrel[R]
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_kestrel.rb, line 17 def initialize super require 'kestrel' require 'time' end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_kestrel.rb, line 23 def configure(conf) super unless @queue && @host raise ConfigError, "[kestrel config error]:'host' and 'queue' option is required." end @timef = TimeFormatter.new(@time_format, @localtime) @f_separator = case conf['field_separator'] when 'SPACE' then ' ' when 'COMMA' then ',' else "\t" end if @remove_prefix @remove_prefix_string = @remove_prefix + '.' @remove_prefix_length = @remove_prefix_string.length end end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_kestrel.rb, line 53 def format(tag, time, record) if tag == @remove_prefix or @remove_prefix and (tag[0, @remove_prefix_length] == @remove_prefix_string and tag.length > @remove_prefix_length) tag = tag[@remove_prefix_length..-1] end time_str = if @output_include_time @timef.format(time) + @f_separator else '' end tag_str = if @output_include_tag tag + @f_separator else '' end [tag_str, time_str, record].to_msgpack end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_kestrel.rb, line 49 def shutdown super end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_kestrel.rb, line 43 def start super @kestrel = Kestrel::Client.new(@host + ":" + @port.to_s) end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_kestrel.rb, line 72 def write(chunk) chunk.open { |io| begin MessagePack::Unpacker.new(io).each{ |tag, time, record| data = "#{time}#{tag}#{record.to_json}" @kestrel.set(@queue, data, ttl=@ttl, raw=@raw) } rescue EOFError # EOFError always occured when reached end of chunk. end } end