class Vertica::Connection

A client for a Vertica server, which allows you to run queries against it.

Use {Vertica.connect} to establish a connection. Then, the {#query} method will allow you to run SQL queries against the database. For `COPY FROM STDIN` queries, use the {#copy} method instead. You can use {#interrupt} to interrupt long running queries. {#close} will close the connection to the server.

@attr_reader transaction_status [:no_transaction, :in_transaction, :failed_transaction] The current

transaction state of the session. This will be updated after every query.

@attr_reader parameters [Hash<String, String>] Connection parameters as provided by the server. @attr_reader options [Hash] The connection options provided to the constructor. See {#initialize}.

@example Running a buffered query against the database

connection = Vertica.connect(host: 'db_server', username: 'user', password: 'password', ...)
result = connection.query("SELECT id, name FROM my_table")
result.each do |row|
  puts "Row: #row['id']: #{row['name']}"
end
connection.close

@see Vertica.connect @see Vertica::Result

Constants

DEFAULT_IO_COPY_HANDLER_BLOCK_SIZE

Attributes

backend_key[R]
backend_pid[R]
options[R]
parameters[R]
session_id[R]
transaction_status[R]

Public Class Methods

new(host: nil, port: 5433, username: nil, password: nil, database: nil, interruptable: false, ssl: false, read_timeout: 600, debug: false, role: nil, search_path: nil, timezone: nil, autocommit: false, skip_startup: false, skip_initialize: false, user: nil) click to toggle source

Creates a connection the a Vertica server. @param host [String] The hostname to connect to. E.g. `localhost` @param port [Integer] The port to connect to. Defaults to `5433`. @param username [String] The username for the session. @param password [String] The password for the session. @param interruptable [true, false] Whether to make this session interruptible. Setting this to true

allows you to interrupt sessions and queries, but requires running a query during startup in order
to obtain the session id.

@param ssl [OpenSSL::SSL::SSLContext, Boolean] Set this to an OpenSSL::SSL::SSLContext instance to

require the connection to be encrypted using SSL/TLS. `true` will use the default SSL options.
Not every server has support for SSL encryption. In that case you'll have to leave this to false.

@param read_timeout [Integer] The number of seconds to wait for data on the connection. You should

set this to a sufficiently high value when executing complicated queries that require a long time
to be evaluated.

@param role [Array<String>, :all, :none, :default] A list of additional roles to enable for the session. See the

[Vertica documentation for `SET ROLE`](https://my.vertica.com/docs/7.1.x/HTML/Content/Authoring/SQLReferenceManual/Statements/SET/SETROLE.htm).

@param timezone [String] The timezone to use for the session. See the

[Vertica documentation for `SET TIME ZONE`](https://my.vertica.com/docs/7.1.x/HTML/Content/Authoring/SQLReferenceManual/Statements/SET/SETTIMEZONE.htm).

@param search_path [Array<String>] A list of schemas to use as search path. See the

[Vertica documentation for `SET SEARCH_PATH`](https://my.vertica.com/docs/7.1.x/HTML/Content/Authoring/SQLReferenceManual/Statements/SET/SETSEARCH_PATH.htm).

@param autocommit [Boolean] Enable autocommit on the session. See [the Vertica documentation](my.vertica.com/docs/7.1.x/HTML/Content/Authoring/ConnectingToHPVertica/vsql/AUTOCOMMIT.htm)

for more information.

@param debug [Boolean] Setting this to true will log all the communication between client and server

to STDOUT. Useful when developing this library.
# File lib/vertica/connection.rb, line 53
def initialize(host: nil, port: 5433, username: nil, password: nil, database: nil, interruptable: false, ssl: false, read_timeout: 600, debug: false, role: nil, search_path: nil, timezone: nil, autocommit: false, skip_startup: false, skip_initialize: false, user: nil)
  reset_state
  @notice_handler = nil

  @options = {
    host: host,
    port: port.to_i,
    username: username || user,
    password: password,
    database: database,
    debug: debug,
    ssl: ssl,
    interruptable: interruptable,
    read_timeout: read_timeout,
    role: role,
    search_path: search_path,
    timezone: timezone,
    autocommit: autocommit,
  }

  boot_connection(skip_initialize: skip_initialize) unless skip_startup
end

Public Instance Methods

busy?() click to toggle source

@return [Boolean] Returns true iff the connection is in use.

# File lib/vertica/connection.rb, line 94
def busy?
  @mutex.locked?
end
cancel() click to toggle source

Cancels the current query.

@note Vertica's protocol is based on the PostgreSQL protocol. This method to cancel sessions

in PostgreSQL is accepted by the Vertica server, but I haven't actually observed queries
actually being cancelled when using this method. Vertica provides an alternative method, by
running `SELECT CLOSE_SESSION(session_id)` as a query on a different connection. See {#interrupt}.

@return [void] @see interrupt

# File lib/vertica/connection.rb, line 215
def cancel
  conn = self.class.new(skip_startup: true, **options)
  conn.write_message(Vertica::Protocol::CancelRequest.new(backend_pid, backend_key))
  conn.write_message(Vertica::Protocol::Flush.new)
  conn.close_socket
end
close() click to toggle source

Closes the connection to the Vertica server. @return [void]

# File lib/vertica/connection.rb, line 200
def close
  write_message(Vertica::Protocol::Terminate.new)
ensure
  close_socket
end
closed?() click to toggle source

@return [Boolean] Returns false iff the connection to the server is opened. @note Even if the connection is closed, it will be opened automatically if you use it.

# File lib/vertica/connection.rb, line 89
def closed?
  !opened?
end
copy(sql, source: nil, &block) click to toggle source

Loads data into Vertica using a `COPY table FROM STDIN` query.

@param sql [String] The `COPY … FROM STDIN` SQL command to run. @param source [String, IO] The source of the data to be copied. This can either be a filename, or

an IO object. If you don't specify a source, you'll need to provide a block that will provide the
data to be copied.

@yield A block that will be called with a writer that you can provided data to. If an exception is

raised in the block, the `COPY` command will be cancelled.

@yieldparam io [:write] An object that you can call write on to provide data to be loaded. @return [String] The kind of command that was executed on the server. This should always be `“COPY”`.

@example Loading data using an IO object as source

connection = Vertica.connect(host: 'db_server', username: 'user', password: 'password', ...)
File.open("filename.csv", "r") do |io|
  connection.copy("COPY my_table FROM STDIN ...", source: io)
end

@example Loading data using a filename as source

connection = Vertica.connect(host: 'db_server', username: 'user', password: 'password', ...)
connection.copy("COPY my_table FROM STDIN ...", source: "filename.csv")

@example Loading data using a callback

connection = Vertica.connect(host: 'db_server', username: 'user', password: 'password', ...)
connection.copy("COPY my_table FROM STDIN ...") do |io|
  io.write("my data")
  io.write("more data")
end

@see my.vertica.com/docs/7.1.x/HTML/Content/Authoring/SQLReferenceManual/Statements/COPY/COPY.htm

Vertica's documentation for COPY.
# File lib/vertica/connection.rb, line 179
def copy(sql, source: nil, &block)
  copy_handler = if block_given?
    block
  elsif source && File.exist?(source.to_s)
    lambda { |data| file_copy_handler(source, data) }
  elsif source.respond_to?(:read) && source.respond_to?(:eof?)
    lambda { |data| io_copy_handler(source, data) }
  end

  run_in_mutex(Vertica::Query.new(self, sql, copy_handler: copy_handler))
end
inspect() click to toggle source

Returns a user-consumable string representation of this row. @return [String]

# File lib/vertica/connection.rb, line 193
def inspect
  safe_options = @options.reject { |name, _| name == :password }
  "#<Vertica::Connection:#{object_id} @parameters=#{@parameters.inspect} @backend_pid=#{@backend_pid}, @backend_key=#{@backend_key}, @transaction_status=#{@transaction_status}, @socket=#{@socket}, @options=#{safe_options.inspect}>"
end
interrupt() click to toggle source

Interrupts this session to the Vertica server, which will cancel the running query.

You'll have to call this method in a separate thread. It will open up a separate connection, and run `SELECT CLOSE_SESSION(current_session_id)` to close the current session. In order to be able to do this the client needs to know its session ID. You'll have to pass `interruptable: true` as a connection parameter (see {#initialize}) to make sure the connection will request its session id, by running `SELECT session_id FROM v_monitor.current_session` right after the connection is opened.

@return [void] @see interruptable? @see my.vertica.com/docs/7.1.x/HTML/Content/Authoring/SQLReferenceManual/Functions/VerticaFunctions/CLOSE_SESSION.htm

Vertica's documentation for CLOSE_SESSION
# File lib/vertica/connection.rb, line 234
def interrupt
  raise Vertica::Error::InterruptImpossible, "Session cannopt be interrupted because the session ID is not known!" if session_id.nil?
  conn = self.class.new(skip_initialize: true, **options)
  conn.query("SELECT CLOSE_SESSION(#{Vertica.quote(session_id)})").the_value
ensure
  conn.close if conn
end
interruptable?() click to toggle source

Returns true iff the connection can be interrupted.

Connections can only be interrupted if the session ID is known, so it can run `SELECT CLOSE_SESSION(session_id)` using a separate connection. By passing `interruptable: true` as a connection parameter (see {#initialize}), the connection will discover its session id before you can use it, allowing it to be interrupted.

@return [Boolean] Returns true iff the connection can be interrupted. @see {#interrupt}

# File lib/vertica/connection.rb, line 112
def interruptable?
  !session_id.nil?
end
on_notice(&block) click to toggle source

Installs a hanlder for notices that may be sent from the server to the client.

You can only install one handler; if you call this method again it will replace the previous handler.

@return [void]

# File lib/vertica/connection.rb, line 248
def on_notice(&block)
  @notice_handler = block
end
opened?() click to toggle source

@return [Boolean] Returns true iff the connection to the server is opened. @note The connection will be opened automatically if you use it.

# File lib/vertica/connection.rb, line 83
def opened?
  @socket && @backend_pid && @transaction_status
end
process_message(message) click to toggle source

Processes a backend message that was received from the socket. @note This method is for internal use only; you should not call it directly. @return [void] @private

# File lib/vertica/connection.rb, line 285
def process_message(message)
  case message
  when Vertica::Protocol::ErrorResponse
    raise Vertica::Error::ConnectionError.new(message.error_message)
  when Vertica::Protocol::NoticeResponse
    @notice_handler.call(message) if @notice_handler
  when Vertica::Protocol::BackendKeyData
    @backend_pid = message.pid
    @backend_key = message.key
  when Vertica::Protocol::ParameterStatus
    @parameters[message.name] = message.value
  when Vertica::Protocol::ReadyForQuery
    @transaction_status = message.transaction_status
    @mutex.unlock
  else
    raise Vertica::Error::MessageError, "Unhandled message: #{message.inspect}"
  end
end
query(sql, &block) click to toggle source

Runs a SQL query against the database.

@overload query(sql)

Runs a query against the database, and return the full result as a {Vertica::Result}
instance.

@note This requires the entire result to be buffered in memory, which may cause problems
  for queries with large results. Consider using the unbuffered version instead.

@param sql [String] The SQL command to run.
@return [Vertica::Result]
@raise [Vertica::Error::ConnectionError] The connection to the server failed.
@raise [Vertica::Error::QueryError] The server sent an error response indicating that
  the provided query cannot be evaluated.

@overload query(sql, &block)

Runs a query against the database, and yield every {Vertica::Row row} to the provided
block.

@param sql [String] The SQL command to run.
@yield The provided block will be called for every row in the result.
@yieldparam row [Vertica::Row]
@return [String] The kind of command that was executed, e.g. `"SELECT"`.
@raise [Vertica::Error::ConnectionError] The connection to the server failed.
@raise [Vertica::Error::QueryError] The server sent an error response indicating that
  the provided query cannot be evaluated.

@see my.vertica.com/docs/7.1.x/HTML/Content/Authoring/SQLReferenceManual/Statements/SELECT/SELECT.htm

Vertica's documentation for SELECT.
# File lib/vertica/connection.rb, line 145
def query(sql, &block)
  run_in_mutex(Vertica::Query.new(self, sql, row_handler: block))
end
read_message() click to toggle source

Reads a backend message from the socket. @note This method is for internal use only; you should not call it directly. @return [Vertica::Protocol::BackendMessage] @raise [Vertica::Error::ConnectionError] @private

# File lib/vertica/connection.rb, line 270
def read_message
  type, size = read_bytes(5).unpack('aN')
  raise Vertica::Error::MessageError.new("Bad message size: #{size}.") unless size >= 4
  message = Vertica::Protocol::BackendMessage.factory(type, read_bytes(size - 4))
  puts "<= #{message.inspect}" if options.fetch(:debug)
  return message
rescue SystemCallError, IOError => e
  close_socket
  raise Vertica::Error::ConnectionError.new(e.message)
end
ready_for_query?() click to toggle source

@return [Boolean] Returns true iff the connection is ready to handle queries.

# File lib/vertica/connection.rb, line 99
def ready_for_query?
  !busy?
end
ssl?() click to toggle source

@return [Boolean] Returns true iff the connection is encrypted.

# File lib/vertica/connection.rb, line 77
def ssl?
  Object.const_defined?('OpenSSL') && @socket.kind_of?(OpenSSL::SSL::SSLSocket)
end
write_message(message) click to toggle source

Writes a frontend message to the socket. @note This method is for internal use only; you should not call it directly. @return [void] @raise [Vertica::Error::ConnectionError] @private

# File lib/vertica/connection.rb, line 257
def write_message(message)
  puts "=> #{message.inspect}" if options.fetch(:debug)
  write_bytes(message.to_bytes)
rescue SystemCallError, IOError => e
  close_socket
  raise Vertica::Error::ConnectionError.new(e.message)
end

Protected Instance Methods

boot_connection(skip_initialize: false) click to toggle source
# File lib/vertica/connection.rb, line 445
def boot_connection(skip_initialize: false)
  startup_connection
  initialize_connection unless skip_initialize
end
close_socket() click to toggle source
# File lib/vertica/connection.rb, line 432
def close_socket
  @socket.close if @socket
rescue SystemCallError, IOError
  # ignore
ensure
  reset_state
end
file_copy_handler(input_file, output) click to toggle source
# File lib/vertica/connection.rb, line 345
def file_copy_handler(input_file, output)
  File.open(input_file, 'r') do |input|
    io_copy_handler(input, output)
  end
end
initialize_connection() click to toggle source
# File lib/vertica/connection.rb, line 403
def initialize_connection
  @session_id = query("SELECT session_id FROM v_monitor.current_session").the_value if options[:interruptable]
  initialize_connection_with_role
  initialize_connection_with_search_path
  initialize_connection_with_timezone
  initialize_connection_with_autocommit
end
initialize_connection_with_autocommit() click to toggle source
# File lib/vertica/connection.rb, line 428
def initialize_connection_with_autocommit
  query("SET AUTOCOMMIT TO ON") if options[:autocommit]
end
initialize_connection_with_role() click to toggle source
# File lib/vertica/connection.rb, line 411
def initialize_connection_with_role
  case options[:role]
  when :all, :none, :default
    query("SET ROLE #{options[:role].to_s.upcase}")
  when String, Array
    query("SET ROLE #{Vertica.quote(options[:role])}")
  end
end
initialize_connection_with_search_path() click to toggle source
# File lib/vertica/connection.rb, line 420
def initialize_connection_with_search_path
  query("SET SEARCH_PATH TO #{Vertica.quote(options[:search_path])}") if options[:search_path]
end
initialize_connection_with_timezone() click to toggle source
# File lib/vertica/connection.rb, line 424
def initialize_connection_with_timezone
  query("SET TIME ZONE TO #{Vertica.quote(options[:timezone])}") if options[:timezone]
end
io_copy_handler(input, output) click to toggle source
# File lib/vertica/connection.rb, line 351
def io_copy_handler(input, output)
  until input.eof?
    output << input.read(DEFAULT_IO_COPY_HANDLER_BLOCK_SIZE)
  end
end
io_select(exception) click to toggle source
# File lib/vertica/connection.rb, line 378
def io_select(exception)
  readers, writers = nil, nil
  readers = [socket] if exception.is_a?(IO::WaitReadable)
  writers = [socket] if exception.is_a?(IO::WaitWritable)
  if IO.select(readers, writers, nil, @options[:read_timeout]).nil?
    close
    raise Vertica::Error::TimedOutError.new("Connection timed out.")
  end
end
read_bytes(n) click to toggle source
# File lib/vertica/connection.rb, line 357
def read_bytes(n)
  bytes = ""
  until bytes.length == n
    begin
      bytes << socket.read_nonblock(n - bytes.length)
    rescue IO::WaitReadable, IO::WaitWritable => wait_error
      io_select(wait_error)
      retry
    end
  end
  bytes
end
reset_connection() click to toggle source
# File lib/vertica/connection.rb, line 440
def reset_connection
  close
  boot_connection
end
reset_state() click to toggle source
# File lib/vertica/connection.rb, line 450
def reset_state
  @parameters         = {}
  @session_id         = nil
  @backend_pid        = nil
  @backend_key        = nil
  @transaction_status = nil
  @socket             = nil
  @mutex              = Mutex.new.lock
end
run_in_mutex(job) click to toggle source
# File lib/vertica/connection.rb, line 328
def run_in_mutex(job)
  boot_connection if closed?
  if @mutex.try_lock
    begin
      job.run
    rescue StandardError
      @mutex.unlock if @mutex.locked?
      raise
    end
  else
    raise Vertica::Error::SynchronizeError.new(job)
  end
end
socket() click to toggle source
# File lib/vertica/connection.rb, line 308
def socket
  @socket ||= begin
    raw_socket = TCPSocket.new(@options[:host], @options[:port].to_i)
    if @options[:ssl]
      require 'openssl'
      raw_socket.write(Vertica::Protocol::SslRequest.new.to_bytes)
      if raw_socket.read(1) == 'S'
        ssl_context = @options[:ssl].is_a?(OpenSSL::SSL::SSLContext) ? @options[:ssl] : OpenSSL::SSL::SSLContext.new
        raw_socket = OpenSSL::SSL::SSLSocket.new(raw_socket, ssl_context)
        raw_socket.sync = true
        raw_socket.connect
      else
        raise Vertica::Error::SSLNotSupported.new("SSL requested but server doesn't support it.")
      end
    end

    raw_socket
  end
end
startup_connection() click to toggle source
# File lib/vertica/connection.rb, line 388
def startup_connection
  write_message(Vertica::Protocol::Startup.new(@options[:username], @options[:database]))
  message = nil
  begin
    case message = read_message
    when Vertica::Protocol::Authentication
      if message.code != Vertica::Protocol::Authentication::OK
        write_message(Vertica::Protocol::Password.new(@options[:password], auth_method: message.code, user: @options[:username], salt: message.salt))
      end
    else
      process_message(message)
    end
  end until message.kind_of?(Vertica::Protocol::ReadyForQuery)
end
write_bytes(bytes) click to toggle source
# File lib/vertica/connection.rb, line 370
def write_bytes(bytes)
  bytes_written = socket.write_nonblock(bytes)
  write_bytes(bytes[bytes_written...bytes.size]) if bytes_written < bytes.size
rescue IO::WaitReadable, IO::WaitWritable => wait_error
  io_select(wait_error)
  retry
end