class Pione::Agent::TaskWorker

TaskWorker is an agent to process tasks

Attributes

execution_thread[R]
once[RW]
tuple_space[R]

instance methods

Public Class Methods

new(tuple_space, features, env=nil) click to toggle source

@param [Pione::TupleSpace::TupleSpaceServer] tuple_space

tuple space

@param [String] features

features that the task worker has

@param [Hash] option @option option [] :env

a environment object

@option option [String] :session_id

session ID

@option option [URI] :request_from

URI that a client requested from
Calls superclass method
# File lib/pione/agent/task-worker.rb, line 26
def initialize(tuple_space, features, env=nil)
  super(tuple_space)
  @tuple_space = tuple_space
  @features = features
  @env = env || get_environment
end

Public Instance Methods

get_environment() click to toggle source

Get a environment object from tuple space.

# File lib/pione/agent/task-worker.rb, line 127
def get_environment
  if env = read!(TupleSpace::EnvTuple.new)
    env.obj
  else
    raise TupleSpaceError.new("the tuple space is invalid because \"env\" tuple not found.")
  end
end
make_engine(task) click to toggle source

Make an engine from the task.

# File lib/pione/agent/task-worker.rb, line 136
def make_engine(task)
  param = {
    :tuple_space  => @tuple_space,
    :env          => @env,
    :package_id   => task.package_id,
    :rule_name    => task.rule_name,
    :inputs       => task.inputs,
    :param_set    => task.param_set,
    :domain_id    => task.domain_id,
    :caller_id    => task.caller_id,
    :request_from => @request_from,
    :session_id   => @session_id,
    :client_ui    => @client_ui
  }

  RuleEngine.make(param)
end
spawn_child_task_worker(task) click to toggle source

Spawn child task worker. This method repeats to create a child agent while rule execution thread is alive.

# File lib/pione/agent/task-worker.rb, line 156
def spawn_child_task_worker(task)
  child_agent = nil
  foreground = TupleSpace::ForegroundTuple.new(task.domain_id, task.digest)

  # child worker loop
  while @execution_thread.alive? do
    if @execution_thread.status == "sleep"
      if child_agent.nil? or child_agent.terminated?
        # when there isn't active child agent
        child_agent = self.class.new(tuple_space_server, @features, @env)
        child_agent.once = true

        # make log record
        record = Log::CreateChildTaskWorkerProcessRecord.new.tap do |x|
          x.parent = uuid
          x.child = child_agent.uuid
        end

        # spawn child agent with logging
        with_process_log(record) do
          # turn background
          take!(foreground)
          # start child agent
          child_agent.start
        end

        # wait until the child agent completes the task
        child_agent.wait_until_terminated(nil)
      end
    else
      sleep 0.1 # FIXME : rewrite this sleep by more sophisticated way
    end
  end

  # turn foreground
  write(foreground) unless read!(foreground)
end
transit_to_connection_error(e) click to toggle source

Report the connection error.

# File lib/pione/agent/task-worker.rb, line 118
def transit_to_connection_error(e)
  Log::SystemLog.warn("task worker agent was disconnected from tuple space unexpectedly, goes to termination.")
end
transit_to_execute_task(task) click to toggle source

Execute the task.

# File lib/pione/agent/task-worker.rb, line 90
def transit_to_execute_task(task)
  # setup rule engine
  engine = make_engine(task)

  # start the engine
  @execution_thread = Thread.new do
    engine.handle || terminate
  end

  # spawn child task worker if flow
  if engine.rule_definition.rule_type == "flow"
    spawn_child_task_worker(task)
  end

  # wait until the engine ends
  @execution_thread.join

  # go next transition
  return task
end
transit_to_finalize_task(task) click to toggle source

Finalize the task. This method will turn working flag off and background.

# File lib/pione/agent/task-worker.rb, line 112
def transit_to_finalize_task(task)
  take!(TupleSpace::WorkingTuple.new(task.domain_id, task.digest))
  take!(TupleSpace::ForegroundTuple.new(task.domain_id, task.digest))
end
transit_to_init() click to toggle source

transitions

Calls superclass method
# File lib/pione/agent/task-worker.rb, line 57
def transit_to_init
  @request_from = @tuple_space.attribute("request_from")
  @session_id = @tuple_space.attribute("session_id")
  @client_ui = @tuple_space.attribute("client_ui")
  super
end
transit_to_init_task(task) click to toggle source

Initialize the task.

# File lib/pione/agent/task-worker.rb, line 70
def transit_to_init_task(task)
  # make flag tuples
  working = TupleSpace::WorkingTuple.new(task.domain_id, task.digest)
  foreground = TupleSpace::ForegroundTuple.new(task.domain_id, task.digest)

  if read!(working)
    # the task is working already, so we will dicard the task
    raise Restart.new
  else
    # turn foreground flag on
    write(working)
    write(foreground)
    # go next transition
    return task
  end
rescue Rinda::RedundantTupleError
  raise Restart.new
end
transit_to_take_task() click to toggle source

Take a task and turn it to foreground.

# File lib/pione/agent/task-worker.rb, line 65
def transit_to_take_task
  return take(TupleSpace::TaskTuple.new(features: @features))
end