class Discover::Service

Public Class Methods

new(client, name, filters={}) click to toggle source
# File lib/discover.rb, line 173
def initialize(client, name, filters={})
  @client = client
  @name = name
  @filters = filters
  @current = Condition.new
  @instances = {}
  @watchers = []
  async.process_updates
end

Public Instance Methods

each_leader(&block) click to toggle source
# File lib/discover.rb, line 192
def each_leader(&block)
  leader = self.leader
  block.call leader if leader

  each_update(false) do |update|
    if leader.nil? || (update.offline? && leader && update.address == leader.address)
      leader = self.leader
      block.call leader if leader
    end
  end
end
each_update(include_current = true, &block) click to toggle source
# File lib/discover.rb, line 204
def each_update(include_current = true, &block)
  # Since updates are coming from a Proc being called in a different
  # Actor (the RPCClient), we need to suspend update notifications
  # here to avoid race conditions where we could potentially miss
  # updates between initializing the Watcher and adding it to @watchers
  watcher = pause_updates do
    watcher = Watcher.new(block)

    if include_current
      online.each { |u| watcher.notify u }
    end

    @watchers << watcher

    watcher
  end

  watcher.wait
end
leader() click to toggle source
# File lib/discover.rb, line 188
def leader
  online.sort_by(&:created).first
end
online() click to toggle source
# File lib/discover.rb, line 183
def online
  @current.wait if @current
  @instances.values
end

Private Instance Methods

matches_filters?(update) click to toggle source
# File lib/discover.rb, line 251
def matches_filters?(update)
  @filters.all? do |key, val|
    update.attributes[key] == val
  end
end
pause_updates(&block) click to toggle source
# File lib/discover.rb, line 257
def pause_updates(&block)
  @pause_updates = Condition.new

  result = block.call

  c, @pause_updates = @pause_updates, nil
  c.broadcast

  result
end
process_updates() click to toggle source
# File lib/discover.rb, line 226
def process_updates
  @client.request('Agent.Subscribe', {'Name' => @name}) do |update|
    update = Update.from_hash(update)

    if @current && update.sentinel?
      c, @current = @current, nil
      c.broadcast
      next
    end

    if matches_filters?(update)
      if update.online?
        @instances[update.address] = update
      else
        @instances.delete(update.address)
      end

      @pause_updates.wait if @pause_updates
      @watchers.each { |w| w.notify update }
    end
  end
  @watchers.each(&:done)
  # TODO: handle disconnect
end