class CC::Kafka::Producer::HTTP

Constants

HTTPError
HTTP_TIMEOUT

Public Class Methods

new(host, port, topic, ssl = false) click to toggle source
# File lib/cc/kafka/producer/http.rb, line 8
def initialize(host, port, topic, ssl = false)
  @host = host
  @port = port
  @topic = topic
  @ssl = ssl
end

Public Instance Methods

close() click to toggle source
# File lib/cc/kafka/producer/http.rb, line 39
def close
  # no-op
end
send_message(message, key) click to toggle source
# File lib/cc/kafka/producer/http.rb, line 15
def send_message(message, key)
  Kafka.logger.debug("sending message over HTTP")
  http = Net::HTTP.new(@host, @port)
  http.open_timeout = HTTP_TIMEOUT
  http.read_timeout = HTTP_TIMEOUT

  if ssl?
    http.use_ssl = true
    http.verify_mode = OpenSSL::SSL::VERIFY_PEER
    add_ssl_certificates(http)
  end

  request = Net::HTTP::Post.new("/")
  request["Topic"] = @topic
  request["Key"] = key if key
  request.body = message

  response = http.request(request)

  unless response.is_a?(Net::HTTPSuccess)
    raise HTTPError, "request not successful: (#{response.code}) #{response.body}"
  end
end

Private Instance Methods

add_ssl_certificates(http) click to toggle source
# File lib/cc/kafka/producer/http.rb, line 49
def add_ssl_certificates(http)
  if Kafka.ssl_ca_file
    Kafka.logger.debug("CA certificate: #{Kafka.ssl_ca_file}")
    http.ca_file = Kafka.ssl_ca_file
  end

  if Kafka.ssl_pem_file
    Kafka.logger.debug("PEM certificate: #{Kafka.ssl_pem_file}")
    pem = File.read(Kafka.ssl_pem_file)
    http.cert = OpenSSL::X509::Certificate.new(pem)
    http.key = OpenSSL::PKey::RSA.new(pem)
  end
end
ssl?() click to toggle source
# File lib/cc/kafka/producer/http.rb, line 45
def ssl?
  @ssl
end