class LogStash::Outputs::Kafka

Write events to a Kafka topic. This uses the Kafka Producer API to write messages to a topic on the broker.

Here's a compatibility matrix that shows the Kafka client versions that are compatible with each combination of Logstash and the Kafka output plugin:

options=“header”

|========================================================== |Kafka Client Version |Logstash Version |Plugin Version |Why? |0.8 |2.0.0 - 2.x.x |<3.0.0 |Legacy, 0.8 is still popular |0.9 |2.0.0 - 2.3.x | 3.x.x |Works with the old Ruby Event API (`event['price'] = 10`) |0.9 |2.4.x - 5.x.x | 4.x.x |Works with the new getter/setter APIs (`event.set(‘[price]', 10)`) |0.10.0.x |2.4.x - 5.x.x | 5.x.x |Not compatible with the <= 0.9 broker |0.10.1.x |2.4.x - 5.x.x | 6.x.x | |==========================================================

NOTE: We recommended that you use matching Kafka client and broker versions. During upgrades, you should upgrade brokers before clients because brokers target backwards compatibility. For example, the 0.9 broker is compatible with both the 0.8 consumer and 0.9 consumer APIs, but not the other way around.

This output supports connecting to Kafka over:

  • SSL (requires plugin version 3.0.0 or later)

  • Kerberos SASL (requires plugin version 5.1.0 or later)

By default security is disabled but can be turned on as needed.

The only required configuration is the topic_id. The default codec is plain, so events will be persisted on the broker in plain format. Logstash will encode your messages with not only the message but also with a timestamp and hostname. If you do not want anything but your message passing through, you should make the output configuration something like:

source,ruby

output {

kafka {
  codec => plain {
     format => "%{message}"
  }
  topic_id => "mytopic"
}

}

For more information see kafka.apache.org/documentation.html#theproducer

Kafka producer configuration: kafka.apache.org/documentation.html#newproducerconfigs

Public Instance Methods

close() click to toggle source
# File lib/logstash/outputs/kafka.rb, line 291
def close
  @producer.close
end
multi_receive(events) click to toggle source
# File lib/logstash/outputs/kafka.rb, line 211
def multi_receive(events)
  t = Thread.current
  if !@thread_batch_map.include?(t)
    @thread_batch_map[t] = java.util.ArrayList.new(events.size)
  end

  events.each do |event|
    break if event == LogStash::SHUTDOWN
    @codec.encode(event)
  end

  batch = @thread_batch_map[t]
  if batch.any?
    retrying_send(batch)
    batch.clear
  end
end
prepare(record) click to toggle source

def register

# File lib/logstash/outputs/kafka.rb, line 206
def prepare(record)
  # This output is threadsafe, so we need to keep a batch per thread.
  @thread_batch_map[Thread.current].add(record)
end
register() click to toggle source
# File lib/logstash/outputs/kafka.rb, line 178
def register
  @thread_batch_map = Concurrent::Hash.new

  if !@retries.nil? 
    if @retries < 0
      raise ConfigurationError, "A negative retry count (#{@retries}) is not valid. Must be a value >= 0"
    end

    @logger.warn("Kafka output is configured with finite retry. This instructs Logstash to LOSE DATA after a set number of send attempts fails. If you do not want to lose data if Kafka is down, then you must remove the retry setting.", :retries => @retries)
  end


  @producer = create_producer
  if value_serializer == 'org.apache.kafka.common.serialization.StringSerializer'
    @codec.on_event do |event, data|
      write_to_kafka(event, data)
    end
  elsif value_serializer == 'org.apache.kafka.common.serialization.ByteArraySerializer'
    @codec.on_event do |event, data|
      write_to_kafka(event, data.to_java_bytes)
    end
  else
    raise ConfigurationError, "'value_serializer' only supports org.apache.kafka.common.serialization.ByteArraySerializer and org.apache.kafka.common.serialization.StringSerializer" 
  end
end
retrying_send(batch) click to toggle source
# File lib/logstash/outputs/kafka.rb, line 229
def retrying_send(batch)
  remaining = @retries

  while batch.any?
    if !remaining.nil?
      if remaining < 0
        # TODO(sissel): Offer to DLQ? Then again, if it's a transient fault,
        # DLQing would make things worse (you dlq data that would be successful
        # after the fault is repaired)
        logger.info("Exhausted user-configured retry count when sending to Kafka. Dropping these events.",
                    :max_retries => @retries, :drop_count => batch.count)
        break
      end

      remaining -= 1
    end

    failures = []

    futures = batch.collect do |record| 
      begin
        # send() can throw an exception even before the future is created.
        @producer.send(record)
      rescue org.apache.kafka.common.errors.TimeoutException => e
        failures << record
        nil
      rescue org.apache.kafka.common.errors.InterruptException => e
        failures << record
        nil
      rescue org.apache.kafka.common.errors.SerializationException => e
        # TODO(sissel): Retrying will fail because the data itself has a problem serializing.
        # TODO(sissel): Let's add DLQ here.
        failures << record
        nil
      end
    end.compact

    futures.each_with_index do |future, i|
      begin
        result = future.get()
      rescue => e
        # TODO(sissel): Add metric to count failures, possibly by exception type.
        logger.warn("KafkaProducer.send() failed: #{e}", :exception => e)
        failures << batch[i]
      end
    end

    # No failures? Cool. Let's move on.
    break if failures.empty?

    # Otherwise, retry with any failed transmissions
    if remaining.nil? || remaining >= 0
      delay = @retry_backoff_ms / 1000.0
      logger.info("Sending batch to Kafka failed. Will retry after a delay.", :batch_size => batch.size,
                                                                              :failures => failures.size,
                                                                              :sleep => delay)
      batch = failures
      sleep(delay)
    end
  end
end

Private Instance Methods

create_producer() click to toggle source
# File lib/logstash/outputs/kafka.rb, line 311
def create_producer
  begin
    props = java.util.Properties.new
    kafka = org.apache.kafka.clients.producer.ProducerConfig

    props.put(kafka::ACKS_CONFIG, acks)
    props.put(kafka::BATCH_SIZE_CONFIG, batch_size.to_s)
    props.put(kafka::BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers)
    props.put(kafka::BUFFER_MEMORY_CONFIG, buffer_memory.to_s)
    props.put(kafka::COMPRESSION_TYPE_CONFIG, compression_type)
    props.put(kafka::CLIENT_ID_CONFIG, client_id) unless client_id.nil?
    props.put(kafka::KEY_SERIALIZER_CLASS_CONFIG, key_serializer)
    props.put(kafka::LINGER_MS_CONFIG, linger_ms.to_s)
    props.put(kafka::MAX_REQUEST_SIZE_CONFIG, max_request_size.to_s)
    props.put(kafka::METADATA_MAX_AGE_CONFIG, metadata_max_age_ms) unless metadata_max_age_ms.nil?
    props.put(kafka::RECEIVE_BUFFER_CONFIG, receive_buffer_bytes.to_s) unless receive_buffer_bytes.nil?
    props.put(kafka::RECONNECT_BACKOFF_MS_CONFIG, reconnect_backoff_ms) unless reconnect_backoff_ms.nil?
    props.put(kafka::REQUEST_TIMEOUT_MS_CONFIG, request_timeout_ms) unless request_timeout_ms.nil?
    props.put(kafka::RETRIES_CONFIG, retries.to_s) unless retries.nil?
    props.put(kafka::RETRY_BACKOFF_MS_CONFIG, retry_backoff_ms.to_s) 
    props.put(kafka::SEND_BUFFER_CONFIG, send_buffer_bytes.to_s)
    props.put(kafka::VALUE_SERIALIZER_CLASS_CONFIG, value_serializer)

    props.put("security.protocol", security_protocol) unless security_protocol.nil?

    if security_protocol == "SSL"
      set_trustore_keystore_config(props)
    elsif security_protocol == "SASL_PLAINTEXT"
      set_sasl_config(props)
    elsif security_protocol == "SASL_SSL"
      set_trustore_keystore_config(props)
      set_sasl_config(props)
    end


    org.apache.kafka.clients.producer.KafkaProducer.new(props)
  rescue => e
    logger.error("Unable to create Kafka producer from given configuration",
                 :kafka_error_message => e,
                 :cause => e.respond_to?(:getCause) ? e.getCause() : nil)
    raise e
  end
end
set_sasl_config(props) click to toggle source
# File lib/logstash/outputs/kafka.rb, line 371
def set_sasl_config(props)
  java.lang.System.setProperty("java.security.auth.login.config",jaas_path) unless jaas_path.nil?
  java.lang.System.setProperty("java.security.krb5.conf",kerberos_config) unless kerberos_config.nil?

  props.put("sasl.mechanism",sasl_mechanism)
  if sasl_mechanism == "GSSAPI" && sasl_kerberos_service_name.nil?
    raise LogStash::ConfigurationError, "sasl_kerberos_service_name must be specified when SASL mechanism is GSSAPI"
  end

  props.put("sasl.kerberos.service.name",sasl_kerberos_service_name) unless sasl_kerberos_service_name.nil?
  props.put("sasl.jaas.config", sasl_jaas_config) unless sasl_jaas_config.nil?
end
set_trustore_keystore_config(props) click to toggle source
# File lib/logstash/outputs/kafka.rb, line 355
def set_trustore_keystore_config(props)
  if ssl_truststore_location.nil?
    raise LogStash::ConfigurationError, "ssl_truststore_location must be set when SSL is enabled"
  end
  props.put("ssl.truststore.type", ssl_truststore_type) unless ssl_truststore_type.nil?
  props.put("ssl.truststore.location", ssl_truststore_location)
  props.put("ssl.truststore.password", ssl_truststore_password.value) unless ssl_truststore_password.nil?

  # Client auth stuff
  props.put("ssl.keystore.type", ssl_keystore_type) unless ssl_keystore_type.nil?
  props.put("ssl.key.password", ssl_key_password.value) unless ssl_key_password.nil?
  props.put("ssl.keystore.location", ssl_keystore_location) unless ssl_keystore_location.nil?
  props.put("ssl.keystore.password", ssl_keystore_password.value) unless ssl_keystore_password.nil?
  props.put("ssl.endpoint.identification.algorithm", ssl_endpoint_identification_algorithm) unless ssl_endpoint_identification_algorithm.nil?
end
write_to_kafka(event, serialized_data) click to toggle source
# File lib/logstash/outputs/kafka.rb, line 297
def write_to_kafka(event, serialized_data)
  if @message_key.nil?
    record = ProducerRecord.new(event.sprintf(@topic_id), serialized_data)
  else
    record = ProducerRecord.new(event.sprintf(@topic_id), event.sprintf(@message_key), serialized_data)
  end
  prepare(record)
rescue LogStash::ShutdownSignal
  @logger.debug('Kafka producer got shutdown signal')
rescue => e
  @logger.warn('kafka producer threw exception, restarting',
               :exception => e)
end