class Fluent::KestrelOutput

Attributes

kestrel[R]

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_kestrel.rb, line 17
def initialize
  super
  require 'kestrel'
  require 'time'
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_kestrel.rb, line 23
def configure(conf)
  super

  unless @queue && @host
    raise ConfigError, "[kestrel config error]:'host' and 'queue' option is required."
  end
  @timef = TimeFormatter.new(@time_format, @localtime)
  @f_separator = case conf['field_separator']
                 when 'SPACE' then ' '
                 when 'COMMA' then ','
                 else "\t"
                 end

  if @remove_prefix
    @remove_prefix_string = @remove_prefix + '.'
    @remove_prefix_length = @remove_prefix_string.length
  end

end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_kestrel.rb, line 53
def format(tag, time, record)

  if tag == @remove_prefix or @remove_prefix and (tag[0, @remove_prefix_length] == @remove_prefix_string and tag.length > @remove_prefix_length)
    tag = tag[@remove_prefix_length..-1]
  end

  time_str = if @output_include_time
               @timef.format(time) + @f_separator
             else
               ''
             end
  tag_str = if @output_include_tag
              tag + @f_separator
            else
              ''
            end
  [tag_str, time_str, record].to_msgpack
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_kestrel.rb, line 49
def shutdown
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_kestrel.rb, line 43
def start
  super

  @kestrel = Kestrel::Client.new(@host + ":" + @port.to_s)
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_kestrel.rb, line 72
def write(chunk)
  chunk.open { |io|
    begin
      MessagePack::Unpacker.new(io).each{ |tag, time, record|
        data = "#{time}#{tag}#{record.to_json}"

        @kestrel.set(@queue, data, ttl=@ttl, raw=@raw)
      }
    rescue EOFError
      # EOFError always occured when reached end of chunk.
    end
  }
end