class Pione::RuleEngine::FlowHandler

FlowHandler is a rule handler for flow elements.

Public Instance Methods

copy_data_into_domain(data, domain) click to toggle source

Copy the data tuple with the specified domain and return the tuple list.

@param data [DataTuple]

target data tuple

@param domain [String]

new domain of the copied data tuple

@return [DataTuple]

new data tuple with the domain or nil
# File lib/pione/rule-engine/flow-handler.rb, line 102
def copy_data_into_domain(data, domain)
  return nil unless data
  new_data = data.clone.tap {|x| x.domain = domain}
  write(new_data)
  return new_data
end
execute() click to toggle source

Start to process flow elements.

@return [void]

# File lib/pione/rule-engine/flow-handler.rb, line 8
def execute
  # restore data tuples from domain_location
  restore_data_tuples_from_domain_location

  # start rule application
  rule_set = @rule_definition.flow_context.eval(@env)
  RuleApplication.new(self).apply(rule_set.rules.pieces)

  # find outputs
  outputs = find_outputs_from_space

  # lift output data from child domains to this domain
  lift_output_data(outputs)

  # check output validation
  validate_outputs(outputs)

  return outputs
end
lift_output_data(outputs) click to toggle source

Lift output data from this domains to parent domain.

@return [void]

# File lib/pione/rule-engine/flow-handler.rb, line 44
def lift_output_data(outputs)
  # we cannot lift up if caller id is unknown
  # NOTE: this is the case that we process root rule
  return if @caller_id.nil?

  outputs.flatten.compact.inject([]) do |lifted, output|
    old_location = output.location
    new_location = make_output_location(output.name)
    unless new_location == old_location or lifted.include?(old_location)
      if old_location.exist?
        # move data from old to new
        begin
          old_location.turn(new_location)
        rescue ArgumentError
          # ignore the error because this is the case that old and new is same file
        end
        # sync cache if the old is cached in this machine
        System::FileCache.sync(old_location, new_location)
        # write lift tuple
        write(TupleSpace::LiftTuple.new(old_location, new_location))
        # push history
        lifted << old_location
      end
    end
    lifted
  end
end
remove_data_from_domain(data, domain) click to toggle source

Remove the data from the domain.

# File lib/pione/rule-engine/flow-handler.rb, line 110
def remove_data_from_domain(data, domain)
  take!(TupleSpace::DataTuple.new(name: data.name, domain: domain))
end
remove_finished_tuple(domain) click to toggle source

Remove finished tuple.

@param domain [String]

domain of the finished tuple

@return [void]

# File lib/pione/rule-engine/flow-handler.rb, line 90
def remove_finished_tuple(domain)
  take!(TupleSpace::FinishedTuple.new(domain: domain))
end
restore_data_tuples_from_domain_location() click to toggle source

Restore data tuples from the domain location. This reads files in the location and write it as data tuples.

# File lib/pione/rule-engine/flow-handler.rb, line 30
def restore_data_tuples_from_domain_location
  if @domain_location.exist?
    @domain_location.file_entries.each do |file|
      # ignore dot files
      unless file.basename[0] == "."
        write(TupleSpace::DataTuple.new(@domain_id, file.basename, file, file.mtime))
      end
    end
  end
end
touch_data_in_domain(data, domain) click to toggle source
# File lib/pione/rule-engine/flow-handler.rb, line 114
def touch_data_in_domain(data, domain)
  if target = read!(TupleSpace::DataTuple.new(name: data.name, domain: domain))
    data = target
  end
  new_data = data.clone.tap {|x| x.domain = domain; x.time = Time.now}
  write(new_data)
end
validate_outputs(outputs) click to toggle source

Validate outputs.

# File lib/pione/rule-engine/flow-handler.rb, line 73
def validate_outputs(outputs)
  _outputs = outputs.flatten.compact
  @rule_condition.outputs.each do |condition|
    _condition = condition.eval(@env)
    c1 = _condition.accept_nonexistence?
    c2 = _outputs.any?{|output| _condition.match?(output.name)}
    unless c1 or c2
      raise InvalidOutputError.new(self, outputs)
    end
  end
end