class Que::Connection

Constants

CAST_PROCS

Procs used to convert strings from Postgres into Ruby types.

Attributes

wrapped_connection[R]

Public Class Methods

new(connection) click to toggle source
# File lib/que/connection.rb, line 44
def initialize(connection)
  @wrapped_connection = connection
  @prepared_statements = Set.new
end
wrap(conn) click to toggle source
# File lib/que/connection.rb, line 28
def wrap(conn)
  case conn
  when self
    conn
  when PG::Connection
    if conn.instance_variable_defined?(:@que_wrapper)
      conn.instance_variable_get(:@que_wrapper)
    else
      conn.instance_variable_set(:@que_wrapper, new(conn))
    end
  else
    raise Error, "Unsupported input for Connection.wrap: #{conn.class}"
  end
end

Public Instance Methods

drain_notifications() click to toggle source
# File lib/que/connection.rb, line 118
def drain_notifications
  loop { break if next_notification.nil? }
end
execute(command, params = []) click to toggle source
# File lib/que/connection.rb, line 49
def execute(command, params = [])
  sql =
    case command
    when Symbol then SQL[command]
    when String then command
    else raise Error, "Bad command! #{command.inspect}"
    end

  params = convert_params(params)

  result =
    Que.run_sql_middleware(sql, params) do
      # Some versions of the PG gem dislike an empty/nil params argument.
      if params.empty?
        wrapped_connection.async_exec(sql)
      else
        wrapped_connection.async_exec_params(sql, params)
      end
    end

  Que.internal_log :connection_execute, self do
    {
      backend_pid: backend_pid,
      command:     command,
      params:      params,
      ntuples:     result.ntuples,
    }
  end

  convert_result(result)
end
execute_prepared(command, params = nil) click to toggle source
# File lib/que/connection.rb, line 81
def execute_prepared(command, params = nil)
  Que.assert(Symbol, command)

  if !Que.use_prepared_statements || in_transaction?
    return execute(command, params)
  end

  name = "que_#{command}"

  begin
    unless @prepared_statements.include?(command)
      wrapped_connection.prepare(name, SQL[command])
      @prepared_statements.add(command)
      prepared_just_now = true
    end

    convert_result(
      wrapped_connection.exec_prepared(name, params)
    )
  rescue ::PG::InvalidSqlStatementName => error
    # Reconnections on ActiveRecord can cause the same connection
    # objects to refer to new backends, so recover as well as we can.

    unless prepared_just_now
      Que.log level: :warn, event: :reprepare_statement, command: command
      @prepared_statements.delete(command)
      retry
    end

    raise error
  end
end
in_transaction?() click to toggle source
# File lib/que/connection.rb, line 126
def in_transaction?
  wrapped_connection.transaction_status != ::PG::PQTRANS_IDLE
end
next_notification() click to toggle source
# File lib/que/connection.rb, line 114
def next_notification
  wrapped_connection.notifies
end
server_version() click to toggle source
# File lib/que/connection.rb, line 122
def server_version
  wrapped_connection.server_version
end

Private Instance Methods

convert_params(params) click to toggle source
# File lib/que/connection.rb, line 132
def convert_params(params)
  params.map do |param|
    case param
    when Time
      # The pg gem unfortunately doesn't convert fractions of time
      # instances, so cast them to a string.
      param.strftime('%Y-%m-%d %H:%M:%S.%6N %z')
    when Array, Hash
      # Handle JSON.
      Que.serialize_json(param)
    else
      param
    end
  end
end
convert_result(result) click to toggle source
# File lib/que/connection.rb, line 176
def convert_result(result)
  output = result.to_a

  result.fields.each_with_index do |field, index|
    symbol = field.to_sym

    if converter = CAST_PROCS[result.ftype(index)]
      output.each do |hash|
        value = hash.delete(field)
        value = converter.call(value) if value
        hash[symbol] = value
      end
    else
      output.each do |hash|
        hash[symbol] = hash.delete(field)
      end
    end
  end

  output
end