class RubyKafkaOutput

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_ruby_kafka.rb, line 28
def initialize
  super
  require "kafka"
  require "zookeeper"
  require "json"
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_ruby_kafka.rb, line 35
def configure(conf)
  super
  @seed_brokers = []
  if @zookeepers
    z = Zookeeper.new(@zookeepers)
    z.get_children(:path => '/brokers/ids')[:children].each do |id|
      broker = Yajl.load(z.get(:path => "/brokers/ids/#{id}")[:data])
      @seed_brokers.push("#{broker['host']}:#{broker['port']}")
    end
    z.close
    log.info "brokers has been refreshed via Zookeeper: #{@seed_brokers}"
  end

  if @seed_brokers.empty? and @brokers
    @seed_brokers = @brokers.match(",").nil? ? [@brokers] : @brokers.split(",")
    log.info "brokers has been set directly: #{@seed_brokers}"
  end

  raise Fluent::ConfigError, "Broker has not been set." if @seed_brokers.empty?

  kafka = Kafka.new(seed_brokers: @seed_brokers, logger: @use_kafka_log ? log : nil)
  @producer = kafka.async_producer(
    max_queue_size: @max_queue_size,
    delivery_threshold: @delivery_threshold,
    delivery_interval: @delivery_interval,
  )
end
emit(tag, es, chain) click to toggle source
# File lib/fluent/plugin/out_ruby_kafka.rb, line 72
def emit(tag, es, chain)
  chain.next
  es.each do |time,record|
    record['time'] = time if @output_include_time
    record['tag'] = tag if @output_include_tag
    topic = record['topic'] || @default_topic || tag

    data = JSON.dump(record)
    retry_counter = 0
    begin
      retry_counter += 1
      @producer.produce(data, topic: topic)
    rescue Kafka::BufferOverflow
      if retry_counter <= @retry_count
        log.warn "Buffer overflow, backing off for 1s. #{retry_counter} time."
        sleep 1
        retry
      else
        raise
      end
    end
  end
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_ruby_kafka.rb, line 67
def shutdown
  super
  @producer.shutdown if @producer
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_ruby_kafka.rb, line 63
def start
  super
end