class Embulk::Parser::Ltsv
Public Class Methods
transaction(config) { |task, columns| ... }
click to toggle source
# File lib/embulk/parser/ltsv.rb, line 7 def self.transaction(config, &control) parser_task = config.load_config(Java::LineDecoder::DecoderTask) task = { "decoder_task" => DataSource.from_java(parser_task.dump), "schema" => config.param("schema", :array), "null_value_pattern" => config.param("null_value_pattern", :string, default: nil), "null_empty_string" => config.param("null_empty_string", :bool, default: false), "delimiter" => config.param("delimiter", :string, default: "\t"), "label_delimiter" => config.param("label_delimiter", :string, default: ":") } columns = task["schema"].each_with_index.map do |c, i| Column.new(i, c["name"], c["type"].to_sym) end yield(task, columns) end
Public Instance Methods
init()
click to toggle source
# File lib/embulk/parser/ltsv.rb, line 23 def init @delimiter = task["delimiter"] @label_delimiter = task["label_delimiter"] @null_value_pattern = task["null_value_pattern"] ? Regexp.new(task["null_value_pattern"]) : nil @null_empty_string = task["null_empty_string"] @decoder_task = task.param("decoder_task", :hash).load_task(Java::LineDecoder::DecoderTask) end
run(file_input)
click to toggle source
# File lib/embulk/parser/ltsv.rb, line 31 def run(file_input) decoder = Java::LineDecoder.new(file_input.instance_eval { @java_file_input }, @decoder_task) while decoder.nextFile while line = decoder.poll begin array = line.split(@delimiter).map { |pair| pair.split(@label_delimiter, 2) } @page_builder.add(make_record(Hash[*array.flatten])) rescue => e puts "\n#{e.message}\n#{e.backtrace.join("\n")}" end end end page_builder.finish end
Private Instance Methods
convert_value(e, c)
click to toggle source
# File lib/embulk/parser/ltsv.rb, line 57 def convert_value(e, c) v = convert_value_to_nil(e[c["name"]]) return nil if v.nil? case c["type"] when "string" v when "long" v.to_i when "double" v.to_f when "boolean" ["yes", "true", "1"].include?(v.downcase) when "timestamp" if v.empty? nil else c["time_format"] ? Time.strptime(v, c["time_format"]) : Time.parse(v) end else raise "Unsupported type #{c['type']}" end end
convert_value_to_nil(value)
click to toggle source
# File lib/embulk/parser/ltsv.rb, line 80 def convert_value_to_nil(value) if value and @null_empty_string value = (value == '') ? nil : value end if value and @null_value_pattern value = match_regexp(@null_value_pattern, value) ? nil : value end value end
make_record(e)
click to toggle source
# File lib/embulk/parser/ltsv.rb, line 51 def make_record(e) @task["schema"].map do |c| convert_value(e, c) end end
match_regexp(regexp, string)
click to toggle source
# File lib/embulk/parser/ltsv.rb, line 90 def match_regexp(regexp, string) begin return regexp.match(string) rescue ArgumentError => e raise e unless e.message.index("invalid byte sequence in".freeze).zero? log.info "invalid byte sequence is replaced in `#{string}`" string = string.scrub('?') retry end return true end