class Fluent::Plugin::SQLOutput

Attributes

tables[RW]

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_sql.rb, line 151
def initialize
  super
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_sql.rb, line 155
def configure(conf)
  compat_parameters_convert(conf, :inject, :buffer)

  super

  if remove_tag_prefix = conf['remove_tag_prefix']
    @remove_tag_prefix = Regexp.new('^' + Regexp.escape(remove_tag_prefix))
  end

  @tables = []
  @default_table = nil
  conf.elements.select { |e|
    e.name == 'table'
  }.each { |e|
    te = TableElement.new(e.arg, log, @enable_fallback)
    te.configure(e)
    if e.arg.empty?
      $log.warn "Detect duplicate default table definition" if @default_table
      @default_table = te
    else
      @tables << te
    end
  }

  if @pool < @buffer_config.flush_thread_count
    log.warn "connection pool size is smaller than buffer's flush_thread_count. Recommend to increase pool value", :pool => @pool, :flush_thread_count => @buffer_config.flush_thread_count
  end

  if @default_table.nil?
    raise Fluent::ConfigError, "There is no default table. <table> is required in sql output"
  end
end
formatted_to_msgpack_binary() click to toggle source
# File lib/fluent/plugin/out_sql.rb, line 222
def formatted_to_msgpack_binary
  true
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_sql.rb, line 218
def shutdown
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_sql.rb, line 188
def start
  super

  config = {
    adapter: @adapter,
    host: @host,
    port: @port,
    database: @database,
    username: @username,
    password: @password,
    socket: @socket,
    schema_search_path: @schema_search_path,
    pool: @pool,
    timeout: @timeout,
  }

  @base_model = Class.new(ActiveRecord::Base) do
    self.abstract_class = true
  end

  SQLOutput.const_set("BaseModel_#{rand(1 << 31)}", @base_model)
  ActiveRecord::Base.establish_connection(config)

  # ignore tables if TableElement#init failed
  @tables.reject! do |te|
    init_table(te, @base_model)
  end
  init_table(@default_table, @base_model)
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_sql.rb, line 226
def write(chunk)
  ActiveRecord::Base.connection_pool.with_connection do

    @tables.each { |table|
      tag = format_tag(chunk.metadata.tag)
      if table.pattern.match(tag)
        return table.import(chunk, self)
      end
    }
    @default_table.import(chunk, self)
  end
end

Private Instance Methods

format_tag(tag) click to toggle source
# File lib/fluent/plugin/out_sql.rb, line 253
def format_tag(tag)
  if tag && @remove_tag_prefix
    tag.gsub(@remove_tag_prefix, '')
  else
    tag
  end
end
init_table(te, base_model) click to toggle source
# File lib/fluent/plugin/out_sql.rb, line 241
def init_table(te, base_model)
  begin
    te.init(base_model)
    log.info "Selecting '#{te.table}' table"
    false
  rescue => e
    log.warn "Can't handle '#{te.table}' table. Ignoring.", error: e
    log.warn_backtrace e.backtrace
    true
  end
end