class CC::Kafka::Producer
Constants
- InvalidScheme
Public Class Methods
new(url, client_id = nil)
click to toggle source
# File lib/cc/kafka/producer.rb, line 10 def initialize(url, client_id = nil) @url = url @client_id = client_id Kafka.logger.debug("initialized client for #{@url} (id: #{@client_id.inspect})") end
Public Instance Methods
close()
click to toggle source
# File lib/cc/kafka/producer.rb, line 36 def close producer.close end
send_message(data, key = nil)
click to toggle source
# File lib/cc/kafka/producer.rb, line 17 def send_message(data, key = nil) Kafka.logger.debug("data: #{data.inspect}, key: #{key.inspect}") producer.send_message(BSON.serialize(data).to_s, key) rescue producer.close raise end
send_snapshot_document(collection:, document:, snapshot_id:)
click to toggle source
# File lib/cc/kafka/producer.rb, line 26 def send_snapshot_document(collection:, document:, snapshot_id:) snapshot_id = BSON::ObjectId(snapshot_id) if snapshot_id.is_a?(String) data = { type: "document", collection: collection, document: document.merge(snapshot_id: snapshot_id) } send_message(data, snapshot_id.to_s) end
Private Instance Methods
choose_producer()
click to toggle source
# File lib/cc/kafka/producer.rb, line 46 def choose_producer case (scheme = uri.scheme) when "http" then HTTP.new(host, port, topic) when "https" then HTTP.new(host, port, topic, true) when "kafka" then Poseidon.new(host, port, topic, @client_id) else raise InvalidScheme, "invalid scheme #{scheme.inspect}" end end
host()
click to toggle source
# File lib/cc/kafka/producer.rb, line 59 def host uri.host end
port()
click to toggle source
# File lib/cc/kafka/producer.rb, line 63 def port uri.port end
producer()
click to toggle source
# File lib/cc/kafka/producer.rb, line 42 def producer @producer ||= choose_producer end
scheme()
click to toggle source
# File lib/cc/kafka/producer.rb, line 55 def scheme uri.scheme end
topic()
click to toggle source
# File lib/cc/kafka/producer.rb, line 67 def topic uri.path.split("/")[1] end
uri()
click to toggle source
# File lib/cc/kafka/producer.rb, line 71 def uri @uri ||= URI.parse(@url) end