class CC::Kafka::Producer

Constants

InvalidScheme

Public Class Methods

new(url, client_id = nil) click to toggle source
# File lib/cc/kafka/producer.rb, line 10
def initialize(url, client_id = nil)
  @url = url
  @client_id = client_id

  Kafka.logger.debug("initialized client for #{@url} (id: #{@client_id.inspect})")
end

Public Instance Methods

close() click to toggle source
# File lib/cc/kafka/producer.rb, line 36
def close
  producer.close
end
send_message(data, key = nil) click to toggle source
# File lib/cc/kafka/producer.rb, line 17
def send_message(data, key = nil)
  Kafka.logger.debug("data: #{data.inspect}, key: #{key.inspect}")

  producer.send_message(BSON.serialize(data).to_s, key)
rescue
  producer.close
  raise
end
send_snapshot_document(collection:, document:, snapshot_id:) click to toggle source
# File lib/cc/kafka/producer.rb, line 26
def send_snapshot_document(collection:, document:, snapshot_id:)
  snapshot_id = BSON::ObjectId(snapshot_id) if snapshot_id.is_a?(String)
  data = {
    type: "document",
    collection: collection,
    document: document.merge(snapshot_id: snapshot_id)
  }
  send_message(data, snapshot_id.to_s)
end

Private Instance Methods

choose_producer() click to toggle source
# File lib/cc/kafka/producer.rb, line 46
def choose_producer
  case (scheme = uri.scheme)
  when "http" then HTTP.new(host, port, topic)
  when "https" then HTTP.new(host, port, topic, true)
  when "kafka" then Poseidon.new(host, port, topic, @client_id)
  else raise InvalidScheme, "invalid scheme #{scheme.inspect}"
  end
end
host() click to toggle source
# File lib/cc/kafka/producer.rb, line 59
def host
  uri.host
end
port() click to toggle source
# File lib/cc/kafka/producer.rb, line 63
def port
  uri.port
end
producer() click to toggle source
# File lib/cc/kafka/producer.rb, line 42
def producer
  @producer ||= choose_producer
end
scheme() click to toggle source
# File lib/cc/kafka/producer.rb, line 55
def scheme
  uri.scheme
end
topic() click to toggle source
# File lib/cc/kafka/producer.rb, line 67
def topic
  uri.path.split("/")[1]
end
uri() click to toggle source
# File lib/cc/kafka/producer.rb, line 71
def uri
  @uri ||= URI.parse(@url)
end