class Poseidon::Connection
High level internal interface to a remote broker. Provides access to the broker API. @api private
Constants
- API_VERSION
- REPLICA_ID
Attributes
host[R]
port[R]
Public Class Methods
new(host, port, client_id, socket_timeout_ms)
click to toggle source
Create a new connection
@param [String] host Host to connect to @param [Integer] port Port broker listens on @param [String] client_id Unique across processes?
# File lib/poseidon/connection.rb, line 30 def initialize(host, port, client_id, socket_timeout_ms) @host = host @port = port @client_id = client_id @socket_timeout_ms = socket_timeout_ms end
open(host, port, client_id, socket_timeout_ms) { |connection| ... }
click to toggle source
@yieldparam [Connection]
# File lib/poseidon/connection.rb, line 15 def self.open(host, port, client_id, socket_timeout_ms, &block) connection = new(host, port, client_id, socket_timeout_ms) yield connection ensure connection.close end
Public Instance Methods
close()
click to toggle source
Close broker connection
# File lib/poseidon/connection.rb, line 39 def close @socket && @socket.close end
fetch(max_wait_time, min_bytes, topic_fetches)
click to toggle source
Execute a fetch call
@param [Integer] max_wait_time @param [Integer] min_bytes @param [Integer] topic_fetches
# File lib/poseidon/connection.rb, line 68 def fetch(max_wait_time, min_bytes, topic_fetches) ensure_connected req = FetchRequest.new( request_common(:fetch), REPLICA_ID, max_wait_time, min_bytes, topic_fetches) send_request(req) read_response(FetchResponse) end
offset(offset_topic_requests)
click to toggle source
# File lib/poseidon/connection.rb, line 79 def offset(offset_topic_requests) ensure_connected req = OffsetRequest.new(request_common(:offset), REPLICA_ID, offset_topic_requests) send_request(req) read_response(OffsetResponse).topic_offset_responses end
produce(required_acks, timeout, messages_for_topics)
click to toggle source
Execute a produce call
@param [Integer] required_acks @param [Integer] timeout @param [Array<Protocol::MessagesForTopics>] messages_for_topics Messages to send @return [ProduceResponse]
# File lib/poseidon/connection.rb, line 49 def produce(required_acks, timeout, messages_for_topics) ensure_connected req = ProduceRequest.new( request_common(:produce), required_acks, timeout, messages_for_topics) send_request(req) if required_acks != 0 read_response(ProduceResponse) else true end end
topic_metadata(topic_names)
click to toggle source
Fetch metadata for topic_names
@param [Enumberable<String>] topic_names
A list of topics to retrive metadata for
@return [TopicMetadataResponse] metadata for the topics
# File lib/poseidon/connection.rb, line 93 def topic_metadata(topic_names) ensure_connected req = MetadataRequest.new( request_common(:metadata), topic_names) send_request(req) read_response(MetadataResponse) end
Private Instance Methods
ensure_connected()
click to toggle source
# File lib/poseidon/connection.rb, line 102 def ensure_connected if @socket.nil? || @socket.closed? begin @socket = TCPSocket.new(@host, @port) rescue SystemCallError raise_connection_failed_error end end end
ensure_read_or_timeout(maxlen)
click to toggle source
# File lib/poseidon/connection.rb, line 126 def ensure_read_or_timeout(maxlen) if IO.select([@socket], nil, nil, @socket_timeout_ms / 1000.0) @socket.read(maxlen) else raise TimeoutException.new end end
ensure_write_or_timeout(data)
click to toggle source
# File lib/poseidon/connection.rb, line 143 def ensure_write_or_timeout(data) if IO.select(nil, [@socket], nil, @socket_timeout_ms / 1000.0) @socket.write(data) else raise TimeoutException.new end end
next_correlation_id()
click to toggle source
# File lib/poseidon/connection.rb, line 160 def next_correlation_id @correlation_id ||= 0 @correlation_id += 1 end
raise_connection_failed_error()
click to toggle source
# File lib/poseidon/connection.rb, line 165 def raise_connection_failed_error raise ConnectionFailedError, "Failed to connect to #{@host}:#{@port}" end
read_response(response_class)
click to toggle source
# File lib/poseidon/connection.rb, line 112 def read_response(response_class) r = ensure_read_or_timeout(4) if r.nil? raise_connection_failed_error end n = r.unpack("N").first s = ensure_read_or_timeout(n) buffer = Protocol::ResponseBuffer.new(s) response_class.read(buffer) rescue Errno::ECONNRESET, SocketError, TimeoutException @socket = nil raise_connection_failed_error end
request_common(request_type)
click to toggle source
# File lib/poseidon/connection.rb, line 151 def request_common(request_type) RequestCommon.new( API_KEYS[request_type], API_VERSION, next_correlation_id, @client_id ) end
send_request(request)
click to toggle source
# File lib/poseidon/connection.rb, line 134 def send_request(request) buffer = Protocol::RequestBuffer.new request.write(buffer) ensure_write_or_timeout([buffer.to_s.bytesize].pack("N") + buffer.to_s) rescue Errno::EPIPE, Errno::ECONNRESET, TimeoutException @socket = nil raise_connection_failed_error end