class EzPool

Generic connection pool class for e.g. sharing a limited number of network connections among many threads. Note: Connections are lazily created.

Example usage with block (faster):

@pool = EzPool.new { Redis.new }

@pool.with do |redis|
  redis.lpop('my-list') if redis.llen('my-list') > 0
end

Using optional timeout override (for that single invocation)

@pool.with(timeout: 2.0) do |redis|
  redis.lpop('my-list') if redis.llen('my-list') > 0
end

Example usage replacing an existing connection (slower):

$redis = EzPool.wrap { Redis.new }

def do_work
  $redis.lpop('my-list') if $redis.llen('my-list') > 0
end

Note that there's no way to pass a disconnection function to this usage, nor any way to guarantee that subsequent calls will go to the same connection (if your connection has any concept of sessions, this may be important). We strongly recommend against using wrapped connections in production environments.

Accepts the following options:

Constants

DEFAULTS
GLOBAL_MONOTONIC_CLOCK

Clock that cannot be set and represents monotonic time since some unspecified starting point.

@!visibility private

VERSION

Public Class Methods

monotonic_time() click to toggle source

Returns the current time a tracked by the application monotonic clock.

@return [Float] The current monotonic time when `since` not given else

the elapsed monotonic time between `since` and the current time
# File lib/ezpool/monotonic_time.rb, line 62
def monotonic_time
  GLOBAL_MONOTONIC_CLOCK.get_time
end
new(options = {}, &block) click to toggle source
# File lib/ezpool.rb, line 55
def initialize(options = {}, &block)
  options = DEFAULTS.merge(options)

  @size = options.fetch(:size)
  @timeout = options.fetch(:timeout)
  @max_age = options.fetch(:max_age).to_f

  if @max_age <= 0
    raise ArgumentError.new(":max_age must be > 0")
  end

  if block_given?
    if options.include?(:connect_with)
      raise ArgumentError.new("Block passed to EzPool *and* :connect_with in options")
    else
      options[:connect_with] = block
    end
  end

  @manager = EzPool::ConnectionManager.new(options[:connect_with], options[:disconnect_with])

  @available = TimedStack.new(@manager, @size)
  @key = :"current-#{@available.object_id}"

  @checked_out_connections = Hash.new
  @mutex = Mutex.new
end
wrap(options, &block) click to toggle source
# File lib/ezpool.rb, line 48
def self.wrap(options, &block)
  if block_given?
    options[:connect_with] = block
  end
  Wrapper.new(options)
end

Public Instance Methods

checkin(conn) click to toggle source
# File lib/ezpool.rb, line 134
def checkin(conn)
  conn_wrapper = @mutex.synchronize do
    @checked_out_connections.delete(conn.object_id)
  end
  if conn_wrapper.nil?
    raise EzPool::CheckedInUnCheckedOutConnectionError
  end
  if expired? conn_wrapper
    @available.abandon(conn_wrapper)
  else
    @available.push(conn_wrapper)
  end
  nil
end
checkout(options = {}) click to toggle source
# File lib/ezpool.rb, line 117
def checkout(options = {})
  conn_wrapper = nil
  while conn_wrapper.nil? do
    timeout = options[:timeout] || @timeout
    conn_wrapper = @available.pop(timeout: timeout)
    if expired? conn_wrapper
      @available.abandon(conn_wrapper)
      conn_wrapper = nil
    end
  end

  @mutex.synchronize do
    @checked_out_connections[conn_wrapper.raw_conn.object_id] = conn_wrapper
  end
  conn_wrapper.raw_conn
end
connect_with(&block) click to toggle source
# File lib/ezpool.rb, line 83
def connect_with(&block)
  @manager.connect_with(&block)
end
disconnect_with(&block) click to toggle source
# File lib/ezpool.rb, line 87
def disconnect_with(&block)
  @manager.disconnect_with(&block)
end
get_time() click to toggle source

@!visibility private

# File lib/ezpool/monotonic_time.rb, line 15
def get_time
  Process.clock_gettime(Process::CLOCK_MONOTONIC)
end
shutdown() click to toggle source
# File lib/ezpool.rb, line 149
def shutdown
  if block_given?
    raise ArgumentError.new("shutdown no longer accepts a block; call #disconnect_with to set the disconnect method, or pass the disconnect: option to the EzPool initializer")
  end
  @available.shutdown
end
with(options = {}) { |conn| ... } click to toggle source

MRI

# File lib/ezpool.rb, line 93
def with(options = {})
  Thread.handle_interrupt(Exception => :never) do
    conn = checkout(options)
    begin
      Thread.handle_interrupt(Exception => :immediate) do
        yield conn
      end
    ensure
      checkin conn
    end
  end
end

Private Instance Methods

expired?(connection_wrapper) click to toggle source
# File lib/ezpool.rb, line 157
def expired?(connection_wrapper)
  if @max_age.finite?
    connection_wrapper.age > @max_age
  else
    false
  end
end