class Tapyrus::Network::Pool

peer pool class.

Attributes

chain[R]
logger[R]
max_outbound[R]
mutex[R]
node[R]
peer_discovery[R]
peers[R]
pending_peers[R]
started[RW]

Public Class Methods

new(node, chain, configuration) click to toggle source
# File lib/tapyrus/network/pool.rb, line 26
def initialize(node, chain, configuration)
  @node = node
  @peers = []
  @pending_peers = []
  @max_outbound = MAX_OUTBOUND_CONNECTIONS
  @chain = chain
  @logger = Tapyrus::Logger.create(:debug)
  @configuration = configuration
  @peer_discovery = PeerDiscovery.new(configuration)
  @started = false
  @mutex = Mutex.new
end

Public Instance Methods

broadcast(tx) click to toggle source

broadcast tx to connecting peer.

# File lib/tapyrus/network/pool.rb, line 82
def broadcast(tx)
  peers.each { |peer| peer.broadcast_tx(tx) }
end
filter_add(element) click to toggle source

add element to bloom filter.

# File lib/tapyrus/network/pool.rb, line 92
def filter_add(element)
  peers.each { |peer| peer.send_filter_add(element) }
end
filter_clear() click to toggle source

clear bloom filter.

# File lib/tapyrus/network/pool.rb, line 97
def filter_clear
  peers.each { |peer| peer.send_filter_clear }
end
filter_load(peer) click to toggle source

new bloom filter.

# File lib/tapyrus/network/pool.rb, line 87
def filter_load(peer)
  peer.send_filter_load(node.bloom)
end
handle_close_peer(peer) click to toggle source
# File lib/tapyrus/network/pool.rb, line 65
def handle_close_peer(peer)
  return unless started
  peers.delete(peer)
  pending_peers.delete(peer)
  addr_list = peer_discovery.peers - peers.map(&:host) - pending_peers.map(&:host) - [peer.host]
  connect(addr_list)
end
handle_error(e) click to toggle source
# File lib/tapyrus/network/pool.rb, line 101
def handle_error(e)
  terminate
end
handle_new_peer(peer) click to toggle source

detect new peer connection.

# File lib/tapyrus/network/pool.rb, line 51
def handle_new_peer(peer)
  logger.debug "connected new peer #{peer.addr}."
  mutex.synchronize do
    peer.id = allocate_peer_id
    unless peers.find(&:primary?)
      peer.primary = true
      peer.start_block_header_download
    end
    peers << peer
  end
  pending_peers.delete(peer)
  filter_load(peer) if node.wallet
end
start() click to toggle source

connecting other peers and begin network activity.

# File lib/tapyrus/network/pool.rb, line 40
def start
  raise 'Cannot start a peer pool twice.' if started
  logger.debug 'Start connecting other pears.'
  addr_list = peer_discovery.peers

  connect(addr_list)

  @started = true
end
terminate() click to toggle source

terminate peers.

# File lib/tapyrus/network/pool.rb, line 74
def terminate
  peers.each { |peer| peer.close('terminate') }
  pending_peers.each { |peer| peer.close('terminate') }
  @peers = []
  @started = false
end

Private Instance Methods

allocate_peer_id() click to toggle source

allocate new peer id

# File lib/tapyrus/network/pool.rb, line 113
def allocate_peer_id
  id = 0
  id += 1 until peers.empty? || peers.find { |p| p.id == id }.nil?
  id
end
connect(addr_list) click to toggle source
# File lib/tapyrus/network/pool.rb, line 119
def connect(addr_list)
  port = Tapyrus.chain_params.default_port

  EM::Iterator
    .new(addr_list, Tapyrus::PARALLEL_THREAD)
    .each do |ip, iter|
      if pending_peers.size + peers.size < MAX_OUTBOUND_CONNECTIONS
        peer = Peer.new(ip, port, self, @configuration)
        pending_peers << peer
        peer.connect
        iter.next
      end
    end
end
primary_peer() click to toggle source

get primary peer

# File lib/tapyrus/network/pool.rb, line 108
def primary_peer
  peers.find(&:primary?)
end