class Que::Connection
Constants
- CAST_PROCS
Procs used to convert strings from Postgres into Ruby types.
Attributes
wrapped_connection[R]
Public Class Methods
new(connection)
click to toggle source
# File lib/que/connection.rb, line 44 def initialize(connection) @wrapped_connection = connection @prepared_statements = Set.new end
wrap(conn)
click to toggle source
# File lib/que/connection.rb, line 28 def wrap(conn) case conn when self conn when PG::Connection if conn.instance_variable_defined?(:@que_wrapper) conn.instance_variable_get(:@que_wrapper) else conn.instance_variable_set(:@que_wrapper, new(conn)) end else raise Error, "Unsupported input for Connection.wrap: #{conn.class}" end end
Public Instance Methods
drain_notifications()
click to toggle source
# File lib/que/connection.rb, line 118 def drain_notifications loop { break if next_notification.nil? } end
execute(command, params = [])
click to toggle source
# File lib/que/connection.rb, line 49 def execute(command, params = []) sql = case command when Symbol then SQL[command] when String then command else raise Error, "Bad command! #{command.inspect}" end params = convert_params(params) result = Que.run_sql_middleware(sql, params) do # Some versions of the PG gem dislike an empty/nil params argument. if params.empty? wrapped_connection.async_exec(sql) else wrapped_connection.async_exec_params(sql, params) end end Que.internal_log :connection_execute, self do { backend_pid: backend_pid, command: command, params: params, ntuples: result.ntuples, } end convert_result(result) end
execute_prepared(command, params = nil)
click to toggle source
# File lib/que/connection.rb, line 81 def execute_prepared(command, params = nil) Que.assert(Symbol, command) if !Que.use_prepared_statements || in_transaction? return execute(command, params) end name = "que_#{command}" begin unless @prepared_statements.include?(command) wrapped_connection.prepare(name, SQL[command]) @prepared_statements.add(command) prepared_just_now = true end convert_result( wrapped_connection.exec_prepared(name, params) ) rescue ::PG::InvalidSqlStatementName => error # Reconnections on ActiveRecord can cause the same connection # objects to refer to new backends, so recover as well as we can. unless prepared_just_now Que.log level: :warn, event: :reprepare_statement, command: command @prepared_statements.delete(command) retry end raise error end end
in_transaction?()
click to toggle source
# File lib/que/connection.rb, line 126 def in_transaction? wrapped_connection.transaction_status != ::PG::PQTRANS_IDLE end
next_notification()
click to toggle source
# File lib/que/connection.rb, line 114 def next_notification wrapped_connection.notifies end
server_version()
click to toggle source
# File lib/que/connection.rb, line 122 def server_version wrapped_connection.server_version end
Private Instance Methods
convert_params(params)
click to toggle source
# File lib/que/connection.rb, line 132 def convert_params(params) params.map do |param| case param when Time # The pg gem unfortunately doesn't convert fractions of time # instances, so cast them to a string. param.strftime('%Y-%m-%d %H:%M:%S.%6N %z') when Array, Hash # Handle JSON. Que.serialize_json(param) else param end end end
convert_result(result)
click to toggle source
# File lib/que/connection.rb, line 176 def convert_result(result) output = result.to_a result.fields.each_with_index do |field, index| symbol = field.to_sym if converter = CAST_PROCS[result.ftype(index)] output.each do |hash| value = hash.delete(field) value = converter.call(value) if value hash[symbol] = value end else output.each do |hash| hash[symbol] = hash.delete(field) end end end output end