class Pione::RuleEngine::BasicHandler

BasicHandler is a base class for rule handlers.

Attributes

base_location[R]
caller_id[R]
digest[R]
domain_id[R]
domain_location[R]
dry_run[R]
env[R]
inputs[R]
outputs[R]
package_id[R]
param_set[R]
plain_env[R]
rule_condition[R]
rule_definition[R]
rule_name[R]

Public Class Methods

new(param) click to toggle source

Create a new handler for rule.

@param [Hash] param

see `RuleEngine.make`
# File lib/pione/rule-engine/basic-handler.rb, line 28
  def initialize(param)
    ### set tuple space server
    set_tuple_space(param[:tuple_space])

    ### set informations
    @plain_env = param[:env]
    @env = setup_env(param[:env], param[:param_set])
    @package_id = param[:package_id]
    @rule_name = param[:rule_name]
    @rule_definition = param[:rule_definition]
    @rule_condition = eval_rule_condition()
    @inputs = param[:inputs]
    @outputs = []
    @param_set = param[:param_set]
    @digest = Util::TaskDigest.generate(@package_id, @rule_name, @inputs, @param_set)
    @base_location = read!(TupleSpace::BaseLocationTuple.any).location
    @dry_run = begin read!(TupleSpace::DryRunTuple.any).availability rescue false end
    @domain_id = param[:domain_id]
    @domain_location = make_location("", @domain_id)
    @caller_id = param[:caller_id]
    @request_from = param[:request_from]
    @session_id = param[:session_id]
    @client_ui = param[:client_ui]
  end

  # Handle the rule and return the outputs.
  #
  # @return [Boolean]
  #   true if rule execution has succeeded, or false
  def handle
    # make rule and task process log
    process_log(make_task_process_record.merge(transition: "start"))
    process_log(make_rule_process_record.merge(transition: "start"))

    # show begin messages
    user_message(@digest, 0, "==>")
    debug_message("caller: %s" % @caller_id)

    unless @domain_id == "root"
      # save domain log
      Log::DomainLog.new(self).save

      # save a domain dump file
      domain_dump_location = @working_directory ? @working_directory.location : @domain_location
      System::DomainDump.new(env.dumpable).write(domain_dump_location)
    end

    # execute the rule
    outputs = execute

    # publish outputs and finished
    begin
      outputs.flatten.compact.each {|output| write(output)}
      write(TupleSpace::FinishedTuple.new(@domain_id, Util::UUID.generate, :succeeded, outputs))
    rescue Rinda::RedundantTupleError
      write(TupleSpace::FinishedTuple.new(@domain_id, Util::UUID.generate, :error, outputs))
    end

    # show end message
    show_outputs(outputs)
    user_message(@digest, 0, "<==")

    # put rule and task process log
    process_log(make_rule_process_record.merge(transition: "complete"))
    process_log(make_task_process_record.merge(transition: "complete"))

    return true

  rescue Object => e
    user_message("ERROR: " + e.message, 0, "info", :red)
    status = System::Status.error(message: e.message, exception: e)
    write(TupleSpace::CommandTuple.new(name: "terminate", args: [status]))
    return false
  end

  # Executes the rule.
  def execute
    raise NotImplementError
  end

  def eval_rule_condition()
    rule_condition = @rule_definition.rule_condition_context.eval(@env)

    # change to touch operation if the definition is empty rule
    # if @rule_definition.kind_of?(Lang::EmptyRuleDefinition)
    #   rule_condition.outputs.each do |output|
    #     if output.operation == :write
    #       output.set(operation: :touch)
    #     end
    #   end
    # end

    return rule_condition
  end

  # Make location by data name and the domain.
  #
  # @param name [String]
  #   data name
  # @param domain [String]
  #   domain of the data
  # @return [BasicLocation]
  #   the location
  def make_location(name, domain_id)
    if domain_id == "root"
      return @base_location + "./output/%s" % name
    else
      # make relative path
      pakcage_id, rule_name, task_id = domain_id.split(":")
      path = "./.%s/%s/%s/%s" % [package_id, rule_name, task_id, name]

      # make location
      return @base_location + path
    end
  end

  # Make output data location by the name.
  def make_output_location(name)
    # FIXME: maybe we should not lift output here
    return if @caller_id.nil?

    # get parent domain or root domain
    make_location(name, @caller_id)
  end

  # Make output tuple by the name.
  def make_output_tuple(data_expr)
    name = data_expr.first.name
    location = make_output_location(name)
    TupleSpace::DataTuple.new(name: name, domain: @domain_id, location: location, time: nil)
  end

  # Setup handler's environment. We make a new environment that is
  # introduced a new layer in top of the plain package environment, so we
  # can do any operations safety.
  def setup_env(env, param_set)
    # put new layer
    _env = env.layer
    # set current package id
    _env.set(current_package_id: package_id)
    # merge parameter set
    _env.merge_param_set(param_set)

    ### system environment
    # ENV.each do |key, value|
    #   @variable_table.set(Variable.new("ENV_" + key), PioneString.new(value))
    # end
  end

  # Find outputs from the domain space.
  #
  # @return [void]
  def find_outputs_from_space
    tuples = read_all(TupleSpace::DataTuple.new(domain: @domain_id))
    outputs = []

    @rule_condition.outputs.each_with_index do |condition, i|
      _condition = condition.eval(@env)
      case _condition.distribution
      when :all
        outputs[i] = tuples.find_all {|t| _condition.match(t.name)}
      when :each
        # FIXME
        outputs[i] = tuples.find_all {|t| _condition.match(t.name)}
      end

      # apply touch operation and push the result
      if new_tuples = apply_touch_operation(_condition, outputs[i])
        outputs[i] = new_tuples
      end

      # write data null if needed
      write_data_null(_condition, outputs[i], i)
    end

    return outputs
  end

  # Apply touch operation.
  def apply_touch_operation(condition, tuples)
    _condition = condition.eval(@env)
    if _condition.operation == :touch or (self.kind_of?(EmptyHandler) and _condition.operation == :write)
      if tuples.empty?
        create_data_by_touch_operation(_condition)
      else
        update_time_by_touch_operation(tuples)
      end
    end
  end

  def create_data_by_touch_operation(condition)
    # NOTE: touch operation applies first piece of data sequence now
    name = condition.pieces.first.pattern
    location = @domain_location + name
    # create a empty file
    location.create("") unless location.exist?
    # FIXME: write a touch tuple
    time = Time.now
    write(TupleSpace::TouchTuple.new(name: name, domain: @domain_id, time: time))
    # FIXME: create an output data tuple
    data_tuple = TupleSpace::DataTuple.new(name: name, domain: @domain_id, location: location, time: time)
    write(data_tuple)
    [data_tuple]
  end

  def update_time_by_touch_operation(tuples)
    fun = lambda do |tuple|
      time = Time.now
      new_data = TupleSpace::DataTuple.new(name: tuple.name, domain: @domain_id, location: tuple.location, time: time)
      write(TupleSpace::TouchTuple.new(name: tuple.name, domain: @domain_id, time: time))
      write(new_data)
      new_data
    end
    tuples.map do |tuple|
      take!(TupleSpace::DataTuple.new(name: tuple.name, domain: @domain_id)) ? fun.call(tuple) : tuple
    end
  end

  # Write a data null tuple if the output condition accepts nonexistence.
  def write_data_null(output, tuples, i)
    if output.accept_nonexistence? and tuples.nil?
      write(TupleSpace::DataNullTuple.new(domain: @domain_id, position: i))
    end
  end

  # Build rule process record.
  def make_rule_process_record
    Log::RuleProcessRecord.new.tap do |record|
      record.name = "&%s:%s" % [@package_id, @rule_name]
      record.rule_type = @rule_definition.rule_type
      if @caller
        caller_package_id, caller_rule_name, caller_task_id = @caller.split(":")
        record.caller = "&%s:%s" % [caller_package_id, caller_rule_name]
      end
    end
  end

  def make_task_process_record
    Log::TaskProcessRecord.new.tap do |record|
      record.name = @digest
      record.package_id = @package_id
      record.rule_name = @rule_name
      record.rule_type = @rule_definition.rule_type
      record.inputs = @inputs.flatten.map{|input| input.name}.join(",")
      record.parameters = @param_set.textize
    end
  end

  # Publish output data tuples.
  def publish_outputs(outputs)
    # output data
  rescue Rinda::RedundantTupleError
    write("finished")
  end

  # Show output tuples as message. This method is used for debugging only.
  def show_outputs(outputs)
    debug_message("Result of %s:" % @digest)
    if outputs
      outputs.each_with_index do |output, i|
        output.each_with_index do |t, ii|
          debug_message("[%s,%s] %s" % [i, ii, t.name], 1)
        end
      end
    else
      debug_message("no outputs", 1)
    end
  end

end

Public Instance Methods

apply_touch_operation(condition, tuples) click to toggle source

Apply touch operation.

# File lib/pione/rule-engine/basic-handler.rb, line 207
def apply_touch_operation(condition, tuples)
  _condition = condition.eval(@env)
  if _condition.operation == :touch or (self.kind_of?(EmptyHandler) and _condition.operation == :write)
    if tuples.empty?
      create_data_by_touch_operation(_condition)
    else
      update_time_by_touch_operation(tuples)
    end
  end
end
create_data_by_touch_operation(condition) click to toggle source
# File lib/pione/rule-engine/basic-handler.rb, line 218
def create_data_by_touch_operation(condition)
  # NOTE: touch operation applies first piece of data sequence now
  name = condition.pieces.first.pattern
  location = @domain_location + name
  # create a empty file
  location.create("") unless location.exist?
  # FIXME: write a touch tuple
  time = Time.now
  write(TupleSpace::TouchTuple.new(name: name, domain: @domain_id, time: time))
  # FIXME: create an output data tuple
  data_tuple = TupleSpace::DataTuple.new(name: name, domain: @domain_id, location: location, time: time)
  write(data_tuple)
  [data_tuple]
end
eval_rule_condition() click to toggle source
# File lib/pione/rule-engine/basic-handler.rb, line 108
def eval_rule_condition()
  rule_condition = @rule_definition.rule_condition_context.eval(@env)

  # change to touch operation if the definition is empty rule
  # if @rule_definition.kind_of?(Lang::EmptyRuleDefinition)
  #   rule_condition.outputs.each do |output|
  #     if output.operation == :write
  #       output.set(operation: :touch)
  #     end
  #   end
  # end

  return rule_condition
end
execute() click to toggle source

Executes the rule.

# File lib/pione/rule-engine/basic-handler.rb, line 104
def execute
  raise NotImplementError
end
find_outputs_from_space() click to toggle source

Find outputs from the domain space.

@return [void]

# File lib/pione/rule-engine/basic-handler.rb, line 180
def find_outputs_from_space
  tuples = read_all(TupleSpace::DataTuple.new(domain: @domain_id))
  outputs = []

  @rule_condition.outputs.each_with_index do |condition, i|
    _condition = condition.eval(@env)
    case _condition.distribution
    when :all
      outputs[i] = tuples.find_all {|t| _condition.match(t.name)}
    when :each
      # FIXME
      outputs[i] = tuples.find_all {|t| _condition.match(t.name)}
    end

    # apply touch operation and push the result
    if new_tuples = apply_touch_operation(_condition, outputs[i])
      outputs[i] = new_tuples
    end

    # write data null if needed
    write_data_null(_condition, outputs[i], i)
  end

  return outputs
end
handle() click to toggle source

Handle the rule and return the outputs.

@return [Boolean]

true if rule execution has succeeded, or false
# File lib/pione/rule-engine/basic-handler.rb, line 57
def handle
  # make rule and task process log
  process_log(make_task_process_record.merge(transition: "start"))
  process_log(make_rule_process_record.merge(transition: "start"))

  # show begin messages
  user_message(@digest, 0, "==>")
  debug_message("caller: %s" % @caller_id)

  unless @domain_id == "root"
    # save domain log
    Log::DomainLog.new(self).save

    # save a domain dump file
    domain_dump_location = @working_directory ? @working_directory.location : @domain_location
    System::DomainDump.new(env.dumpable).write(domain_dump_location)
  end

  # execute the rule
  outputs = execute

  # publish outputs and finished
  begin
    outputs.flatten.compact.each {|output| write(output)}
    write(TupleSpace::FinishedTuple.new(@domain_id, Util::UUID.generate, :succeeded, outputs))
  rescue Rinda::RedundantTupleError
    write(TupleSpace::FinishedTuple.new(@domain_id, Util::UUID.generate, :error, outputs))
  end

  # show end message
  show_outputs(outputs)
  user_message(@digest, 0, "<==")

  # put rule and task process log
  process_log(make_rule_process_record.merge(transition: "complete"))
  process_log(make_task_process_record.merge(transition: "complete"))

  return true

rescue Object => e
  user_message("ERROR: " + e.message, 0, "info", :red)
  status = System::Status.error(message: e.message, exception: e)
  write(TupleSpace::CommandTuple.new(name: "terminate", args: [status]))
  return false
end
make_location(name, domain_id) click to toggle source

Make location by data name and the domain.

@param name [String]

data name

@param domain [String]

domain of the data

@return [BasicLocation]

the location
# File lib/pione/rule-engine/basic-handler.rb, line 131
def make_location(name, domain_id)
  if domain_id == "root"
    return @base_location + "./output/%s" % name
  else
    # make relative path
    pakcage_id, rule_name, task_id = domain_id.split(":")
    path = "./.%s/%s/%s/%s" % [package_id, rule_name, task_id, name]

    # make location
    return @base_location + path
  end
end
make_output_location(name) click to toggle source

Make output data location by the name.

# File lib/pione/rule-engine/basic-handler.rb, line 145
def make_output_location(name)
  # FIXME: maybe we should not lift output here
  return if @caller_id.nil?

  # get parent domain or root domain
  make_location(name, @caller_id)
end
make_output_tuple(data_expr) click to toggle source

Make output tuple by the name.

# File lib/pione/rule-engine/basic-handler.rb, line 154
def make_output_tuple(data_expr)
  name = data_expr.first.name
  location = make_output_location(name)
  TupleSpace::DataTuple.new(name: name, domain: @domain_id, location: location, time: nil)
end
make_rule_process_record() click to toggle source

Build rule process record.

# File lib/pione/rule-engine/basic-handler.rb, line 254
def make_rule_process_record
  Log::RuleProcessRecord.new.tap do |record|
    record.name = "&%s:%s" % [@package_id, @rule_name]
    record.rule_type = @rule_definition.rule_type
    if @caller
      caller_package_id, caller_rule_name, caller_task_id = @caller.split(":")
      record.caller = "&%s:%s" % [caller_package_id, caller_rule_name]
    end
  end
end
make_task_process_record() click to toggle source
# File lib/pione/rule-engine/basic-handler.rb, line 265
def make_task_process_record
  Log::TaskProcessRecord.new.tap do |record|
    record.name = @digest
    record.package_id = @package_id
    record.rule_name = @rule_name
    record.rule_type = @rule_definition.rule_type
    record.inputs = @inputs.flatten.map{|input| input.name}.join(",")
    record.parameters = @param_set.textize
  end
end
publish_outputs(outputs) click to toggle source

Publish output data tuples.

# File lib/pione/rule-engine/basic-handler.rb, line 277
def publish_outputs(outputs)
  # output data
rescue Rinda::RedundantTupleError
  write("finished")
end
setup_env(env, param_set) click to toggle source

Setup handler’s environment. We make a new environment that is introduced a new layer in top of the plain package environment, so we can do any operations safety.

# File lib/pione/rule-engine/basic-handler.rb, line 163
def setup_env(env, param_set)
  # put new layer
  _env = env.layer
  # set current package id
  _env.set(current_package_id: package_id)
  # merge parameter set
  _env.merge_param_set(param_set)

  ### system environment
  # ENV.each do |key, value|
  #   @variable_table.set(Variable.new("ENV_" + key), PioneString.new(value))
  # end
end
show_outputs(outputs) click to toggle source

Show output tuples as message. This method is used for debugging only.

# File lib/pione/rule-engine/basic-handler.rb, line 284
def show_outputs(outputs)
  debug_message("Result of %s:" % @digest)
  if outputs
    outputs.each_with_index do |output, i|
      output.each_with_index do |t, ii|
        debug_message("[%s,%s] %s" % [i, ii, t.name], 1)
      end
    end
  else
    debug_message("no outputs", 1)
  end
end
update_time_by_touch_operation(tuples) click to toggle source
# File lib/pione/rule-engine/basic-handler.rb, line 233
def update_time_by_touch_operation(tuples)
  fun = lambda do |tuple|
    time = Time.now
    new_data = TupleSpace::DataTuple.new(name: tuple.name, domain: @domain_id, location: tuple.location, time: time)
    write(TupleSpace::TouchTuple.new(name: tuple.name, domain: @domain_id, time: time))
    write(new_data)
    new_data
  end
  tuples.map do |tuple|
    take!(TupleSpace::DataTuple.new(name: tuple.name, domain: @domain_id)) ? fun.call(tuple) : tuple
  end
end
write_data_null(output, tuples, i) click to toggle source

Write a data null tuple if the output condition accepts nonexistence.

# File lib/pione/rule-engine/basic-handler.rb, line 247
def write_data_null(output, tuples, i)
  if output.accept_nonexistence? and tuples.nil?
    write(TupleSpace::DataNullTuple.new(domain: @domain_id, position: i))
  end
end