class Fluent::NSQOutput
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_nsq.rb, line 10 def initialize super require 'nsq' require 'yajl' end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_nsq.rb, line 16 def configure(conf) super fail ConfigError, 'Missing nsqlookupd' unless @nsqlookupd fail ConfigError, 'Missing topic' unless @topic end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_nsq.rb, line 37 def format(tag, time, record) [tag, time, record].to_msgpack end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_nsq.rb, line 32 def shutdown super @producer.terminate end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_nsq.rb, line 23 def start super lookupds = @nsqlookupd.split(',') @producer = Nsq::Producer.new( nsqlookupd: lookupds, topic: @topic ) end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_nsq.rb, line 41 def write(chunk) return if chunk.empty? chunk.msgpack_each do |tag, time, record| next unless record.is_a? Hash # TODO get rid of this extra copy tagged_record = record.merge( :_key => tag, :_ts => time, :'@timestamp' => Time.at(time).to_datetime.to_s # kibana/elasticsearch friendly ) begin @producer.write(Yajl.dump(tagged_record)) rescue => e log.warn("nsq: #{e}") end end end