class Pione::RuleEngine::RuleApplication
Public Class Methods
new(handler)
click to toggle source
Calls superclass method
# File lib/pione/rule-engine/flow-handler.rb, line 175 def initialize(handler) super(handler) @data_finder = DataFinder.new(tuple_space_server, domain_id) @finished = Set.new # finished tuple cache end
Public Instance Methods
apply(rules)
click to toggle source
Apply input data to rules.
# File lib/pione/rule-engine/flow-handler.rb, line 182 def apply(rules) # start message user_message_begin("Rule Application: %s" % digest, 1) # with profile Util::Profiler.profile(Util::RuleApplicationProfileReport.new(digest)) do # rule application loop while tasks = find_tasks(rules) do # save previous finished tuples's number size = @finished.size # distribute tasks distribute_tasks(tasks) # check whether tasks have been processed break unless size < @finished.size end end # end message user_message_end("Rule Application: %s" % digest, 1) end
check_updatability(task)
click to toggle source
Check updatability of the task and get update order.
# File lib/pione/rule-engine/flow-handler.rb, line 321 def check_updatability(task) # read all tuples of data-null data_null_tuples = read_all(TupleSpace::DataNullTuple.new(domain: task.domain_id)) res = [] f = lambda do |task_env, outputs| # make parameter set for the task table = Hash.new if val_i = task_env.variable_get!(Lang::Variable.new("O")) table["OUTPUT"] = Lang::Variable.new("O") table["O"] = val_i end task_param_set = task.param_set.set(table: task.param_set.table.merge(table)) # check update criterias order = UpdateCriteria.order(task_env, task.rule_condition, task.inputs, outputs, data_null_tuples) res << [order, task_env, task_param_set] end # find output data combination @data_finder.find(:output, task.rule_condition.outputs, task.env, &f) f.call(task.env, []) if res.empty? # evaluate the result groups = res.group_by {|(order, _, _)| order} if f = groups[:force] or f = groups[:weak] order, env, param_set = f.first # setup output variables var_o = Lang::Variable.new("O") task.env.variable_set(Lang::Variable.new("OUTPUT"), var_o) kseq = find_output_variables(task, Lang::KeyedSequence.new) task.env.variable_set(var_o, kseq) param_set = param_set.set(table: param_set.table.merge({"O" => kseq})) return task.set(order: order, env: env, param_set: param_set) else return nil end end
distribute_tasks(tasks)
click to toggle source
Distribute tasks.
# File lib/pione/rule-engine/flow-handler.rb, line 379 def distribute_tasks(tasks) # log and message process_log(make_task_process_record.merge(transition: "suspend")) process_log(make_rule_process_record.merge(transition: "suspend")) user_message_begin("Distribution: %s" % digest, 2) # distribute tasks tasks.each do |task| tuple = task.make_tuple(domain_id) # publish tasks if need_to_publish_task?(task, tuple) # clear finished tuple and data tuples from the domain take!(TupleSpace::FinishedTuple.new(domain: task.domain_id)) take_all!(TupleSpace::DataTuple.new(domain: task.domain_id)) # copy input data from this domain to the task domain task.inputs.flatten.each {|input| copy_data_into_domain(input, task.domain_id)} # write the task write(tuple) # log and message process_log(task.make_task_process_record.merge(transition: "schedule")) user_message(">>> %s".color(:yellow) % task.digest, 3, "", :blue) else # cancel the task Log::Debug.rule_engine "task %s canceled at %s" % [task.digest, digest] end end # wait an end of distributed tasks wait_task_completion(tasks) # turn foreground if the task is background unless read!(TupleSpace::ForegroundTuple.new(domain_id, digest)) write(TupleSpace::ForegroundTuple.new(domain_id, digest)) end # log and message process_log(make_rule_process_record.merge(transition: "resume")) process_log(make_task_process_record.merge(transition: "resume")) user_message_end("Distribution: %s" % digest, 2) end
find_applicable_rules(rules)
click to toggle source
Find applicable rules. The criterion of applicable rule is that the rule satisfies ticket conditions or not.
# File lib/pione/rule-engine/flow-handler.rb, line 219 def find_applicable_rules(rules) # select rules which ticktes exist in this domain rules.select do |rule| rule.input_tickets.pieces.all? do |ticket| read!(TupleSpace::TicketTuple.new(domain_id, ticket.name)) end end end
find_output_variables(task, kseq)
click to toggle source
# File lib/pione/rule-engine/flow-handler.rb, line 365 def find_output_variables(task, kseq) _kseq = kseq task.rule_condition.outputs.each_with_index do |condition, i| begin data = condition.eval(task.env) _kseq = _kseq.put(Lang::IntegerSequence.of(i+1), data) rescue Lang::UnboundError next end end return _kseq end
find_tasks(rules)
click to toggle source
Find applicable and updatable rule applications.
# File lib/pione/rule-engine/flow-handler.rb, line 206 def find_tasks(rules) # select applicable rules applicable_rules = find_applicable_rules(rules) # make task tasks = make_tasks(applicable_rules) # be careful that returns nil when tasks are empty tasks.empty? ? nil : tasks end
find_tasks_by_rule_condition(env, rule, rule_definition, rule_condition, param_set)
click to toggle source
Handle parameter distribution. Rule parameters with each modifier are distributed tasks by each element.
# File lib/pione/rule-engine/flow-handler.rb, line 278 def find_tasks_by_rule_condition(env, rule, rule_definition, rule_condition, param_set) tasks = [] # find input data combinations @data_finder.find(:input, rule_condition.inputs, env) do |task_env, inputs| # make parameter set for the task table = Hash.new if val_i = task_env.variable_get!(Lang::Variable.new("I")) table["INPUT"] = Lang::Variable.new(name: "I", package_id: rule.package_id) table["I"] = val_i end if val_star = task_env.variable_get!(Lang::Variable.new("*")) table["*"] = val_star end task_param_set = param_set.set(table: param_set.table.merge(table)) # check constraint conditions next unless rule_condition.constraints.all? do |constraint| res = constraint.eval(task_env) if res.is_a?(Lang::BooleanSequence) res.value else raise Lang::StructuralError.new(Lang::BooleanSequence, constraint.pos) end end # make task domain_id = Util::DomainID.generate(rule.package_id, rule.name, inputs, task_param_set) task = Task.new(task_env, domain_id, rule, rule_definition, rule_condition, inputs, task_param_set) # check updatability if _task = check_updatability(task) tasks << _task end end return tasks end
import_outputs_of_task(task, finished)
click to toggle source
Import finished tuple’s outputs from the domain.
# File lib/pione/rule-engine/flow-handler.rb, line 475 def import_outputs_of_task(task, finished) finished.outputs.each_with_index do |output, i| data_expr = task.rule_condition.outputs[i].eval(task.env) output = [output] unless output.kind_of?(Array) case data_expr.operation when :write output.each {|o| copy_data_into_domain(o, domain_id)} when :remove output.each {|o| remove_data_from_domain(o, domain_id)} when :touch output.each {|o| touch_data_in_domain(o, domain_id)} else # here is unreached raise RuleExecutionError.new(self) end end end
lift_touch_tuple(task)
click to toggle source
Lift effects of touch operations from the task domain to this domain.
# File lib/pione/rule-engine/flow-handler.rb, line 495 def lift_touch_tuple(task) read_all(TupleSpace::TouchTuple.new(domain: task.domain_id)).each do |touch| if target = read!(TupleSpace::DataTuple.new(name: touch.name, domain: domain_id)) # update time of data tuple write(target.tap {|x| x.time = touch.time}) unless target.time > touch.time # lift touch tuple to upper domain write(touch.tap{|x| x.domain = domain_id}) end end end
make_tasks(rules)
click to toggle source
Make tasks from rules.
# File lib/pione/rule-engine/flow-handler.rb, line 229 def make_tasks(rules) rules.each_with_object([]) do |rule, tasks| # set handler's package id if rule's package id is implicit rule = rule.set(package_id: package_id) unless rule.package_id # get rule definition rule_definition = env.rule_get(rule) # handle parameter sequence pieces = rule.param_sets.pieces if not(pieces.empty?) pieces.each do |param_set| ### merge default parameter values #### # setup task's environment by parameter set _env = plain_env.layer.merge_param_set(param_set) _env.set(current_package_id: rule.package_id || env.current_package_id) # get task's condition rule_condition = rule_definition.rule_condition_context.eval(_env) # merge default values _param_set = param_set.merge_default_values(rule_condition) # handle parameter distribution _param_set.eval(_env).expand do |expanded_param_set| # rebuild environment by expanded param set _env = plain_env.layer.merge_param_set(expanded_param_set) _env.set(current_package_id: rule.package_id || env.current_package_id) # get task's condition rule_condition = rule_definition.rule_condition_context.eval(_env) tasks.concat find_tasks_by_rule_condition(_env, rule, rule_definition, rule_condition, expanded_param_set).uniq end end else _env = plain_env.layer # get task's condition rule_condition = rule_definition.rule_condition_context.eval(_env) this_tasks = find_tasks_by_rule_condition( _env, rule, rule_definition, rule_condition, Lang::ParameterSet.new ).uniq tasks.concat(this_tasks) end end end
need_to_publish_task?(task, tuple)
click to toggle source
Return true if we need to publish the task.
# File lib/pione/rule-engine/flow-handler.rb, line 425 def need_to_publish_task?(task, tuple) # reuse task finished result if order is weak update if task.order == :weak template = TupleSpace::FinishedTuple.new(domain: task.domain_id, status: :succeeded) if @finished.include?(template) return false end if finished = read!(template) unless @finished.any? {|t| t.uuid == finished.uuid} @finished << finished end return false end end # the task exists in space already, so we don't need to publish return false if read!(tuple) # another worker is working now, so we don't need to publish return false if read!(TupleSpace::WorkingTuple.new(domain: task.domain_id)) # we need to publish the task return true end
wait_task_completion(tasks)
click to toggle source
Wait until tasks completed.
# File lib/pione/rule-engine/flow-handler.rb, line 451 def wait_task_completion(tasks) tasks.each do |task| # wait to finish the distributed task, note that finished tuple is in # the task domain finished = read(TupleSpace::FinishedTuple.new(domain: task.domain_id)) unless @finished.any? {|t| t.uuid == finished.uuid} @finished << finished end ### task completion processing ### # copy write operation data tuple from the task domain to this domain import_outputs_of_task(task, finished) # touch tuple lift_touch_tuple(task) # publish output tickets task.rule.output_tickets.pieces.each do |piece| write(TupleSpace::TicketTuple.new(domain_id, piece.name)) end end end