class Poseidon::Connection

High level internal interface to a remote broker. Provides access to the broker API. @api private

Constants

API_VERSION
REPLICA_ID

Attributes

host[R]
port[R]

Public Class Methods

new(host, port, client_id, socket_timeout_ms) click to toggle source

Create a new connection

@param [String] host Host to connect to @param [Integer] port Port broker listens on @param [String] client_id Unique across processes?

# File lib/poseidon/connection.rb, line 30
def initialize(host, port, client_id, socket_timeout_ms)
  @host = host
  @port = port

  @client_id = client_id
  @socket_timeout_ms = socket_timeout_ms
end
open(host, port, client_id, socket_timeout_ms) { |connection| ... } click to toggle source

@yieldparam [Connection]

# File lib/poseidon/connection.rb, line 15
def self.open(host, port, client_id, socket_timeout_ms, &block)
  connection = new(host, port, client_id, socket_timeout_ms)

  yield connection
ensure
  connection.close
end

Public Instance Methods

close() click to toggle source

Close broker connection

# File lib/poseidon/connection.rb, line 39
def close
  @socket && @socket.close
end
fetch(max_wait_time, min_bytes, topic_fetches) click to toggle source

Execute a fetch call

@param [Integer] max_wait_time @param [Integer] min_bytes @param [Integer] topic_fetches

# File lib/poseidon/connection.rb, line 68
def fetch(max_wait_time, min_bytes, topic_fetches)
  ensure_connected
  req = FetchRequest.new( request_common(:fetch),
                            REPLICA_ID,
                            max_wait_time,
                            min_bytes,
                            topic_fetches) 
  send_request(req)
  read_response(FetchResponse)
end
offset(offset_topic_requests) click to toggle source
# File lib/poseidon/connection.rb, line 79
def offset(offset_topic_requests)
  ensure_connected
  req = OffsetRequest.new(request_common(:offset),
                          REPLICA_ID,
                          offset_topic_requests)
  send_request(req)
  read_response(OffsetResponse).topic_offset_responses
end
produce(required_acks, timeout, messages_for_topics) click to toggle source

Execute a produce call

@param [Integer] required_acks @param [Integer] timeout @param [Array<Protocol::MessagesForTopics>] messages_for_topics Messages to send @return [ProduceResponse]

# File lib/poseidon/connection.rb, line 49
def produce(required_acks, timeout, messages_for_topics)
  ensure_connected
  req = ProduceRequest.new( request_common(:produce),
                            required_acks,
                            timeout,
                            messages_for_topics) 
  send_request(req)
  if required_acks != 0
    read_response(ProduceResponse)
  else
    true
  end
end
topic_metadata(topic_names) click to toggle source

Fetch metadata for topic_names

@param [Enumberable<String>] topic_names

A list of topics to retrive metadata for

@return [TopicMetadataResponse] metadata for the topics

# File lib/poseidon/connection.rb, line 93
def topic_metadata(topic_names)
  ensure_connected
  req = MetadataRequest.new( request_common(:metadata),
                             topic_names)
  send_request(req)
  read_response(MetadataResponse)
end

Private Instance Methods

ensure_connected() click to toggle source
# File lib/poseidon/connection.rb, line 102
def ensure_connected
  if @socket.nil? || @socket.closed?
    begin
      @socket = TCPSocket.new(@host, @port)
    rescue SystemCallError
      raise_connection_failed_error
    end
  end
end
ensure_read_or_timeout(maxlen) click to toggle source
# File lib/poseidon/connection.rb, line 126
def ensure_read_or_timeout(maxlen)
  if IO.select([@socket], nil, nil, @socket_timeout_ms / 1000.0)
     @socket.read(maxlen)
  else
     raise TimeoutException.new
  end
end
ensure_write_or_timeout(data) click to toggle source
# File lib/poseidon/connection.rb, line 143
def ensure_write_or_timeout(data)
  if IO.select(nil, [@socket], nil, @socket_timeout_ms / 1000.0)
    @socket.write(data)
  else
    raise TimeoutException.new
  end
end
next_correlation_id() click to toggle source
# File lib/poseidon/connection.rb, line 160
def next_correlation_id
  @correlation_id ||= 0
  @correlation_id  += 1
end
raise_connection_failed_error() click to toggle source
# File lib/poseidon/connection.rb, line 165
def raise_connection_failed_error
  raise ConnectionFailedError, "Failed to connect to #{@host}:#{@port}"
end
read_response(response_class) click to toggle source
# File lib/poseidon/connection.rb, line 112
def read_response(response_class)
  r = ensure_read_or_timeout(4)
  if r.nil?
    raise_connection_failed_error
  end
  n = r.unpack("N").first
  s = ensure_read_or_timeout(n)
  buffer = Protocol::ResponseBuffer.new(s)
  response_class.read(buffer)
rescue Errno::ECONNRESET, SocketError, TimeoutException
  @socket = nil
  raise_connection_failed_error
end
request_common(request_type) click to toggle source
# File lib/poseidon/connection.rb, line 151
def request_common(request_type)
  RequestCommon.new(
    API_KEYS[request_type],
    API_VERSION,
    next_correlation_id,
    @client_id
  )
end
send_request(request) click to toggle source
# File lib/poseidon/connection.rb, line 134
def send_request(request)
  buffer = Protocol::RequestBuffer.new
  request.write(buffer)
  ensure_write_or_timeout([buffer.to_s.bytesize].pack("N") + buffer.to_s)
rescue Errno::EPIPE, Errno::ECONNRESET, TimeoutException
  @socket = nil
  raise_connection_failed_error
end