class Embulk::Parser::Flexml
Public Class Methods
transaction(config) { |task, columns| ... }
click to toggle source
# File lib/embulk/parser/flexml.rb, line 10 def self.transaction(config, &control) schema = config.param("schema", :array) schema_serialized = schema.inject({}) do |memo, s| memo[s["name"]] = s if s["type"] == "timestamp" memo[s["name"]].merge!({ "format" => s["format"], "timezone" => s["timezone"] }) end memo end.to_h task = { :schema => schema_serialized, :root => config.param("root", :string) } columns = schema.each_with_index.map do |c, i| Column.new(i, c["name"], c["type"].to_sym) end yield(task, columns) end
Public Instance Methods
convert(val, config)
click to toggle source
# File lib/embulk/parser/flexml.rb, line 67 def convert(val, config) v = val.nil? ? "" : val case config["type"] when "string" v when "long" v.to_i when "double" v.to_f when "boolean" ["yes", "true", "1"].include?(v.downcase) when "timestamp" unless v.empty? dest = Time.strptime(v, config["format"]) return dest.utc if config["timezone"].nil? utc_offset = dest.utc_offset zone_offset = Time.zone_offset(config["timezone"]) dest.localtime(zone_offset) + utc_offset - zone_offset else nil end else raise "Unsupported type '#{type}'" end end
run(file_input)
click to toggle source
# File lib/embulk/parser/flexml.rb, line 32 def run(file_input) while file = file_input.next_file begin xml_text = file.read doc = Document.new(xml_text) doc.elements.each(@task[:root]) do |e| values = @task[:schema].map do |f, c| row = if c.has_key?("xpath") XPath.first(e, c["xpath"]) else e end unless row.nil? val = if c.has_key?("attribute") row.attributes[c["attribute"]] else row.text end convert(val, c) else nil end end @page_builder.add(values) end rescue Exception => e Embulk.logger.error "Failed to parse xml: #{e.message}" end end @page_builder.finish end