class Embulk::Filter::Pherialize

Public Class Methods

transaction(config, in_schema) { |task, out_schema| ... } click to toggle source
# File lib/embulk/filter/pherialize.rb, line 9
def self.transaction(config, in_schema, &control)
  task = {
    'serialized_column'      => config.param('serialized_column', :string),
    'extract_fields'         => config.param('extract_fields', :array, default: []),
    'drop_serialized_column' => config.param('drop_serialized_column', :bool, default: false),
  }
  index = 0
  out_schema = in_schema.sort_by(&:index).reduce([]) do |mem, col|
    next mem if task['drop_serialized_column'] && col.name == task['serialized_column']
    mem << col.tap do |c|
      c.index = index
      index += 1
    end
    mem
  end
  size = out_schema.size
  out_schema += task['extract_fields'].map.each_with_index do |f, i|
    name = (in_schema.names.include? f['name']) ? "_#{f['name']}" : f['name']
    Column.new(size + i, name, f['type'].to_sym)
  end
  yield(task, out_schema)
end

Public Instance Methods

add(page) click to toggle source
# File lib/embulk/filter/pherialize.rb, line 41
def add(page)
  target = page.schema.find { |s| s.name == @serialized_column }
  page.each do |record|
    serialized = @drop_serialized_column ? record.delete_at(target.index) : record[target.index]
    data = PHP.unserialize(serialized)
    result = @extract_fields.map { |f| data[f['name']] }
    page_builder.add(record + result)
  end
end
close() click to toggle source
# File lib/embulk/filter/pherialize.rb, line 38
def close
end
finish() click to toggle source
# File lib/embulk/filter/pherialize.rb, line 51
def finish
  page_builder.finish
end
init() click to toggle source
# File lib/embulk/filter/pherialize.rb, line 32
def init
  @serialized_column = task['serialized_column']
  @extract_fields    = task['extract_fields']
  @drop_serialized_column = task['drop_serialized_column']
end