class Racecar::Config

Constants

STATISTICS_DISABLED_VALUE

Attributes

error_handler[R]

The error handler must be set directly on the object.

logger[RW]
parallel_workers[RW]
subscriptions[RW]

Public Class Methods

new(env: ENV) click to toggle source
Calls superclass method
# File lib/racecar/config.rb, line 184
def initialize(env: ENV)
  super(env: env)
  @error_handler = proc {}
  @subscriptions = []
  @logger = Logger.new(STDOUT)
end

Public Instance Methods

inspect() click to toggle source
# File lib/racecar/config.rb, line 191
def inspect
  self.class.variables
    .map(&:name)
    .map {|key| [key, get(key).inspect].join(" = ") }
    .join("\n")
end
load_consumer_class(consumer_class) click to toggle source
# File lib/racecar/config.rb, line 212
def load_consumer_class(consumer_class)
  self.group_id = consumer_class.group_id || self.group_id

  self.group_id ||= [
    # Configurable and optional prefix:
    group_id_prefix,

    # MyFunnyConsumer => my-funny-consumer
    consumer_class.name.gsub(/[a-z][A-Z]/) { |str| "#{str[0]}-#{str[1]}" }.downcase,
  ].compact.join

  self.parallel_workers = consumer_class.parallel_workers
  self.subscriptions = consumer_class.subscriptions
  self.max_wait_time = consumer_class.max_wait_time || self.max_wait_time
  self.pidfile ||= "#{group_id}.pid"
end
max_wait_time_ms() click to toggle source
# File lib/racecar/config.rb, line 180
def max_wait_time_ms
  max_wait_time * 1000
end
on_error(&handler) click to toggle source
# File lib/racecar/config.rb, line 229
def on_error(&handler)
  @error_handler = handler
end
rdkafka_consumer() click to toggle source
# File lib/racecar/config.rb, line 233
def rdkafka_consumer
  consumer_config = consumer.map do |param|
    param.split("=", 2).map(&:strip)
  end.to_h
  consumer_config.merge!(rdkafka_security_config)
  consumer_config
end
rdkafka_producer() click to toggle source
# File lib/racecar/config.rb, line 241
def rdkafka_producer
  producer_config = producer.map do |param|
    param.split("=", 2).map(&:strip)
  end.to_h
  producer_config.merge!(rdkafka_security_config)
  producer_config
end
statistics_interval_ms() click to toggle source
# File lib/racecar/config.rb, line 172
def statistics_interval_ms
  if Rdkafka::Config.statistics_callback
    statistics_interval * 1000
  else
    STATISTICS_DISABLED_VALUE
  end
end
validate!() click to toggle source
# File lib/racecar/config.rb, line 198
def validate!
  if brokers.empty?
    raise ConfigError, "`brokers` must not be empty"
  end

  if socket_timeout <= max_wait_time
    raise ConfigError, "`socket_timeout` must be longer than `max_wait_time`"
  end

  if max_pause_timeout && !pause_with_exponential_backoff?
    raise ConfigError, "`max_pause_timeout` only makes sense when `pause_with_exponential_backoff` is enabled"
  end
end

Private Instance Methods

rdkafka_security_config() click to toggle source
# File lib/racecar/config.rb, line 251
def rdkafka_security_config
  {
    "security.protocol" => security_protocol,
    "enable.ssl.certificate.verification" => ssl_verify_hostname,
    "ssl.ca.location" => ssl_ca_location,
    "ssl.crl.location" => ssl_crl_location,
    "ssl.keystore.location" => ssl_keystore_location,
    "ssl.keystore.password" => ssl_keystore_password,
    "ssl.certificate.location" => ssl_certificate_location,
    "ssl.key.location" => ssl_key_location,
    "ssl.key.password" => ssl_key_password,
    "sasl.mechanism" => sasl_mechanism,
    "sasl.kerberos.service.name" => sasl_kerberos_service_name,
    "sasl.kerberos.principal" => sasl_kerberos_principal,
    "sasl.kerberos.kinit.cmd" => sasl_kerberos_kinit_cmd,
    "sasl.kerberos.keytab" => sasl_kerberos_keytab,
    "sasl.kerberos.min.time.before.relogin" => sasl_kerberos_min_time_before_relogin,
    "sasl.username" => sasl_username,
    "sasl.password" => sasl_password,
  }.compact
end