class Cql::Protocol::CqlProtocolHandler
This class wraps a single connection and translates between request/ response frames and raw bytes.
You send requests with send_request
, and receive responses through the returned future.
Instances of this class are thread safe.
@example Sending an OPTIONS request
future = protocol_handler.send_request(Cql::Protocol::OptionsRequest.new) response = future.get puts "These options are supported: #{response.options}"
Attributes
@return [String] the current keyspace for the underlying connection
Public Class Methods
# File lib/cql/protocol/cql_protocol_handler.rb, line 22 def initialize(connection, scheduler, protocol_version, compressor=nil) @connection = connection @scheduler = scheduler @compressor = compressor @connection.on_data(&method(:receive_data)) @connection.on_closed(&method(:socket_closed)) @promises = Array.new(128) { nil } @read_buffer = CqlByteBuffer.new @frame_encoder = FrameEncoder.new(protocol_version, @compressor) @frame_decoder = FrameDecoder.new(@compressor) @current_frame = FrameDecoder::NULL_FRAME @request_queue_in = [] @request_queue_out = [] @event_listeners = [] @data = {} @lock = Mutex.new @closed_promise = Promise.new @keyspace = nil end
Public Instance Methods
@see {#[]=} @return the value associated with the key
# File lib/cql/protocol/cql_protocol_handler.rb, line 69 def [](key) @lock.lock @data[key] ensure @lock.unlock end
Associate arbitrary data with this protocol handler object. This is useful in situations where additional metadata can be loaded after the connection has been set up, or to keep statistics specific to the connection this protocol handler wraps.
# File lib/cql/protocol/cql_protocol_handler.rb, line 60 def []=(key, value) @lock.lock @data[key] = value ensure @lock.unlock end
Closes the underlying connection.
@return [Cql::Future] a future that completes when the connection has closed
# File lib/cql/protocol/cql_protocol_handler.rb, line 161 def close @connection.close @closed_promise.future end
@return [true, false] true if the underlying connection is closed
# File lib/cql/protocol/cql_protocol_handler.rb, line 82 def closed? @connection.closed? end
@return [true, false] true if the underlying connection is connected
# File lib/cql/protocol/cql_protocol_handler.rb, line 77 def connected? @connection.connected? end
Returns the hostname of the underlying connection
@return [String] the hostname
# File lib/cql/protocol/cql_protocol_handler.rb, line 45 def host @connection.host end
Register to receive notification when the underlying connection has closed. If the connection closed abruptly the error will be passed to the listener, otherwise it will not receive any parameters.
@yieldparam error [nil, Error] the error that caused the connection to
close, if any
# File lib/cql/protocol/cql_protocol_handler.rb, line 92 def on_closed(&listener) @closed_promise.future.on_value(&listener) @closed_promise.future.on_failure(&listener) end
Register to receive server sent events, like schema changes, nodes going up or down, etc. To actually receive events you also need to send a REGISTER request for the events you wish to receive.
@yieldparam event [Cql::Protocol::EventResponse] an event sent by the server
# File lib/cql/protocol/cql_protocol_handler.rb, line 102 def on_event(&listener) @lock.lock @event_listeners += [listener] ensure @lock.unlock end
Returns the port of the underlying connection
@return [Integer] the port
# File lib/cql/protocol/cql_protocol_handler.rb, line 52 def port @connection.port end
Serializes and send a request over the underlying connection.
Returns a future that will resolve to the response. When the connection closes the futures of all active requests will be failed with the error that caused the connection to close, or nil.
When ‘timeout` is specified the future will fail with {Cql::TimeoutError} after that many seconds have passed. If a response arrives after that time it will be lost. If a response never arrives for the request the channel occupied by the request will not be reused.
@param [Cql::Protocol::Request] request @param [Float] timeout an optional number of seconds to wait until
failing the request
@return [Cql::Future<Cql::Protocol::Response>] a future that resolves to
the response
# File lib/cql/protocol/cql_protocol_handler.rb, line 125 def send_request(request, timeout=nil) return Future.failed(NotConnectedError.new) if closed? promise = RequestPromise.new(request, @frame_encoder) id = nil @lock.lock begin if (id = next_stream_id) @promises[id] = promise end ensure @lock.unlock end if id @connection.write do |buffer| @frame_encoder.encode_frame(request, id, buffer) end else @lock.lock begin promise.encode_frame @request_queue_in << promise ensure @lock.unlock end end if timeout @scheduler.schedule_timer(timeout).on_value do promise.time_out! end end promise.future end
Private Instance Methods
# File lib/cql/protocol/cql_protocol_handler.rb, line 223 def complete_request(id, response) promise = nil @lock.lock begin promise = @promises[id] @promises[id] = nil ensure @lock.unlock end if response.is_a?(Protocol::SetKeyspaceResultResponse) @keyspace = response.keyspace end flush_request_queue unless promise.timed_out? promise.fulfill(response) end end
# File lib/cql/protocol/cql_protocol_handler.rb, line 241 def flush_request_queue @lock.lock begin if @request_queue_out.empty? && !@request_queue_in.empty? @request_queue_out = @request_queue_in @request_queue_in = [] end ensure @lock.unlock end while true id = nil frame = nil @lock.lock begin if @request_queue_out.any? && (id = next_stream_id) promise = @request_queue_out.shift if promise.timed_out? next else frame = promise.frame @promises[id] = promise end end ensure @lock.unlock end if id @frame_encoder.change_stream_id(id, frame) @connection.write(frame) else break end end end
# File lib/cql/protocol/cql_protocol_handler.rb, line 298 def next_stream_id if (stream_id = @promises.index(nil)) stream_id else nil end end
# File lib/cql/protocol/cql_protocol_handler.rb, line 209 def notify_event_listeners(event_response) event_listeners = nil @lock.lock begin event_listeners = @event_listeners return if event_listeners.empty? ensure @lock.unlock end event_listeners.each do |listener| listener.call(@current_frame.body) rescue nil end end
# File lib/cql/protocol/cql_protocol_handler.rb, line 195 def receive_data(data) @read_buffer << data @current_frame = @frame_decoder.decode_frame(@read_buffer, @current_frame) while @current_frame.complete? id = @current_frame.stream_id if id == -1 notify_event_listeners(@current_frame.body) else complete_request(id, @current_frame.body) end @current_frame = @frame_decoder.decode_frame(@read_buffer) end end
# File lib/cql/protocol/cql_protocol_handler.rb, line 277 def socket_closed(cause) request_failure_cause = cause || Io::ConnectionClosedError.new promises_to_fail = nil @lock.synchronize do promises_to_fail = @promises.compact promises_to_fail.concat(@request_queue_in) promises_to_fail.concat(@request_queue_out) @promises.fill(nil) @request_queue_in.clear @request_queue_out.clear end promises_to_fail.each do |promise| promise.fail(request_failure_cause) end if cause @closed_promise.fail(cause) else @closed_promise.fulfill end end