class Nestene::Actor::AutonQueue

Public Class Methods

new() click to toggle source
# File lib/nestene/actor/auton_queue.rb, line 9
def initialize
  subscribe('state_update', :execute_next_step)
end

Public Instance Methods

execute_next_step(topic, auton_id, state) click to toggle source
# File lib/nestene/actor/auton_queue.rb, line 14
def execute_next_step topic, auton_id, state

  return unless state

  executing = nil
  type = nil
  ser = nil
  running = false

  storage_actor = Celluloid::Actor["storage:%s" % auton_id]

  return unless storage_actor

  storage_actor.update do |state|
    if state
      running = state.state == :ready && !state.queue.to_execute.empty?
      if running
        nx = state.queue.to_execute.shift
        executing = ExecutingMethod.new(nx)
        state.queue.currently_executing = executing
        type = state.type
        ser = state.serialized
      end
    end
  end

  return unless running

  if executing.name == '__terminate_this_auton'
    Celluloid::Actor["storage:%s" % auton_id].async.shutdown
    return
  end

  instance = Nestene::class_from_string(type).from_structure(ser)

  if instance.public_methods.include?(:context=)
    instance.public_send(:context=, AutonContext.new(auton_id))
  end

  future = Celluloid::Future.new do
    instance.method(executing.name).call(*executing.parameters)
  end

  result = exception = nil

  begin
    result = future.value
  rescue Exception => e
    exception = e
  end

  executed = ExecutedMethod.new(executing)

  executed.result = result

  if exception
    executed.error = exception
  end

  if executed.callback
    if exception
      Celluloid::Actor[:nestene_core].async.schedule_step executed.callback.auton_id, ("%s_error" % executed.callback.name.to_s), [exception.class.name, exception.message]
    else
      Celluloid::Actor[:nestene_core].async.schedule_step executed.callback.auton_id, executed.callback.name, result
    end
  end

  Celluloid::Actor["storage:%s" % auton_id].update do |state|
    if state
      state.queue.currently_executing = nil
      state.serialized = instance.to_structure
      if exception
        if instance.methods.include?(:handle_exception) && executing.name != :handle_exception
          method = ScheduledMethod.new
          method.name = :handle_exception
          method.parameters = [exception.class.name, exception.message, executing.name, executing.parameters]
          method.uuid = SecureRandom.uuid
          state.queue.to_execute.unshift method
        else
          state.queue.failed = true
        end
      end
      state.queue.add_executed executed
    end
  end
end