class Cql::Client::AsynchronousClient
@private
Constants
- DEFAULT_CONNECTION_TIMEOUT
- DEFAULT_CONSISTENCY
- DEFAULT_CQL_VERSIONS
- DEFAULT_PORT
- MAX_RECONNECTION_ATTEMPTS
Public Class Methods
new(options={})
click to toggle source
# File lib/cql/client/client.rb, line 212 def initialize(options={}) @compressor = options[:compressor] @cql_version = options[:cql_version] @logger = options[:logger] || NullLogger.new @protocol_version = options[:protocol_version] || 2 @io_reactor = options[:io_reactor] || Io::IoReactor.new @hosts = extract_hosts(options) @initial_keyspace = options[:keyspace] @connections_per_node = options[:connections_per_node] || 1 @lock = Mutex.new @request_runner = RequestRunner.new @keyspace_changer = KeyspaceChanger.new @connection_manager = ConnectionManager.new @execute_options_decoder = ExecuteOptionsDecoder.new(options[:default_consistency] || DEFAULT_CONSISTENCY) @port = options[:port] || DEFAULT_PORT @connection_timeout = options[:connection_timeout] || DEFAULT_CONNECTION_TIMEOUT @credentials = options[:credentials] @auth_provider = options[:auth_provider] || @credentials && Auth::PlainTextAuthProvider.new(*@credentials.values_at(:username, :password)) @connected = false @connecting = false @closing = false end
Public Instance Methods
batch(type=:logged, options=nil) { |b| ... }
click to toggle source
# File lib/cql/client/client.rb, line 315 def batch(type=:logged, options=nil) if type.is_a?(Hash) options = type type = :logged end b = AsynchronousBatch.new(type, @execute_options_decoder, @connection_manager, options) if block_given? yield b b.execute else b end end
close()
click to toggle source
# File lib/cql/client/client.rb, line 256 def close @lock.synchronize do return @closed_future if @closing @closing = true @closed_future = begin if @connecting f = @connected_future.recover f = f.flat_map { @io_reactor.stop } f = f.map(self) f else f = @io_reactor.stop f = f.map(self) f end end end @closed_future.on_complete(&method(:closed)) @closed_future end
connect()
click to toggle source
# File lib/cql/client/client.rb, line 235 def connect @lock.synchronize do raise ClientError, 'Cannot connect a closed client' if @closing || @closed return @connected_future if can_execute? @connecting = true @connected_future = begin f = @io_reactor.start f = f.flat_map { connect_with_protocol_version_fallback } f = f.flat_map { |connections| connect_to_all_peers(connections) } f = f.flat_map do |connections| @connection_manager.add_connections(connections) register_event_listener(@connection_manager.random_connection) end f = f.flat_map { use_keyspace(@connection_manager.snapshot, @initial_keyspace) } f.map(self) end end @connected_future.on_complete(&method(:connected)) @connected_future end
connected?()
click to toggle source
# File lib/cql/client/client.rb, line 277 def connected? @connected end
execute(cql, *args)
click to toggle source
# File lib/cql/client/client.rb, line 293 def execute(cql, *args) with_failure_handler do options_or_consistency = nil if args.last.is_a?(Symbol) || args.last.is_a?(Hash) options_or_consistency = args.pop end options = @execute_options_decoder.decode_options(options_or_consistency) request = Protocol::QueryRequest.new(cql, args, options[:type_hints], options[:consistency], options[:serial_consistency], options[:page_size], options[:paging_state], options[:trace]) f = execute_request(request, options[:timeout]) if options.include?(:page_size) f = f.map { |result| AsynchronousQueryPagedQueryResult.new(self, request, result, options) } end f end end
keyspace()
click to toggle source
# File lib/cql/client/client.rb, line 281 def keyspace @connection_manager.random_connection.keyspace end
prepare(cql)
click to toggle source
# File lib/cql/client/client.rb, line 309 def prepare(cql) with_failure_handler do AsynchronousPreparedStatement.prepare(cql, @execute_options_decoder, @connection_manager, @logger) end end
use(keyspace)
click to toggle source
# File lib/cql/client/client.rb, line 285 def use(keyspace) with_failure_handler do connections = @connection_manager.reject { |c| c.keyspace == keyspace } return Future.resolved if connections.empty? use_keyspace(connections, keyspace).map(nil) end end
Private Instance Methods
can_execute?()
click to toggle source
# File lib/cql/client/client.rb, line 430 def can_execute? !@closing && (@connecting || (@connected && @connection_manager.connected?)) end
closed(f)
click to toggle source
# File lib/cql/client/client.rb, line 415 def closed(f) @lock.synchronize do @closing = false @closed = true @connected = false if f.resolved? @logger.info('Cluster disconnect complete') else f.on_failure do |e| @logger.error('Cluster disconnect failed: %s' % e.message) end end end end
connect_to_all_peers(seed_connections, initial_keyspace=@initial_keyspace)
click to toggle source
# File lib/cql/client/client.rb, line 378 def connect_to_all_peers(seed_connections, initial_keyspace=@initial_keyspace) @logger.debug('Looking for additional nodes') peer_discovery = PeerDiscovery.new(seed_connections) peer_discovery.new_hosts.flat_map do |hosts| if hosts.empty? @logger.debug('No additional nodes found') Future.resolved(seed_connections) else @logger.debug('%d additional nodes found' % hosts.size) f = create_cluster_connector.connect_all(hosts, @connections_per_node) f = f.map do |discovered_connections| seed_connections + discovered_connections end f.recover(seed_connections) end end end
connect_with_protocol_version_fallback()
click to toggle source
# File lib/cql/client/client.rb, line 365 def connect_with_protocol_version_fallback f = create_cluster_connector.connect_all(@hosts, @connections_per_node) f.fallback do |error| if error.is_a?(QueryError) && error.code == QueryError::PROTOCOL_ERROR && @protocol_version > 1 @logger.warn('Could not connect using protocol version %d (will try again with %d): %s' % [@protocol_version, @protocol_version - 1, error.message]) @protocol_version -= 1 connect_with_protocol_version_fallback else raise error end end end
connected(f)
click to toggle source
# File lib/cql/client/client.rb, line 396 def connected(f) if f.resolved? @lock.synchronize do @connecting = false @connected = true end @logger.info('Cluster connection complete') else @lock.synchronize do @connecting = false @connected = false end f.on_failure do |e| @logger.error('Failed connecting to cluster: %s' % e.message) end close end end
create_cluster_connector()
click to toggle source
# File lib/cql/client/client.rb, line 349 def create_cluster_connector cql_version = @cql_version || DEFAULT_CQL_VERSIONS[@protocol_version] authentication_step = @protocol_version == 1 ? CredentialsAuthenticationStep.new(@credentials) : SaslAuthenticationStep.new(@auth_provider) protocol_handler_factory = lambda { |connection| Protocol::CqlProtocolHandler.new(connection, @io_reactor, @protocol_version, @compressor) } ClusterConnector.new( Connector.new([ ConnectStep.new(@io_reactor, protocol_handler_factory, @port, @connection_timeout, @logger), CacheOptionsStep.new, InitializeStep.new(cql_version, @compressor, @logger), authentication_step, CachePropertiesStep.new, ]), @logger ) end
execute_request(request, timeout=nil, connection=nil)
click to toggle source
# File lib/cql/client/client.rb, line 501 def execute_request(request, timeout=nil, connection=nil) f = @request_runner.execute(connection || @connection_manager.random_connection, request, timeout) f.map do |result| if result.is_a?(KeyspaceChanged) use(result.keyspace) nil else result end end end
extract_hosts(options)
click to toggle source
# File lib/cql/client/client.rb, line 339 def extract_hosts(options) if options[:hosts] && options[:hosts].any? options[:hosts].uniq elsif options[:host] options[:host].split(',').uniq else %w[localhost] end end
handle_topology_change(remaning_attempts=MAX_RECONNECTION_ATTEMPTS)
click to toggle source
# File lib/cql/client/client.rb, line 474 def handle_topology_change(remaning_attempts=MAX_RECONNECTION_ATTEMPTS) with_failure_handler do seed_connections = @connection_manager.snapshot f = connect_to_all_peers(seed_connections, keyspace) f.flat_map do |all_connections| new_connections = all_connections - seed_connections if new_connections.size > 0 f = use_keyspace(new_connections, keyspace) f.on_value do @connection_manager.add_connections(new_connections) end f elsif remaning_attempts > 0 timeout = 2**(MAX_RECONNECTION_ATTEMPTS - remaning_attempts) @logger.debug('Scheduling new peer discovery in %ds' % timeout) f = @io_reactor.schedule_timer(timeout) f.flat_map do handle_topology_change(remaning_attempts - 1) end else @logger.warn('Giving up looking for additional nodes') Future.resolved end end end end
register_event_listener(connection)
click to toggle source
# File lib/cql/client/client.rb, line 446 def register_event_listener(connection) register_request = Protocol::RegisterRequest.new(Protocol::TopologyChangeEventResponse::TYPE, Protocol::StatusChangeEventResponse::TYPE) f = execute_request(register_request, nil, connection) f.on_value do connection.on_closed do if connected? begin register_event_listener(@connection_manager.random_connection) rescue NotConnectedError # we had started closing down after the connection check end end end connection.on_event do |event| if event.change == 'UP' || event.change == 'NEW_NODE' @logger.debug('Received %s event' % event.change) unless @looking_for_nodes @looking_for_nodes = true handle_topology_change.on_complete do |f| @looking_for_nodes = false end end end end end f end
use_keyspace(connections, keyspace)
click to toggle source
# File lib/cql/client/client.rb, line 441 def use_keyspace(connections, keyspace) futures = connections.map { |connection| @keyspace_changer.use_keyspace(connection, keyspace) } Future.all(*futures) end
with_failure_handler() { || ... }
click to toggle source
# File lib/cql/client/client.rb, line 434 def with_failure_handler return Future.failed(NotConnectedError.new) unless can_execute? yield rescue => e Future.failed(e) end