class Fluent::KestrelInput

Attributes

kestrel[R]

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_kestrel.rb, line 15
def initialize
  super
  require 'kestrel'
  require 'time'
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_kestrel.rb, line 21
def configure(conf)
  super

  unless @queue && @host
    raise ConfigError, "[kestrel config error]:'host' and 'queue' option is required."
  end
  unless @tag
    raise ConfigError, "[kestrel config error]:'tag' option is required."
  end
  @timef = TimeFormatter.new(@time_format, @localtime)
  @options = {
    :raw => @raw,
    :peek => @peek,
    :timeout => @timeout
  }.freeze
end
run() click to toggle source
# File lib/fluent/plugin/in_kestrel.rb, line 50
def run
  loop do
    data = @kestrel.get(@queue, @options)
    unless data
      sleep 1
    else
      Engine.emit(@tag, Engine.now, data)
    end
  end
rescue
  $log.error "unexpected error.", :error=>$!.to_s
  $log.error_backtrace
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_kestrel.rb, line 45
def shutdown
  @thread.join
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_kestrel.rb, line 38
def start
  super

  @kestrel = Kestrel::Client.new(@host + ":" + @port.to_s)
  @thread = Thread.new(&method(:run))
end