class Beanpool::Connections
Attributes
connections[R]
ip_array[R]
Public Class Methods
new(ip_array, debug)
click to toggle source
# File lib/beanpool/connections.rb, line 8 def initialize(ip_array, debug) @ip_array = ip_array @troubled_ips = {} @connections = {} @debug = debug build_connections @mutex = Mutex.new end
Public Instance Methods
all_stats(tube_name)
click to toggle source
# File lib/beanpool/connections.rb, line 134 def all_stats(tube_name) return_struct = {} @connections.each do |ip, connection| stat = connection.tubes[tube_name].stats stat.to_h.each do |name, val| return_struct[name] = if val.is_a?(Integer) return_struct[name].nil? ? val.to_i : return_struct[name] + val.to_i elsif name == 'name' return_struct[name].nil? ? ip.to_s : return_struct[name] + '|' + ip.to_s else return_struct[name].nil? ? val.to_s : return_struct[name] + '|' + val.to_s end end end return return_struct end
build_connections()
click to toggle source
# File lib/beanpool/connections.rb, line 34 def build_connections @ip_array.each do |ip| raise 'Only single IP for beaneater' if ip.is_a?(Array) begin @connections[ip] = Beaneater.new(ip) rescue => ex notify(ex) notify("Failed to add #{ip}") end end end
check_times()
click to toggle source
# File lib/beanpool/connections.rb, line 65 def check_times @troubled_ips.each do |k, v| notify("Checking troubled #{k}") if v[:time] < Time.now - 60 begin @connections[k] = Beaneater.new(k) notify("Re-added to live: #{k}") @troubled_ips.delete(k) rescue => ex notify(ex) # Keep retrying every minute v[:time] = Time.now end end end end
close()
click to toggle source
# File lib/beanpool/connections.rb, line 46 def close @connections.each do |_k, v| v.close end end
connection_sample()
click to toggle source
# File lib/beanpool/connections.rb, line 59 def connection_sample check_times ip_id = @connections.keys.sample ip_id end
get_job_from_tube(timeout = nil, tube_name = 'default')
click to toggle source
# File lib/beanpool/connections.rb, line 82 def get_job_from_tube(timeout = nil, tube_name = 'default') @mutex.synchronize do ip_id = connection_sample connection = @connections[ip_id] begin job = connection.tubes[tube_name].reserve(timeout) return job rescue Beaneater::TimedOutError return nil rescue => ex notify(ex) notify("Exception IP: #{ip_id}") put_ip_in_timeout_and_reload(ip_id) return nil end end end
keystring_hash(hash)
click to toggle source
# File lib/beanpool/connections.rb, line 126 def keystring_hash(hash) new_hash = {} hash.keys.each do |k| new_hash[k.to_s] = hash[k] end new_hash end
notify(object)
click to toggle source
# File lib/beanpool/connections.rb, line 17 def notify(object) if @debug if object.is_a? Exception backtrace_array = object.backtrace backtrace_array.reject! { |x| x =~ /\.rvm/ } backtrace_array.unshift(object.message.to_s) raw_string = backtrace_array.join("\n") puts "EXCEPTION: #{object.message}" puts raw_string elsif object.is_a?(Hash) || object.is_a?(Array) puts object elsif object.is_a?(String) puts object end end end
put_ip_in_timeout_and_reload(ip)
click to toggle source
# File lib/beanpool/connections.rb, line 52 def put_ip_in_timeout_and_reload(ip) return unless @connections.size > 1 @troubled_ips[ip] = { time: Time.now } @connections.delete(ip) unless @connections.size < 2 notify("Added #{ip} to troubled") end
put_job_to_tube(body, options)
click to toggle source
# File lib/beanpool/connections.rb, line 100 def put_job_to_tube(body, options) @mutex.synchronize do options = keystring_hash(options) pri = options["pri"] || 32000 ttr = options["ttr"] || 60 reset_use_tube = options['reset_use_tube'] tube_name = options["tube_name"] || 'default' delay = (options["delay"]).to_i ip_id = connection_sample connection = @connections[ip_id] begin notify("BEANPOOL: Putting to #{tube_name}") connection.tubes[tube_name].put(body, :pri => pri, :delay => delay, :ttr => ttr) rescue => ex notify(ex) put_ip_in_timeout_and_reload(ip_id) ip_id = connection_sample connection = @connections[ip_id] connection.tubes[tube_name].put(body, :pri => pri, :delay => delay, :ttr => ttr) end # Force default tube reset if requested. connection.tubes.use(reset_use_tube) if connection && reset_use_tube end end
stats(tube_name, stat_name)
click to toggle source
# File lib/beanpool/connections.rb, line 151 def stats(tube_name, stat_name) value = 0 @connections.each do |_k, v| tube = v.tubes[tube_name] if tube beaneater_stats = tube.stats value += beaneater_stats[stat_name.to_sym].to_i if beaneater_stats end end value end