class Fluent::SQLInput
Constants
- SKIP_TABLE_REGEXP
Public Class Methods
desc(description)
click to toggle source
# File lib/fluent/plugin/in_sql.rb, line 28 def desc(description) end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_sql.rb, line 152 def configure(conf) super unless @state_file $log.warn "'state_file PATH' parameter is not set to a 'sql' source." $log.warn "this parameter is highly recommended to save the last rows to resume tailing." end @tables = conf.elements.select {|e| e.name == 'table' }.map {|e| te = TableElement.new te.configure(e) te } if config['all_tables'] @all_tables = true end end
shutdown()
click to toggle source
# File lib/fluent/plugin/in_sql.rb, line 238 def shutdown @stop_flag = true $log.debug "Waiting for thread to finish" @thread.join end
start()
click to toggle source
# File lib/fluent/plugin/in_sql.rb, line 175 def start @state_store = @state_file.nil? ? MemoryStateStore.new : StateStore.new(@state_file) config = { :adapter => @adapter, :host => @host, :port => @port, :database => @database, :username => @username, :password => @password, :socket => @socket, } # creates subclass of ActiveRecord::Base so that it can have different # database configuration from ActiveRecord::Base. @base_model = Class.new(ActiveRecord::Base) do # base model doesn't have corresponding phisical table self.abstract_class = true end # ActiveRecord requires the base_model to have a name. Here sets name # of an anonymous class by assigning it to a constant. In Ruby, class has # a name of a constant assigned first SQLInput.const_set("BaseModel_#{rand(1 << 31)}", @base_model) # Now base_model can have independent configuration from ActiveRecord::Base @base_model.establish_connection(config) if @all_tables # get list of tables from the database @tables = @base_model.connection.tables.map do |table_name| if table_name.match(SKIP_TABLE_REGEXP) # some tables such as "schema_migrations" should be ignored nil else te = TableElement.new te.configure({ 'table' => table_name, 'tag' => table_name, 'update_column' => nil, }) te end end.compact end # ignore tables if TableElement#init failed @tables.reject! do |te| begin te.init(@tag_prefix, @base_model, router) log.info "Selecting '#{te.table}' table" false rescue => e log.warn "Can't handle '#{te.table}' table. Ignoring.", :error => e.message, :error_class => e.class log.warn_backtrace e.backtrace true end end @stop_flag = false @thread = Thread.new(&method(:thread_main)) end
thread_main()
click to toggle source
# File lib/fluent/plugin/in_sql.rb, line 244 def thread_main until @stop_flag sleep @select_interval begin conn = @base_model.connection conn.active? || conn.reconnect! rescue => e log.warn "can't connect to database. Reconnect at next try" next end @tables.each do |t| begin last_record = @state_store.last_records[t.table] @state_store.last_records[t.table] = t.emit_next_records(last_record, @select_limit) @state_store.update! rescue => e log.error "unexpected error", :error => e.message, :error_class => e.class log.error_backtrace e.backtrace end end end end