class Fluent::Plugin::SQLOutput
Attributes
tables[RW]
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_sql.rb, line 151 def initialize super end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_sql.rb, line 155 def configure(conf) compat_parameters_convert(conf, :inject, :buffer) super if remove_tag_prefix = conf['remove_tag_prefix'] @remove_tag_prefix = Regexp.new('^' + Regexp.escape(remove_tag_prefix)) end @tables = [] @default_table = nil conf.elements.select { |e| e.name == 'table' }.each { |e| te = TableElement.new(e.arg, log, @enable_fallback) te.configure(e) if e.arg.empty? $log.warn "Detect duplicate default table definition" if @default_table @default_table = te else @tables << te end } if @pool < @buffer_config.flush_thread_count log.warn "connection pool size is smaller than buffer's flush_thread_count. Recommend to increase pool value", :pool => @pool, :flush_thread_count => @buffer_config.flush_thread_count end if @default_table.nil? raise Fluent::ConfigError, "There is no default table. <table> is required in sql output" end end
formatted_to_msgpack_binary()
click to toggle source
# File lib/fluent/plugin/out_sql.rb, line 222 def formatted_to_msgpack_binary true end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_sql.rb, line 218 def shutdown super end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_sql.rb, line 188 def start super config = { adapter: @adapter, host: @host, port: @port, database: @database, username: @username, password: @password, socket: @socket, schema_search_path: @schema_search_path, pool: @pool, timeout: @timeout, } @base_model = Class.new(ActiveRecord::Base) do self.abstract_class = true end SQLOutput.const_set("BaseModel_#{rand(1 << 31)}", @base_model) ActiveRecord::Base.establish_connection(config) # ignore tables if TableElement#init failed @tables.reject! do |te| init_table(te, @base_model) end init_table(@default_table, @base_model) end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_sql.rb, line 226 def write(chunk) ActiveRecord::Base.connection_pool.with_connection do @tables.each { |table| tag = format_tag(chunk.metadata.tag) if table.pattern.match(tag) return table.import(chunk, self) end } @default_table.import(chunk, self) end end
Private Instance Methods
format_tag(tag)
click to toggle source
# File lib/fluent/plugin/out_sql.rb, line 253 def format_tag(tag) if tag && @remove_tag_prefix tag.gsub(@remove_tag_prefix, '') else tag end end
init_table(te, base_model)
click to toggle source
# File lib/fluent/plugin/out_sql.rb, line 241 def init_table(te, base_model) begin te.init(base_model) log.info "Selecting '#{te.table}' table" false rescue => e log.warn "Can't handle '#{te.table}' table. Ignoring.", error: e log.warn_backtrace e.backtrace true end end