class Smith::Agent
Attributes
name[R]
pid[R]
uuid[R]
Public Class Methods
new(uuid)
click to toggle source
# File lib/smith/agent.rb, line 10 def initialize(uuid) @name = self.class.to_s @pid = $$ @uuid = uuid @factory = QueueFactory.new @signal_handler = SelfPipe.new(self) setup_control_queue @start_time = Time.now @state = :starting @on_stopping = proc {|completion| completion.succeed } @on_starting = proc {|completion| completion.succeed } @on_running = proc {|completion| completion.succeed } @on_exception = proc {} @on_starting_completion = EM::Completion.new.tap do |c| c.completion do |completion| acknowledge_start do @on_running.call(@on_running_completion) logger.info { "Agent started: #{name}, UUID: #{uuid}, PID: #{pid}" } end end end @on_running_completion = EM::Completion.new.tap do |c| c.completion do |completion| start_keep_alive setup_stats_queue @state = :running run end end @on_stopping_completion = EM::Completion.new.tap do |c| c.completion do |completion| acknowledge_stop do @state = :stopping Smith.stop end end end @on_starting.call(@on_starting_completion) end
options(opts)
click to toggle source
Options supported: :monitor, the agency will monitor the agent & if dies restart. :singleton, only every have one agent. If this is set to false
multiple agents are allowed.
# File lib/smith/agent.rb, line 113 def options(opts) opts.each do |k, v| Smith.config.agent[k] = v end end
Public Instance Methods
install_signal_handler(signal, position=:end, &blk)
click to toggle source
# File lib/smith/agent.rb, line 87 def install_signal_handler(signal, position=:end, &blk) @signal_handler.install_signal_handler(signal, position=:end, &blk) end
on_exception(&blk)
click to toggle source
The agent may hook into this if they want to do something on exception. It should be noted that, since an exception occured, the reactor will not be running at this point. Even if we restarted the reactor before calling this it would be a different reactor than existed when assigning the block so this would potentially lead to confusion. If the agent really needs the reactor to do something it can always restart the reactor itself.
@param blk [Block] This block will be passed the exception as an
argument.
# File lib/smith/agent.rb, line 78 def on_exception(&blk) @on_exception = blk end
on_running(&blk)
click to toggle source
# File lib/smith/agent.rb, line 64 def on_running(&blk) @on_running = blk end
on_stopping(&blk)
click to toggle source
# File lib/smith/agent.rb, line 60 def on_stopping(&blk) @on_stopping = blk end
receiver(queue_name, opts={}, &blk)
click to toggle source
# File lib/smith/agent.rb, line 100 def receiver(queue_name, opts={}, &blk) queues.receiver(queue_name, opts, &blk) end
run()
click to toggle source
Override this method to implement your own agent.
# File lib/smith/agent.rb, line 83 def run raise ArgumentError, "You must override this method" end
sender(queue_names, opts={}, &blk)
click to toggle source
# File lib/smith/agent.rb, line 104 def sender(queue_names, opts={}, &blk) Array(queue_names).each { |queue_name| queues.sender(queue_name, opts, &blk) } end
state()
click to toggle source
# File lib/smith/agent.rb, line 91 def state @state end
Private Instance Methods
__exception_handler(exception)
click to toggle source
# File lib/smith/agent.rb, line 218 def __exception_handler(exception) @on_exception.call(exception) end
acknowledge_start(&blk)
click to toggle source
# File lib/smith/agent.rb, line 172 def acknowledge_start(&blk) Messaging::Sender.new(QueueDefinitions::Agent_lifecycle) do |queue| payload = ACL::AgentAcknowledgeStart.new.tap do |p| p.uuid = uuid p.pid = $$ p.singleton = Smith.config.agent.singleton p.started_at = Time.now.to_i p.metadata = Smith.config.agent.metadata p.monitor = Smith.config.agent.monitor end queue.publish(payload) end end
acknowledge_stop(&blk)
click to toggle source
# File lib/smith/agent.rb, line 186 def acknowledge_stop(&blk) Messaging::Sender.new(QueueDefinitions::Agent_lifecycle) do |queue| message = {:state => 'acknowledge_stop', :pid => $$, :name => self.class.to_s} queue.publish(ACL::AgentAcknowledgeStop.new(:uuid => uuid), &blk) end end
control_queue_def()
click to toggle source
# File lib/smith/agent.rb, line 214 def control_queue_def @control_queue_def ||= QueueDefinitions::Agent_control.call(uuid) end
queues()
click to toggle source
# File lib/smith/agent.rb, line 210 def queues @factory end
setup_control_queue()
click to toggle source
# File lib/smith/agent.rb, line 122 def setup_control_queue logger.debug { "Setting up control queue: #{control_queue_def.denormalise}" } Messaging::Receiver.new(control_queue_def) do |receiver| receiver.subscribe do |payload| logger.debug { "Command received on agent control queue: #{payload.command} #{payload.options}" } case payload.command when 'object_count' object_count(payload.options.first.to_i).each{|o| logger.info{o}} when 'stop' @on_stopping.call(@on_stopping_completion) when 'log_level' begin level = payload.options.first logger.info { "Setting log level to #{level} for: #{name} (#{uuid})" } log_level(level) rescue ArgumentError => e logger.error { "Incorrect log level: #{level}" } end else logger.warn { "Unknown command: #{level} -> #{level.inspect}" } end end end end
setup_stats_queue()
click to toggle source
# File lib/smith/agent.rb, line 149 def setup_stats_queue Messaging::Sender.new(QueueDefinitions::Agent_stats) do |stats_queue| EventMachine.add_periodic_timer(2) do stats_queue.number_of_consumers do |consumers| if consumers > 0 payload = ACL::AgentStats.new.tap do |p| p.uuid = uuid p.agent_name = self.name p.pid = self.pid p.rss = (File.read("/proc/#{pid}/statm").split[1].to_i * 4) / 1024 # This assumes the page size is 4K & is MB p.up_time = (Time.now - @start_time).to_i queues.each_queue do |q| p.queues << ACL::AgentStats::QueueStats.new(:name => q.name, :type => q.class.to_s, :length => q.counter) end end stats_queue.publish(payload) end end end end end
start_keep_alive()
click to toggle source
# File lib/smith/agent.rb, line 193 def start_keep_alive if Smith.config.agent.monitor EventMachine::add_periodic_timer(1) do Messaging::Sender.new(QueueDefinitions::Agent_keepalive) do |queue| message = {:name => self.class.to_s, :uuid => uuid, :time => Time.now.to_i} queue.consumers do |consumers| if consumers > 0 queue.publish(ACL::AgentKeepalive, message) end end end end else logger.info { "Not initiating keep alive, agent is not being monitored: #{@name}" } end end