class Cassandra::Protocol::CqlProtocolHandler
This class wraps a single connection and translates between request/ response frames and raw bytes.
You send requests with send_request
, and receive responses through the returned future.
Instances of this class are thread safe.
@example Sending an OPTIONS request
future = protocol_handler.send_request(Cassandra::Protocol::OptionsRequest.new) response = future.get puts "These options are supported: #{response.options}"
Constants
- HEARTBEAT
- TERMINATED
Attributes
@return [Exception] outstanding error, from a failed connection.
@return [String] the current keyspace for the underlying connection
@return [Integer] the version of the protocol to use in communicating with C*.
Public Class Methods
# File lib/cassandra/protocol/cql_protocol_handler.rb 43 def initialize(connection, 44 scheduler, 45 protocol_version, 46 compressor = nil, 47 heartbeat_interval = 30, 48 idle_timeout = 60, 49 requests_per_connection = 128, 50 custom_type_handlers = {}) 51 @protocol_version = protocol_version 52 @connection = connection 53 @scheduler = scheduler 54 @compressor = compressor 55 @connection.on_data(&method(:receive_data)) 56 @connection.on_closed(&method(:socket_closed)) 57 58 @streams = Array.new(requests_per_connection) {|i| i} 59 60 @promises = {} 61 62 if protocol_version > 3 63 @frame_encoder = V4::Encoder.new(@compressor, protocol_version) 64 @frame_decoder = V4::Decoder.new(self, @compressor, custom_type_handlers) 65 elsif protocol_version > 2 66 @frame_encoder = V3::Encoder.new(@compressor, protocol_version) 67 @frame_decoder = V3::Decoder.new(self, @compressor) 68 else 69 @frame_encoder = V1::Encoder.new(@compressor, protocol_version) 70 @frame_decoder = V1::Decoder.new(self, @compressor) 71 end 72 73 @request_queue_in = [] 74 @request_queue_out = [] 75 @event_listeners = [] 76 @data = {} 77 @lock = Mutex.new 78 @closed_promise = Ione::Promise.new 79 @keyspace = nil 80 @heartbeat = nil 81 @terminate = nil 82 @heartbeat_interval = heartbeat_interval 83 @idle_timeout = idle_timeout 84 @error = nil 85 end
Public Instance Methods
@see {#[]=} @return the value associated with the key
# File lib/cassandra/protocol/cql_protocol_handler.rb 114 def [](key) 115 @lock.lock 116 @data[key] 117 ensure 118 @lock.unlock 119 end
Associate arbitrary data with this protocol handler object. This is useful in situations where additional metadata can be loaded after the connection has been set up, or to keep statistics specific to the connection this protocol handler wraps.
# File lib/cassandra/protocol/cql_protocol_handler.rb 105 def []=(key, value) 106 @lock.lock 107 @data[key] = value 108 ensure 109 @lock.unlock 110 end
Closes the underlying connection.
@return [Ione::Future] a future that completes when the connection has closed
# File lib/cassandra/protocol/cql_protocol_handler.rb 199 def close(cause = nil) 200 if @heartbeat 201 @scheduler.cancel_timer(@heartbeat) 202 @heartbeat = nil 203 end 204 205 if @terminate 206 @scheduler.cancel_timer(@terminate) 207 @terminate = nil 208 end 209 210 @scheduler.schedule_timer(0).on_value do 211 @connection.close(cause) 212 end 213 214 @closed_promise.future 215 end
@return [true, false] true if the underlying connection is closed
# File lib/cassandra/protocol/cql_protocol_handler.rb 127 def closed? 128 @connection.closed? 129 end
# File lib/cassandra/protocol/cql_protocol_handler.rb 231 def complete_request(id, response) 232 promise = nil 233 @lock.lock 234 begin 235 promise = @promises.delete(id) 236 @streams.unshift(id) 237 ensure 238 @lock.unlock 239 end 240 @keyspace = response.keyspace if response.is_a?(Protocol::SetKeyspaceResultResponse) 241 if response.is_a?(Protocol::SchemaChangeResultResponse) && 242 response.change == 'DROPPED' && 243 response.keyspace == @keyspace && 244 response.target == Protocol::Constants::SCHEMA_CHANGE_TARGET_KEYSPACE 245 @keyspace = nil 246 end 247 flush_request_queue 248 promise.fulfill(response) unless promise.timed_out? 249 end
@return [true, false] true if the underlying connection is connected
# File lib/cassandra/protocol/cql_protocol_handler.rb 122 def connected? 123 @connection.connected? 124 end
Returns the hostname of the underlying connection
@return [String] the hostname
# File lib/cassandra/protocol/cql_protocol_handler.rb 90 def host 91 @connection.host 92 end
# File lib/cassandra/protocol/cql_protocol_handler.rb 217 def notify_event_listeners(event_response) 218 event_listeners = nil 219 @lock.lock 220 begin 221 event_listeners = @event_listeners 222 return if event_listeners.empty? 223 ensure 224 @lock.unlock 225 end 226 event_listeners.each do |listener| 227 listener.call(event_response) 228 end 229 end
Register to receive notification when the underlying connection has closed. If the connection closed abruptly the error will be passed to the listener, otherwise it will not receive any parameters.
@yieldparam error [nil, Error] the error that caused the connection to
close, if any
# File lib/cassandra/protocol/cql_protocol_handler.rb 137 def on_closed(&listener) 138 @closed_promise.future.on_value(&listener) 139 @closed_promise.future.on_failure(&listener) 140 end
Register to receive server sent events, like schema changes, nodes going up or down, etc. To actually receive events you also need to send a REGISTER request for the events you wish to receive.
@yieldparam event [Cassandra::Protocol::EventResponse] an event sent by the server
# File lib/cassandra/protocol/cql_protocol_handler.rb 147 def on_event(&listener) 148 @lock.lock 149 @event_listeners += [listener] 150 ensure 151 @lock.unlock 152 end
Returns the port of the underlying connection
@return [Integer] the port
# File lib/cassandra/protocol/cql_protocol_handler.rb 97 def port 98 @connection.port 99 end
Serializes and send a request over the underlying connection.
Returns a future that will resolve to the response. When the connection closes the futures of all active requests will be failed with the error that caused the connection to close, or nil.
When `timeout` is specified the future will fail with {Cassandra::Errors::TimeoutError} after that many seconds have passed. If a response arrives after that time it will be lost. If a response never arrives for the request the channel occupied by the request will not be reused.
@param [Cassandra::Protocol::Request] request @param [Float] timeout an optional number of seconds to wait until
failing the request
@return [Ione::Future<Cassandra::Protocol::Response>] a future that resolves to
the response
# File lib/cassandra/protocol/cql_protocol_handler.rb 170 def send_request(request, timeout = nil, with_heartbeat = true) 171 return Ione::Future.failed(Errors::IOError.new('Connection closed')) if closed? 172 schedule_heartbeat if with_heartbeat 173 promise = RequestPromise.new(request, timeout, @scheduler) 174 id = nil 175 @lock.lock 176 begin 177 if (id = next_stream_id) 178 @promises[id] = promise 179 end 180 ensure 181 @lock.unlock 182 end 183 if id 184 write_request(id, promise) 185 else 186 @lock.lock 187 begin 188 @request_queue_in << promise 189 ensure 190 @lock.unlock 191 end 192 end 193 promise.future 194 end
Private Instance Methods
# File lib/cassandra/protocol/cql_protocol_handler.rb 334 def flush_request_queue 335 @lock.lock 336 begin 337 if @request_queue_out.empty? && !@request_queue_in.empty? 338 @request_queue_out = @request_queue_in 339 @request_queue_in = [] 340 end 341 ensure 342 @lock.unlock 343 end 344 loop do 345 id = nil 346 promise = nil 347 @lock.lock 348 begin 349 if @request_queue_out.any? && (id = next_stream_id) 350 promise = @request_queue_out.shift 351 next if promise.timed_out? 352 @promises[id] = promise 353 end 354 ensure 355 @lock.unlock 356 end 357 358 break unless id 359 write_request(id, promise) 360 end 361 end
# File lib/cassandra/protocol/cql_protocol_handler.rb 440 def next_stream_id 441 @streams.shift 442 end
# File lib/cassandra/protocol/cql_protocol_handler.rb 329 def receive_data(data) 330 reschedule_termination 331 @frame_decoder << data 332 end
# File lib/cassandra/protocol/cql_protocol_handler.rb 423 def reschedule_termination 424 return unless @idle_timeout 425 426 timer = nil 427 428 @lock.synchronize do 429 @scheduler.cancel_timer(@terminate) if @terminate 430 431 @terminate = timer = @scheduler.schedule_timer(@idle_timeout) 432 end 433 434 timer.on_value do 435 @terminate = nil 436 @connection.close(TERMINATED) 437 end 438 end
# File lib/cassandra/protocol/cql_protocol_handler.rb 405 def schedule_heartbeat 406 return unless @heartbeat_interval 407 408 timer = nil 409 410 @lock.synchronize do 411 @scheduler.cancel_timer(@heartbeat) if @heartbeat && !@heartbeat.resolved? 412 413 @heartbeat = timer = @scheduler.schedule_timer(@heartbeat_interval) 414 end 415 416 timer.on_value do 417 send_request(HEARTBEAT, nil, false).on_value do 418 schedule_heartbeat 419 end 420 end 421 end
# File lib/cassandra/protocol/cql_protocol_handler.rb 370 def socket_closed(cause) 371 if cause 372 e = Errors::IOError.new(cause.message) 373 e.set_backtrace(cause.backtrace) 374 375 cause = e 376 end 377 @error = cause 378 379 request_failure_cause = cause || Errors::IOError.new('Connection closed') 380 promises_to_fail = nil 381 @lock.synchronize do 382 @scheduler.cancel_timer(@heartbeat) if @heartbeat 383 @scheduler.cancel_timer(@terminate) if @terminate 384 385 @heartbeat = nil 386 @terminate = nil 387 388 promises_to_fail = @promises.values 389 promises_to_fail.concat(@request_queue_in) 390 promises_to_fail.concat(@request_queue_out) 391 @promises.clear 392 @request_queue_in.clear 393 @request_queue_out.clear 394 end 395 promises_to_fail.each do |promise| 396 promise.fail(request_failure_cause) unless promise.timed_out? 397 end 398 if cause 399 @closed_promise.fail(cause) 400 else 401 @closed_promise.fulfill 402 end 403 end
# File lib/cassandra/protocol/cql_protocol_handler.rb 363 def write_request(id, request_promise) 364 @connection.write do |buffer| 365 @frame_encoder.encode(buffer, request_promise.request, id) 366 end 367 request_promise.maybe_start_timer 368 end