class Bosh::Monitor::AgentManager

Attributes

alerts_processed[R]
alerts_received[R]
heartbeats_received[R]
processor[RW]

Public Class Methods

new(event_processor) click to toggle source
# File lib/bosh/monitor/agent_manager.rb, line 10
def initialize(event_processor)
  # hash of agent id to agent structure (see add_agent())
  @agents = { }

  # hash of deployment name to set of agent ids
  @deployments = { }

  @logger = Bhm.logger
  @heartbeats_received = 0
  @alerts_received = 0
  @alerts_processed = 0

  @processor = event_processor
end

Public Instance Methods

add_agent(deployment_name, vm_data) click to toggle source

Processes VM data from BOSH Director, extracts relevant agent data, wraps it into Agent object and adds it to a list of managed agents.

# File lib/bosh/monitor/agent_manager.rb, line 121
def add_agent(deployment_name, vm_data)
  unless vm_data.kind_of?(Hash)
    @logger.error("Invalid format for VM data: expected Hash, got #{vm_data.class}: #{vm_data}")
    return false
  end

  @logger.info("Adding agent #{vm_data['agent_id']} (#{vm_data['job']}/#{vm_data['id']}) to #{deployment_name}...")

  agent_id = vm_data['agent_id']

  if agent_id.nil?
    @logger.warn("No agent id for VM: #{vm_data}")
    return false
  end

  # Idle VMs, we don't care about them, but we still want to track them
  if vm_data['job'].nil?
    @logger.debug("VM with no job found: #{agent_id}")
  end

  agent = @agents[agent_id]

  if agent.nil?
    @logger.debug("Discovered agent #{agent_id}")
    agent = Agent.new(agent_id)
    @agents[agent_id] = agent
  end

  agent.deployment = deployment_name
  agent.job = vm_data['job']
  agent.index = vm_data['index']
  agent.cid = vm_data['cid']
  agent.instance_id = vm_data['id']

  @deployments[deployment_name] ||= Set.new
  @deployments[deployment_name] << agent_id
  true
end
agents_count() click to toggle source
# File lib/bosh/monitor/agent_manager.rb, line 64
def agents_count
  @agents.size
end
analyze_agent(agent_id) click to toggle source
# File lib/bosh/monitor/agent_manager.rb, line 187
def analyze_agent(agent_id)
  agent = @agents[agent_id]
  ts = Time.now.to_i

  if agent.nil?
    @logger.error("Can't analyze agent #{agent_id} as it is missing from agents index, skipping...")
    return false
  end

  if agent.timed_out? && agent.rogue?
    # Agent has timed out but it was never
    # actually a proper member of the deployment,
    # so we don't really care about it
    remove_agent(agent.id)
    return
  end

  if agent.timed_out?
    @processor.process(:alert,
      severity: 2,
      source: agent.name,
      title: "#{agent.id} has timed out",
      created_at: ts,
      deployment: agent.deployment,
      job: agent.job,
      instance_id: agent.instance_id)
  end

  if agent.rogue?
    @processor.process(:alert,
      :severity => 2,
      :source => agent.name,
      :title => "#{agent.id} is not a part of any deployment",
      :created_at => ts)
  end

  true
end
analyze_agents() click to toggle source
# File lib/bosh/monitor/agent_manager.rb, line 160
def analyze_agents
  @logger.info "Analyzing agents..."
  started = Time.now

  processed = Set.new
  count = 0

  # Agents from managed deployments
  @deployments.each_pair do |deployment_name, agent_ids|
    agent_ids.each do |agent_id|
      analyze_agent(agent_id)
      processed << agent_id
      count += 1
    end
  end

  # Rogue agents (hey there Solid Snake)
  (@agents.keys.to_set - processed).each do |agent_id|
    @logger.warn("Agent #{agent_id} is not a part of any deployment")
    analyze_agent(agent_id)
    count += 1
  end

  @logger.info("Analyzed %s, took %s seconds" % [ pluralize(count, "agent"), Time.now - started ])
  count
end
deployments_count() click to toggle source
# File lib/bosh/monitor/agent_manager.rb, line 68
def deployments_count
  @deployments.size
end
get_agents_for_deployment(deployment_name) click to toggle source

Get a hash of agent id -> agent object for all agents associated with the deployment

# File lib/bosh/monitor/agent_manager.rb, line 26
def get_agents_for_deployment(deployment_name)
  agent_ids = @deployments[deployment_name]
  @agents.select { |key, value| agent_ids.include?(key) }
end
lookup_plugin(name, options = {}) click to toggle source
# File lib/bosh/monitor/agent_manager.rb, line 31
def lookup_plugin(name, options = {})
  plugin_class = nil
  begin
    class_name = name.to_s.split("_").map(&:capitalize).join
    plugin_class = Bosh::Monitor::Plugins.const_get(class_name)
  rescue NameError => e
    raise PluginError, "Cannot find '#{name}' plugin"
  end

  plugin_class.new(options)
end
on_alert(agent, message) click to toggle source
# File lib/bosh/monitor/agent_manager.rb, line 268
def on_alert(agent, message)
  if message.is_a?(Hash) && !message.has_key?("source")
    message["source"] = agent.name
  end

  @processor.process(:alert, message)
  @alerts_processed += 1
end
on_heartbeat(agent, message) click to toggle source
# File lib/bosh/monitor/agent_manager.rb, line 277
def on_heartbeat(agent, message)
  agent.updated_at = Time.now

  if message.is_a?(Hash)
    message["timestamp"] = Time.now.to_i if message["timestamp"].nil?
    message["agent_id"] = agent.id
    message["deployment"] = agent.deployment
  end

  @processor.process(:heartbeat, message)
  @heartbeats_received += 1
end
on_shutdown(agent, message) click to toggle source
# File lib/bosh/monitor/agent_manager.rb, line 290
def on_shutdown(agent, message)
  @logger.info("Agent '#{agent.id}' shutting down...")
  remove_agent(agent.id)
end
process_event(kind, subject, payload = {}) click to toggle source
# File lib/bosh/monitor/agent_manager.rb, line 226
def process_event(kind, subject, payload = {})
  kind = kind.to_s
  agent_id = subject.split('.', 4).last
  agent = @agents[agent_id]

  if agent.nil?
    # There might be more than a single shutdown event,
    # we are only interested in processing it if agent
    # is still managed
    return if kind == "shutdown"

    @logger.warn("Received #{kind} from unmanaged agent: #{agent_id}")
    agent = Agent.new(agent_id)
    @agents[agent_id] = agent
  else
    @logger.debug("Received #{kind} from #{agent_id}: #{payload}")
  end

  case payload
  when String
    message = Yajl::Parser.parse(payload)
  when Hash
    message = payload
  end

  case kind.to_s
  when "alert"
    on_alert(agent, message)
  when "heartbeat"
    on_heartbeat(agent, message)
  when "shutdown"
    on_shutdown(agent, message)
  else
    @logger.warn("No handler found for '#{kind}' event")
  end

rescue Yajl::ParseError => e
  @logger.error("Cannot parse incoming event: #{e}")
rescue Bhm::InvalidEvent => e
  @logger.error("Invalid event: #{e}")
end
remove_agent(agent_id) click to toggle source
# File lib/bosh/monitor/agent_manager.rb, line 110
def remove_agent(agent_id)
  @logger.info("Removing agent #{agent_id} from all deployments...")
  @agents.delete(agent_id)
  @deployments.each_pair do |deployment, agents|
    agents.delete(agent_id)
  end
end
remove_deployment(name) click to toggle source
# File lib/bosh/monitor/agent_manager.rb, line 100
def remove_deployment(name)
  agent_ids = @deployments[name]

  agent_ids.to_a.each do |agent_id|
    @agents.delete(agent_id)
  end

  @deployments.delete(name)
end
setup_events() click to toggle source
# File lib/bosh/monitor/agent_manager.rb, line 43
def setup_events
  @processor.enable_pruning(Bhm.intervals.prune_events)
  Bhm.plugins.each do |plugin|
    @processor.add_plugin(lookup_plugin(plugin["name"], plugin["options"]), plugin["events"])
  end

  EM.schedule do
    Bhm.nats.subscribe("hm.agent.heartbeat.*") do |message, reply, subject|
      process_event(:heartbeat, subject, message)
    end

    Bhm.nats.subscribe("hm.agent.alert.*") do |message, reply, subject|
      process_event(:alert, subject, message)
    end

    Bhm.nats.subscribe("hm.agent.shutdown.*") do |message, reply, subject|
      process_event(:shutdown, subject, message)
    end
  end
end
sync_agents(deployment, vms) click to toggle source
# File lib/bosh/monitor/agent_manager.rb, line 85
def sync_agents(deployment, vms)
  managed_agent_ids = @deployments[deployment] || Set.new
  active_agent_ids  = Set.new

  vms.each do |vm|
    if add_agent(deployment, vm)
      active_agent_ids << vm["agent_id"]
    end
  end

  (managed_agent_ids - active_agent_ids).each do |agent_id|
    remove_agent(agent_id)
  end
end
sync_deployments(deployments) click to toggle source

Syncs deployments list received from director with HM deployments. @param deployments Array list of deployments returned by director

# File lib/bosh/monitor/agent_manager.rb, line 75
def sync_deployments(deployments)
  managed = Set.new(deployments.map { |d| d["name"] })
  all     = Set.new(@deployments.keys)

  (all - managed).each do |stale_deployment|
    @logger.warn("Found stale deployment #{stale_deployment}, removing...")
    remove_deployment(stale_deployment)
  end
end