class Protobuf::Rpc::ServiceDirectory
Constants
- DEFAULT_ADDRESS
- DEFAULT_PORT
- DEFAULT_TIMEOUT
Attributes
address[W]
port[W]
Public Class Methods
address()
click to toggle source
# File lib/protobuf/rpc/service_directory.rb, line 67 def self.address @address ||= DEFAULT_ADDRESS end
new()
click to toggle source
Instance Methods
# File lib/protobuf/rpc/service_directory.rb, line 87 def initialize reset end
port()
click to toggle source
# File lib/protobuf/rpc/service_directory.rb, line 71 def self.port @port ||= DEFAULT_PORT end
start() { |self| ... }
click to toggle source
# File lib/protobuf/rpc/service_directory.rb, line 75 def self.start yield(self) if block_given? instance.start end
stop()
click to toggle source
# File lib/protobuf/rpc/service_directory.rb, line 80 def self.stop instance.stop end
Public Instance Methods
all_listings_for(service)
click to toggle source
# File lib/protobuf/rpc/service_directory.rb, line 91 def all_listings_for(service) if running? && @listings_by_service.key?(service.to_s) start_listener_thread if listener_dead? @listings_by_service[service.to_s].entries.shuffle else [] end end
each_listing(&block)
click to toggle source
# File lib/protobuf/rpc/service_directory.rb, line 100 def each_listing(&block) start_listener_thread if listener_dead? @listings_by_uuid.each_value(&block) end
listener_dead?()
click to toggle source
# File lib/protobuf/rpc/service_directory.rb, line 112 def listener_dead? @thread.nil? || !@thread.alive? end
lookup(service)
click to toggle source
# File lib/protobuf/rpc/service_directory.rb, line 105 def lookup(service) return unless running? start_listener_thread if listener_dead? return unless @listings_by_service.key?(service.to_s) @listings_by_service[service.to_s].entries.sample end
restart()
click to toggle source
# File lib/protobuf/rpc/service_directory.rb, line 116 def restart stop start end
running?()
click to toggle source
# File lib/protobuf/rpc/service_directory.rb, line 121 def running? !!@running end
start()
click to toggle source
# File lib/protobuf/rpc/service_directory.rb, line 125 def start unless running? init_socket logger.info { sign_message("listening to udp://#{self.class.address}:#{self.class.port}") } @running = true end start_listener_thread if listener_dead? self end
start_listener_thread()
click to toggle source
# File lib/protobuf/rpc/service_directory.rb, line 136 def start_listener_thread return if @thread.try(:alive?) @thread = Thread.new { send(:run) } end
stop()
click to toggle source
# File lib/protobuf/rpc/service_directory.rb, line 141 def stop logger.info { sign_message("Stopping directory") } @running = false @thread.try(:kill).try(:join) @socket.try(:close) reset end
Private Instance Methods
add_or_update_listing(uuid, server)
click to toggle source
# File lib/protobuf/rpc/service_directory.rb, line 153 def add_or_update_listing(uuid, server) listing = @listings_by_uuid[uuid] if listing action = :updated listing.update(server) else action = :added listing = Listing.new(server) @listings_by_uuid[uuid] = listing end listing.services.each do |service| @listings_by_service[service] << listing end trigger(action, listing) logger.debug { sign_message("#{action} server: #{server.inspect}") } end
init_socket()
click to toggle source
# File lib/protobuf/rpc/service_directory.rb, line 173 def init_socket @socket = UDPSocket.new @socket.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_REUSEADDR, true) if defined?(::Socket::SO_REUSEPORT) @socket.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_REUSEPORT, true) end @socket.bind(self.class.address, self.class.port.to_i) end
process_beacon(beacon)
click to toggle source
# File lib/protobuf/rpc/service_directory.rb, line 184 def process_beacon(beacon) server = beacon.server uuid = server.try(:uuid) if server && uuid case beacon.beacon_type when ::Protobuf::Rpc::DynamicDiscovery::BeaconType::HEARTBEAT add_or_update_listing(uuid, server) when ::Protobuf::Rpc::DynamicDiscovery::BeaconType::FLATLINE remove_listing(uuid) end else logger.info { sign_message("Ignoring incomplete beacon: #{beacon.inspect}") } end end
read_beacon()
click to toggle source
# File lib/protobuf/rpc/service_directory.rb, line 200 def read_beacon data, addr = @socket.recvfrom(2048) beacon = ::Protobuf::Rpc::DynamicDiscovery::Beacon.decode(data) # Favor the address captured by the socket beacon.try(:server).try(:address=, addr[3]) beacon end
remove_expired_listings()
click to toggle source
# File lib/protobuf/rpc/service_directory.rb, line 211 def remove_expired_listings logger.debug { sign_message("Removing expired listings") } @listings_by_uuid.each do |uuid, listing| remove_listing(uuid) if listing.expired? end end
remove_listing(uuid)
click to toggle source
# File lib/protobuf/rpc/service_directory.rb, line 218 def remove_listing(uuid) listing = @listings_by_uuid[uuid] || return logger.debug { sign_message("Removing listing: #{listing.inspect}") } @listings_by_service.each_value do |listings| listings.delete(listing) end trigger(:removed, @listings_by_uuid.delete(uuid)) end
reset()
click to toggle source
# File lib/protobuf/rpc/service_directory.rb, line 230 def reset @thread = nil @socket = nil @listings_by_uuid = {} @listings_by_service = Hash.new { |h, k| h[k] = Set.new } end
run()
click to toggle source
# File lib/protobuf/rpc/service_directory.rb, line 237 def run sweep_interval = 5 # sweep expired listings every 5 seconds next_sweep = Time.now.to_i + sweep_interval loop do timeout = [next_sweep - Time.now.to_i, 0.1].max readable = IO.select([@socket], nil, nil, timeout) process_beacon(read_beacon) if readable if Time.now.to_i >= next_sweep remove_expired_listings next_sweep = Time.now.to_i + sweep_interval end end rescue => e logger.debug { sign_message("ERROR: (#{e.class}) #{e.message}\n#{e.backtrace.join("\n")}") } retry end
trigger(action, listing)
click to toggle source
# File lib/protobuf/rpc/service_directory.rb, line 256 def trigger(action, listing) ::ActiveSupport::Notifications.instrument("directory.listing.#{action}", :listing => listing) end