class Que::ConnectionPool
Public Class Methods
new(&block)
click to toggle source
# File lib/que/connection_pool.rb, line 8 def initialize(&block) @connection_proc = block @checked_out = Set.new @mutex = Mutex.new @thread_key = "que_connection_pool_#{object_id}".to_sym end
Public Instance Methods
checkout() { |wrapped| ... }
click to toggle source
# File lib/que/connection_pool.rb, line 15 def checkout # Do some asserting to ensure that the connection pool we're using is # behaving properly. @connection_proc.call do |conn| # Did this pool already have a connection for this thread? preexisting = wrapped = current_connection begin if preexisting # If so, check that the connection we just got is the one we expect. if preexisting.wrapped_connection.backend_pid != conn.backend_pid raise Error, "Connection pool is not reentrant! previous: #{preexisting.wrapped_connection.inspect} now: #{conn.inspect}" end else # If not, make sure that it wasn't promised to any other threads. sync do Que.assert(@checked_out.add?(conn.backend_pid)) do "Connection pool didn't synchronize access properly! (entrance: #{conn.backend_pid})" end end self.current_connection = wrapped = Connection.wrap(conn) end yield(wrapped) ensure if preexisting.nil? # We're at the top level (about to return this connection to the # pool we got it from), so mark it as no longer ours. self.current_connection = nil sync do Que.assert(@checked_out.delete?(conn.backend_pid)) do "Connection pool didn't synchronize access properly! (exit: #{conn.backend_pid})" end end end end end end
execute(*args)
click to toggle source
# File lib/que/connection_pool.rb, line 56 def execute(*args) checkout { |conn| conn.execute(*args) } end
in_transaction?()
click to toggle source
# File lib/que/connection_pool.rb, line 60 def in_transaction? checkout { |conn| conn.in_transaction? } end
Private Instance Methods
current_connection()
click to toggle source
# File lib/que/connection_pool.rb, line 70 def current_connection Thread.current[@thread_key] end
current_connection=(c)
click to toggle source
# File lib/que/connection_pool.rb, line 74 def current_connection=(c) Thread.current[@thread_key] = c end
sync(&block)
click to toggle source
# File lib/que/connection_pool.rb, line 66 def sync(&block) @mutex.synchronize(&block) end