class Smith::Agency
Attributes
agent_processes[R]
agents[R]
Public Class Methods
new(opts={})
click to toggle source
# File lib/smith/application/agency.rb, line 15 def initialize(opts={}) @agent_processes = AgentCache.new end
Public Instance Methods
setup_queues()
click to toggle source
# File lib/smith/application/agency.rb, line 19 def setup_queues Messaging::Receiver.new(QueueDefinitions::Agency_control.call, :auto_ack => false) do |receiver| receiver.subscribe do |payload, responder| completion = EM::Completion.new.tap do |c| c.completion do |value| responder.ack responder.reply(Smith::ACL::AgencyCommandResponse.new(:response => value)) end end begin Command.run(payload.command, payload.args, :agency => self, :agents => @agent_processes, :responder => completion) rescue Command::UnknownCommandError => e responder.reply("Unknown command: #{payload.command}") end end end Messaging::Receiver.new(QueueDefinitions::Agent_lifecycle) do |receiver| receiver.subscribe do |payload, r| case payload when Smith::ACL::AgentDead dead(payload) when Smith::ACL::AgentAcknowledgeStart acknowledge_start(payload) when Smith::ACL::AgentAcknowledgeStop acknowledge_stop(payload) else logger.warn { "Unknown command received on #{QueueDefinitions::Agent_lifecycle.name} queue: #{payload.state}" } end end end Messaging::Receiver.new(QueueDefinitions::Agent_keepalive) do |receiver| receiver.subscribe do |payload, r| keep_alive(payload) end end end
start_monitoring()
click to toggle source
# File lib/smith/application/agency.rb, line 60 def start_monitoring # @agent_monitor = AgentMonitoring.new(@agent_processes) # @agent_monitor.start_monitoring end
stop(&blk)
click to toggle source
Stop the agency. This will wait for one second to ensure that any messages are flushed.
# File lib/smith/application/agency.rb, line 67 def stop(&blk) if blk Smith.stop(true, &blk) else Smith.stop(true) end end
Private Instance Methods
acknowledge_start(agent_data)
click to toggle source
# File lib/smith/application/agency.rb, line 77 def acknowledge_start(agent_data) agent_exists?(agent_data.uuid) do |agent_process| agent_process.pid = agent_data.pid agent_process.started_at = agent_data.started_at agent_process.singleton = agent_data.singleton agent_process.monitor = agent_data.monitor agent_process.metadata = agent_data.metadata agent_process.acknowledge_start end end
acknowledge_stop(agent_data)
click to toggle source
# File lib/smith/application/agency.rb, line 88 def acknowledge_stop(agent_data) agent_exists?(agent_data.uuid) do |agent_process| agent_process.acknowledge_stop end end
agent_exists?(uuid, error_proc=->{}
click to toggle source
# File lib/smith/application/agency.rb, line 113 def agent_exists?(uuid, error_proc=->{}, &blk) agent = @agent_processes[uuid] if agent blk.call(agent) else error_proc.call end end
dead(agent_data)
click to toggle source
# File lib/smith/application/agency.rb, line 94 def dead(agent_data) agent_exists?(agent_data.uuid) do |agent_process| if agent_process.no_process_running logger.fatal { "Agent is dead: #{agent_process.name}, UUID: #{agent_process.uuid}, PID: #{agent_process.pid}" } end end end
keep_alive(agent_data)
click to toggle source
# File lib/smith/application/agency.rb, line 102 def keep_alive(agent_data) agent_exists?(agent_data.uuid) do |agent_process| agent_process.last_keep_alive = agent_data.time logger.verbose { "Agent keep alive: #{agent_data.uuid}: #{agent_data.time}" } # We need to call save explicitly here as the keep alive is not part of # the state_machine which is the thing that writes the state to disc. agent_process.save end end