class EzPool
Generic connection pool class for e.g. sharing a limited number of network connections among many threads. Note: Connections are lazily created.
Example usage with block (faster):
@pool = EzPool.new { Redis.new } @pool.with do |redis| redis.lpop('my-list') if redis.llen('my-list') > 0 end
Using optional timeout override (for that single invocation)
@pool.with(timeout: 2.0) do |redis| redis.lpop('my-list') if redis.llen('my-list') > 0 end
Example usage replacing an existing connection (slower):
$redis = EzPool.wrap { Redis.new } def do_work $redis.lpop('my-list') if $redis.llen('my-list') > 0 end
Note that there's no way to pass a disconnection function to this usage, nor any way to guarantee that subsequent calls will go to the same connection (if your connection has any concept of sessions, this may be important). We strongly recommend against using wrapped connections in production environments.
Accepts the following options:
-
:size - number of connections to pool, defaults to 5
-
:timeout - amount of time to wait for a connection if none currently available, defaults to 5 seconds
-
:max_age - maximum number of seconds that a connection may be alive for (will recycle on checkin/checkout)
-
:connect_with - callable for creating a connection
-
:disconnect-_with - callable for shutting down a connection
Constants
- DEFAULTS
- GLOBAL_MONOTONIC_CLOCK
Clock that cannot be set and represents monotonic time since some unspecified starting point.
@!visibility private
- VERSION
Public Class Methods
Returns the current time a tracked by the application monotonic clock.
@return [Float] The current monotonic time when `since` not given else
the elapsed monotonic time between `since` and the current time
# File lib/ezpool/monotonic_time.rb, line 62 def monotonic_time GLOBAL_MONOTONIC_CLOCK.get_time end
# File lib/ezpool.rb, line 55 def initialize(options = {}, &block) options = DEFAULTS.merge(options) @size = options.fetch(:size) @timeout = options.fetch(:timeout) @max_age = options.fetch(:max_age).to_f if @max_age <= 0 raise ArgumentError.new(":max_age must be > 0") end if block_given? if options.include?(:connect_with) raise ArgumentError.new("Block passed to EzPool *and* :connect_with in options") else options[:connect_with] = block end end @manager = EzPool::ConnectionManager.new(options[:connect_with], options[:disconnect_with]) @available = TimedStack.new(@manager, @size) @key = :"current-#{@available.object_id}" @checked_out_connections = Hash.new @mutex = Mutex.new end
# File lib/ezpool.rb, line 48 def self.wrap(options, &block) if block_given? options[:connect_with] = block end Wrapper.new(options) end
Public Instance Methods
# File lib/ezpool.rb, line 134 def checkin(conn) conn_wrapper = @mutex.synchronize do @checked_out_connections.delete(conn.object_id) end if conn_wrapper.nil? raise EzPool::CheckedInUnCheckedOutConnectionError end if expired? conn_wrapper @available.abandon(conn_wrapper) else @available.push(conn_wrapper) end nil end
# File lib/ezpool.rb, line 117 def checkout(options = {}) conn_wrapper = nil while conn_wrapper.nil? do timeout = options[:timeout] || @timeout conn_wrapper = @available.pop(timeout: timeout) if expired? conn_wrapper @available.abandon(conn_wrapper) conn_wrapper = nil end end @mutex.synchronize do @checked_out_connections[conn_wrapper.raw_conn.object_id] = conn_wrapper end conn_wrapper.raw_conn end
# File lib/ezpool.rb, line 83 def connect_with(&block) @manager.connect_with(&block) end
# File lib/ezpool.rb, line 87 def disconnect_with(&block) @manager.disconnect_with(&block) end
@!visibility private
# File lib/ezpool/monotonic_time.rb, line 15 def get_time Process.clock_gettime(Process::CLOCK_MONOTONIC) end
# File lib/ezpool.rb, line 149 def shutdown if block_given? raise ArgumentError.new("shutdown no longer accepts a block; call #disconnect_with to set the disconnect method, or pass the disconnect: option to the EzPool initializer") end @available.shutdown end
MRI
# File lib/ezpool.rb, line 93 def with(options = {}) Thread.handle_interrupt(Exception => :never) do conn = checkout(options) begin Thread.handle_interrupt(Exception => :immediate) do yield conn end ensure checkin conn end end end
Private Instance Methods
# File lib/ezpool.rb, line 157 def expired?(connection_wrapper) if @max_age.finite? connection_wrapper.age > @max_age else false end end