class TellaPeer::Connections
Attributes
connections[W]
max_connections[W]
overriden_replies[W]
ping_log[W]
query_log[W]
reply_log[W]
seed[RW]
Public Class Methods
add_potential_connection(ip, port)
click to toggle source
# File lib/tella_peer/connections.rb, line 158 def add_potential_connection(ip, port) connection_queue[[ip, port].join(':')] ||= 0 end
build(socket, remote_ip, remote_port, direction = :inbound)
click to toggle source
# File lib/tella_peer/connections.rb, line 76 def build(socket, remote_ip, remote_port, direction = :inbound) key = "#{remote_ip}:#{remote_port}" if connections[key] logger.warn "Tried to reconnect to existing connection #{key}" else logger.info "Connecting opened to #{key}" unless connections.size > max_connections connections[key] ||= Connection.new(socket, remote_ip, remote_port, direction) else socket.close logger.info "Connection to #{key} closed due to max connections" end end connections[key] end
build_from_connections()
click to toggle source
# File lib/tella_peer/connections.rb, line 60 def build_from_connections (max_connections - connections.size).times do begin candidate = connection_queue.keys.sample if candidate && !connections.keys.include?(candidate) && candidate != "#{Message.ip.join('.')}:Message.port" connection_queue[candidate] = connection_queue.fetch(candidate, 0) + 1 logger.debug "Connecting to #{candidate}" Timeout::timeout(15) { connect_as_client(*candidate.split(':')).watch } end rescue logger.debug "Unable to connect to #{candidate}" logger.debug $! end end end
clear_logs()
click to toggle source
# File lib/tella_peer/connections.rb, line 27 def clear_logs @ping_log = {} @query_log = {} end
close_connection(key)
click to toggle source
# File lib/tella_peer/connections.rb, line 52 def close_connection(key) connections.delete(key) end
connect_as_client(remote_ip, remote_port)
click to toggle source
# File lib/tella_peer/connections.rb, line 166 def connect_as_client(remote_ip, remote_port) build(TCPSocket.open(remote_ip, remote_port), remote_ip, remote_port, :outbound) end
connection_queue()
click to toggle source
# File lib/tella_peer/connections.rb, line 36 def connection_queue @connection_queue ||= {} end
connections()
click to toggle source
# File lib/tella_peer/connections.rb, line 32 def connections @connections ||= {} end
count_message(type, direction)
click to toggle source
# File lib/tella_peer/connections.rb, line 151 def count_message(type, direction) @message_counts ||= {in: {}, out: {}} mutex.synchronize do @message_counts[direction][type] = @message_counts[direction].fetch(type, 0) + 1 end end
flood(message, except_to: nil)
click to toggle source
# File lib/tella_peer/connections.rb, line 100 def flood(message, except_to: nil) connections.each do |_, connection| connection.send_message(message) unless connection == except_to end end
logger()
click to toggle source
# File lib/tella_peer/connections.rb, line 162 def logger TellaPeer.logger end
max_connections()
click to toggle source
# File lib/tella_peer/connections.rb, line 15 def max_connections @max_connections ||= 10 end
message_counts()
click to toggle source
# File lib/tella_peer/connections.rb, line 144 def message_counts @message_counts ||= {in: {}, out: {}} mutex.synchronize do @message_counts.dup end end
mutex()
click to toggle source
# File lib/tella_peer/connections.rb, line 56 def mutex @mutex ||= Mutex.new end
overriden_replies()
click to toggle source
# File lib/tella_peer/connections.rb, line 48 def overriden_replies @overriden_replies ||= [] end
ping()
click to toggle source
# File lib/tella_peer/connections.rb, line 92 def ping flood(Ping.new) end
ping_log()
click to toggle source
# File lib/tella_peer/connections.rb, line 19 def ping_log @ping_log ||= {} end
query()
click to toggle source
# File lib/tella_peer/connections.rb, line 96 def query flood(Query.new) end
query_log()
click to toggle source
# File lib/tella_peer/connections.rb, line 23 def query_log @query_log ||= {} end
reply_log()
click to toggle source
# File lib/tella_peer/connections.rb, line 44 def reply_log @reply_log ||= {} end
seed_connections()
click to toggle source
# File lib/tella_peer/connections.rb, line 139 def seed_connections add_potential_connection(*seed) self end
seen_ping?(message_id, from: nil)
click to toggle source
# File lib/tella_peer/connections.rb, line 106 def seen_ping?(message_id, from: nil) if ping_log.keys.include? message_id ping_log[message_id] else ping_log[message_id] = from if from false end end
seen_query?(message_id, from: nil)
click to toggle source
# File lib/tella_peer/connections.rb, line 115 def seen_query?(message_id, from: nil) if query_log.keys.include? message_id query_log[message_id] else query_log[message_id] = from if from false end end
start_time()
click to toggle source
# File lib/tella_peer/connections.rb, line 11 def start_time @start_time ||= Time.now end
store_reply(message)
click to toggle source
# File lib/tella_peer/connections.rb, line 124 def store_reply(message) key = message.connection_key return if key == Message.key connection = connections[key] connection.text = message.text if connection unless reply_log[key] && reply_log[key] == message.text overriden_replies << [Time.now, key, reply_log[key]] if reply_log[key] logger.info "#{key} replied with #{message.text}" message.log end reply_log[key] = message.text end
uptime()
click to toggle source
# File lib/tella_peer/connections.rb, line 7 def uptime start_time - Time.now end
web_connection_queue()
click to toggle source
# File lib/tella_peer/connections.rb, line 40 def web_connection_queue connection_queue.dup end