class Fluent::NSQInput

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_nsq.rb, line 26
def initialize
  super
  require 'cool.io'
  require 'nsq'
  require 'yajl'
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_nsq.rb, line 33
def configure(conf)
  super

  fail ConfigError, 'Missing nsqlookupd' unless @nsqlookupd
  fail ConfigError, 'Missing topic' unless @topic
  fail ConfigError, 'Missing channel' unless @channel
  fail ConfigError, 'in_flight needs to be bigger than 0' unless @in_flight > 0
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_nsq.rb, line 55
def shutdown
  super
  @running = false
  @consumer.terminate
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_nsq.rb, line 42
def start
  super
  lookupds = @nsqlookupd.split(',')
  @consumer = Nsq::Consumer.new(
    nsqlookupd: lookupds,
    topic: @topic,
    channel: @channel,
    max_in_flight: @in_flight
  )
  @running = true
  @thread = Thread.new(&method(:consume))
end

Private Instance Methods

consume() click to toggle source
# File lib/fluent/plugin/in_nsq.rb, line 62
def consume
  while @running
    consume_one
  end
end
consume_one() click to toggle source
# File lib/fluent/plugin/in_nsq.rb, line 68
def consume_one
  msg = @consumer.pop
  record = Yajl.load(msg.body.force_encoding('UTF-8'))
  record_tag = tag_for_record(record)
  record_time = time_for_record(record, msg)
  Engine.emit(record_tag, record_time, record)
  msg.finish
rescue => e
  log.warn("nsq: #{e}")
  msg.requeue if msg
end
tag_for_record(record) click to toggle source
# File lib/fluent/plugin/in_nsq.rb, line 80
def tag_for_record(record)
  case @tag_source
  when :static
    @tag
  when :key
    record[@tag]
  when :topic
    @topic
  end
end
time_for_record(record, msg) click to toggle source
# File lib/fluent/plugin/in_nsq.rb, line 91
def time_for_record(record, msg)
  if @time_key
    record[@time_key]
  else
    msg.timestamp.to_i
  end
end