module PryRemoteEm::Broker

Attributes

host[R]
listening[R]
listening?[R]
port[R]

Public Class Methods

connected?() click to toggle source
# File lib/pry-remote-em/broker.rb, line 106
def connected?
  @connected
end
hbeats() click to toggle source
# File lib/pry-remote-em/broker.rb, line 102
def hbeats
  @hbeats ||= {}
end
log() click to toggle source
# File lib/pry-remote-em/broker.rb, line 52
def log
  return opts[:logger] if opts[:logger]
  @log ||= Logger.new(STDERR)
end
new(opts = {}, &blk) click to toggle source
# File lib/pry-remote-em/broker.rb, line 181
def initialize(opts = {}, &blk)
  @opts = opts
  @ids = []
end
opts() click to toggle source
# File lib/pry-remote-em/broker.rb, line 48
def opts
  @opts ||= {}
end
register(description) click to toggle source
# File lib/pry-remote-em/broker.rb, line 61
def register(description)
  client { |c| c.send_register_server(description[:id], description[:urls], description[:name], description[:details], description[:metrics]) }
end
register_server(id, description) click to toggle source
# File lib/pry-remote-em/broker.rb, line 69
def register_server(id, description)
  servers[id] = description
  watch_heartbeats(id)
  log.info("[pry-remote-em broker] registered #{id} #{description.inspect}")
end
restart() click to toggle source
# File lib/pry-remote-em/broker.rb, line 30
def restart
  log.info("[pry-remote-em broker] restarting on pryem://#{host}:#{port}")
  @waiting   = nil
  @client    = nil
  run(@host, @port, @opts) do
    PryRemoteEm.servers.each do |id, description|
      next unless EM.get_sockname(description[:server])
      register(
        id: description[:id],
        urls: description[:urls],
        name: description[:name],
        details: description[:details],
        metrics: PryRemoteEm::Metrics.list
      )
    end
  end
end
run(host = nil, port = nil, opts = {}) { |self| ... } click to toggle source
# File lib/pry-remote-em/broker.rb, line 13
def run(host = nil, port = nil, opts = {})
  host ||= ENV['PRYEMBROKER'].nil? || ENV['PRYEMBROKER'].empty? ? DEFAULT_BROKER_HOST : ENV['PRYEMBROKER']
  port ||= ENV['PRYEMBROKERPORT'].nil? || ENV['PRYEMBROKERPORT'].empty? ? DEFAULT_BROKER_PORT : ENV['PRYEMBROKERPORT']
  port = port.to_i if port.kind_of?(String)
  raise "root permission required for port below 1024 (#{port})" if port < 1024 && Process.euid != 0
  @host = host
  @port = port
  opts = opts.dup
  # Brokers cannot use SSL directly. If they do then when a proxy request to an SSL server is received
  # the client and server will not be able to negotiate a SSL session. The proxied traffic can be SSL
  # encrypted, but the SSL session will be between the client and the server.
  opts[:tls] = false
  @opts = opts
  start_server(host, port, opts) unless @listening || ENV['PRYEMREMOTEBROKER'] || @opts[:remote_broker]
  client { |c| yield self } if block_given?
end
servers() click to toggle source
# File lib/pry-remote-em/broker.rb, line 57
def servers
  @servers ||= {}
end
timers() click to toggle source
# File lib/pry-remote-em/broker.rb, line 98
def timers
  @timers ||= {}
end
unregister(id) click to toggle source
# File lib/pry-remote-em/broker.rb, line 65
def unregister(id)
  client { |c| c.send_unregister_server(id) }
end
unregister_server(id, reason: nil) click to toggle source
# File lib/pry-remote-em/broker.rb, line 81
def unregister_server(id, reason: nil)
  server = servers.delete(id) or return
  log.warn("[pry-remote-em broker] unregister by #{reason || 'unknown reason'} #{id} #{server.inspect}")
  timer = timers.delete(id)
  timer.cancel if timer
  hbeats.delete(id)
end
update_server(server, description) click to toggle source
# File lib/pry-remote-em/broker.rb, line 75
def update_server(server, description)
  server.update(urls: description[:urls], name: description[:name])
  server[:details].update(description[:details])
  server[:metrics].update(description[:metrics])
end
watch_heartbeats(id) click to toggle source
# File lib/pry-remote-em/broker.rb, line 89
def watch_heartbeats(id)
  interval = ENV['PRYEMHBCHECK'].nil? || ENV['PRYEMHBCHECK'].empty? ? HEARTBEAT_CHECK_INTERVAL : ENV['PRYEMHBCHECK']
  timers[id] ||= EM::PeriodicTimer.new(interval) do
    if !hbeats[id] || (Time.new - hbeats[id]) > 20
      unregister_server(id, reason: 'heartbeats checker')
    end
  end
end

Private Class Methods

client() { |client| ... } click to toggle source
# File lib/pry-remote-em/broker.rb, line 128
def client(&blk)
  raise ArgumentError.new('A block is required') unless block_given?
  if @client
    yield @client
    return
  end

  if @waiting
    @waiting << blk
  else
    @waiting = [blk]
    EM.connect(host, port, Client::Broker, @opts) do |client|
      client.errback { |e| raise(e || 'broker client error') }
      client.callback do
        @client = client
        while (w = @waiting.shift)
          w.call(client)
        end
      end
    end
  end
end
start_server(host, port, opts) click to toggle source
# File lib/pry-remote-em/broker.rb, line 112
def start_server(host, port, opts)
  EM.start_server(host, port, PryRemoteEm::Broker, opts)
  log.info("[pry-remote-em broker] listening on #{opts[:tls] ? 'pryems' : 'pryem'}://#{host}:#{port}")
  @listening = true
rescue => error
  if error.message.include?('port is in use')
    if opts[:raise_if_port_in_use]
      raise
    else
      # A broker is already listening on this port, we can do nothing
    end
  else
    raise
  end
end

Public Instance Methods

log() click to toggle source
# File lib/pry-remote-em/broker.rb, line 186
def log
  Broker.log
end
peer_ip() click to toggle source
# File lib/pry-remote-em/broker.rb, line 203
def peer_ip
  return @peer_ip if @peer_ip
  return '' if get_peername.nil?
  @peer_port, @peer_ip = Socket.unpack_sockaddr_in(get_peername)
  @peer_ip = '127.0.0.1' if @peer_ip == '::1' # Little hack to avoid segmentation fault in EventMachine 1.2.0.1 while connecting to PryRemoteEm Server from localhost with IPv6 address
  @peer_ip
end
peer_port() click to toggle source
# File lib/pry-remote-em/broker.rb, line 211
def peer_port
  return @peer_port if @peer_port
  return '' if get_peername.nil?
  peer_ip # Fills peer_port too
  @peer_port
end
post_init() click to toggle source
# File lib/pry-remote-em/broker.rb, line 190
def post_init
  port, ip = Socket.unpack_sockaddr_in(get_peername)
  log.info("[pry-remote-em broker] received client connection from #{ip}:#{port}")
  send_banner("PryRemoteEm #{VERSION} #{@opts[:tls] ? 'pryems' : 'pryem'}")
  @opts[:tls] ? start_tls : send_server_list(Broker.servers)
end
receive_proxy_connection(url) click to toggle source
# File lib/pry-remote-em/broker.rb, line 175
def receive_proxy_connection(url)
  log.info("[pry-remote-em broker] proxying to #{url}")
  url = URI.parse(url)
  EM.connect(url.host, url.port, Client::Proxy, self)
end
receive_register_server(id, urls, name, details, metrics) click to toggle source
# File lib/pry-remote-em/broker.rb, line 158
def receive_register_server(id, urls, name, details, metrics)
  @ids.push(id)
  description = { urls: urls, name: name, details: details, metrics: metrics }
  Broker.hbeats[id] = Time.new
  server = Broker.servers[id]
  if server
    Broker.update_server(server, description)
  else
    Broker.register_server(id, description)
  end
end
receive_server_reload_list() click to toggle source
# File lib/pry-remote-em/broker.rb, line 154
def receive_server_reload_list
  send_server_list(Broker.servers)
end
receive_unregister_server(id) click to toggle source
# File lib/pry-remote-em/broker.rb, line 170
def receive_unregister_server(id)
  server = Broker.servers[id]
  Broker.unregister_server(id, reason: 'remote command') if server
end
ssl_handshake_completed() click to toggle source
# File lib/pry-remote-em/broker.rb, line 218
def ssl_handshake_completed
  log.info("[pry-remote-em broker] TLS connection established (#{peer_ip}:#{peer_port})")
  send_server_list(Broker.servers)
end
start_tls() click to toggle source
Calls superclass method
# File lib/pry-remote-em/broker.rb, line 197
def start_tls
  log.debug("[pry-remote-em broker] starting TLS (#{peer_ip}:#{peer_port})")
  send_start_tls
  super(@opts[:tls].is_a?(Hash) ? @opts[:tls] : {})
end
unbind() click to toggle source
# File lib/pry-remote-em/broker.rb, line 223
def unbind
  @ids.each do |id|
    Broker.unregister_server(id, reason: 'disconnection')
  end
end