class Franz::Output::Kafka

Kafka output for Franz.

Public Class Methods

new(opts={}) click to toggle source

Start a new output in the background. We’ll consume from the input queue and ship events to STDOUT.

@param [Hash] opts options for the output @option opts [Queue] :input ([]) “input” queue @option opts [Queue] :output ([]) “output” configuration

# File lib/franz/output/kafka.rb, line 22
def initialize opts={}
  opts = {
    logger: Logger.new(STDOUT),
    tags: [],
    input: [],
    output: {
      topic: 'franz',
      flush_interval: 10,
      flush_size: 500,
      client_id: @@host,
      type: 'sync',
      compression_codec: 'snappy',
      metadata_refresh_interval_ms: 600000,
      max_send_retries: 3,
      retry_backoff_ms: 100,
      required_acks: 0,
      ack_timeout_ms: 1500,
      socket_timeout_ms: 10000
    }
  }.deep_merge!(opts)

  @statz = opts[:statz] || Franz::Stats.new
  @statz.create :num_output, 0

  @logger = opts[:logger]

  @stop = false
  @foreground = opts[:foreground]

  @flush_size = opts[:output].delete :flush_size
  @flush_interval = opts[:output].delete :flush_interval
  @topic = opts[:output].delete :topic

  @kafka_brokers = opts[:output].delete(:brokers) || %w[ localhost:9092 ]
  @kafka_client_id = opts[:output].delete :client_id
  @kafka_config = opts[:output].map { |k,v|
    [ k, v.is_a?(String) ? v.to_sym : v ]
  }

  kafka_connect

  @lock = Mutex.new
  @messages = []


  @thread = Thread.new do
    until @stop
      @lock.synchronize do
        num_messages = kafka_send @messages
        log.debug \
          event: 'periodic flush',
          num_messages: num_messages
      end

      sleep @flush_interval
    end
  end


  @thread = Thread.new do
    until @stop
      event = opts[:input].shift

      unless opts[:tags].empty?
        event['tags'] ||= []
        event['tags']  += opts[:tags]
      end

      log.debug \
        event: 'publish',
        raw: event

      payload = JSON::generate(event)

      @lock.synchronize do
        @messages << Poseidon::MessageToSend.new(@topic, payload)

        if @messages.size >= @flush_size
          num_messages = kafka_send @messages
          log.debug \
            event: 'flush',
            num_messages: num_messages
        end
      end

    end
  end

  log.info event: 'output started'

  @thread.join if @foreground
end

Public Instance Methods

join() click to toggle source

Join the Output thread. Effectively only once.

# File lib/franz/output/kafka.rb, line 117
def join
  return if @foreground
  @foreground = true
  @thread.join
end
stop() click to toggle source

Stop the Output thread. Effectively only once.

# File lib/franz/output/kafka.rb, line 125
def stop
  return if @foreground
  @foreground = true
  @thread.kill
  log.info event: 'output stopped'
end

Private Instance Methods

kafka_connect() click to toggle source
# File lib/franz/output/kafka.rb, line 136
def kafka_connect
  @kafka = Poseidon::Producer.new \
    @kafka_brokers,
    @kafka_client_id,
    Hash[@kafka_config]
end
kafka_send(messages) click to toggle source
# File lib/franz/output/kafka.rb, line 143
def kafka_send messages
  return 0 if @messages.empty?
  @kafka.send_messages @messages
  @statz.inc :num_output, @messages.length
  size = @messages.size
  @messages = []
  return size
rescue Poseidon::Errors::UnableToFetchMetadata
  log.warn event: 'output dropped'
  kafka_connect
  sleep 1
  retry
end
log() click to toggle source
# File lib/franz/output/kafka.rb, line 134
def log ; @logger end