class Smith::Agency

Attributes

agent_processes[R]
agents[R]

Public Class Methods

new(opts={}) click to toggle source
# File lib/smith/application/agency.rb, line 15
def initialize(opts={})
  @agent_processes = AgentCache.new
end

Public Instance Methods

setup_queues() click to toggle source
# File lib/smith/application/agency.rb, line 19
def setup_queues
  Messaging::Receiver.new(QueueDefinitions::Agency_control.call, :auto_ack => false) do |receiver|
    receiver.subscribe do |payload, responder|

      completion = EM::Completion.new.tap do |c|
        c.completion do |value|
          responder.ack
          responder.reply(Smith::ACL::AgencyCommandResponse.new(:response => value))
        end
      end

      begin
        Command.run(payload.command, payload.args, :agency => self,  :agents => @agent_processes, :responder => completion)
      rescue Command::UnknownCommandError => e
        responder.reply("Unknown command: #{payload.command}")
      end
    end
  end

  Messaging::Receiver.new(QueueDefinitions::Agent_lifecycle) do |receiver|
    receiver.subscribe do |payload, r|
      case payload
      when Smith::ACL::AgentDead
        dead(payload)
      when Smith::ACL::AgentAcknowledgeStart
        acknowledge_start(payload)
      when Smith::ACL::AgentAcknowledgeStop
        acknowledge_stop(payload)
      else
        logger.warn { "Unknown command received on #{QueueDefinitions::Agent_lifecycle.name} queue: #{payload.state}" }
      end
    end
  end

  Messaging::Receiver.new(QueueDefinitions::Agent_keepalive) do |receiver|
    receiver.subscribe do |payload, r|
      keep_alive(payload)
    end
  end
end
start_monitoring() click to toggle source
# File lib/smith/application/agency.rb, line 60
def start_monitoring
  # @agent_monitor = AgentMonitoring.new(@agent_processes)
  # @agent_monitor.start_monitoring
end
stop(&blk) click to toggle source

Stop the agency. This will wait for one second to ensure that any messages are flushed.

# File lib/smith/application/agency.rb, line 67
def stop(&blk)
  if blk
    Smith.stop(true, &blk)
  else
    Smith.stop(true)
  end
end

Private Instance Methods

acknowledge_start(agent_data) click to toggle source
# File lib/smith/application/agency.rb, line 77
def acknowledge_start(agent_data)
  agent_exists?(agent_data.uuid) do |agent_process|
    agent_process.pid = agent_data.pid
    agent_process.started_at = agent_data.started_at
    agent_process.singleton = agent_data.singleton
    agent_process.monitor = agent_data.monitor
    agent_process.metadata = agent_data.metadata
    agent_process.acknowledge_start
  end
end
acknowledge_stop(agent_data) click to toggle source
# File lib/smith/application/agency.rb, line 88
def acknowledge_stop(agent_data)
  agent_exists?(agent_data.uuid) do |agent_process|
    agent_process.acknowledge_stop
  end
end
agent_exists?(uuid, error_proc=->{} click to toggle source
# File lib/smith/application/agency.rb, line 113
def agent_exists?(uuid, error_proc=->{}, &blk)
  agent = @agent_processes[uuid]
  if agent
    blk.call(agent)
  else
    error_proc.call
  end
end
dead(agent_data) click to toggle source
# File lib/smith/application/agency.rb, line 94
def dead(agent_data)
  agent_exists?(agent_data.uuid) do |agent_process|
    if agent_process.no_process_running
      logger.fatal { "Agent is dead: #{agent_process.name}, UUID: #{agent_process.uuid}, PID: #{agent_process.pid}" }
    end
  end
end
keep_alive(agent_data) click to toggle source
# File lib/smith/application/agency.rb, line 102
def keep_alive(agent_data)
  agent_exists?(agent_data.uuid) do |agent_process|
    agent_process.last_keep_alive = agent_data.time
    logger.verbose { "Agent keep alive: #{agent_data.uuid}: #{agent_data.time}" }

    # We need to call save explicitly here as the keep alive is not part of
    # the state_machine which is the thing that writes the state to disc.
    agent_process.save
  end
end