class Cql::Protocol::CqlProtocolHandler

This class wraps a single connection and translates between request/ response frames and raw bytes.

You send requests with send_request, and receive responses through the returned future.

Instances of this class are thread safe.

@example Sending an OPTIONS request

future = protocol_handler.send_request(Cql::Protocol::OptionsRequest.new)
response = future.get
puts "These options are supported: #{response.options}"

Attributes

keyspace[R]

@return [String] the current keyspace for the underlying connection

Public Class Methods

new(connection, scheduler, protocol_version, compressor=nil) click to toggle source
# File lib/cql/protocol/cql_protocol_handler.rb, line 22
def initialize(connection, scheduler, protocol_version, compressor=nil)
  @connection = connection
  @scheduler = scheduler
  @compressor = compressor
  @connection.on_data(&method(:receive_data))
  @connection.on_closed(&method(:socket_closed))
  @promises = Array.new(128) { nil }
  @read_buffer = CqlByteBuffer.new
  @frame_encoder = FrameEncoder.new(protocol_version, @compressor)
  @frame_decoder = FrameDecoder.new(@compressor)
  @current_frame = FrameDecoder::NULL_FRAME
  @request_queue_in = []
  @request_queue_out = []
  @event_listeners = []
  @data = {}
  @lock = Mutex.new
  @closed_promise = Promise.new
  @keyspace = nil
end

Public Instance Methods

[](key) click to toggle source

@see {#[]=} @return the value associated with the key

# File lib/cql/protocol/cql_protocol_handler.rb, line 69
def [](key)
  @lock.lock
  @data[key]
ensure
  @lock.unlock
end
[]=(key, value) click to toggle source

Associate arbitrary data with this protocol handler object. This is useful in situations where additional metadata can be loaded after the connection has been set up, or to keep statistics specific to the connection this protocol handler wraps.

# File lib/cql/protocol/cql_protocol_handler.rb, line 60
def []=(key, value)
  @lock.lock
  @data[key] = value
ensure
  @lock.unlock
end
close() click to toggle source

Closes the underlying connection.

@return [Cql::Future] a future that completes when the connection has closed

# File lib/cql/protocol/cql_protocol_handler.rb, line 161
def close
  @connection.close
  @closed_promise.future
end
closed?() click to toggle source

@return [true, false] true if the underlying connection is closed

# File lib/cql/protocol/cql_protocol_handler.rb, line 82
def closed?
  @connection.closed?
end
connected?() click to toggle source

@return [true, false] true if the underlying connection is connected

# File lib/cql/protocol/cql_protocol_handler.rb, line 77
def connected?
  @connection.connected?
end
host() click to toggle source

Returns the hostname of the underlying connection

@return [String] the hostname

# File lib/cql/protocol/cql_protocol_handler.rb, line 45
def host
  @connection.host
end
on_closed(&listener) click to toggle source

Register to receive notification when the underlying connection has closed. If the connection closed abruptly the error will be passed to the listener, otherwise it will not receive any parameters.

@yieldparam error [nil, Error] the error that caused the connection to

close, if any
# File lib/cql/protocol/cql_protocol_handler.rb, line 92
def on_closed(&listener)
  @closed_promise.future.on_value(&listener)
  @closed_promise.future.on_failure(&listener)
end
on_event(&listener) click to toggle source

Register to receive server sent events, like schema changes, nodes going up or down, etc. To actually receive events you also need to send a REGISTER request for the events you wish to receive.

@yieldparam event [Cql::Protocol::EventResponse] an event sent by the server

# File lib/cql/protocol/cql_protocol_handler.rb, line 102
def on_event(&listener)
  @lock.lock
  @event_listeners += [listener]
ensure
  @lock.unlock
end
port() click to toggle source

Returns the port of the underlying connection

@return [Integer] the port

# File lib/cql/protocol/cql_protocol_handler.rb, line 52
def port
  @connection.port
end
send_request(request, timeout=nil) click to toggle source

Serializes and send a request over the underlying connection.

Returns a future that will resolve to the response. When the connection closes the futures of all active requests will be failed with the error that caused the connection to close, or nil.

When ‘timeout` is specified the future will fail with {Cql::TimeoutError} after that many seconds have passed. If a response arrives after that time it will be lost. If a response never arrives for the request the channel occupied by the request will not be reused.

@param [Cql::Protocol::Request] request @param [Float] timeout an optional number of seconds to wait until

failing the request

@return [Cql::Future<Cql::Protocol::Response>] a future that resolves to

the response
# File lib/cql/protocol/cql_protocol_handler.rb, line 125
def send_request(request, timeout=nil)
  return Future.failed(NotConnectedError.new) if closed?
  promise = RequestPromise.new(request, @frame_encoder)
  id = nil
  @lock.lock
  begin
    if (id = next_stream_id)
      @promises[id] = promise
    end
  ensure
    @lock.unlock
  end
  if id
    @connection.write do |buffer|
      @frame_encoder.encode_frame(request, id, buffer)
    end
  else
    @lock.lock
    begin
      promise.encode_frame
      @request_queue_in << promise
    ensure
      @lock.unlock
    end
  end
  if timeout
    @scheduler.schedule_timer(timeout).on_value do
      promise.time_out!
    end
  end
  promise.future
end

Private Instance Methods

complete_request(id, response) click to toggle source
# File lib/cql/protocol/cql_protocol_handler.rb, line 223
def complete_request(id, response)
  promise = nil
  @lock.lock
  begin
    promise = @promises[id]
    @promises[id] = nil
  ensure
    @lock.unlock
  end
  if response.is_a?(Protocol::SetKeyspaceResultResponse)
    @keyspace = response.keyspace
  end
  flush_request_queue
  unless promise.timed_out?
    promise.fulfill(response)
  end
end
flush_request_queue() click to toggle source
# File lib/cql/protocol/cql_protocol_handler.rb, line 241
def flush_request_queue
  @lock.lock
  begin
    if @request_queue_out.empty? && !@request_queue_in.empty?
      @request_queue_out = @request_queue_in
      @request_queue_in = []
    end
  ensure
    @lock.unlock
  end
  while true
    id = nil
    frame = nil
    @lock.lock
    begin
      if @request_queue_out.any? && (id = next_stream_id)
        promise = @request_queue_out.shift
        if promise.timed_out?
          next
        else
          frame = promise.frame
          @promises[id] = promise
        end
      end
    ensure
      @lock.unlock
    end
    if id
      @frame_encoder.change_stream_id(id, frame)
      @connection.write(frame)
    else
      break
    end
  end
end
next_stream_id() click to toggle source
# File lib/cql/protocol/cql_protocol_handler.rb, line 298
def next_stream_id
  if (stream_id = @promises.index(nil))
    stream_id
  else
    nil
  end
end
notify_event_listeners(event_response) click to toggle source
# File lib/cql/protocol/cql_protocol_handler.rb, line 209
def notify_event_listeners(event_response)
  event_listeners = nil
  @lock.lock
  begin
    event_listeners = @event_listeners
    return if event_listeners.empty?
  ensure
    @lock.unlock
  end
  event_listeners.each do |listener|
    listener.call(@current_frame.body) rescue nil
  end
end
receive_data(data) click to toggle source
# File lib/cql/protocol/cql_protocol_handler.rb, line 195
def receive_data(data)
  @read_buffer << data
  @current_frame = @frame_decoder.decode_frame(@read_buffer, @current_frame)
  while @current_frame.complete?
    id = @current_frame.stream_id
    if id == -1
      notify_event_listeners(@current_frame.body)
    else
      complete_request(id, @current_frame.body)
    end
    @current_frame = @frame_decoder.decode_frame(@read_buffer)
  end
end
socket_closed(cause) click to toggle source
# File lib/cql/protocol/cql_protocol_handler.rb, line 277
def socket_closed(cause)
  request_failure_cause = cause || Io::ConnectionClosedError.new
  promises_to_fail = nil
  @lock.synchronize do
    promises_to_fail = @promises.compact
    promises_to_fail.concat(@request_queue_in)
    promises_to_fail.concat(@request_queue_out)
    @promises.fill(nil)
    @request_queue_in.clear
    @request_queue_out.clear
  end
  promises_to_fail.each do |promise|
    promise.fail(request_failure_cause)
  end
  if cause
    @closed_promise.fail(cause)
  else
    @closed_promise.fulfill
  end
end