class Fluent::NSQOutput

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_nsq.rb, line 10
def initialize
  super
  require 'nsq'
  require 'yajl'
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_nsq.rb, line 16
def configure(conf)
  super

  fail ConfigError, 'Missing nsqlookupd' unless @nsqlookupd
  fail ConfigError, 'Missing topic' unless @topic
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_nsq.rb, line 37
def format(tag, time, record)
  [tag, time, record].to_msgpack
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_nsq.rb, line 32
def shutdown
  super
  @producer.terminate
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_nsq.rb, line 23
def start
  super
  lookupds = @nsqlookupd.split(',')
  @producer = Nsq::Producer.new(
    nsqlookupd: lookupds,
    topic: @topic
  )
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_nsq.rb, line 41
def write(chunk)
  return if chunk.empty?

  chunk.msgpack_each do |tag, time, record|
    next unless record.is_a? Hash
    # TODO get rid of this extra copy
    tagged_record = record.merge(
      :_key => tag,
      :_ts => time,
      :'@timestamp' => Time.at(time).to_datetime.to_s  # kibana/elasticsearch friendly
    )
    begin
      @producer.write(Yajl.dump(tagged_record))
    rescue => e
      log.warn("nsq: #{e}")
    end
  end
end