class EventMachine::Synchrony::ConnectionPool

Public Class Methods

new(opts, &block) click to toggle source
# File lib/em-synchrony/connection_pool.rb, line 7
def initialize(opts, &block)
  @reserved  = {}   # map of in-progress connections
  @available = []   # pool of free connections
  @pending   = []   # pending reservations (FIFO)

  opts[:size].times do
    @available.push(block.call) if block_given?
  end
end

Public Instance Methods

execute(async) { |conn| ... } click to toggle source

Choose first available connection and pass it to the supplied block. This will block indefinitely until there is an available connection to service the request.

# File lib/em-synchrony/connection_pool.rb, line 20
def execute(async)
  f = Fiber.current

  begin
    conn = acquire(f)
    yield conn
  ensure
    release(f) if not async
  end
end
pool_status() click to toggle source

Returns current pool utilization.

@return [Hash] Current utilization.

# File lib/em-synchrony/connection_pool.rb, line 34
def pool_status
  {
    available: @available.size,
    reserved: @reserved.size,
    pending: @pending.size
  }
end

Private Instance Methods

acquire(fiber) click to toggle source

Acquire a lock on a connection and assign it to executing fiber

  • if connection is available, pass it back to the calling block

  • if pool is full, yield the current fiber until connection is available

# File lib/em-synchrony/connection_pool.rb, line 47
def acquire(fiber)
  if conn = @available.pop
    @reserved[fiber.object_id] = conn
    conn
  else
    Fiber.yield @pending.push fiber
    acquire(fiber)
  end
end
method_missing(method, *args, &blk) click to toggle source

Allow the pool to behave as the underlying connection

If the requesting method begins with “a” prefix, then hijack the callbacks and errbacks to fire a connection pool release whenever the request is complete. Otherwise yield the connection within execute method and release once it is complete (assumption: fiber will yield until data is available, or request is complete)

# File lib/em-synchrony/connection_pool.rb, line 77
def method_missing(method, *args, &blk)
  async = (method[0,1] == "a")

  execute(async) do |conn|
    df = conn.__send__(method, *args, &blk)

    if async
      fiber = Fiber.current
      df.callback { release(fiber) }
      df.errback { release(fiber) }
    end

    df
  end
end
release(fiber) click to toggle source

Release connection assigned to the supplied fiber and resume any other pending connections (which will immediately try to run acquire on the pool)

# File lib/em-synchrony/connection_pool.rb, line 60
def release(fiber)
  @available.push(@reserved.delete(fiber.object_id))

  if pending = @pending.shift
    pending.resume
  end
end