class Smith::Agent

Attributes

name[R]
pid[R]
uuid[R]

Public Class Methods

new(uuid) click to toggle source
# File lib/smith/agent.rb, line 10
def initialize(uuid)
  @name = self.class.to_s
  @pid = $$
  @uuid = uuid

  @factory = QueueFactory.new

  @signal_handler = SelfPipe.new(self)

  setup_control_queue

  @start_time = Time.now

  @state = :starting

  @on_stopping = proc {|completion| completion.succeed }
  @on_starting = proc {|completion| completion.succeed }
  @on_running = proc {|completion| completion.succeed }
  @on_exception = proc {}

  @on_starting_completion = EM::Completion.new.tap do |c|
    c.completion do |completion|
      acknowledge_start do
        @on_running.call(@on_running_completion)
        logger.info { "Agent started: #{name}, UUID: #{uuid}, PID: #{pid}" }
      end
    end
  end

  @on_running_completion = EM::Completion.new.tap do |c|
    c.completion do |completion|
      start_keep_alive
      setup_stats_queue
      @state = :running
      run
    end
  end

  @on_stopping_completion = EM::Completion.new.tap do |c|
    c.completion do |completion|
      acknowledge_stop do
        @state = :stopping
        Smith.stop
      end
    end
  end

  @on_starting.call(@on_starting_completion)
end
options(opts) click to toggle source

Options supported: :monitor, the agency will monitor the agent & if dies restart. :singleton, only every have one agent. If this is set to false

multiple agents are allowed.
# File lib/smith/agent.rb, line 113
def options(opts)
  opts.each do |k, v|
    Smith.config.agent[k] = v
  end
end

Public Instance Methods

install_signal_handler(signal, position=:end, &blk) click to toggle source
# File lib/smith/agent.rb, line 87
def install_signal_handler(signal, position=:end, &blk)
  @signal_handler.install_signal_handler(signal, position=:end, &blk)
end
on_exception(&blk) click to toggle source

The agent may hook into this if they want to do something on exception. It should be noted that, since an exception occured, the reactor will not be running at this point. Even if we restarted the reactor before calling this it would be a different reactor than existed when assigning the block so this would potentially lead to confusion. If the agent really needs the reactor to do something it can always restart the reactor itself.

@param blk [Block] This block will be passed the exception as an

argument.
# File lib/smith/agent.rb, line 78
def on_exception(&blk)
  @on_exception = blk
end
on_running(&blk) click to toggle source
# File lib/smith/agent.rb, line 64
def on_running(&blk)
  @on_running = blk
end
on_stopping(&blk) click to toggle source
# File lib/smith/agent.rb, line 60
def on_stopping(&blk)
  @on_stopping = blk
end
receiver(queue_name, opts={}, &blk) click to toggle source
# File lib/smith/agent.rb, line 100
def receiver(queue_name, opts={}, &blk)
  queues.receiver(queue_name, opts, &blk)
end
run() click to toggle source

Override this method to implement your own agent.

# File lib/smith/agent.rb, line 83
def run
  raise ArgumentError, "You must override this method"
end
sender(queue_names, opts={}, &blk) click to toggle source
# File lib/smith/agent.rb, line 104
def sender(queue_names, opts={}, &blk)
  Array(queue_names).each { |queue_name| queues.sender(queue_name, opts, &blk) }
end
state() click to toggle source
# File lib/smith/agent.rb, line 91
def state
  @state
end

Private Instance Methods

__exception_handler(exception) click to toggle source
# File lib/smith/agent.rb, line 218
def __exception_handler(exception)
  @on_exception.call(exception)
end
acknowledge_start(&blk) click to toggle source
# File lib/smith/agent.rb, line 172
def acknowledge_start(&blk)
  Messaging::Sender.new(QueueDefinitions::Agent_lifecycle) do |queue|
    payload = ACL::AgentAcknowledgeStart.new.tap do |p|
      p.uuid = uuid
      p.pid = $$
      p.singleton = Smith.config.agent.singleton
      p.started_at = Time.now.to_i
      p.metadata = Smith.config.agent.metadata
      p.monitor = Smith.config.agent.monitor
    end
    queue.publish(payload)
  end
end
acknowledge_stop(&blk) click to toggle source
# File lib/smith/agent.rb, line 186
def acknowledge_stop(&blk)
  Messaging::Sender.new(QueueDefinitions::Agent_lifecycle) do |queue|
    message = {:state => 'acknowledge_stop', :pid => $$, :name => self.class.to_s}
    queue.publish(ACL::AgentAcknowledgeStop.new(:uuid => uuid), &blk)
  end
end
control_queue_def() click to toggle source
# File lib/smith/agent.rb, line 214
def control_queue_def
  @control_queue_def ||= QueueDefinitions::Agent_control.call(uuid)
end
queues() click to toggle source
# File lib/smith/agent.rb, line 210
def queues
  @factory
end
setup_control_queue() click to toggle source
# File lib/smith/agent.rb, line 122
def setup_control_queue
  logger.debug { "Setting up control queue: #{control_queue_def.denormalise}" }

  Messaging::Receiver.new(control_queue_def) do |receiver|
    receiver.subscribe do |payload|
      logger.debug { "Command received on agent control queue: #{payload.command} #{payload.options}" }

      case payload.command
      when 'object_count'
        object_count(payload.options.first.to_i).each{|o| logger.info{o}}
      when 'stop'
        @on_stopping.call(@on_stopping_completion)
      when 'log_level'
        begin
          level = payload.options.first
          logger.info { "Setting log level to #{level} for: #{name} (#{uuid})" }
          log_level(level)
        rescue ArgumentError => e
          logger.error { "Incorrect log level: #{level}" }
        end
      else
        logger.warn { "Unknown command: #{level} -> #{level.inspect}" }
      end
    end
  end
end
setup_stats_queue() click to toggle source
# File lib/smith/agent.rb, line 149
def setup_stats_queue
  Messaging::Sender.new(QueueDefinitions::Agent_stats) do |stats_queue|
    EventMachine.add_periodic_timer(2) do
      stats_queue.number_of_consumers do |consumers|
        if consumers > 0
          payload = ACL::AgentStats.new.tap do |p|
            p.uuid = uuid
            p.agent_name = self.name
            p.pid = self.pid
            p.rss = (File.read("/proc/#{pid}/statm").split[1].to_i * 4) / 1024 # This assumes the page size is 4K & is MB
            p.up_time = (Time.now - @start_time).to_i
            queues.each_queue do |q|
              p.queues << ACL::AgentStats::QueueStats.new(:name => q.name, :type => q.class.to_s, :length => q.counter)
            end
          end

          stats_queue.publish(payload)
        end
      end
    end
  end
end
start_keep_alive() click to toggle source
# File lib/smith/agent.rb, line 193
def start_keep_alive
  if Smith.config.agent.monitor
    EventMachine::add_periodic_timer(1) do
      Messaging::Sender.new(QueueDefinitions::Agent_keepalive) do |queue|
        message = {:name => self.class.to_s, :uuid => uuid, :time => Time.now.to_i}
        queue.consumers do |consumers|
          if consumers > 0
            queue.publish(ACL::AgentKeepalive, message)
          end
        end
      end
    end
  else
    logger.info { "Not initiating keep alive, agent is not being monitored: #{@name}" }
  end
end