class Embulk::Parser::MysqldumpTab

Constants

DUMMY_STRING
FIELDS_ENCLOSED_BY
FIELDS_ESCAPED_BY
FIELDS_TERMINATED_BY
LINES_TERMINATED_BY

Public Class Methods

transaction(config) { |task, columns| ... } click to toggle source
# File lib/embulk/parser/mysqldump_tab.rb, line 14
def self.transaction(config, &control)
  # configuration code:
  parser_task = config.load_config(Java::LineDecoder::DecoderTask)

  task = {
    "decoder_task" => DataSource.from_java(parser_task.dump)
  }

  # see https://github.com/treasure-data/embulk-input-jira/blob/master/lib/embulk/input/jira.rb#L22
  attributes = {}
  columns = config.param(:columns, :array).map do |column|
    name = column["name"]
    type = column["type"].to_sym
    attributes[name] = type
    Column.new(nil, name, type, column["format"])
  end

  task[:attributes] = attributes
  task[:columns] = columns

  yield(task, columns)
end

Public Instance Methods

init() click to toggle source
# File lib/embulk/parser/mysqldump_tab.rb, line 37
def init
  # initialization code:
  @decoder_task = task.param("decoder_task", :hash).load_task(Java::LineDecoder::DecoderTask)
end
run(file_input) click to toggle source
# File lib/embulk/parser/mysqldump_tab.rb, line 42
def run(file_input)
  decoder = Java::LineDecoder.new(file_input.instance_eval { @java_file_input }, @decoder_task)

  while decoder.nextFile
    buffer = ''
    while line = decoder.poll
      buffer = buffer + line
      if in_column?(line)
        buffer = buffer.gsub(/#{Regexp.escape(FIELDS_ESCAPED_BY)}/, LINES_TERMINATED_BY)
        next
      end
      cols = parse_line(buffer)
      page_builder.add(cols)
      buffer = ''
    end

    # When output has not ended
    if buffer.length > 0
        cols = parse_line(buffer)
        page_builder.add(cols)
    end
  end

  page_builder.finish
end

Private Instance Methods

cast_value(val, col) click to toggle source
# File lib/embulk/parser/mysqldump_tab.rb, line 92
def cast_value(val, col)
  type = col["type"]
  fmt  = col["format"]

  case type
  when "string"
    val
  when "long"
    val.to_i
  when "double"
    val.to_f
  when "json"
    val
  when "boolean"
    if kind_of_boolean?(val)
      val
    elsif val.nil? || val.empty?
      nil
    elsif val.kind_of?(String)
      ["yes", "true", "1"].include?(val.downcase)
    elsif val.kind_of?(Numeric)
      !val.zero?
    else
      !!val
    end
  when "timestamp"
    if val.nil? || val.empty?
      nil
    else
      begin
        Time.strptime(val, fmt)
      rescue ArgumentError => e
        #raise DataParseError.new e
        nil
      end
    end
  else
    raise "Unsupported type #{type}"
  end
end
in_column?(line) click to toggle source
# File lib/embulk/parser/mysqldump_tab.rb, line 80
def in_column?(line)
  /#{Regexp.escape(FIELDS_ESCAPED_BY)}$/.match(line) ? true : false # escaped new line
end
kind_of_boolean?(val) click to toggle source
# File lib/embulk/parser/mysqldump_tab.rb, line 133
def kind_of_boolean?(val)
  val.kind_of?(TrueClass) || val.kind_of?(FalseClass)
end
make_record(arr) click to toggle source

see github.com/takumakanari/embulk-parser-json/blob/master/lib/embulk/parser/jsonpath.rb#L43

# File lib/embulk/parser/mysqldump_tab.rb, line 85
def make_record(arr)
  columns = @task[:columns]
  record = columns.map.with_index do |col, i|
    val  = cast_value(arr[i], col)
  end
end
parse_line(line) click to toggle source
# File lib/embulk/parser/mysqldump_tab.rb, line 69
def parse_line(line)
  # Escape "escaped TAB" temporarily
  line = line.gsub(/\\#{FIELDS_TERMINATED_BY}/, DUMMY_STRING)

  # Split with separator (TAB)
  cols = line.split(FIELDS_TERMINATED_BY)
  cols.map! { |item| item.gsub(/#{DUMMY_STRING}/, FIELDS_TERMINATED_BY) }

  cols = make_record(cols)
end