class Beanpool::Connections

Attributes

connections[R]
ip_array[R]

Public Class Methods

new(ip_array, debug) click to toggle source
# File lib/beanpool/connections.rb, line 8
def initialize(ip_array, debug)
  @ip_array = ip_array
  @troubled_ips = {}
  @connections = {}
  @debug = debug
  build_connections
  @mutex = Mutex.new
end

Public Instance Methods

all_stats(tube_name) click to toggle source
# File lib/beanpool/connections.rb, line 134
def all_stats(tube_name)
  return_struct = {}
  @connections.each do |ip, connection|
    stat = connection.tubes[tube_name].stats
    stat.to_h.each do |name, val|
      return_struct[name] =  if val.is_a?(Integer)
        return_struct[name].nil? ? val.to_i : return_struct[name] + val.to_i
      elsif name == 'name'
        return_struct[name].nil? ? ip.to_s : return_struct[name] + '|' + ip.to_s
      else
        return_struct[name].nil? ? val.to_s : return_struct[name] + '|' + val.to_s
      end
    end
  end
  return return_struct
end
build_connections() click to toggle source
# File lib/beanpool/connections.rb, line 34
def build_connections
  @ip_array.each do |ip|
    raise 'Only single IP for beaneater' if ip.is_a?(Array)
    begin
      @connections[ip] = Beaneater.new(ip)
    rescue => ex
      notify(ex)
      notify("Failed to add #{ip}")
    end
  end
end
check_times() click to toggle source
# File lib/beanpool/connections.rb, line 65
def check_times
  @troubled_ips.each do |k, v|
    notify("Checking troubled #{k}")
    if v[:time] < Time.now - 60
      begin
        @connections[k] = Beaneater.new(k)
        notify("Re-added to live: #{k}")
        @troubled_ips.delete(k)
      rescue => ex
        notify(ex)
        # Keep retrying every minute
        v[:time] = Time.now
      end
    end
  end
end
close() click to toggle source
# File lib/beanpool/connections.rb, line 46
def close
  @connections.each do |_k, v|
    v.close
  end
end
connection_sample() click to toggle source
# File lib/beanpool/connections.rb, line 59
def connection_sample
  check_times
  ip_id = @connections.keys.sample
  ip_id
end
get_job_from_tube(timeout = nil, tube_name = 'default') click to toggle source
# File lib/beanpool/connections.rb, line 82
def get_job_from_tube(timeout = nil, tube_name = 'default')
  @mutex.synchronize do
    ip_id = connection_sample
    connection = @connections[ip_id]
    begin
      job = connection.tubes[tube_name].reserve(timeout)
      return job
    rescue Beaneater::TimedOutError
      return nil
    rescue => ex
      notify(ex)
      notify("Exception IP: #{ip_id}")
      put_ip_in_timeout_and_reload(ip_id)
      return nil
    end
  end
end
keystring_hash(hash) click to toggle source
# File lib/beanpool/connections.rb, line 126
def keystring_hash(hash)
  new_hash = {}
  hash.keys.each do |k|
    new_hash[k.to_s] = hash[k]
  end
  new_hash
end
notify(object) click to toggle source
# File lib/beanpool/connections.rb, line 17
def notify(object)
  if @debug
    if object.is_a? Exception
      backtrace_array = object.backtrace
      backtrace_array.reject! { |x| x =~ /\.rvm/ }
      backtrace_array.unshift(object.message.to_s)
      raw_string = backtrace_array.join("\n")
      puts "EXCEPTION: #{object.message}"
      puts raw_string
    elsif object.is_a?(Hash) || object.is_a?(Array)
      puts object
    elsif object.is_a?(String)
      puts object
    end
  end
end
put_ip_in_timeout_and_reload(ip) click to toggle source
# File lib/beanpool/connections.rb, line 52
def put_ip_in_timeout_and_reload(ip)
  return unless @connections.size > 1
  @troubled_ips[ip] = { time: Time.now }
  @connections.delete(ip) unless @connections.size < 2
  notify("Added #{ip} to troubled")
end
put_job_to_tube(body, options) click to toggle source
# File lib/beanpool/connections.rb, line 100
def put_job_to_tube(body, options)
  @mutex.synchronize do
    options = keystring_hash(options)
    pri = options["pri"] || 32000
    ttr = options["ttr"] || 60
    reset_use_tube = options['reset_use_tube']
    tube_name = options["tube_name"] || 'default'
    delay = (options["delay"]).to_i

    ip_id = connection_sample
    connection = @connections[ip_id]
    begin
      notify("BEANPOOL: Putting to #{tube_name}")
      connection.tubes[tube_name].put(body, :pri => pri, :delay => delay, :ttr => ttr)
    rescue => ex
      notify(ex)
      put_ip_in_timeout_and_reload(ip_id)
      ip_id = connection_sample
      connection = @connections[ip_id]
      connection.tubes[tube_name].put(body, :pri => pri, :delay => delay, :ttr => ttr)
    end
    # Force default tube reset if requested.
    connection.tubes.use(reset_use_tube) if connection && reset_use_tube
  end
end
stats(tube_name, stat_name) click to toggle source
# File lib/beanpool/connections.rb, line 151
def stats(tube_name, stat_name)
  value = 0
  @connections.each do |_k, v|
    tube = v.tubes[tube_name]
    if tube
      beaneater_stats = tube.stats
      value += beaneater_stats[stat_name.to_sym].to_i if beaneater_stats
    end
  end
  value
end