class Racecar::Config
Constants
- STATISTICS_DISABLED_VALUE
Attributes
error_handler[R]
The error handler must be set directly on the object.
logger[RW]
parallel_workers[RW]
subscriptions[RW]
Public Class Methods
new(env: ENV)
click to toggle source
Calls superclass method
# File lib/racecar/config.rb, line 184 def initialize(env: ENV) super(env: env) @error_handler = proc {} @subscriptions = [] @logger = Logger.new(STDOUT) end
Public Instance Methods
inspect()
click to toggle source
# File lib/racecar/config.rb, line 191 def inspect self.class.variables .map(&:name) .map {|key| [key, get(key).inspect].join(" = ") } .join("\n") end
load_consumer_class(consumer_class)
click to toggle source
# File lib/racecar/config.rb, line 212 def load_consumer_class(consumer_class) self.group_id = consumer_class.group_id || self.group_id self.group_id ||= [ # Configurable and optional prefix: group_id_prefix, # MyFunnyConsumer => my-funny-consumer consumer_class.name.gsub(/[a-z][A-Z]/) { |str| "#{str[0]}-#{str[1]}" }.downcase, ].compact.join self.parallel_workers = consumer_class.parallel_workers self.subscriptions = consumer_class.subscriptions self.max_wait_time = consumer_class.max_wait_time || self.max_wait_time self.pidfile ||= "#{group_id}.pid" end
max_wait_time_ms()
click to toggle source
# File lib/racecar/config.rb, line 180 def max_wait_time_ms max_wait_time * 1000 end
on_error(&handler)
click to toggle source
# File lib/racecar/config.rb, line 229 def on_error(&handler) @error_handler = handler end
rdkafka_consumer()
click to toggle source
# File lib/racecar/config.rb, line 233 def rdkafka_consumer consumer_config = consumer.map do |param| param.split("=", 2).map(&:strip) end.to_h consumer_config.merge!(rdkafka_security_config) consumer_config end
rdkafka_producer()
click to toggle source
# File lib/racecar/config.rb, line 241 def rdkafka_producer producer_config = producer.map do |param| param.split("=", 2).map(&:strip) end.to_h producer_config.merge!(rdkafka_security_config) producer_config end
statistics_interval_ms()
click to toggle source
# File lib/racecar/config.rb, line 172 def statistics_interval_ms if Rdkafka::Config.statistics_callback statistics_interval * 1000 else STATISTICS_DISABLED_VALUE end end
validate!()
click to toggle source
# File lib/racecar/config.rb, line 198 def validate! if brokers.empty? raise ConfigError, "`brokers` must not be empty" end if socket_timeout <= max_wait_time raise ConfigError, "`socket_timeout` must be longer than `max_wait_time`" end if max_pause_timeout && !pause_with_exponential_backoff? raise ConfigError, "`max_pause_timeout` only makes sense when `pause_with_exponential_backoff` is enabled" end end
Private Instance Methods
rdkafka_security_config()
click to toggle source
# File lib/racecar/config.rb, line 251 def rdkafka_security_config { "security.protocol" => security_protocol, "enable.ssl.certificate.verification" => ssl_verify_hostname, "ssl.ca.location" => ssl_ca_location, "ssl.crl.location" => ssl_crl_location, "ssl.keystore.location" => ssl_keystore_location, "ssl.keystore.password" => ssl_keystore_password, "ssl.certificate.location" => ssl_certificate_location, "ssl.key.location" => ssl_key_location, "ssl.key.password" => ssl_key_password, "sasl.mechanism" => sasl_mechanism, "sasl.kerberos.service.name" => sasl_kerberos_service_name, "sasl.kerberos.principal" => sasl_kerberos_principal, "sasl.kerberos.kinit.cmd" => sasl_kerberos_kinit_cmd, "sasl.kerberos.keytab" => sasl_kerberos_keytab, "sasl.kerberos.min.time.before.relogin" => sasl_kerberos_min_time_before_relogin, "sasl.username" => sasl_username, "sasl.password" => sasl_password, }.compact end