class RubyKafkaOutput
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_ruby_kafka.rb, line 28 def initialize super require "kafka" require "zookeeper" require "json" end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_ruby_kafka.rb, line 35 def configure(conf) super @seed_brokers = [] if @zookeepers z = Zookeeper.new(@zookeepers) z.get_children(:path => '/brokers/ids')[:children].each do |id| broker = Yajl.load(z.get(:path => "/brokers/ids/#{id}")[:data]) @seed_brokers.push("#{broker['host']}:#{broker['port']}") end z.close log.info "brokers has been refreshed via Zookeeper: #{@seed_brokers}" end if @seed_brokers.empty? and @brokers @seed_brokers = @brokers.match(",").nil? ? [@brokers] : @brokers.split(",") log.info "brokers has been set directly: #{@seed_brokers}" end raise Fluent::ConfigError, "Broker has not been set." if @seed_brokers.empty? kafka = Kafka.new(seed_brokers: @seed_brokers, logger: @use_kafka_log ? log : nil) @producer = kafka.async_producer( max_queue_size: @max_queue_size, delivery_threshold: @delivery_threshold, delivery_interval: @delivery_interval, ) end
emit(tag, es, chain)
click to toggle source
# File lib/fluent/plugin/out_ruby_kafka.rb, line 72 def emit(tag, es, chain) chain.next es.each do |time,record| record['time'] = time if @output_include_time record['tag'] = tag if @output_include_tag topic = record['topic'] || @default_topic || tag data = JSON.dump(record) retry_counter = 0 begin retry_counter += 1 @producer.produce(data, topic: topic) rescue Kafka::BufferOverflow if retry_counter <= @retry_count log.warn "Buffer overflow, backing off for 1s. #{retry_counter} time." sleep 1 retry else raise end end end end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_ruby_kafka.rb, line 67 def shutdown super @producer.shutdown if @producer end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_ruby_kafka.rb, line 63 def start super end