class LogStash::Inputs::Customkafka
This input will read events from a Kafka topic. It uses the 0.10 version of the consumer API provided by Kafka to read messages from the broker.
Here's a compatibility matrix that shows the Kafka client versions that are compatible with each combination of Logstash and the Kafka input plugin:
- options=“header”
-
|========================================================== |Kafka Client Version |Logstash Version |Plugin Version |Why? |0.8 |2.0.0 - 2.x.x |<3.0.0 |Legacy, 0.8 is still popular |0.9 |2.0.0 - 2.3.x | 3.x.x |Works with the old Ruby Event API (`event['price'] = 10`)
|0.9 |2.4.x - 5.x.x | 4.x.x |Works with the new getter/setter APIs (`event.set(‘[price]', 10)`) |0.10.0.x |2.4.x - 5.x.x | 5.x.x |Not compatible with the <= 0.9 broker |0.10.1.x |2.4.x - 5.x.x | 6.x.x | |==========================================================
NOTE: We recommended that you use matching Kafka client and broker versions. During upgrades, you should upgrade brokers before clients because brokers target backwards compatibility. For example, the 0.9 broker is compatible with both the 0.8 consumer and 0.9 consumer APIs, but not the other way around.
This input supports connecting to Kafka over:
-
SSL (requires plugin version 3.0.0 or later)
-
Kerberos SASL (requires plugin version 5.1.0 or later)
By default security is disabled but can be turned on as needed.
The Logstash Kafka consumer handles group management and uses the default offset management strategy using Kafka topics.
Logstash instances by default form a single logical group to subscribe to Kafka topics Each Logstash Kafka consumer can run multiple threads to increase read throughput. Alternatively, you could run multiple Logstash instances with the same `group_id` to spread the load across physical machines. Messages in a topic will be distributed to all Logstash instances with the same `group_id`.
Ideally you should have as many threads as the number of partitions for a perfect balance – more threads than partitions means that some threads will be idle
For more information see kafka.apache.org/documentation.html#theconsumer
Kafka consumer configuration: kafka.apache.org/documentation.html#consumerconfigs
-
Public Instance Methods
# File lib/logstash/inputs/customkafka.rb, line 240 def kafka_consumers @runner_consumers end
# File lib/logstash/inputs/customkafka.rb, line 222 def register @runner_threads = [] end
# File lib/logstash/inputs/customkafka.rb, line 227 def run(logstash_queue) @runner_consumers = consumer_threads.times.map { |i| create_consumer("#{client_id}-#{i}") } @runner_threads = @runner_consumers.map { |consumer| thread_runner(logstash_queue, consumer) } @runner_threads.each { |t| t.join } end
# File lib/logstash/inputs/customkafka.rb, line 234 def stop # if we have consumers, wake them up to unblock our runner threads @runner_consumers && @runner_consumers.each(&:wakeup) end
Private Instance Methods
# File lib/logstash/inputs/customkafka.rb, line 287 def create_consumer(client_id) begin props = java.util.Properties.new kafka = org.apache.kafka.clients.consumer.ConsumerConfig props.put(kafka::AUTO_COMMIT_INTERVAL_MS_CONFIG, auto_commit_interval_ms) props.put(kafka::AUTO_OFFSET_RESET_CONFIG, auto_offset_reset) unless auto_offset_reset.nil? props.put(kafka::BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers) props.put(kafka::CHECK_CRCS_CONFIG, check_crcs) unless check_crcs.nil? props.put(kafka::CLIENT_ID_CONFIG, client_id) props.put(kafka::CONNECTIONS_MAX_IDLE_MS_CONFIG, connections_max_idle_ms) unless connections_max_idle_ms.nil? props.put(kafka::ENABLE_AUTO_COMMIT_CONFIG, enable_auto_commit) props.put(kafka::EXCLUDE_INTERNAL_TOPICS_CONFIG, exclude_internal_topics) unless exclude_internal_topics.nil? props.put(kafka::FETCH_MAX_BYTES_CONFIG, fetch_max_bytes) unless fetch_max_bytes.nil? props.put(kafka::FETCH_MAX_WAIT_MS_CONFIG, fetch_max_wait_ms) unless fetch_max_wait_ms.nil? props.put(kafka::FETCH_MIN_BYTES_CONFIG, fetch_min_bytes) unless fetch_min_bytes.nil? props.put(kafka::GROUP_ID_CONFIG, group_id) props.put(kafka::HEARTBEAT_INTERVAL_MS_CONFIG, heartbeat_interval_ms) unless heartbeat_interval_ms.nil? props.put(kafka::KEY_DESERIALIZER_CLASS_CONFIG, key_deserializer_class) props.put(kafka::MAX_PARTITION_FETCH_BYTES_CONFIG, max_partition_fetch_bytes) unless max_partition_fetch_bytes.nil? props.put(kafka::MAX_POLL_RECORDS_CONFIG, max_poll_records) unless max_poll_records.nil? props.put(kafka::MAX_POLL_INTERVAL_MS_CONFIG, max_poll_interval_ms) unless max_poll_interval_ms.nil? props.put(kafka::METADATA_MAX_AGE_CONFIG, metadata_max_age_ms) unless metadata_max_age_ms.nil? props.put(kafka::PARTITION_ASSIGNMENT_STRATEGY_CONFIG, partition_assignment_strategy) unless partition_assignment_strategy.nil? props.put(kafka::RECEIVE_BUFFER_CONFIG, receive_buffer_bytes) unless receive_buffer_bytes.nil? props.put(kafka::RECONNECT_BACKOFF_MS_CONFIG, reconnect_backoff_ms) unless reconnect_backoff_ms.nil? props.put(kafka::REQUEST_TIMEOUT_MS_CONFIG, request_timeout_ms) unless request_timeout_ms.nil? props.put(kafka::RETRY_BACKOFF_MS_CONFIG, retry_backoff_ms) unless retry_backoff_ms.nil? props.put(kafka::SEND_BUFFER_CONFIG, send_buffer_bytes) unless send_buffer_bytes.nil? props.put(kafka::SESSION_TIMEOUT_MS_CONFIG, session_timeout_ms) unless session_timeout_ms.nil? props.put(kafka::VALUE_DESERIALIZER_CLASS_CONFIG, value_deserializer_class) props.put("security.protocol", security_protocol) unless security_protocol.nil? if security_protocol == "SSL" set_trustore_keystore_config(props) elsif security_protocol == "SASL_PLAINTEXT" set_sasl_config(props) elsif security_protocol == "SASL_SSL" set_trustore_keystore_config(props) set_sasl_config(props) end org.apache.kafka.clients.consumer.KafkaConsumer.new(props) rescue => e logger.error("Unable to create Kafka consumer from given configuration", :kafka_error_message => e, :cause => e.respond_to?(:getCause) ? e.getCause() : nil) raise e end end
# File lib/logstash/inputs/customkafka.rb, line 352 def set_sasl_config(props) java.lang.System.setProperty("java.security.auth.login.config",jaas_path) unless jaas_path.nil? java.lang.System.setProperty("java.security.krb5.conf",kerberos_config) unless kerberos_config.nil? props.put("sasl.mechanism",sasl_mechanism) if sasl_mechanism == "GSSAPI" && sasl_kerberos_service_name.nil? raise LogStash::ConfigurationError, "sasl_kerberos_service_name must be specified when SASL mechanism is GSSAPI" end props.put("sasl.kerberos.service.name",sasl_kerberos_service_name) unless sasl_kerberos_service_name.nil? props.put("sasl.jaas.config", sasl_jaas_config) unless sasl_jaas_config.nil? props.put("sasl.login.callback.handler.class", sasl_login_callback_handler_class) unless sasl_login_callback_handler_class.nil? end
# File lib/logstash/inputs/customkafka.rb, line 339 def set_trustore_keystore_config(props) props.put("ssl.truststore.type", ssl_truststore_type) unless ssl_truststore_type.nil? props.put("ssl.truststore.location", ssl_truststore_location) unless ssl_truststore_location.nil? props.put("ssl.truststore.password", ssl_truststore_password.value) unless ssl_truststore_password.nil? # Client auth stuff props.put("ssl.keystore.type", ssl_keystore_type) unless ssl_keystore_type.nil? props.put("ssl.key.password", ssl_key_password.value) unless ssl_key_password.nil? props.put("ssl.keystore.location", ssl_keystore_location) unless ssl_keystore_location.nil? props.put("ssl.keystore.password", ssl_keystore_password.value) unless ssl_keystore_password.nil? props.put("ssl.endpoint.identification.algorithm", ssl_endpoint_identification_algorithm) unless ssl_endpoint_identification_algorithm.nil? end
# File lib/logstash/inputs/customkafka.rb, line 245 def thread_runner(logstash_queue, consumer) Thread.new do begin unless @topics_pattern.nil? nooplistener = org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener.new pattern = java.util.regex.Pattern.compile(@topics_pattern) consumer.subscribe(pattern, nooplistener) else consumer.subscribe(topics); end codec_instance = @codec.clone while !stop? records = consumer.poll(poll_timeout_ms) next unless records.count > 0 for record in records do codec_instance.decode(record.value.to_s) do |event| decorate(event) if @decorate_events event.set("[@metadata][kafka][topic]", record.topic) event.set("[@metadata][kafka][consumer_group]", @group_id) event.set("[@metadata][kafka][partition]", record.partition) event.set("[@metadata][kafka][offset]", record.offset) event.set("[@metadata][kafka][key]", record.key) event.set("[@metadata][kafka][timestamp]", record.timestamp) end logstash_queue << event end end # Manual offset commit if @enable_auto_commit == "false" consumer.commitSync end end rescue org.apache.kafka.common.errors.WakeupException => e raise e if !stop? ensure consumer.close end end end