class Pione::RuleEngine::RuleApplication

Public Class Methods

new(handler) click to toggle source
Calls superclass method
# File lib/pione/rule-engine/flow-handler.rb, line 175
def initialize(handler)
  super(handler)
  @data_finder = DataFinder.new(tuple_space_server, domain_id)
  @finished = Set.new # finished tuple cache
end

Public Instance Methods

apply(rules) click to toggle source

Apply input data to rules.

# File lib/pione/rule-engine/flow-handler.rb, line 182
def apply(rules)
  # start message
  user_message_begin("Rule Application: %s" % digest, 1)

  # with profile
  Util::Profiler.profile(Util::RuleApplicationProfileReport.new(digest)) do
    # rule application loop
    while tasks = find_tasks(rules) do
      # save previous finished tuples's number
      size = @finished.size

      # distribute tasks
      distribute_tasks(tasks)

      # check whether tasks have been processed
      break unless size < @finished.size
    end
  end

  # end message
  user_message_end("Rule Application: %s" % digest, 1)
end
check_updatability(task) click to toggle source

Check updatability of the task and get update order.

# File lib/pione/rule-engine/flow-handler.rb, line 321
def check_updatability(task)
  # read all tuples of data-null
  data_null_tuples = read_all(TupleSpace::DataNullTuple.new(domain: task.domain_id))

  res = []

  f = lambda do |task_env, outputs|
    # make parameter set for the task
    table = Hash.new

    if val_i = task_env.variable_get!(Lang::Variable.new("O"))
      table["OUTPUT"] = Lang::Variable.new("O")
      table["O"] = val_i
    end

    task_param_set = task.param_set.set(table: task.param_set.table.merge(table))

    # check update criterias
    order = UpdateCriteria.order(task_env, task.rule_condition, task.inputs, outputs, data_null_tuples)
    res << [order, task_env, task_param_set]
  end

  # find output data combination
  @data_finder.find(:output, task.rule_condition.outputs, task.env, &f)
  f.call(task.env, []) if res.empty?

  # evaluate the result
  groups = res.group_by {|(order, _, _)| order}
  if f = groups[:force] or f = groups[:weak]
    order, env, param_set = f.first

    # setup output variables
    var_o = Lang::Variable.new("O")
    task.env.variable_set(Lang::Variable.new("OUTPUT"), var_o)
    kseq = find_output_variables(task, Lang::KeyedSequence.new)
    task.env.variable_set(var_o, kseq)
    param_set = param_set.set(table: param_set.table.merge({"O" => kseq}))

    return task.set(order: order, env: env, param_set: param_set)
  else
    return nil
  end
end
distribute_tasks(tasks) click to toggle source

Distribute tasks.

# File lib/pione/rule-engine/flow-handler.rb, line 379
def distribute_tasks(tasks)
  # log and message
  process_log(make_task_process_record.merge(transition: "suspend"))
  process_log(make_rule_process_record.merge(transition: "suspend"))
  user_message_begin("Distribution: %s" % digest, 2)

  # distribute tasks
  tasks.each do |task|
    tuple = task.make_tuple(domain_id)

    # publish tasks
    if need_to_publish_task?(task, tuple)
      # clear finished tuple and data tuples from the domain
      take!(TupleSpace::FinishedTuple.new(domain: task.domain_id))
      take_all!(TupleSpace::DataTuple.new(domain: task.domain_id))

      # copy input data from this domain to the task domain
      task.inputs.flatten.each {|input| copy_data_into_domain(input, task.domain_id)}

      # write the task
      write(tuple)

      # log and message
      process_log(task.make_task_process_record.merge(transition: "schedule"))
      user_message(">>> %s".color(:yellow) % task.digest, 3, "", :blue)
    else
      # cancel the task
      Log::Debug.rule_engine "task %s canceled at %s" % [task.digest, digest]
    end
  end

  # wait an end of distributed tasks
  wait_task_completion(tasks)

  # turn foreground if the task is background
  unless read!(TupleSpace::ForegroundTuple.new(domain_id, digest))
    write(TupleSpace::ForegroundTuple.new(domain_id, digest))
  end

  # log and message
  process_log(make_rule_process_record.merge(transition: "resume"))
  process_log(make_task_process_record.merge(transition: "resume"))
  user_message_end("Distribution: %s" % digest, 2)
end
find_applicable_rules(rules) click to toggle source

Find applicable rules. The criterion of applicable rule is that the rule satisfies ticket conditions or not.

# File lib/pione/rule-engine/flow-handler.rb, line 219
def find_applicable_rules(rules)
  # select rules which ticktes exist in this domain
  rules.select do |rule|
    rule.input_tickets.pieces.all? do |ticket|
      read!(TupleSpace::TicketTuple.new(domain_id, ticket.name))
    end
  end
end
find_output_variables(task, kseq) click to toggle source
# File lib/pione/rule-engine/flow-handler.rb, line 365
def find_output_variables(task, kseq)
  _kseq = kseq
  task.rule_condition.outputs.each_with_index do |condition, i|
    begin
      data = condition.eval(task.env)
      _kseq = _kseq.put(Lang::IntegerSequence.of(i+1), data)
    rescue Lang::UnboundError
      next
    end
  end
  return _kseq
end
find_tasks(rules) click to toggle source

Find applicable and updatable rule applications.

# File lib/pione/rule-engine/flow-handler.rb, line 206
def find_tasks(rules)
  # select applicable rules
  applicable_rules = find_applicable_rules(rules)

  # make task
  tasks = make_tasks(applicable_rules)

  # be careful that returns nil when tasks are empty
  tasks.empty? ? nil : tasks
end
find_tasks_by_rule_condition(env, rule, rule_definition, rule_condition, param_set) click to toggle source

Handle parameter distribution. Rule parameters with each modifier are distributed tasks by each element.

# File lib/pione/rule-engine/flow-handler.rb, line 278
def find_tasks_by_rule_condition(env, rule, rule_definition, rule_condition, param_set)
  tasks = []

  # find input data combinations
  @data_finder.find(:input, rule_condition.inputs, env) do |task_env, inputs|
    # make parameter set for the task
    table = Hash.new

    if val_i = task_env.variable_get!(Lang::Variable.new("I"))
      table["INPUT"] = Lang::Variable.new(name: "I", package_id: rule.package_id)
      table["I"] = val_i
    end

    if val_star = task_env.variable_get!(Lang::Variable.new("*"))
      table["*"] = val_star
    end

    task_param_set = param_set.set(table: param_set.table.merge(table))

    # check constraint conditions
    next unless rule_condition.constraints.all? do |constraint|
      res = constraint.eval(task_env)
      if res.is_a?(Lang::BooleanSequence)
        res.value
      else
        raise Lang::StructuralError.new(Lang::BooleanSequence, constraint.pos)
      end
    end

    # make task
    domain_id = Util::DomainID.generate(rule.package_id, rule.name, inputs, task_param_set)
    task = Task.new(task_env, domain_id, rule, rule_definition, rule_condition, inputs, task_param_set)

    # check updatability
    if _task = check_updatability(task)
      tasks << _task
    end
  end

  return tasks
end
import_outputs_of_task(task, finished) click to toggle source

Import finished tuple’s outputs from the domain.

# File lib/pione/rule-engine/flow-handler.rb, line 475
def import_outputs_of_task(task, finished)
  finished.outputs.each_with_index do |output, i|
    data_expr = task.rule_condition.outputs[i].eval(task.env)
    output = [output] unless output.kind_of?(Array)

    case data_expr.operation
    when :write
      output.each {|o| copy_data_into_domain(o, domain_id)}
    when :remove
      output.each {|o| remove_data_from_domain(o, domain_id)}
    when :touch
      output.each {|o| touch_data_in_domain(o, domain_id)}
    else
      # here is unreached
      raise RuleExecutionError.new(self)
    end
  end
end
lift_touch_tuple(task) click to toggle source

Lift effects of touch operations from the task domain to this domain.

# File lib/pione/rule-engine/flow-handler.rb, line 495
def lift_touch_tuple(task)
  read_all(TupleSpace::TouchTuple.new(domain: task.domain_id)).each do |touch|
    if target = read!(TupleSpace::DataTuple.new(name: touch.name, domain: domain_id))
      # update time of data tuple
      write(target.tap {|x| x.time = touch.time}) unless target.time > touch.time

      # lift touch tuple to upper domain
      write(touch.tap{|x| x.domain = domain_id})
    end
  end
end
make_tasks(rules) click to toggle source

Make tasks from rules.

# File lib/pione/rule-engine/flow-handler.rb, line 229
def make_tasks(rules)
  rules.each_with_object([]) do |rule, tasks|
    # set handler's package id if rule's package id is implicit
    rule = rule.set(package_id: package_id) unless rule.package_id

    # get rule definition
    rule_definition = env.rule_get(rule)

    # handle parameter sequence
    pieces = rule.param_sets.pieces
    if not(pieces.empty?)
      pieces.each do |param_set|
        ### merge default parameter values ####
        # setup task's environment by parameter set
        _env = plain_env.layer.merge_param_set(param_set)
        _env.set(current_package_id: rule.package_id || env.current_package_id)

        # get task's condition
        rule_condition = rule_definition.rule_condition_context.eval(_env)

        # merge default values
        _param_set = param_set.merge_default_values(rule_condition)

        # handle parameter distribution
        _param_set.eval(_env).expand do |expanded_param_set|
          # rebuild environment by expanded param set
          _env = plain_env.layer.merge_param_set(expanded_param_set)
          _env.set(current_package_id: rule.package_id || env.current_package_id)

          # get task's condition
          rule_condition = rule_definition.rule_condition_context.eval(_env)

          tasks.concat find_tasks_by_rule_condition(_env, rule, rule_definition, rule_condition, expanded_param_set).uniq
        end
      end
    else
      _env = plain_env.layer
      # get task's condition
      rule_condition = rule_definition.rule_condition_context.eval(_env)
      this_tasks = find_tasks_by_rule_condition(
        _env, rule, rule_definition, rule_condition, Lang::ParameterSet.new
      ).uniq
      tasks.concat(this_tasks)
    end
  end
end
need_to_publish_task?(task, tuple) click to toggle source

Return true if we need to publish the task.

# File lib/pione/rule-engine/flow-handler.rb, line 425
def need_to_publish_task?(task, tuple)
  # reuse task finished result if order is weak update
  if task.order == :weak
    template = TupleSpace::FinishedTuple.new(domain: task.domain_id, status: :succeeded)
    if @finished.include?(template)
      return false
    end
    if finished = read!(template)
      unless @finished.any? {|t| t.uuid == finished.uuid}
        @finished << finished
      end
      return false
    end
  end

  # the task exists in space already, so we don't need to publish
  return false if read!(tuple)

  # another worker is working now, so we don't need to publish
  return false if read!(TupleSpace::WorkingTuple.new(domain: task.domain_id))

  # we need to publish the task
  return true
end
wait_task_completion(tasks) click to toggle source

Wait until tasks completed.

# File lib/pione/rule-engine/flow-handler.rb, line 451
def wait_task_completion(tasks)
  tasks.each do |task|
    # wait to finish the distributed task, note that finished tuple is in
    # the task domain
    finished = read(TupleSpace::FinishedTuple.new(domain: task.domain_id))
    unless @finished.any? {|t| t.uuid == finished.uuid}
      @finished << finished
    end

    ### task completion processing ###
    # copy write operation data tuple from the task domain to this domain
    import_outputs_of_task(task, finished)

    # touch tuple
    lift_touch_tuple(task)

    # publish output tickets
    task.rule.output_tickets.pieces.each do |piece|
      write(TupleSpace::TicketTuple.new(domain_id, piece.name))
    end
  end
end