class Queight::ChannelPool

Constants

DEFAULT_POOL_SIZE
MAXIMUM_BUNNY_ERRORS

Public Class Methods

new(options) click to toggle source
# File lib/queight/channel_pool.rb, line 10
def initialize(options)
  @options = options
  @wrappers = []
  @error_count = 0
  @lock = Monitor.new
end

Public Instance Methods

create_channel(prefetch = nil) click to toggle source
# File lib/queight/channel_pool.rb, line 38
def create_channel(prefetch = nil)
  channel = conn.create_channel
  channel.prefetch(prefetch) if prefetch

  channel
end
reset_cache!() click to toggle source
# File lib/queight/channel_pool.rb, line 45
def reset_cache!
  @wrappers.each(&:reset_cache!)
end
with_channel() { |channel| ... } click to toggle source
# File lib/queight/channel_pool.rb, line 17
def with_channel
  tracking_bunny_errors do
    channel_pool.run { |channel| yield(channel) }
  end
end
with_subscribe_channel(prefetch) { |channel| ... } click to toggle source
# File lib/queight/channel_pool.rb, line 29
def with_subscribe_channel(prefetch)
  tracking_bunny_errors do
    channel = create_channel(prefetch)
    yield(channel)
  end
ensure
  channel.close
end
with_transactional_channel() { |channel| ... } click to toggle source
# File lib/queight/channel_pool.rb, line 23
def with_transactional_channel
  tracking_bunny_errors do
    transactional_channel_pool.run { |channel| yield(channel) }
  end
end

Private Instance Methods

build_wrapper() click to toggle source
# File lib/queight/channel_pool.rb, line 63
def build_wrapper
  @wrappers << ChannelWrapper.new(conn.create_channel)
  @wrappers.last
end
bunny_error() click to toggle source
# File lib/queight/channel_pool.rb, line 105
def bunny_error
  @error_count += 1
  synch { reconnect! if @error_count > MAXIMUM_BUNNY_ERRORS }
end
bunny_options() click to toggle source
# File lib/queight/channel_pool.rb, line 68
def bunny_options
  @options.merge(
    :properties => {
      :information => @options.fetch(:id, "Queight (anonymous)"),
    }
  )
end
channel_pool() click to toggle source
# File lib/queight/channel_pool.rb, line 59
def channel_pool
  @channel_pool ||= new_pool("pool:channels")
end
conn() click to toggle source
# File lib/queight/channel_pool.rb, line 51
def conn
  @conn ||= Bunny.new(bunny_options).tap(&:start)
end
new_pool(name) click to toggle source
# File lib/queight/channel_pool.rb, line 76
def new_pool(name)
  HotTub::Pool.new(pool_options.merge(:name => name)) { build_wrapper }
end
no_bunny_error() click to toggle source
# File lib/queight/channel_pool.rb, line 110
def no_bunny_error
  @error_count = 0
end
pool_options() click to toggle source
# File lib/queight/channel_pool.rb, line 80
def pool_options
  {
    :close => :close,
    :size => pool_size,
    :max_size => (pool_size * 3),
  }
end
pool_size() click to toggle source
# File lib/queight/channel_pool.rb, line 88
def pool_size
  @options.fetch(:size, DEFAULT_POOL_SIZE)
end
reconnect!() click to toggle source
# File lib/queight/channel_pool.rb, line 118
def reconnect!
  channel_pool.drain!
  transactional_channel_pool.drain!
  @conn.close if @conn
  @conn = nil
  @wrappers = []
  @error_count = 0
end
synch() { || ... } click to toggle source
# File lib/queight/channel_pool.rb, line 114
def synch
  @lock.synchronize { yield }
end
tracking_bunny_errors() { || ... } click to toggle source
# File lib/queight/channel_pool.rb, line 92
def tracking_bunny_errors
  val = yield

  no_bunny_error

  return val
rescue Bunny::Exception => e
  bunny_error
  raise e
end
transactional_channel_pool() click to toggle source
# File lib/queight/channel_pool.rb, line 55
def transactional_channel_pool
  @tx_channel_pool ||= new_pool("pool:tx_channels")
end