class KafkaCommand::Configuration
Constants
- CLUSTER_KEYS
- HOST_REGEX
Attributes
clusters[R]
config[R]
errors[R]
Public Class Methods
load!(file_path)
click to toggle source
# File lib/kafka_command/configuration.rb, line 60 def self.load!(file_path) KafkaCommand.config = parse_yaml(file_path) if KafkaCommand.config.invalid? puts "KafkaCommand improperly configured. #{KafkaCommand.config.errors}" end end
new(config_hash)
click to toggle source
# File lib/kafka_command/configuration.rb, line 34 def initialize(config_hash) @config = config_hash[ENV['RAILS_ENV']] @clusters = config['clusters'] if config.present? @errors = [] end
parse_yaml(file_path)
click to toggle source
# File lib/kafka_command/configuration.rb, line 56 def self.parse_yaml(file_path) YAML.load(ERB.new(File.read(file_path)).result(binding)) end
Public Instance Methods
invalid?()
click to toggle source
# File lib/kafka_command/configuration.rb, line 52 def invalid? !valid? end
valid?()
click to toggle source
# File lib/kafka_command/configuration.rb, line 40 def valid? @errors = [] if config.blank? errors << 'No config specified for environment' return false end validate! errors.none? end
Private Instance Methods
client_certificate(cluster)
click to toggle source
# File lib/kafka_command/configuration.rb, line 142 def client_certificate(cluster) cluster['ssl_client_cert'] || cluster['ssl_client_cert_file_path'] end
client_certificate_key(cluster)
click to toggle source
# File lib/kafka_command/configuration.rb, line 146 def client_certificate_key(cluster) cluster['ssl_client_cert_key'] || cluster['ssl_client_cert_key_file_path'] end
validate!()
click to toggle source
# File lib/kafka_command/configuration.rb, line 70 def validate! validate_clusters rescue => e errors << 'Kafka Command is configured incorrectly' end
validate_broker(broker)
click to toggle source
# File lib/kafka_command/configuration.rb, line 108 def validate_broker(broker) unless broker&.match?(HOST_REGEX) errors << 'Broker must be a valid host/port combination' end end
validate_cluster(cluster)
click to toggle source
# File lib/kafka_command/configuration.rb, line 87 def validate_cluster(cluster) cluster.keys.each do |key| unless CLUSTER_KEYS.include?(key) errors << "Invalid cluster option, #{key}" return end end seed_brokers = cluster['seed_brokers'] seed_brokers = seed_brokers.split(',') if seed_brokers.is_a?(String) if seed_brokers&.compact.blank? errors << 'Must specify a list of seed brokers' return end seed_brokers.each(&method(:validate_broker)) validate_ssl(cluster) validate_sasl(cluster) end
validate_clusters()
click to toggle source
# File lib/kafka_command/configuration.rb, line 76 def validate_clusters if clusters.blank? errors << 'Cluster must be provided' return end clusters.each do |_, cluster_hash| validate_cluster(cluster_hash) end end
validate_sasl(cluster)
click to toggle source
# File lib/kafka_command/configuration.rb, line 130 def validate_sasl(cluster) if cluster['sasl_scram_username'].present? && cluster['sasl_scram_password'].blank? errors << 'Initialized with `sasl_scram_username` but no `sasl_scram_password`. Please provide both.' elsif cluster['sasl_scram_username'].blank? && cluster['sasl_scram_password'].present? errors << 'Initialized with `sasl_scram_password` but no `sasl_scram_username`. Please provide both.' end end
validate_ssl(cluster)
click to toggle source
# File lib/kafka_command/configuration.rb, line 114 def validate_ssl(cluster) ca_cert = certificate_authority(cluster) client_cert = client_certificate(cluster) client_cert_key = client_certificate_key(cluster) if ca_cert if client_cert && !client_cert_key errors << 'Initialized with `ssl_client_cert` but no `ssl_client_cert_key`. Please provide both.' elsif !client_cert && client_cert_key errors << 'Initialized with `ssl_client_cert_key`, but no `ssl_client_cert`. Please provide both.' end elsif client_cert || client_cert_key errors << 'Cannot provide client certificate/key without a certificate authority' end end