class DEVp2p::PeerManager
connection strategy
for service which requires peers while peers.size > min_num_peers gen random id resolve closest node address [ideally know their services] connect closest node
Public Class Methods
new(app)
click to toggle source
Calls superclass method
# File lib/devp2p/peer_manager.rb, line 67 def initialize(app) super(app) logger.info "PeerManager init" @peers = [] @excluded = [] @errors = @config[:log_disconnects] ? PeerErrors.new : PeerErrorsBase.new @wire_protocol = P2PProtocol # setup nodeid based on privkey unless @config[:p2p].has_key?(:id) @config[:node][:id] = Crypto.privtopub Utils.decode_hex(@config[:node][:privkey_hex]) end @connect_timeout = 2.0 @connect_loop_delay = 0.5 @discovery_delay = 0.5 @host = @config[:p2p][:listen_host] @port = @config[:p2p][:listen_port] @stopped = false end
Public Instance Methods
add(peer)
click to toggle source
# File lib/devp2p/peer_manager.rb, line 122 def add(peer) @peers.push peer end
add_error(*args)
click to toggle source
# File lib/devp2p/peer_manager.rb, line 215 def add_error(*args) @errors.add *args end
broadcast(protocol, command_name, args=[], kwargs={}, num_peers=nil, exclude_peers=[])
click to toggle source
# File lib/devp2p/peer_manager.rb, line 156 def broadcast(protocol, command_name, args=[], kwargs={}, num_peers=nil, exclude_peers=[]) logger.debug "broadcasting", protocol: protocol, command: command_name, num_peers: num_peers, exclude_peers: exclude_peers.map(&:to_s) raise ArgumentError, 'invalid num_peers' unless num_peers.nil? || num_peers > 0 peers_with_proto = @peers.select {|p| p.protocols.include?(protocol) && !exclude_peers.include?(p) } if peers_with_proto.empty? logger.debug "no peers with protocol found", protos: @peers.select {|p| p.protocols } end num_peers ||= peers_with_proto.size peers_with_proto.sample([num_peers, peers_with_proto.size].min).each do |peer| logger.debug "broadcasting to", proto: peer.protocols[protocol] args.push kwargs peer.protocols[protocol].send "send_#{command_name}", *args peer.safe_to_read.wait logger.debug "broadcasting done", ts: Time.now end end
connect(host, port, remote_pubkey)
click to toggle source
Connect to address (a 2-tuple [host, port]) and return the socket object.
Passing the optional timeout parameter will set the timeout.
# File lib/devp2p/peer_manager.rb, line 182 def connect(host, port, remote_pubkey) socket = create_connection host, port, @connect_timeout logger.debug "connecting to", peer: socket.peeraddr start_peer socket, remote_pubkey true rescue Errno::ETIMEDOUT address = "#{host}:#{port}" logger.debug "connection timeout", address: address, timeout: @connect_timeout @errors.add address, 'connection timeout' false rescue Errno::ECONNRESET, Errno::ECONNABORTED, Errno::ECONNREFUSED address = "#{host}:#{port}" logger.debug "connection error #{$!}" @errors.add address, "connection error #{$!}" false rescue address = "#{host}:#{port}" logger.debug $! @errors.add address, "connection error #{$!}" false end
delete(peer)
click to toggle source
# File lib/devp2p/peer_manager.rb, line 126 def delete(peer) @peers.delete peer end
exclude(peer)
click to toggle source
# File lib/devp2p/peer_manager.rb, line 130 def exclude(peer) @excluded.push peer.remote_pubkey peer.async.stop end
handle_connection(socket)
click to toggle source
# File lib/devp2p/peer_manager.rb, line 219 def handle_connection(socket) _, port, host = socket.peeraddr logger.debug "incoming connection", host: host, port: port start_peer socket rescue EOFError logger.debug "connection disconnected", host: host, port: port socket.close end
num_peers()
click to toggle source
# File lib/devp2p/peer_manager.rb, line 205 def num_peers active = @peers.select {|p| !p.stopped? } if @peers.size != active.size logger.error "stopped peers in peers list", inlist: @peers.size, active: active.size end active.size end
on_hello_received(proto, version, client_version_string, capabilities, listen_port, remote_pubkey)
click to toggle source
# File lib/devp2p/peer_manager.rb, line 135 def on_hello_received(proto, version, client_version_string, capabilities, listen_port, remote_pubkey) logger.debug 'hello_received', listen_port: listen_port, peer: proto.peer, num_peers: @peers.size if @peers.size > @config[:p2p][:max_peers] logger.debug "too many peers", max: @config[:p2p][:max_peers] proto.send_disconnect proto.class::Disconnect::Reason[:too_many_peers] return false end if @peers.select {|p| p != proto.peer }.include?(remote_pubkey) logger.debug "connected to that node already" proto.send_disconnect proto.class::Disconnect::Reason[:useless_peer] return false end return true end
start()
click to toggle source
# File lib/devp2p/peer_manager.rb, line 93 def start logger.info "starting peermanager" logger.info "starting tcp listener", host: @host, port: @port @server = TCPServer.new @host, @port @service_listener = ServiceListener.new self, @server @service_listener.async.start @discovery_loop = Thread.new do sleep 0.1 discovery_loop end end
stop()
click to toggle source
# File lib/devp2p/peer_manager.rb, line 108 def stop logger.info "stopping peermanager" @server.close if @server @peers.each(&:stop) @discovery_loop.kill @stopped = true end
stopped?()
click to toggle source
# File lib/devp2p/peer_manager.rb, line 118 def stopped? @stopped end
wired_services()
click to toggle source
# File lib/devp2p/peer_manager.rb, line 152 def wired_services app.services.values.select {|s| s.is_a?(WiredService) } end
Private Instance Methods
bootstrap(bootstrap_nodes=[])
click to toggle source
# File lib/devp2p/peer_manager.rb, line 235 def bootstrap(bootstrap_nodes=[]) bootstrap_nodes.each do |uri| ip, port, pubkey = Utils.host_port_pubkey_from_uri uri logger.info 'connecting bootstrap server', uri: uri begin connect ip, port, pubkey rescue Errno::ECONNRESET, Errno::ECONNABORTED, Errno::ECONNREFUSED, Errno::ETIMEDOUT logger.warn "connecting bootstrap server failed: #{$!}" end end end
create_connection(host, port, timeout)
click to toggle source
FIXME: TODO: timeout is ignored!
# File lib/devp2p/peer_manager.rb, line 249 def create_connection(host, port, timeout) ::TCPSocket.new(host, port) end
discovery_loop()
click to toggle source
# File lib/devp2p/peer_manager.rb, line 269 def discovery_loop logger.info "waiting for bootstrap" sleep @discovery_delay while !stopped? num, min = num_peers, @config[:p2p][:min_peers] begin kademlia_proto = app.services.discovery.protocol.kademlia rescue NoMethodError # some point hit nil logger.error "Discovery service not available." break end if num < min logger.debug "missing peers", num_peers: num, min_peers: min, known: kademlia_proto.routing.size nodeid = Kademlia.random_nodeid kademlia_proto.find_node nodeid sleep @discovery_delay neighbours = kademlia_proto.routing.neighbours(nodeid, 2) if neighbours.empty? sleep @connect_loop_delay next end node = neighbours.sample local_pubkey = Crypto.privtopub Utils.decode_hex(@config[:node][:privkey_hex]) if node.pubkey == local_pubkey logger.debug 'connecting random neighbour', node: node, skipped: true, reason: 'myself' next end if @peers.any? {|p| node.pubkey == p.remote_pubkey } logger.debug 'connecting random neighbour', node: node, skipped: true, reason: 'already connected' next end if @excluded.any? {|pubkey| node.pubkey == pubkey } logger.debug 'connecting random neighbour', node: node, skipped: true, reason: 'excluded peer' next end logger.debug 'connecting random neighbour', node: node, skipped: false connect node.address.ip, node.address.tcp_port, node.pubkey end sleep @connect_loop_delay end rescue puts $! puts $!.backtrace[0,10].join("\n") end
logger()
click to toggle source
# File lib/devp2p/peer_manager.rb, line 231 def logger @logger ||= Logger.new "p2p.peermgr" end
start_peer(socket, remote_pubkey=nil)
click to toggle source
# File lib/devp2p/peer_manager.rb, line 253 def start_peer(socket, remote_pubkey=nil) peer = Peer.new self, socket, remote_pubkey logger.debug "created new peer", peer: peer, fileno: socket.to_io.fileno add peer peer.async.start logger.debug "peer started", peer: peer, fileno: socket.to_io.fileno raise PeerError, 'connection closed' if socket.closed? peer rescue puts $! puts $!.backtrace[0,10].join("\n") end