class Fluent::Plugin::SQLOutput::TableElement

TODO: Merge SQLInput's TableElement

Attributes

model[R]
pattern[R]

Public Class Methods

new(pattern, log, enable_fallback) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_sql.rb, line 54
def initialize(pattern, log, enable_fallback)
  super()
  @pattern = Fluent::MatchPattern.create(pattern)
  @log = log
  @enable_fallback = enable_fallback
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_sql.rb, line 61
def configure(conf)
  super

  @mapping = parse_column_mapping(@column_mapping)
  @format_proc = Proc.new { |record|
    new_record = {}
    @mapping.each { |k, c|
      new_record[c] = record[k]
    }
    new_record
  }
end
import(chunk, output) click to toggle source
# File lib/fluent/plugin/out_sql.rb, line 91
def import(chunk, output)
  tag = chunk.metadata.tag
  records = []
  chunk.msgpack_each { |time, data|
    begin
      data = output.inject_values_to_record(tag, time, data)
      records << @model.new(@format_proc.call(data))
    rescue => e
      args = {error: e, table: @table, record: Yajl.dump(data)}
      @log.warn "Failed to create the model. Ignore a record:", args
    end
  }
  begin
    @model.import(records)
  rescue ActiveRecord::StatementInvalid, ActiveRecord::Import::MissingColumnError => e
    if @enable_fallback
      # ignore other exceptions to use Fluentd retry mechanizm
      @log.warn "Got deterministic error. Fallback to one-by-one import", error: e
      one_by_one_import(records)
    else
      @log.warn "Got deterministic error. Fallback is disabled", error: e
      raise e
    end
  end
end
init(base_model) click to toggle source
# File lib/fluent/plugin/out_sql.rb, line 74
def init(base_model)
  # See SQLInput for more details of following code
  table_name = @table
  @model = Class.new(base_model) do
    self.table_name = table_name
    self.inheritance_column = '_never_use_output_'
  end

  class_name = table_name.singularize.camelize
  base_model.const_set(class_name, @model)
  model_name = ActiveModel::Name.new(@model, nil, class_name)
  @model.define_singleton_method(:model_name) { model_name }

  # TODO: check column_names and table schema
  # @model.column_names
end
one_by_one_import(records) click to toggle source
# File lib/fluent/plugin/out_sql.rb, line 117
def one_by_one_import(records)
  records.each { |record|
    retries = 0
    begin
      @model.import([record])
    rescue ActiveRecord::StatementInvalid, ActiveRecord::Import::MissingColumnError => e
      @log.error "Got deterministic error again. Dump a record", error: e, record: record
    rescue => e
      retries += 1
      if retries > @num_retries
        @log.error "Can't recover undeterministic error. Dump a record", error: e, record: record
        next
      end

      @log.warn "Failed to import a record: retry number = #{retries}", error: e
      sleep 0.5
      retry
    end
  }
end

Private Instance Methods

parse_column_mapping(column_mapping_conf) click to toggle source
# File lib/fluent/plugin/out_sql.rb, line 140
def parse_column_mapping(column_mapping_conf)
  mapping = {}
  column_mapping_conf.split(',').each { |column_map|
    key, column = column_map.strip.split(':', 2)
    column = key if column.nil?
    mapping[key] = column
  }
  mapping
end