class Embulk::Filter::EvalFilterPlugin

Constants

VERSION

Public Class Methods

out_schema(out_columns, in_schema) click to toggle source
# File lib/embulk/filter/eval.rb, line 22
def self.out_schema(out_columns, in_schema)
  schema = out_columns.map.with_index do |name, i|
    sch = in_schema.find { |sch| sch.name == name }

    unless sch
      raise NotFoundOutSchema, "Not found output schema: `#{name}'"
    end

    Embulk::Column.new(index: i, name: sch.name, type: sch.type, format: sch.format)
  end
  schema.empty? ? in_schema : schema
end
transaction(config, in_schema) { |task, out_schema| ... } click to toggle source
# File lib/embulk/filter/eval.rb, line 10
def self.transaction(config, in_schema, &control)
  # configuration code:
  task = {
    "eval_columns" => config.param("eval_columns", :array, default: []),
    "out_columns" => config.param("out_columns", :array, default: [])
  }

  out_schema = out_schema(task['out_columns'], in_schema)

  yield(task, out_schema)
end

Public Instance Methods

add(page) click to toggle source
# File lib/embulk/filter/eval.rb, line 42
def add(page)
  page.each do |record|
    begin
      record = hash_record(record)

      result = {}

      record.each do |key, value|
        source = @table.find do |t|
          t.key?(key)
        end

        if source && source = source[key]
          result[key] = eval(source)
        else
          result[key] = value
        end
      end

      page_builder.add(result.values)
    rescue
    end
  end
end
close() click to toggle source
# File lib/embulk/filter/eval.rb, line 39
def close
end
finish() click to toggle source
# File lib/embulk/filter/eval.rb, line 67
def finish
  page_builder.finish
end
hash_record(record) click to toggle source
# File lib/embulk/filter/eval.rb, line 71
def hash_record(record)
  Hash[in_schema.names.zip(record)]
end
init() click to toggle source
# File lib/embulk/filter/eval.rb, line 35
def init
  @table = task["eval_columns"]
end