class CC::Kafka::Producer::HTTP
Constants
- HTTPError
- HTTP_TIMEOUT
Public Class Methods
new(host, port, topic, ssl = false)
click to toggle source
# File lib/cc/kafka/producer/http.rb, line 8 def initialize(host, port, topic, ssl = false) @host = host @port = port @topic = topic @ssl = ssl end
Public Instance Methods
close()
click to toggle source
# File lib/cc/kafka/producer/http.rb, line 39 def close # no-op end
send_message(message, key)
click to toggle source
# File lib/cc/kafka/producer/http.rb, line 15 def send_message(message, key) Kafka.logger.debug("sending message over HTTP") http = Net::HTTP.new(@host, @port) http.open_timeout = HTTP_TIMEOUT http.read_timeout = HTTP_TIMEOUT if ssl? http.use_ssl = true http.verify_mode = OpenSSL::SSL::VERIFY_PEER add_ssl_certificates(http) end request = Net::HTTP::Post.new("/") request["Topic"] = @topic request["Key"] = key if key request.body = message response = http.request(request) unless response.is_a?(Net::HTTPSuccess) raise HTTPError, "request not successful: (#{response.code}) #{response.body}" end end
Private Instance Methods
add_ssl_certificates(http)
click to toggle source
# File lib/cc/kafka/producer/http.rb, line 49 def add_ssl_certificates(http) if Kafka.ssl_ca_file Kafka.logger.debug("CA certificate: #{Kafka.ssl_ca_file}") http.ca_file = Kafka.ssl_ca_file end if Kafka.ssl_pem_file Kafka.logger.debug("PEM certificate: #{Kafka.ssl_pem_file}") pem = File.read(Kafka.ssl_pem_file) http.cert = OpenSSL::X509::Certificate.new(pem) http.key = OpenSSL::PKey::RSA.new(pem) end end
ssl?()
click to toggle source
# File lib/cc/kafka/producer/http.rb, line 45 def ssl? @ssl end