class TellaPeer::Connections

Attributes

connections[W]
max_connections[W]
overriden_replies[W]
ping_log[W]
query_log[W]
reply_log[W]
seed[RW]

Public Class Methods

add_potential_connection(ip, port) click to toggle source
# File lib/tella_peer/connections.rb, line 158
def add_potential_connection(ip, port)
  connection_queue[[ip, port].join(':')] ||= 0
end
build(socket, remote_ip, remote_port, direction = :inbound) click to toggle source
# File lib/tella_peer/connections.rb, line 76
def build(socket, remote_ip, remote_port, direction = :inbound)
  key = "#{remote_ip}:#{remote_port}"
  if connections[key]
    logger.warn "Tried to reconnect to existing connection #{key}"
  else
    logger.info "Connecting opened to #{key}"
    unless connections.size > max_connections
      connections[key] ||= Connection.new(socket, remote_ip, remote_port, direction)
    else
      socket.close
      logger.info "Connection to #{key} closed due to max connections"
    end
  end
  connections[key]
end
build_from_connections() click to toggle source
# File lib/tella_peer/connections.rb, line 60
def build_from_connections
  (max_connections - connections.size).times do
    begin
      candidate = connection_queue.keys.sample
      if candidate && !connections.keys.include?(candidate) && candidate != "#{Message.ip.join('.')}:Message.port"
        connection_queue[candidate] = connection_queue.fetch(candidate, 0) + 1
        logger.debug "Connecting to #{candidate}"
        Timeout::timeout(15) { connect_as_client(*candidate.split(':')).watch } 
      end
    rescue
      logger.debug "Unable to connect to #{candidate}"
      logger.debug $!
    end
  end
end
clear_logs() click to toggle source
# File lib/tella_peer/connections.rb, line 27
def clear_logs
  @ping_log = {}
  @query_log = {}
end
close_connection(key) click to toggle source
# File lib/tella_peer/connections.rb, line 52
def close_connection(key)
  connections.delete(key)
end
connect_as_client(remote_ip, remote_port) click to toggle source
# File lib/tella_peer/connections.rb, line 166
def connect_as_client(remote_ip, remote_port)
  build(TCPSocket.open(remote_ip, remote_port), remote_ip, remote_port, :outbound)
end
connection_queue() click to toggle source
# File lib/tella_peer/connections.rb, line 36
def connection_queue
  @connection_queue ||= {}
end
connections() click to toggle source
# File lib/tella_peer/connections.rb, line 32
def connections
  @connections ||= {}
end
count_message(type, direction) click to toggle source
# File lib/tella_peer/connections.rb, line 151
def count_message(type, direction)
  @message_counts ||= {in: {}, out: {}}
  mutex.synchronize do
    @message_counts[direction][type] = @message_counts[direction].fetch(type, 0) + 1
  end
end
flood(message, except_to: nil) click to toggle source
# File lib/tella_peer/connections.rb, line 100
def flood(message, except_to: nil)
  connections.each do |_, connection|
    connection.send_message(message) unless connection == except_to
  end
end
logger() click to toggle source
# File lib/tella_peer/connections.rb, line 162
def logger
  TellaPeer.logger
end
max_connections() click to toggle source
# File lib/tella_peer/connections.rb, line 15
def max_connections
  @max_connections ||= 10
end
message_counts() click to toggle source
# File lib/tella_peer/connections.rb, line 144
def message_counts
  @message_counts ||= {in: {}, out: {}}
  mutex.synchronize do
    @message_counts.dup
  end
end
mutex() click to toggle source
# File lib/tella_peer/connections.rb, line 56
def mutex
  @mutex ||= Mutex.new
end
overriden_replies() click to toggle source
# File lib/tella_peer/connections.rb, line 48
def overriden_replies
  @overriden_replies ||= []
end
ping() click to toggle source
# File lib/tella_peer/connections.rb, line 92
def ping
  flood(Ping.new)
end
ping_log() click to toggle source
# File lib/tella_peer/connections.rb, line 19
def ping_log
  @ping_log    ||= {}
end
query() click to toggle source
# File lib/tella_peer/connections.rb, line 96
def query
  flood(Query.new)
end
query_log() click to toggle source
# File lib/tella_peer/connections.rb, line 23
def query_log
  @query_log   ||= {}
end
reply_log() click to toggle source
# File lib/tella_peer/connections.rb, line 44
def reply_log
  @reply_log ||= {}
end
seed_connections() click to toggle source
# File lib/tella_peer/connections.rb, line 139
def seed_connections
  add_potential_connection(*seed)
  self
end
seen_ping?(message_id, from: nil) click to toggle source
# File lib/tella_peer/connections.rb, line 106
def seen_ping?(message_id, from: nil)
  if ping_log.keys.include? message_id
    ping_log[message_id]
  else
    ping_log[message_id] = from if from
    false
  end
end
seen_query?(message_id, from: nil) click to toggle source
# File lib/tella_peer/connections.rb, line 115
def seen_query?(message_id, from: nil)
  if query_log.keys.include? message_id
    query_log[message_id]
  else
    query_log[message_id] = from if from
    false
  end
end
start_time() click to toggle source
# File lib/tella_peer/connections.rb, line 11
def start_time
  @start_time ||= Time.now
end
store_reply(message) click to toggle source
# File lib/tella_peer/connections.rb, line 124
def store_reply(message)
  key = message.connection_key
  return if key == Message.key
  connection = connections[key]
  connection.text = message.text if connection
  
  unless reply_log[key] && reply_log[key] == message.text
    overriden_replies << [Time.now, key, reply_log[key]] if reply_log[key]
    logger.info "#{key} replied with #{message.text}"
    message.log
  end

  reply_log[key] = message.text
end
uptime() click to toggle source
# File lib/tella_peer/connections.rb, line 7
def uptime
  start_time - Time.now
end
web_connection_queue() click to toggle source
# File lib/tella_peer/connections.rb, line 40
def web_connection_queue
  connection_queue.dup
end