class Cql::Client::AsynchronousPreparedStatement
@private
Public Class Methods
new(cql, execute_options_decoder, connection_manager, logger)
click to toggle source
@private
# File lib/cql/client/prepared_statement.rb, line 132 def initialize(cql, execute_options_decoder, connection_manager, logger) @cql = cql @execute_options_decoder = execute_options_decoder @connection_manager = connection_manager @logger = logger @request_runner = RequestRunner.new end
prepare(cql, execute_options_decoder, connection_manager, logger)
click to toggle source
# File lib/cql/client/prepared_statement.rb, line 140 def self.prepare(cql, execute_options_decoder, connection_manager, logger) statement = new(cql, execute_options_decoder, connection_manager, logger) futures = connection_manager.map do |connection| statement.prepare(connection) end Future.all(*futures).map(statement) rescue => e Future.failed(e) end
Public Instance Methods
add_to_batch(batch, connection, bound_args)
click to toggle source
@private
# File lib/cql/client/prepared_statement.rb, line 211 def add_to_batch(batch, connection, bound_args) statement_id = connection[self] unless statement_id raise NotPreparedError end unless bound_args.size == @raw_metadata.size raise ArgumentError, "Expected #{@raw_metadata.size} arguments, got #{bound_args.size}" end batch.add_prepared(statement_id, @raw_metadata, bound_args) end
batch(type=:logged, options=nil) { |pb| ... }
click to toggle source
# File lib/cql/client/prepared_statement.rb, line 169 def batch(type=:logged, options=nil) if type.is_a?(Hash) options = type type = :logged end b = AsynchronousBatch.new(type, @execute_options_decoder, @connection_manager, options) pb = AsynchronousPreparedStatementBatch.new(self, b) if block_given? yield pb pb.execute else pb end end
execute(*args)
click to toggle source
# File lib/cql/client/prepared_statement.rb, line 150 def execute(*args) connection = @connection_manager.random_connection if connection[self] f = run(args, connection) f.fallback do |e| raise e unless e.is_a?(QueryError) && e.code == QueryError::UNPREPARED prepare(connection).flat_map do run(args, connection) end end else prepare(connection).flat_map do run(args, connection) end end rescue => e Future.failed(e) end
prepare(connection)
click to toggle source
@private
# File lib/cql/client/prepared_statement.rb, line 185 def prepare(connection) prepare_request = Protocol::PrepareRequest.new(@cql) f = @request_runner.execute(connection, prepare_request) do |response| connection[self] = response.id unless @raw_metadata # NOTE: this is not thread safe, but the worst that could happen # is that we assign the same data multiple times @raw_metadata = response.metadata @metadata = ResultMetadata.new(@raw_metadata) @raw_result_metadata = response.result_metadata if @raw_result_metadata @result_metadata = ResultMetadata.new(@raw_result_metadata) end end hex_id = response.id.each_byte.map { |x| x.to_s(16).rjust(2, '0') }.join('') @logger.debug('Statement %s prepared on node %s (%s:%d)' % [hex_id, connection[:host_id].to_s, connection.host, connection.port]) end f.map(self) end
prepared?(connection)
click to toggle source
@private
# File lib/cql/client/prepared_statement.rb, line 206 def prepared?(connection) !!connection[self] end
Private Instance Methods
run(args, connection)
click to toggle source
# File lib/cql/client/prepared_statement.rb, line 224 def run(args, connection) bound_args = args.take(@raw_metadata.size) remaining_args = args.drop(@raw_metadata.size) unless bound_args.size == @raw_metadata.size && remaining_args.size <= 1 raise ArgumentError, "Expected #{@raw_metadata.size} arguments, got #{bound_args.size}" end options = @execute_options_decoder.decode_options(remaining_args.last) statement_id = connection[self] request_metadata = @raw_result_metadata.nil? request = Protocol::ExecuteRequest.new(statement_id, @raw_metadata, bound_args, request_metadata, options[:consistency], options[:serial_consistency], options[:page_size], options[:paging_state], options[:trace]) f = @request_runner.execute(connection, request, options[:timeout], @raw_result_metadata) if options.include?(:page_size) f = f.map { |result| AsynchronousPreparedPagedQueryResult.new(self, request, result, options) } end f end