class Embulk::Guess::CsvGuessPlugin
Constants
- COMMENT_LINE_MARKER_CANDIDATES
- DELIMITER_CANDIDATES
- ESCAPE_CANDIDATES
- MAX_SKIP_LINES
- NO_SKIP_DETECT_LINES
- NULL_STRING_CANDIDATES
- QUOTE_CANDIDATES
Public Instance Methods
guess_lines(config, sample_lines)
click to toggle source
# File lib/embulk/guess/csv.rb, line 35 def guess_lines(config, sample_lines) return {} unless config.fetch("parser", {}).fetch("type", "csv") == "csv" parser_config = config["parser"] || {} if parser_config["type"] == "csv" && parser_config["delimiter"] delim = parser_config["delimiter"] else delim = guess_delimiter(sample_lines) unless delim # assuming single column CSV delim = DELIMITER_CANDIDATES.first end end parser_guessed = DataSource.new.merge(parser_config).merge({"type" => "csv", "delimiter" => delim}) unless parser_guessed.has_key?("quote") quote = guess_quote(sample_lines, delim) unless quote if !guess_force_no_quote(sample_lines, delim, '"') # assuming CSV follows RFC for quoting quote = '"' else # disable quoting (set null) end end parser_guessed["quote"] = quote end parser_guessed["quote"] = '"' if parser_guessed["quote"] == '' # setting '' is not allowed any more. this line converts obsoleted config syntax to explicit syntax. unless parser_guessed.has_key?("escape") if quote = parser_guessed["quote"] escape = guess_escape(sample_lines, delim, quote) unless escape if quote == '"' # assuming this CSV follows RFC for escaping escape = '"' else # disable escaping (set null) end end parser_guessed["escape"] = escape else # escape does nothing if quote is disabled end end unless parser_guessed.has_key?("null_string") null_string = guess_null_string(sample_lines, delim) parser_guessed["null_string"] = null_string if null_string # don't even set null_string to avoid confusion of null and 'null' in YAML format end # guessing skip_header_lines should be before guessing guess_comment_line_marker # because lines supplied to CsvTokenizer already don't include skipped header lines. # skipping empty lines is also disabled here because skipping header lines is done by # CsvParser which doesn't skip empty lines automatically sample_records = split_lines(parser_guessed, false, sample_lines, delim, {}) skip_header_lines = guess_skip_header_lines(sample_records) sample_lines = sample_lines[skip_header_lines..-1] sample_records = sample_records[skip_header_lines..-1] unless parser_guessed.has_key?("comment_line_marker") comment_line_marker, sample_lines = guess_comment_line_marker(sample_lines, delim, parser_guessed["quote"], parser_guessed["null_string"]) if comment_line_marker parser_guessed["comment_line_marker"] = comment_line_marker end end sample_records = split_lines(parser_guessed, true, sample_lines, delim, {}) # It should fail if CSV parser cannot parse sample_lines. if sample_records.nil? || sample_records.empty? return {} end if sample_lines.size == 1 # The file contains only 1 line. Assume that there are no header line. header_line = false column_types = SchemaGuess.types_from_array_records(sample_records[0, 1]) unless parser_guessed.has_key?("trim_if_not_quoted") sample_records_trimmed = split_lines(parser_guessed, true, sample_lines, delim, {"trim_if_not_quoted" => true}) column_types_trimmed = SchemaGuess.types_from_array_records(sample_records_trimmed) if column_types != column_types_trimmed parser_guessed["trim_if_not_quoted"] = true column_types = column_types_trimmed else parser_guessed["trim_if_not_quoted"] = false end end else # The file contains more than 1 line. If guessed first line's column types are all strings or boolean, and the types are # different from the other lines, assume that the first line is column names. first_types = SchemaGuess.types_from_array_records(sample_records[0, 1]) other_types = SchemaGuess.types_from_array_records(sample_records[1..-1] || []) unless parser_guessed.has_key?("trim_if_not_quoted") sample_records_trimmed = split_lines(parser_guessed, true, sample_lines, delim, {"trim_if_not_quoted" => true}) other_types_trimmed = SchemaGuess.types_from_array_records(sample_records_trimmed[1..-1] || []) if other_types != other_types_trimmed parser_guessed["trim_if_not_quoted"] = true other_types = other_types_trimmed else parser_guessed["trim_if_not_quoted"] = false end end header_line = (first_types != other_types && first_types.all? {|t| ["string", "boolean"].include?(t) }) || guess_string_header_line(sample_records) column_types = other_types end if column_types.empty? # TODO here is making the guessing failed if the file doesn't contain any columns. However, # this may not be convenient for users. return {} end if header_line parser_guessed["skip_header_lines"] = skip_header_lines + 1 else parser_guessed["skip_header_lines"] = skip_header_lines end parser_guessed["allow_extra_columns"] = false unless parser_guessed.has_key?("allow_extra_columns") parser_guessed["allow_optional_columns"] = false unless parser_guessed.has_key?("allow_optional_columns") if header_line column_names = sample_records.first.map(&:strip) else column_names = (0..column_types.size).to_a.map {|i| "c#{i}" } end schema = [] column_names.zip(column_types).each do |name,type| if name && type schema << new_column(name, type) end end parser_guessed["columns"] = schema return {"parser" => parser_guessed} end
new_column(name, type)
click to toggle source
# File lib/embulk/guess/csv.rb, line 180 def new_column(name, type) if type.is_a?(SchemaGuess::TimestampTypeMatch) {"name" => name, "type" => type, "format" => type.format} else {"name" => name, "type" => type} end end
Private Instance Methods
array_avg(array)
click to toggle source
# File lib/embulk/guess/csv.rb, line 359 def array_avg(array) array.inject(0.0) {|r,i| r += i } / array.size end
array_standard_deviation(array)
click to toggle source
# File lib/embulk/guess/csv.rb, line 368 def array_standard_deviation(array) Math.sqrt(array_variance(array)) end
array_sum(array)
click to toggle source
# File lib/embulk/guess/csv.rb, line 355 def array_sum(array) array.inject(0) {|r,i| r += i } end
array_variance(array)
click to toggle source
# File lib/embulk/guess/csv.rb, line 363 def array_variance(array) avg = array_avg(array) array.inject(0.0) {|r,i| r += (i - avg) ** 2 } / array.size end
guess_comment_line_marker(sample_lines, delim, quote, null_string)
click to toggle source
# File lib/embulk/guess/csv.rb, line 317 def guess_comment_line_marker(sample_lines, delim, quote, null_string) exclude = [] exclude << /^#{Regexp.escape(quote)}/ if quote && !quote.empty? exclude << /^#{Regexp.escape(null_string)}(?:#{Regexp.escape(delim)}|$)/ if null_string guessed = COMMENT_LINE_MARKER_CANDIDATES.map do |str| regexp = /^#{Regexp.quote(str)}/ unmatch_lines = sample_lines.reject do |line| exclude.all? {|ex| line !~ ex } && line =~ regexp end match_count = sample_lines.size - unmatch_lines.size [str, match_count, unmatch_lines] end.select {|str,match_count,unmatch_lines| match_count > 0 }.sort_by {|str,match_count,unmatch_lines| -match_count } str, match_count, unmatch_lines = guessed.first if str return str, unmatch_lines else return nil, sample_lines end end
guess_delimiter(sample_lines)
click to toggle source
# File lib/embulk/guess/csv.rb, line 228 def guess_delimiter(sample_lines) delim_weights = DELIMITER_CANDIDATES.map do |d| counts = sample_lines.map {|line| line.count(d) } total = array_sum(counts) if total > 0 stddev = array_standard_deviation(counts) stddev = 0.000000001 if stddev == 0.0 weight = total / stddev [d, weight] else [nil, 0] end end delim, weight = *delim_weights.sort_by {|d,weight| weight }.last if delim != nil && weight > 1 return delim else return nil end end
guess_escape(sample_lines, delim, quote)
click to toggle source
# File lib/embulk/guess/csv.rb, line 284 def guess_escape(sample_lines, delim, quote) guessed = ESCAPE_CANDIDATES.map do |str| regexp = /#{Regexp.quote(str)}(?:#{Regexp.quote(delim)}|#{Regexp.quote(quote)})/ counts = sample_lines.map {|line| line.scan(regexp).count } count = counts.inject(0) {|r,c| r + c } [str, count] end.select {|str,count| count > 0 }.sort_by {|str,count| -count } found = guessed.first return found ? found[0] : nil end
guess_force_no_quote(sample_lines, delim, quote_candidate)
click to toggle source
# File lib/embulk/guess/csv.rb, line 275 def guess_force_no_quote(sample_lines, delim, quote_candidate) delim_regexp = Regexp.escape(delim) q_regexp = Regexp.escape(quote_candidate) sample_lines.any? do |line| # quoting character appear at the middle of a non-quoted value line =~ /(?:\A|#{delim_regexp})\s*[^#{q_regexp}]+#{q_regexp}/ end end
guess_null_string(sample_lines, delim)
click to toggle source
# File lib/embulk/guess/csv.rb, line 295 def guess_null_string(sample_lines, delim) guessed = NULL_STRING_CANDIDATES.map do |str| regexp = /(?:^|#{Regexp.quote(delim)})#{Regexp.quote(str)}(?:$|#{Regexp.quote(delim)})/ counts = sample_lines.map {|line| line.scan(regexp).count } count = counts.inject(0) {|r,c| r + c } [str, count] end.select {|str,count| count > 0 }.sort_by {|str,count| -count } found_str, found_count = guessed.first return found_str ? found_str : nil end
guess_quote(sample_lines, delim)
click to toggle source
# File lib/embulk/guess/csv.rb, line 250 def guess_quote(sample_lines, delim) delim_regexp = Regexp.escape(delim) quote_weights = QUOTE_CANDIDATES.map do |q| weights = sample_lines.map do |line| q_regexp = Regexp.escape(q) count = line.count(q) if count > 0 weight = count weight += line.scan(/(?:\A|#{delim_regexp})\s*#{q_regexp}(?:(?!#{q_regexp}).)*\s*#{q_regexp}(?:$|#{delim_regexp})/).size * 20 weight += line.scan(/(?:\A|#{delim_regexp})\s*#{q_regexp}(?:(?!#{delim_regexp}).)*\s*#{q_regexp}(?:$|#{delim_regexp})/).size * 40 weight else nil end end.compact weights.empty? ? 0 : array_avg(weights) end quote, weight = QUOTE_CANDIDATES.zip(quote_weights).sort_by {|q,w| w }.last if weight >= 10.0 return quote else return nil end end
guess_skip_header_lines(sample_records)
click to toggle source
# File lib/embulk/guess/csv.rb, line 306 def guess_skip_header_lines(sample_records) counts = sample_records.map {|records| records.size } (1..[MAX_SKIP_LINES, counts.length - 1].min).each do |i| check_row_count = counts[i-1] if counts[i, NO_SKIP_DETECT_LINES].all? {|c| c <= check_row_count } return i - 1 end end return 0 end
guess_string_header_line(sample_records)
click to toggle source
# File lib/embulk/guess/csv.rb, line 339 def guess_string_header_line(sample_records) first = sample_records.first first.count.times do |column_index| lengths = sample_records.map {|row| row[column_index] }.compact.map {|v| v.to_s.size } if lengths.size > 1 if array_variance(lengths[1..-1]) <= 0.2 avg = array_avg(lengths[1..-1]) if avg == 0.0 ? lengths[0] > 1 : (avg - lengths[0]).abs / avg > 0.7 return true end end end end return false end
split_lines(parser_config, skip_empty_lines, sample_lines, delim, extra_config)
click to toggle source
# File lib/embulk/guess/csv.rb, line 190 def split_lines(parser_config, skip_empty_lines, sample_lines, delim, extra_config) null_string = parser_config["null_string"] config = parser_config.merge(extra_config).merge({"charset" => "UTF-8", "columns" => []}) parser_task = config.load_config(org.embulk.standards.CsvParserPlugin::PluginTask) data = sample_lines.map {|line| line.force_encoding('UTF-8') }.join(parser_task.getNewline.getString.encode('UTF-8')) sample = Buffer.from_ruby_string(data) decoder = Java::LineDecoder.new(Java::ListFileInput.new([[sample.to_java]]), parser_task) tokenizer = org.embulk.standards.CsvTokenizer.new(decoder, parser_task) rows = [] while tokenizer.nextFile while tokenizer.nextRecord(skip_empty_lines) begin columns = [] while true begin column = tokenizer.nextColumn quoted = tokenizer.wasQuotedColumn if null_string && !quoted && column == null_string column = nil end columns << column rescue org.embulk.standards.CsvTokenizer::TooFewColumnsException rows << columns break end end rescue org.embulk.standards.CsvTokenizer::InvalidValueException # TODO warning tokenizer.skipCurrentLine end end end return rows rescue # TODO warning if fallback to this ad-hoc implementation sample_lines.map {|line| line.split(delim) } end