class Discover::Service
Public Class Methods
new(client, name, filters={})
click to toggle source
# File lib/discover.rb, line 173 def initialize(client, name, filters={}) @client = client @name = name @filters = filters @current = Condition.new @instances = {} @watchers = [] async.process_updates end
Public Instance Methods
each_leader(&block)
click to toggle source
# File lib/discover.rb, line 192 def each_leader(&block) leader = self.leader block.call leader if leader each_update(false) do |update| if leader.nil? || (update.offline? && leader && update.address == leader.address) leader = self.leader block.call leader if leader end end end
each_update(include_current = true, &block)
click to toggle source
# File lib/discover.rb, line 204 def each_update(include_current = true, &block) # Since updates are coming from a Proc being called in a different # Actor (the RPCClient), we need to suspend update notifications # here to avoid race conditions where we could potentially miss # updates between initializing the Watcher and adding it to @watchers watcher = pause_updates do watcher = Watcher.new(block) if include_current online.each { |u| watcher.notify u } end @watchers << watcher watcher end watcher.wait end
leader()
click to toggle source
# File lib/discover.rb, line 188 def leader online.sort_by(&:created).first end
online()
click to toggle source
# File lib/discover.rb, line 183 def online @current.wait if @current @instances.values end
Private Instance Methods
matches_filters?(update)
click to toggle source
# File lib/discover.rb, line 251 def matches_filters?(update) @filters.all? do |key, val| update.attributes[key] == val end end
pause_updates(&block)
click to toggle source
# File lib/discover.rb, line 257 def pause_updates(&block) @pause_updates = Condition.new result = block.call c, @pause_updates = @pause_updates, nil c.broadcast result end
process_updates()
click to toggle source
# File lib/discover.rb, line 226 def process_updates @client.request('Agent.Subscribe', {'Name' => @name}) do |update| update = Update.from_hash(update) if @current && update.sentinel? c, @current = @current, nil c.broadcast next end if matches_filters?(update) if update.online? @instances[update.address] = update else @instances.delete(update.address) end @pause_updates.wait if @pause_updates @watchers.each { |w| w.notify update } end end @watchers.each(&:done) # TODO: handle disconnect end