class SemanticLogger::Appender::Kafka
Attributes
client_id[RW]
connect_timeout[RW]
delivery_interval[RW]
delivery_threshold[RW]
key[RW]
partition[RW]
partition_key[RW]
producer[R]
seed_brokers[RW]
socket_timeout[RW]
ssl_ca_cert[RW]
ssl_client_cert[RW]
ssl_client_cert_key[RW]
topic[RW]
Public Class Methods
new(seed_brokers:, client_id: "semantic-logger", connect_timeout: nil, socket_timeout: nil, ssl_ca_cert: nil, ssl_client_cert: nil, ssl_client_cert_key: nil, topic: "log_messages", partition: nil, partition_key: nil, key: nil, delivery_threshold: 100, delivery_interval: 10, metrics: true, **args, &block)
click to toggle source
Send log messages to Kafka
in JSON format.
Kafka
Parameters:
seed_brokers: [Array<String>, String] The list of brokers used to initialize the client. Either an Array of connections, or a comma separated string of connections. Connections can either be a string of "port:protocol" or a full URI with a scheme. If there's a scheme it's ignored and only host/port are used. client_id: [String] The identifier for this application. Default: semantic-logger topic: [String] Topic to publish log messages to. Default: 'log_messages' partition: [Integer] The partition that the message should be written to. Default: nil partition_key: [String] The key that should be used to assign a partition. Default: nil key: [String] The message key. Default: nil connect_timeout: [Integer] The timeout setting for connecting to brokers. Default: nil socket_timeout: [Integer] The timeout setting for socket connections. Default: nil ssl_ca_cert: [String, Array<String>] A PEM encoded CA cert, or an Array of PEM encoded CA certs, to use with a SSL connection. Default: nil ssl_client_cert: [String] A PEM encoded client cert to use with a SSL connection. Must be used in combination with ssl_client_cert_key. Default: nil ssl_client_cert_key [String] A PEM encoded client cert key to use with a SSL connection. Must be used in combination with ssl_client_cert. Default: nil delivery_threshold: [Integer] Number of messages between triggering a delivery of messages to Apache Kafka. Default: 100 delivery_interval: [Integer] Number of seconds between triggering a delivery of messages to Apache Kafka. Default: 5
Semantic Logger
Parameters:
level: [:trace | :debug | :info | :warn | :error | :fatal] Override the log level for this appender. Default: SemanticLogger.default_level formatter: [Object|Proc|Symbol|Hash] An instance of a class that implements #call, or a Proc to be used to format the output from this appender Default: :raw_json (See: #call) filter: [Regexp|Proc] RegExp: Only include log messages where the class name matches the supplied. regular expression. All other messages will be ignored. Proc: Only include log messages where the supplied Proc returns true The Proc must return true or false. host: [String] Name of this host to appear in log messages. Default: SemanticLogger.host application: [String] Name of this application to appear in log messages. Default: SemanticLogger.application metrics: [Boolean] Send metrics only events to kafka. Default: true
Calls superclass method
SemanticLogger::Subscriber::new
# File lib/semantic_logger/appender/kafka.rb, line 118 def initialize(seed_brokers:, client_id: "semantic-logger", connect_timeout: nil, socket_timeout: nil, ssl_ca_cert: nil, ssl_client_cert: nil, ssl_client_cert_key: nil, topic: "log_messages", partition: nil, partition_key: nil, key: nil, delivery_threshold: 100, delivery_interval: 10, metrics: true, **args, &block) @seed_brokers = seed_brokers @client_id = client_id @connect_timeout = connect_timeout @socket_timeout = socket_timeout @ssl_ca_cert = ssl_ca_cert @ssl_client_cert = ssl_client_cert @ssl_client_cert_key = ssl_client_cert_key @topic = topic @partition = partition @partition_key = partition_key @key = key @delivery_threshold = delivery_threshold @delivery_interval = delivery_interval super(metrics: metrics, **args, &block) reopen end
Public Instance Methods
close()
click to toggle source
# File lib/semantic_logger/appender/kafka.rb, line 160 def close @producer&.shutdown @producer = nil @kafka&.close @kafka = nil end
default_formatter()
click to toggle source
Use JSON Formatter by default.
# File lib/semantic_logger/appender/kafka.rb, line 174 def default_formatter SemanticLogger::Formatters::Json.new end
flush()
click to toggle source
Restart producer thread since there is no other way to flush.
# File lib/semantic_logger/appender/kafka.rb, line 179 def flush @producer.shutdown @producer = @kafka.async_producer( delivery_threshold: delivery_threshold, delivery_interval: delivery_interval ) end
log(log)
click to toggle source
Forward log messages to Kafka
producer thread.
# File lib/semantic_logger/appender/kafka.rb, line 168 def log(log) json = formatter.call(log, self) @producer.produce(json, topic: topic, partition: partition, partition_key: partition_key, key: key) end
reopen()
click to toggle source
# File lib/semantic_logger/appender/kafka.rb, line 142 def reopen @kafka = ::Kafka.new( seed_brokers: seed_brokers, client_id: client_id, connect_timeout: connect_timeout, socket_timeout: socket_timeout, ssl_ca_cert: ssl_ca_cert, ssl_client_cert: ssl_client_cert, ssl_client_cert_key: ssl_client_cert_key, logger: logger ) @producer = @kafka.async_producer( delivery_threshold: delivery_threshold, delivery_interval: delivery_interval ) end