class Poseidon::BrokerPool
BrokerPool
allows you to send api calls to the a brokers Connection
.
@api private
Public Class Methods
new(client_id, seed_brokers, socket_timeout_ms)
click to toggle source
@param [String] client_id
# File lib/poseidon/broker_pool.rb, line 18 def initialize(client_id, seed_brokers, socket_timeout_ms) @connections = {} @brokers = {} @client_id = client_id @seed_brokers = seed_brokers @socket_timeout_ms = socket_timeout_ms end
open(client_id, seed_brokers, socket_timeout_ms) { |broker_pool| ... }
click to toggle source
@yieldparam [BrokerPool]
# File lib/poseidon/broker_pool.rb, line 9 def self.open(client_id, seed_brokers, socket_timeout_ms, &block) broker_pool = new(client_id, seed_brokers, socket_timeout_ms) yield broker_pool ensure broker_pool.close end
Public Instance Methods
close()
click to toggle source
Closes all open connections to brokers
# File lib/poseidon/broker_pool.rb, line 57 def close @brokers.values(&:close) @brokers = {} end
Also aliased as: shutdown
execute_api_call(broker_id, api_call, *args)
click to toggle source
Executes an api call on the connection
@param [Integer] broker_id id of the broker we want to execute it on @param [Symbol] api_call
the api call we want to execute (:produce,:fetch,etc)
# File lib/poseidon/broker_pool.rb, line 52 def execute_api_call(broker_id, api_call, *args) connection(broker_id).send(api_call, *args) end
fetch_metadata(topics)
click to toggle source
# File lib/poseidon/broker_pool.rb, line 26 def fetch_metadata(topics) @seed_brokers.each do |broker| if metadata = fetch_metadata_from_broker(broker, topics) Poseidon.logger.debug { "Fetched metadata\n" + metadata.to_s } return metadata end end raise Errors::UnableToFetchMetadata end
update_known_brokers(brokers)
click to toggle source
Update the brokers we know about
TODO break connection when a brokers info changes?
@param [Hash<Integer,Hash>] brokers
Hash of broker_id => { :host => host, :port => port }
# File lib/poseidon/broker_pool.rb, line 42 def update_known_brokers(brokers) @brokers.update(brokers) nil end
Private Instance Methods
connection(broker_id)
click to toggle source
# File lib/poseidon/broker_pool.rb, line 74 def connection(broker_id) @connections[broker_id] ||= new_connection(broker_id) end
fetch_metadata_from_broker(broker, topics)
click to toggle source
# File lib/poseidon/broker_pool.rb, line 65 def fetch_metadata_from_broker(broker, topics) host, port = broker.split(":") Connection.open(host, port, @client_id, @socket_timeout_ms) do |connection| connection.topic_metadata(topics) end rescue Connection::ConnectionFailedError return nil end
new_connection(broker_id)
click to toggle source
# File lib/poseidon/broker_pool.rb, line 78 def new_connection(broker_id) info = @brokers[broker_id] if info.nil? raise UnknownBroker end Connection.new(info[:host], info[:port], @client_id, @socket_timeout_ms) end