class Pione::Agent::BasicAgent

BasicAgent is a super class for all PIONE agents.

Attributes

chain_threads[R]

Public Class Methods

agent_type() click to toggle source

Return the agent type.

# File lib/pione/agent/basic-agent.rb, line 104
def self.agent_type
  @agent_type
end
inherited(subclass) click to toggle source

class methods

# File lib/pione/agent/basic-agent.rb, line 91
def self.inherited(subclass)
  subclass.instance_variable_set(:@transitions, @transitions.clone)
  subclass.instance_variable_set(:@transition_chain, @transition_chain.clone)
  subclass.instance_variable_set(:@exception_handler, @exception_handler.clone)
end
new() click to toggle source
# File lib/pione/agent/basic-agent.rb, line 115
def initialize
  @chain_threads = ThreadGroup.new

  # for wait_until_before method
  @__wait_until_before_mutex__ = Mutex.new
  @__wait_until_before_cv__ = Hash.new {|h, k| h[k] = ConditionVariable.new}

  # for wait_until_after method
  @__wait_until_after_mutex__ = Mutex.new
  @__wait_until_after_cv__ = Hash.new {|h, k| h[k] = ConditionVariable.new}
end
set_agent_type(agent_type, klass=nil) click to toggle source

Set the agent type.

# File lib/pione/agent/basic-agent.rb, line 98
def self.set_agent_type(agent_type, klass=nil)
  @agent_type = agent_type
  Agent.set_agent(klass) if klass
end

Public Instance Methods

start() click to toggle source

Start the agent activity.

# File lib/pione/agent/basic-agent.rb, line 128
def start
  unless @chain_threads.list.empty?
    raise TerminationError.new(self, states)
  end

  # save current thread
  @__owner_thread__ = Thread.current

  # start a new chain thread
  @chain_threads.add(start_running(:init, [], AgentState.new, true))
  @chain_threads.enclose

  return self
end
start!() click to toggle source

Start the agent activity and wait the termination.

# File lib/pione/agent/basic-agent.rb, line 144
def start!
  start
  wait_until_terminated(nil)
end
states() click to toggle source

Return agent states.

# File lib/pione/agent/basic-agent.rb, line 193
def states
  @chain_threads.list.map {|th| th[:agent_state]}
end
terminate() click to toggle source

Terminate the agent activity.

# File lib/pione/agent/basic-agent.rb, line 234
def terminate
  state = nil

  Thread.new {
    # kill all chain threads
    @chain_threads.list.each do |thread|
      state = thread[:agent_state] # save last state
      unless thread == Thread.current
        thread.kill
        thread.join
      end
    end

    # fire "terminate" transtion
    begin
      Thread.current[:agent_state] = state || AgentState.new
      transit(:terminate, [])
    rescue DRb::DRbConnError, DRbPatch::ReplyReaderError => e
      Log::Debug.warn("raised a connection error when we terminated", self, e)
    end
  }.join
end
terminated?() click to toggle source

Return true if the agent has been terminated.

# File lib/pione/agent/basic-agent.rb, line 258
def terminated?
  return (@chain_threads.list.empty? and @chain_threads.enclosed?)
end
transit(transition, transition_inputs) click to toggle source

Fire the transtion with inputs.

# File lib/pione/agent/basic-agent.rb, line 150
def transit(transition, transition_inputs)
  # wake up threads that wait by wait_until_before method
  if @__wait_until_before_cv__.has_key?(transition)
    @__wait_until_before_mutex__.synchronize do
      @__wait_until_before_cv__[transition].broadcast
    end
  end

  # mark current transition
  Thread.current[:agent_state] =
    AgentState.new(previous: Thread.current[:agent_state].previous, current: transition)

  # call transition
  result = call_transition_method(transition, transition_inputs)
  result = result.nil? ? [] : result
  result = result.is_a?(Array) ? result : [result]

  # unmark current transition and mark previous transition
  Thread.current[:agent_state] = AgentState.new(previous: transition, current: nil)

  # wake up threads that wait by wait_until_after method
  if @__wait_until_after_cv__.has_key?(transition)
    @__wait_until_after_mutex__.synchronize do
      @__wait_until_after_cv__[transition].broadcast
    end
  end

  return transition, result
rescue StandardError => e
  # error handling
  if error_transition = get_exception_handler(e)
    raise unless error_transition.is_a?(Symbol)
    return transit(error_transition, [e])
  else
    if @__owner_thread and @__owner_thread__.alive?
      @__owner_thread__.raise e
    else
      raise e
    end
  end
end
wait_until(transition, sec=10) click to toggle source
# File lib/pione/agent/basic-agent.rb, line 208
def wait_until(transition, sec=10)
  unless @chain_threads.list.any? {|th| th[:agent_state] and th[:agent_state].current == transition}
    wait_until_before(transition, sec)
  end
end
wait_until_after(transition, sec=10) click to toggle source

Sleep until after the agent fires the transition.

# File lib/pione/agent/basic-agent.rb, line 215
def wait_until_after(transition, sec=10)
  timeout(sec) do
    @__wait_until_after_mutex__.synchronize do
      @__wait_until_after_cv__[transition].wait(@__wait_until_after_mutex__)
    end
  end
rescue Timeout::Error => e
  raise TimeoutError.new(self, @chain_threads.list.map{|th| th[:agent_state]}, sec)
end
wait_until_before(transition, sec=10) click to toggle source

Sleep until before the agent fires the transition.

# File lib/pione/agent/basic-agent.rb, line 198
def wait_until_before(transition, sec=10)
  timeout(sec) do
    @__wait_until_before_mutex__.synchronize do
      @__wait_until_before_cv__[transition].wait(@__wait_until_before_mutex__)
    end
  end
rescue Timeout::Error
  raise TimeoutError.new(self, @chain_threads.list.map{|th| th[:agent_state]}, sec)
end
wait_until_terminated(sec=10) click to toggle source

Sleep caller thread until the agent is terminated.

# File lib/pione/agent/basic-agent.rb, line 226
def wait_until_terminated(sec=10)
  unless terminated?
    wait_until_after(:terminate, sec)
    @chain_threads.list.each {|thread| thread.join}
  end
end

Private Instance Methods

call_transition_method(transition, args) click to toggle source

Call the transition method.

# File lib/pione/agent/basic-agent.rb, line 298
def call_transition_method(transition, args)
  method = method("transit_to_#{transition}")
  method.call(*args[0,method.arity])
end
get_exception_handler(e) click to toggle source

Get transtion for the exception.

# File lib/pione/agent/basic-agent.rb, line 304
def get_exception_handler(e)
  table = self.class.exception_handler
  e.class.ancestors.each do |mod|
    return table[mod] if table.has_key?(mod)
  end
end
get_next_transitions(transition, result) click to toggle source

Get next transitions based on transition chain table with previous transition result.

# File lib/pione/agent/basic-agent.rb, line 312
def get_next_transitions(transition, result)
  next_transitions = self.class.transition_chain[transition]
  if next_transitions.is_a?(Proc)
    next_transitions = next_transitions.call(self, *result)
  end
  if next_transitions.nil? or (next_transitions == [])
    raise TerminationError.new(self, states)
  end
  return next_transitions
end
start_running(transition, result, state, free) click to toggle source

Start transition chain.

# File lib/pione/agent/basic-agent.rb, line 265
def start_running(transition, result, state, free)
  thread = free ? Util::FreeThreadGenerator.method(:generate) : Thread.method(:new)
  thread.call do
    begin
      Thread.current[:agent_state] = state
      while true
        # fire the transition
        # NOTE: transition name is maybe changed by the result of firing
        _transition, result, count = transit(transition, result)
        state = Thread.current[:agent_state]

        begin
          # go next transition
          next_transitions = get_next_transitions(_transition, result)
          transition, *branches = next_transitions
          # handle transition branches
          branches.each {|t| start_running(t, result, state, false)}
        rescue TerminationError
          break # end loop after terminate transition
        end
      end
    rescue Exception => e
      # throw the exception to owner thread
      if @__owner_thread__ and @__owner_thread__.alive?
        @__owner_thread__.raise e
      else
        raise e
      end
    end
  end
end