class Queight::ChannelPool
Constants
- DEFAULT_POOL_SIZE
- MAXIMUM_BUNNY_ERRORS
Public Class Methods
new(options)
click to toggle source
# File lib/queight/channel_pool.rb, line 10 def initialize(options) @options = options @wrappers = [] @error_count = 0 @lock = Monitor.new end
Public Instance Methods
create_channel(prefetch = nil)
click to toggle source
# File lib/queight/channel_pool.rb, line 38 def create_channel(prefetch = nil) channel = conn.create_channel channel.prefetch(prefetch) if prefetch channel end
reset_cache!()
click to toggle source
# File lib/queight/channel_pool.rb, line 45 def reset_cache! @wrappers.each(&:reset_cache!) end
with_channel() { |channel| ... }
click to toggle source
# File lib/queight/channel_pool.rb, line 17 def with_channel tracking_bunny_errors do channel_pool.run { |channel| yield(channel) } end end
with_subscribe_channel(prefetch) { |channel| ... }
click to toggle source
# File lib/queight/channel_pool.rb, line 29 def with_subscribe_channel(prefetch) tracking_bunny_errors do channel = create_channel(prefetch) yield(channel) end ensure channel.close end
with_transactional_channel() { |channel| ... }
click to toggle source
# File lib/queight/channel_pool.rb, line 23 def with_transactional_channel tracking_bunny_errors do transactional_channel_pool.run { |channel| yield(channel) } end end
Private Instance Methods
build_wrapper()
click to toggle source
# File lib/queight/channel_pool.rb, line 63 def build_wrapper @wrappers << ChannelWrapper.new(conn.create_channel) @wrappers.last end
bunny_error()
click to toggle source
# File lib/queight/channel_pool.rb, line 105 def bunny_error @error_count += 1 synch { reconnect! if @error_count > MAXIMUM_BUNNY_ERRORS } end
bunny_options()
click to toggle source
# File lib/queight/channel_pool.rb, line 68 def bunny_options @options.merge( :properties => { :information => @options.fetch(:id, "Queight (anonymous)"), } ) end
channel_pool()
click to toggle source
# File lib/queight/channel_pool.rb, line 59 def channel_pool @channel_pool ||= new_pool("pool:channels") end
conn()
click to toggle source
# File lib/queight/channel_pool.rb, line 51 def conn @conn ||= Bunny.new(bunny_options).tap(&:start) end
new_pool(name)
click to toggle source
# File lib/queight/channel_pool.rb, line 76 def new_pool(name) HotTub::Pool.new(pool_options.merge(:name => name)) { build_wrapper } end
no_bunny_error()
click to toggle source
# File lib/queight/channel_pool.rb, line 110 def no_bunny_error @error_count = 0 end
pool_options()
click to toggle source
# File lib/queight/channel_pool.rb, line 80 def pool_options { :close => :close, :size => pool_size, :max_size => (pool_size * 3), } end
pool_size()
click to toggle source
# File lib/queight/channel_pool.rb, line 88 def pool_size @options.fetch(:size, DEFAULT_POOL_SIZE) end
reconnect!()
click to toggle source
# File lib/queight/channel_pool.rb, line 118 def reconnect! channel_pool.drain! transactional_channel_pool.drain! @conn.close if @conn @conn = nil @wrappers = [] @error_count = 0 end
synch() { || ... }
click to toggle source
# File lib/queight/channel_pool.rb, line 114 def synch @lock.synchronize { yield } end
tracking_bunny_errors() { || ... }
click to toggle source
# File lib/queight/channel_pool.rb, line 92 def tracking_bunny_errors val = yield no_bunny_error return val rescue Bunny::Exception => e bunny_error raise e end
transactional_channel_pool()
click to toggle source
# File lib/queight/channel_pool.rb, line 55 def transactional_channel_pool @tx_channel_pool ||= new_pool("pool:tx_channels") end