class SemanticLogger::Appender::Kafka

Attributes

client_id[RW]
connect_timeout[RW]
delivery_interval[RW]
delivery_threshold[RW]
key[RW]
partition[RW]
partition_key[RW]
producer[R]
seed_brokers[RW]
socket_timeout[RW]
ssl_ca_cert[RW]
ssl_client_cert[RW]
ssl_client_cert_key[RW]
topic[RW]

Public Class Methods

new(seed_brokers:, client_id: "semantic-logger", connect_timeout: nil, socket_timeout: nil, ssl_ca_cert: nil, ssl_client_cert: nil, ssl_client_cert_key: nil, topic: "log_messages", partition: nil, partition_key: nil, key: nil, delivery_threshold: 100, delivery_interval: 10, metrics: true, **args, &block) click to toggle source

Send log messages to Kafka in JSON format.

Kafka Parameters:

seed_brokers: [Array<String>, String]
  The list of brokers used to initialize the client. Either an Array of connections,
  or a comma separated string of connections.
  Connections can either be a string of "port:protocol" or a full URI with a scheme.
  If there's a scheme it's ignored and only host/port are used.

client_id: [String]
  The identifier for this application.
  Default: semantic-logger

topic: [String]
  Topic to publish log messages to.
  Default: 'log_messages'

partition: [Integer]
  The partition that the message should be written to.
  Default: nil

partition_key: [String]
  The key that should be used to assign a partition.
  Default: nil

key: [String]
  The message key.
  Default: nil

connect_timeout: [Integer]
  The timeout setting for connecting to brokers.
  Default: nil

socket_timeout: [Integer]
  The timeout setting for socket connections.
  Default: nil

ssl_ca_cert: [String, Array<String>]
  A PEM encoded CA cert, or an Array of PEM encoded CA certs, to use with a SSL connection.
  Default: nil

ssl_client_cert: [String]
  A PEM encoded client cert to use with a SSL connection.
  Must be used in combination with ssl_client_cert_key.
  Default: nil

ssl_client_cert_key [String]
  A PEM encoded client cert key to use with a SSL connection.
  Must be used in combination with ssl_client_cert.
  Default: nil

 delivery_threshold: [Integer]
   Number of messages between triggering a delivery of messages to Apache Kafka.
   Default: 100

 delivery_interval: [Integer]
   Number of seconds between triggering a delivery of messages to Apache Kafka.
   Default: 5

Semantic Logger Parameters:

level: [:trace | :debug | :info | :warn | :error | :fatal]
  Override the log level for this appender.
  Default: SemanticLogger.default_level

formatter: [Object|Proc|Symbol|Hash]
  An instance of a class that implements #call, or a Proc to be used to format
  the output from this appender
  Default: :raw_json (See: #call)

filter: [Regexp|Proc]
  RegExp: Only include log messages where the class name matches the supplied.
  regular expression. All other messages will be ignored.
  Proc: Only include log messages where the supplied Proc returns true
        The Proc must return true or false.

host: [String]
  Name of this host to appear in log messages.
  Default: SemanticLogger.host

application: [String]
  Name of this application to appear in log messages.
  Default: SemanticLogger.application

metrics: [Boolean]
  Send metrics only events to kafka.
  Default: true
Calls superclass method SemanticLogger::Subscriber::new
# File lib/semantic_logger/appender/kafka.rb, line 118
def initialize(seed_brokers:, client_id: "semantic-logger", connect_timeout: nil, socket_timeout: nil,
               ssl_ca_cert: nil, ssl_client_cert: nil, ssl_client_cert_key: nil,
               topic: "log_messages", partition: nil, partition_key: nil, key: nil,
               delivery_threshold: 100, delivery_interval: 10,
               metrics: true, **args, &block)

  @seed_brokers        = seed_brokers
  @client_id           = client_id
  @connect_timeout     = connect_timeout
  @socket_timeout      = socket_timeout
  @ssl_ca_cert         = ssl_ca_cert
  @ssl_client_cert     = ssl_client_cert
  @ssl_client_cert_key = ssl_client_cert_key
  @topic               = topic
  @partition           = partition
  @partition_key       = partition_key
  @key                 = key
  @delivery_threshold  = delivery_threshold
  @delivery_interval   = delivery_interval

  super(metrics: metrics, **args, &block)
  reopen
end

Public Instance Methods

close() click to toggle source
# File lib/semantic_logger/appender/kafka.rb, line 160
def close
  @producer&.shutdown
  @producer = nil
  @kafka&.close
  @kafka = nil
end
default_formatter() click to toggle source

Use JSON Formatter by default.

# File lib/semantic_logger/appender/kafka.rb, line 174
def default_formatter
  SemanticLogger::Formatters::Json.new
end
flush() click to toggle source

Restart producer thread since there is no other way to flush.

# File lib/semantic_logger/appender/kafka.rb, line 179
def flush
  @producer.shutdown
  @producer = @kafka.async_producer(
    delivery_threshold: delivery_threshold,
    delivery_interval:  delivery_interval
  )
end
log(log) click to toggle source

Forward log messages to Kafka producer thread.

# File lib/semantic_logger/appender/kafka.rb, line 168
def log(log)
  json = formatter.call(log, self)
  @producer.produce(json, topic: topic, partition: partition, partition_key: partition_key, key: key)
end
reopen() click to toggle source
# File lib/semantic_logger/appender/kafka.rb, line 142
def reopen
  @kafka = ::Kafka.new(
    seed_brokers:        seed_brokers,
    client_id:           client_id,
    connect_timeout:     connect_timeout,
    socket_timeout:      socket_timeout,
    ssl_ca_cert:         ssl_ca_cert,
    ssl_client_cert:     ssl_client_cert,
    ssl_client_cert_key: ssl_client_cert_key,
    logger:              logger
  )

  @producer = @kafka.async_producer(
    delivery_threshold: delivery_threshold,
    delivery_interval:  delivery_interval
  )
end