class LogStash::Inputs::Kafka

This input will read events from a Kafka topic. It uses the 0.10 version of the consumer API provided by Kafka to read messages from the broker.

Here's a compatibility matrix that shows the Kafka client versions that are compatible with each combination of Logstash and the Kafka input plugin:

options=“header”

|========================================================== |Kafka Client Version |Logstash Version |Plugin Version |Why? |0.8 |2.0.0 - 2.x.x |<3.0.0 |Legacy, 0.8 is still popular |0.9 |2.0.0 - 2.3.x | 3.x.x |Works with the old Ruby Event API (`event['price'] = 10`)

|0.9 |2.4.x - 5.x.x | 4.x.x |Works with the new getter/setter APIs (`event.set(‘[price]', 10)`) |0.10.0.x |2.4.x - 5.x.x | 5.x.x |Not compatible with the <= 0.9 broker |0.10.1.x |2.4.x - 5.x.x | 6.x.x | |==========================================================

NOTE: We recommended that you use matching Kafka client and broker versions. During upgrades, you should upgrade brokers before clients because brokers target backwards compatibility. For example, the 0.9 broker is compatible with both the 0.8 consumer and 0.9 consumer APIs, but not the other way around.

This input supports connecting to Kafka over:

  • SSL (requires plugin version 3.0.0 or later)

  • Kerberos SASL (requires plugin version 5.1.0 or later)

By default security is disabled but can be turned on as needed.

The Logstash Kafka consumer handles group management and uses the default offset management strategy using Kafka topics.

Logstash instances by default form a single logical group to subscribe to Kafka topics Each Logstash Kafka consumer can run multiple threads to increase read throughput. Alternatively, you could run multiple Logstash instances with the same `group_id` to spread the load across physical machines. Messages in a topic will be distributed to all Logstash instances with the same `group_id`.

Ideally you should have as many threads as the number of partitions for a perfect balance – more threads than partitions means that some threads will be idle

For more information see kafka.apache.org/documentation.html#theconsumer

Kafka consumer configuration: kafka.apache.org/documentation.html#consumerconfigs

Public Instance Methods

kafka_consumers() click to toggle source
# File lib/logstash/inputs/kafka.rb, line 238
def kafka_consumers
  @runner_consumers
end
register() click to toggle source
# File lib/logstash/inputs/kafka.rb, line 220
def register
  @runner_threads = []
end
run(logstash_queue) click to toggle source
# File lib/logstash/inputs/kafka.rb, line 225
def run(logstash_queue)
  @runner_consumers = consumer_threads.times.map { |i| create_consumer("#{client_id}-#{i}") }
  @runner_threads = @runner_consumers.map { |consumer| thread_runner(logstash_queue, consumer) }
  @runner_threads.each { |t| t.join }
end
stop() click to toggle source
# File lib/logstash/inputs/kafka.rb, line 232
def stop
  # if we have consumers, wake them up to unblock our runner threads
  @runner_consumers && @runner_consumers.each(&:wakeup)
end

Private Instance Methods

create_consumer(client_id) click to toggle source
# File lib/logstash/inputs/kafka.rb, line 285
def create_consumer(client_id)
  begin
    props = java.util.Properties.new
    kafka = org.apache.kafka.clients.consumer.ConsumerConfig

    props.put(kafka::AUTO_COMMIT_INTERVAL_MS_CONFIG, auto_commit_interval_ms)
    props.put(kafka::AUTO_OFFSET_RESET_CONFIG, auto_offset_reset) unless auto_offset_reset.nil?
    props.put(kafka::BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers)
    props.put(kafka::CHECK_CRCS_CONFIG, check_crcs) unless check_crcs.nil?
    props.put(kafka::CLIENT_ID_CONFIG, client_id)
    props.put(kafka::CONNECTIONS_MAX_IDLE_MS_CONFIG, connections_max_idle_ms) unless connections_max_idle_ms.nil?
    props.put(kafka::ENABLE_AUTO_COMMIT_CONFIG, enable_auto_commit)
    props.put(kafka::EXCLUDE_INTERNAL_TOPICS_CONFIG, exclude_internal_topics) unless exclude_internal_topics.nil?
    props.put(kafka::FETCH_MAX_BYTES_CONFIG, fetch_max_bytes) unless fetch_max_bytes.nil?
    props.put(kafka::FETCH_MAX_WAIT_MS_CONFIG, fetch_max_wait_ms) unless fetch_max_wait_ms.nil?
    props.put(kafka::FETCH_MIN_BYTES_CONFIG, fetch_min_bytes) unless fetch_min_bytes.nil?
    props.put(kafka::GROUP_ID_CONFIG, group_id)
    props.put(kafka::HEARTBEAT_INTERVAL_MS_CONFIG, heartbeat_interval_ms) unless heartbeat_interval_ms.nil?
    props.put(kafka::KEY_DESERIALIZER_CLASS_CONFIG, key_deserializer_class)
    props.put(kafka::MAX_PARTITION_FETCH_BYTES_CONFIG, max_partition_fetch_bytes) unless max_partition_fetch_bytes.nil?
    props.put(kafka::MAX_POLL_RECORDS_CONFIG, max_poll_records) unless max_poll_records.nil?
    props.put(kafka::MAX_POLL_INTERVAL_MS_CONFIG, max_poll_interval_ms) unless max_poll_interval_ms.nil?
    props.put(kafka::METADATA_MAX_AGE_CONFIG, metadata_max_age_ms) unless metadata_max_age_ms.nil?
    props.put(kafka::PARTITION_ASSIGNMENT_STRATEGY_CONFIG, partition_assignment_strategy) unless partition_assignment_strategy.nil?
    props.put(kafka::RECEIVE_BUFFER_CONFIG, receive_buffer_bytes) unless receive_buffer_bytes.nil?
    props.put(kafka::RECONNECT_BACKOFF_MS_CONFIG, reconnect_backoff_ms) unless reconnect_backoff_ms.nil?
    props.put(kafka::REQUEST_TIMEOUT_MS_CONFIG, request_timeout_ms) unless request_timeout_ms.nil?
    props.put(kafka::RETRY_BACKOFF_MS_CONFIG, retry_backoff_ms) unless retry_backoff_ms.nil?
    props.put(kafka::SEND_BUFFER_CONFIG, send_buffer_bytes) unless send_buffer_bytes.nil?
    props.put(kafka::SESSION_TIMEOUT_MS_CONFIG, session_timeout_ms) unless session_timeout_ms.nil?
    props.put(kafka::VALUE_DESERIALIZER_CLASS_CONFIG, value_deserializer_class)

    props.put("security.protocol", security_protocol) unless security_protocol.nil?

    if security_protocol == "SSL"
      set_trustore_keystore_config(props)
    elsif security_protocol == "SASL_PLAINTEXT"
      set_sasl_config(props)
    elsif security_protocol == "SASL_SSL"
      set_trustore_keystore_config(props)
      set_sasl_config(props)
    end

    org.apache.kafka.clients.consumer.KafkaConsumer.new(props)
  rescue => e
    logger.error("Unable to create Kafka consumer from given configuration",
                 :kafka_error_message => e,
                 :cause => e.respond_to?(:getCause) ? e.getCause() : nil)
    raise e
  end
end
set_sasl_config(props) click to toggle source
# File lib/logstash/inputs/kafka.rb, line 350
def set_sasl_config(props)
  java.lang.System.setProperty("java.security.auth.login.config",jaas_path) unless jaas_path.nil?
  java.lang.System.setProperty("java.security.krb5.conf",kerberos_config) unless kerberos_config.nil?

  props.put("sasl.mechanism",sasl_mechanism)
  if sasl_mechanism == "GSSAPI" && sasl_kerberos_service_name.nil?
    raise LogStash::ConfigurationError, "sasl_kerberos_service_name must be specified when SASL mechanism is GSSAPI"
  end

  props.put("sasl.kerberos.service.name",sasl_kerberos_service_name) unless sasl_kerberos_service_name.nil?
  props.put("sasl.jaas.config", sasl_jaas_config) unless sasl_jaas_config.nil?
end
set_trustore_keystore_config(props) click to toggle source
# File lib/logstash/inputs/kafka.rb, line 337
def set_trustore_keystore_config(props)
  props.put("ssl.truststore.type", ssl_truststore_type) unless ssl_truststore_type.nil?
  props.put("ssl.truststore.location", ssl_truststore_location) unless ssl_truststore_location.nil?
  props.put("ssl.truststore.password", ssl_truststore_password.value) unless ssl_truststore_password.nil?

  # Client auth stuff
  props.put("ssl.keystore.type", ssl_keystore_type) unless ssl_keystore_type.nil?
  props.put("ssl.key.password", ssl_key_password.value) unless ssl_key_password.nil?
  props.put("ssl.keystore.location", ssl_keystore_location) unless ssl_keystore_location.nil?
  props.put("ssl.keystore.password", ssl_keystore_password.value) unless ssl_keystore_password.nil?
  props.put("ssl.endpoint.identification.algorithm", ssl_endpoint_identification_algorithm) unless ssl_endpoint_identification_algorithm.nil?
end
thread_runner(logstash_queue, consumer) click to toggle source
# File lib/logstash/inputs/kafka.rb, line 243
def thread_runner(logstash_queue, consumer)
  Thread.new do
    begin
      unless @topics_pattern.nil?
        nooplistener = org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener.new
        pattern = java.util.regex.Pattern.compile(@topics_pattern)
        consumer.subscribe(pattern, nooplistener)
      else
        consumer.subscribe(topics);
      end
      codec_instance = @codec.clone
      while !stop?
        records = consumer.poll(poll_timeout_ms)
        next unless records.count > 0
        for record in records do
          codec_instance.decode(record.value.to_s) do |event|
            decorate(event)
            if @decorate_events
              event.set("[@metadata][kafka][topic]", record.topic)
              event.set("[@metadata][kafka][consumer_group]", @group_id)
              event.set("[@metadata][kafka][partition]", record.partition)
              event.set("[@metadata][kafka][offset]", record.offset)
              event.set("[@metadata][kafka][key]", record.key)
              event.set("[@metadata][kafka][timestamp]", record.timestamp)
            end
            logstash_queue << event
          end
        end
        # Manual offset commit
        if @enable_auto_commit == "false"
          consumer.commitSync
        end
      end
    rescue org.apache.kafka.common.errors.WakeupException => e
      raise e if !stop?
    ensure
      consumer.close
    end
  end
end