class FiberConnectionPool

Constants

RESERVED_TTL_SECS
SAVED_DATA_TTL_SECS
VERSION

Attributes

saved_data[RW]
size[R]
treated_exceptions[RW]

Public Class Methods

new(opts) { || ... } click to toggle source

Initializes the pool with ‘size’ instances running the given block to get each one. Ex:

pool = FiberConnectionPool.new(:size => 5) { MyConnection.new }
# File lib/fiber_connection_pool.rb, line 19
def initialize(opts)
  raise ArgumentError.new('size > 0 is mandatory') if opts[:size].to_i <= 0

  @size = opts[:size].to_i

  @saved_data = {} # placeholder for requested save data
  @reserved  = {}   # map of in-progress connections
  @treated_exceptions = [ PlaceholderException ]  # list of Exception classes that need further connection treatment
  @last_reserved_cleanup = Time.now # reserved cleanup trigger
  @available = []   # pool of free connections
  @pending   = []   # pending reservations (FIFO)
  @save_data_requests = {} # blocks to be yielded to save data
  @last_data_cleanup = Time.now # saved_data cleanup trigger
  @keep_connection = {} # keep reserved connections for fiber

  @available = Array.new(@size) { yield }
end

Public Instance Methods

acquire(fiber = nil, opts = { :keep => true }) click to toggle source

Acquire a lock on a connection and assign it to given fiber If no connection is available, yield the given fiber on the pending array

If :keep => true is given (by default), connection is kept, you must call ‘release’ at some point

Ex:

def transaction
  @pool.acquire          # reserve one instance for this fiber
  @pool.query 'BEGIN'    # start SQL transaction

  yield                  # perform queries inside the transaction

  @pool.query 'COMMIT'   # confirm it
rescue => ex
  @pool.query 'ROLLBACK' # discard it
  raise ex
ensure
  @pool.release          # always release it back
end
# File lib/fiber_connection_pool.rb, line 166
def acquire(fiber = nil, opts = { :keep => true })
  fiber = Fiber.current if fiber.nil?
  @keep_connection[fiber] = true if opts[:keep]
  return @reserved[fiber] if @reserved[fiber] # already reserved? then use it
  if conn = @available.pop
    @reserved[fiber] = conn
    conn
  else
    Fiber.yield @pending.push fiber
    acquire(fiber,opts)
  end
end
clear_save_data_requests() click to toggle source

Clear any save_data requests in the pool. No data will be saved after this, unless new requests are added with save_data.

# File lib/fiber_connection_pool.rb, line 71
def clear_save_data_requests
  @save_data_requests = {}
end
gathered_data() click to toggle source

Return the gathered data for this fiber

# File lib/fiber_connection_pool.rb, line 64
def gathered_data
  @saved_data[Fiber.current]
end
has_connection?(conn) click to toggle source

True if the given connection is anywhere inside the pool

# File lib/fiber_connection_pool.rb, line 100
def has_connection?(conn)
  (@available + @reserved.values).include?(conn)
end
query(sql, *args) click to toggle source

Avoid method_missing stack for ‘query’

# File lib/fiber_connection_pool.rb, line 92
def query(sql, *args)
  execute('query', args) do |conn|
    conn.query sql, *args
  end
end
recreate_connection(new_conn) click to toggle source

DEPRECATED: use with_failed_connection

# File lib/fiber_connection_pool.rb, line 105
def recreate_connection(new_conn)
  with_failed_connection { new_conn }
end
release(fiber = nil) click to toggle source

Release connection assigned to the supplied fiber and resume any other pending connections (which will immediately try to run acquire on the pool) Also perform cleanup if TTL is past

# File lib/fiber_connection_pool.rb, line 184
def release(fiber = nil)
  fiber = Fiber.current if fiber.nil?
  @keep_connection.delete fiber

  @available.unshift @reserved.delete(fiber)

  # try cleanup
  reserved_cleanup if (Time.now - @last_reserved_cleanup) >= RESERVED_TTL_SECS

  notify_new_is_available
end
release_data(fiber) click to toggle source

Delete any saved_data for given fiber

# File lib/fiber_connection_pool.rb, line 77
def release_data(fiber)
  @saved_data.delete(fiber)
end
reserved_cleanup() click to toggle source

Delete any reserved held for dead fibers

# File lib/fiber_connection_pool.rb, line 134
def reserved_cleanup
  @last_reserved_cleanup = Time.now
  @reserved.dup.each do |k,v|
    release(k) if not k.alive?
  end
  @keep_connection.dup.each do |k,v|
    @keep_connection.delete(k) if not k.alive?
  end
end
save_data(key, &block) click to toggle source

Add a save_data request to the pool. The given block will be executed after each successful call to -any- method on the connection. The connection and the method name are passed to the block.

The returned value will be saved in pool.gathered_data, and will be kept as long as the fiber stays alive.

Ex:

# (...right after pool's creation...)
pool.save_data(:hey_or_hoo) do |conn, method, args|
  return 'hey' if method == 'query'
  'hoo'
end

# (...from a reactor fiber...)
pool.query('select anything from anywhere')
puts pool.gathered_data[:hey_or_hoo]
  => 'hey'
# File lib/fiber_connection_pool.rb, line 58
def save_data(key, &block)
  @save_data_requests[key] = block
end
save_data_cleanup() click to toggle source

Delete any saved_data held for dead fibers

# File lib/fiber_connection_pool.rb, line 83
def save_data_cleanup
  @saved_data.dup.each do |k,v|
    @saved_data.delete(k) if not k.alive?
  end
  @last_data_cleanup = Time.now
end
with_failed_connection() { |bad_conn| ... } click to toggle source

Identify the connection that just failed for current fiber. Pass it to the given block, which must return a valid instance of connection. After that, put the new connection into the pool in failed connection’s place. Raises NoReservedConnection if cannot find the failed connection instance.

# File lib/fiber_connection_pool.rb, line 114
def with_failed_connection
  fiber = Fiber.current
  bad_conn = @reserved[fiber]
  raise NoReservedConnection.new if bad_conn.nil?
  new_conn = yield bad_conn
  @available.reject!{ |v| v == bad_conn }
  @reserved.reject!{ |k,v| v == bad_conn }

  # we should keep it if manually acquired,
  # just in case it is still useful
  if @keep_connection[fiber] then
    @reserved[fiber] = new_conn
  else
    @available.unshift new_conn # or else release into the pool
    notify_new_is_available
  end
end

Private Instance Methods

execute(method, args) { |conn| ... } click to toggle source

Choose first available connection and pass it to the supplied block. This will block (yield) indefinitely until there is an available connection to service the request.

After running the block, save requested data and release the connection.

# File lib/fiber_connection_pool.rb, line 210
def execute(method, args)
  f = Fiber.current
  begin
    # get a connection and use it
    conn = acquire(f, :keep => false)
    retval = yield conn

    # save anything requested
    process_save_data(f, conn, method, args)

    # successful run, release if not keeping
    release(f) if not @keep_connection[f]

    retval
  rescue *treated_exceptions => ex
    # do not release connection for these
    # maybe prepare something here to be used on connection repair
    raise ex
  rescue Exception => ex
    # not successful run, but not meant to be treated, release if not keeping
    release(f) if not @keep_connection[f]
    raise ex
  end
end
method_missing(method, *args, &blk) click to toggle source

Allow the pool to behave as the underlying connection

Yield the connection within execute method and release once it is complete (assumption: fiber will yield while waiting for IO, allowing the reactor run other fibers)

# File lib/fiber_connection_pool.rb, line 254
def method_missing(method, *args, &blk)
  execute(method, args) do |conn|
    conn.send(method, *args, &blk)
  end
end
notify_new_is_available() click to toggle source
# File lib/fiber_connection_pool.rb, line 198
def notify_new_is_available
  if pending = @pending.shift
    pending.resume
  end
end
process_save_data(fiber, conn, method, args) click to toggle source

Run each save_data_block over the given connection and save the data for the given fiber. Also perform cleanup if TTL is past

# File lib/fiber_connection_pool.rb, line 239
def process_save_data(fiber, conn, method, args)
  @save_data_requests.each do |key,block|
    @saved_data[fiber] ||= {}
    @saved_data[fiber][key] = block.call(conn, method, args)
  end
  # try cleanup
  save_data_cleanup if (Time.now - @last_data_cleanup) >= SAVED_DATA_TTL_SECS
end