class LogStash::Outputs::Kafka
Write events to a Kafka
topic. This uses the Kafka
Producer API to write messages to a topic on the broker.
Here's a compatibility matrix that shows the Kafka
client versions that are compatible with each combination of Logstash and the Kafka
output plugin:
- options=“header”
-
|========================================================== |Kafka Client Version |Logstash Version |Plugin Version |Why? |0.8 |2.0.0 - 2.x.x |<3.0.0 |Legacy, 0.8 is still popular |0.9 |2.0.0 - 2.3.x | 3.x.x |Works with the old Ruby Event API (`event['price'] = 10`) |0.9 |2.4.x - 5.x.x | 4.x.x |Works with the new getter/setter APIs (`event.set(‘[price]', 10)`) |0.10.0.x |2.4.x - 5.x.x | 5.x.x |Not compatible with the <= 0.9 broker |0.10.1.x |2.4.x - 5.x.x | 6.x.x | |==========================================================
NOTE: We recommended that you use matching
Kafka
client and broker versions. During upgrades, you should upgrade brokers before clients because brokers target backwards compatibility. For example, the 0.9 broker is compatible with both the 0.8 consumer and 0.9 consumer APIs, but not the other way around.This output supports connecting to
Kafka
over:-
SSL (requires plugin version 3.0.0 or later)
-
Kerberos SASL (requires plugin version 5.1.0 or later)
By default security is disabled but can be turned on as needed.
The only required configuration is the topic_id. The default codec is plain, so events will be persisted on the broker in plain format. Logstash will encode your messages with not only the message but also with a timestamp and hostname. If you do not want anything but your message passing through, you should make the output configuration something like:
- source,ruby
-
output {
kafka { codec => plain { format => "%{message}" } topic_id => "mytopic" }
}
For more information see kafka.apache.org/documentation.html#theproducer
Kafka
producer configuration: kafka.apache.org/documentation.html#newproducerconfigs -
Public Instance Methods
# File lib/logstash/outputs/kafka.rb, line 291 def close @producer.close end
# File lib/logstash/outputs/kafka.rb, line 211 def multi_receive(events) t = Thread.current if !@thread_batch_map.include?(t) @thread_batch_map[t] = java.util.ArrayList.new(events.size) end events.each do |event| break if event == LogStash::SHUTDOWN @codec.encode(event) end batch = @thread_batch_map[t] if batch.any? retrying_send(batch) batch.clear end end
def register
# File lib/logstash/outputs/kafka.rb, line 206 def prepare(record) # This output is threadsafe, so we need to keep a batch per thread. @thread_batch_map[Thread.current].add(record) end
# File lib/logstash/outputs/kafka.rb, line 178 def register @thread_batch_map = Concurrent::Hash.new if !@retries.nil? if @retries < 0 raise ConfigurationError, "A negative retry count (#{@retries}) is not valid. Must be a value >= 0" end @logger.warn("Kafka output is configured with finite retry. This instructs Logstash to LOSE DATA after a set number of send attempts fails. If you do not want to lose data if Kafka is down, then you must remove the retry setting.", :retries => @retries) end @producer = create_producer if value_serializer == 'org.apache.kafka.common.serialization.StringSerializer' @codec.on_event do |event, data| write_to_kafka(event, data) end elsif value_serializer == 'org.apache.kafka.common.serialization.ByteArraySerializer' @codec.on_event do |event, data| write_to_kafka(event, data.to_java_bytes) end else raise ConfigurationError, "'value_serializer' only supports org.apache.kafka.common.serialization.ByteArraySerializer and org.apache.kafka.common.serialization.StringSerializer" end end
# File lib/logstash/outputs/kafka.rb, line 229 def retrying_send(batch) remaining = @retries while batch.any? if !remaining.nil? if remaining < 0 # TODO(sissel): Offer to DLQ? Then again, if it's a transient fault, # DLQing would make things worse (you dlq data that would be successful # after the fault is repaired) logger.info("Exhausted user-configured retry count when sending to Kafka. Dropping these events.", :max_retries => @retries, :drop_count => batch.count) break end remaining -= 1 end failures = [] futures = batch.collect do |record| begin # send() can throw an exception even before the future is created. @producer.send(record) rescue org.apache.kafka.common.errors.TimeoutException => e failures << record nil rescue org.apache.kafka.common.errors.InterruptException => e failures << record nil rescue org.apache.kafka.common.errors.SerializationException => e # TODO(sissel): Retrying will fail because the data itself has a problem serializing. # TODO(sissel): Let's add DLQ here. failures << record nil end end.compact futures.each_with_index do |future, i| begin result = future.get() rescue => e # TODO(sissel): Add metric to count failures, possibly by exception type. logger.warn("KafkaProducer.send() failed: #{e}", :exception => e) failures << batch[i] end end # No failures? Cool. Let's move on. break if failures.empty? # Otherwise, retry with any failed transmissions if remaining.nil? || remaining >= 0 delay = @retry_backoff_ms / 1000.0 logger.info("Sending batch to Kafka failed. Will retry after a delay.", :batch_size => batch.size, :failures => failures.size, :sleep => delay) batch = failures sleep(delay) end end end
Private Instance Methods
# File lib/logstash/outputs/kafka.rb, line 311 def create_producer begin props = java.util.Properties.new kafka = org.apache.kafka.clients.producer.ProducerConfig props.put(kafka::ACKS_CONFIG, acks) props.put(kafka::BATCH_SIZE_CONFIG, batch_size.to_s) props.put(kafka::BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers) props.put(kafka::BUFFER_MEMORY_CONFIG, buffer_memory.to_s) props.put(kafka::COMPRESSION_TYPE_CONFIG, compression_type) props.put(kafka::CLIENT_ID_CONFIG, client_id) unless client_id.nil? props.put(kafka::KEY_SERIALIZER_CLASS_CONFIG, key_serializer) props.put(kafka::LINGER_MS_CONFIG, linger_ms.to_s) props.put(kafka::MAX_REQUEST_SIZE_CONFIG, max_request_size.to_s) props.put(kafka::METADATA_MAX_AGE_CONFIG, metadata_max_age_ms) unless metadata_max_age_ms.nil? props.put(kafka::RECEIVE_BUFFER_CONFIG, receive_buffer_bytes.to_s) unless receive_buffer_bytes.nil? props.put(kafka::RECONNECT_BACKOFF_MS_CONFIG, reconnect_backoff_ms) unless reconnect_backoff_ms.nil? props.put(kafka::REQUEST_TIMEOUT_MS_CONFIG, request_timeout_ms) unless request_timeout_ms.nil? props.put(kafka::RETRIES_CONFIG, retries.to_s) unless retries.nil? props.put(kafka::RETRY_BACKOFF_MS_CONFIG, retry_backoff_ms.to_s) props.put(kafka::SEND_BUFFER_CONFIG, send_buffer_bytes.to_s) props.put(kafka::VALUE_SERIALIZER_CLASS_CONFIG, value_serializer) props.put("security.protocol", security_protocol) unless security_protocol.nil? if security_protocol == "SSL" set_trustore_keystore_config(props) elsif security_protocol == "SASL_PLAINTEXT" set_sasl_config(props) elsif security_protocol == "SASL_SSL" set_trustore_keystore_config(props) set_sasl_config(props) end org.apache.kafka.clients.producer.KafkaProducer.new(props) rescue => e logger.error("Unable to create Kafka producer from given configuration", :kafka_error_message => e, :cause => e.respond_to?(:getCause) ? e.getCause() : nil) raise e end end
# File lib/logstash/outputs/kafka.rb, line 371 def set_sasl_config(props) java.lang.System.setProperty("java.security.auth.login.config",jaas_path) unless jaas_path.nil? java.lang.System.setProperty("java.security.krb5.conf",kerberos_config) unless kerberos_config.nil? props.put("sasl.mechanism",sasl_mechanism) if sasl_mechanism == "GSSAPI" && sasl_kerberos_service_name.nil? raise LogStash::ConfigurationError, "sasl_kerberos_service_name must be specified when SASL mechanism is GSSAPI" end props.put("sasl.kerberos.service.name",sasl_kerberos_service_name) unless sasl_kerberos_service_name.nil? props.put("sasl.jaas.config", sasl_jaas_config) unless sasl_jaas_config.nil? end
# File lib/logstash/outputs/kafka.rb, line 355 def set_trustore_keystore_config(props) if ssl_truststore_location.nil? raise LogStash::ConfigurationError, "ssl_truststore_location must be set when SSL is enabled" end props.put("ssl.truststore.type", ssl_truststore_type) unless ssl_truststore_type.nil? props.put("ssl.truststore.location", ssl_truststore_location) props.put("ssl.truststore.password", ssl_truststore_password.value) unless ssl_truststore_password.nil? # Client auth stuff props.put("ssl.keystore.type", ssl_keystore_type) unless ssl_keystore_type.nil? props.put("ssl.key.password", ssl_key_password.value) unless ssl_key_password.nil? props.put("ssl.keystore.location", ssl_keystore_location) unless ssl_keystore_location.nil? props.put("ssl.keystore.password", ssl_keystore_password.value) unless ssl_keystore_password.nil? props.put("ssl.endpoint.identification.algorithm", ssl_endpoint_identification_algorithm) unless ssl_endpoint_identification_algorithm.nil? end
# File lib/logstash/outputs/kafka.rb, line 297 def write_to_kafka(event, serialized_data) if @message_key.nil? record = ProducerRecord.new(event.sprintf(@topic_id), serialized_data) else record = ProducerRecord.new(event.sprintf(@topic_id), event.sprintf(@message_key), serialized_data) end prepare(record) rescue LogStash::ShutdownSignal @logger.debug('Kafka producer got shutdown signal') rescue => e @logger.warn('kafka producer threw exception, restarting', :exception => e) end