class Fluent::Plugin::SQLInput

Constants

SKIP_TABLE_REGEXP

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_sql.rb, line 162
def configure(conf)
  super

  unless @state_file
    $log.warn "'state_file PATH' parameter is not set to a 'sql' source."
    $log.warn "this parameter is highly recommended to save the last rows to resume tailing."
  end

  @tables = conf.elements.select {|e|
    e.name == 'table'
  }.map {|e|
    te = TableElement.new
    te.configure(e)
    te
  }

  if config['all_tables']
    @all_tables = true
  end
end
shutdown() click to toggle source
# File lib/fluent/plugin/in_sql.rb, line 249
def shutdown
  @stop_flag = true
  $log.debug "Waiting for thread to finish"
  @thread.join
end
start() click to toggle source
# File lib/fluent/plugin/in_sql.rb, line 185
def start
  @state_store = @state_file.nil? ? MemoryStateStore.new : StateStore.new(@state_file)

  config = {
    adapter: @adapter,
    host: @host,
    port: @port,
    database: @database,
    username: @username,
    password: @password,
    socket: @socket,
    schema_search_path: @schema_search_path,
  }

  # creates subclass of ActiveRecord::Base so that it can have different
  # database configuration from ActiveRecord::Base.
  @base_model = Class.new(ActiveRecord::Base) do
    # base model doesn't have corresponding phisical table
    self.abstract_class = true
  end

  # ActiveRecord requires the base_model to have a name. Here sets name
  # of an anonymous class by assigning it to a constant. In Ruby, class has
  # a name of a constant assigned first
  SQLInput.const_set("BaseModel_#{rand(1 << 31)}", @base_model)

  # Now base_model can have independent configuration from ActiveRecord::Base
  @base_model.establish_connection(config)

  if @all_tables
    # get list of tables from the database
    @tables = @base_model.connection.tables.map do |table_name|
      if table_name.match(SKIP_TABLE_REGEXP)
        # some tables such as "schema_migrations" should be ignored
        nil
      else
        te = TableElement.new
        te.configure({
          'table' => table_name,
          'tag' => table_name,
          'update_column' => nil,
        })
        te
      end
    end.compact
  end

  # ignore tables if TableElement#init failed
  @tables.reject! do |te|
    begin
      te.init(@tag_prefix, @base_model, router, log)
      log.info "Selecting '#{te.table}' table"
      false
    rescue => e
      log.warn "Can't handle '#{te.table}' table. Ignoring.", error: e
      log.warn_backtrace e.backtrace
      true
    end
  end

  @stop_flag = false
  @thread = Thread.new(&method(:thread_main))
end
thread_main() click to toggle source
# File lib/fluent/plugin/in_sql.rb, line 255
def thread_main
  until @stop_flag
    sleep @select_interval

    begin
      conn = @base_model.connection
      conn.active? || conn.reconnect!
    rescue => e
      log.warn "can't connect to database. Reconnect at next try"
      next
    end

    @tables.each do |t|
      begin
        last_record = @state_store.last_records[t.table]
        @state_store.last_records[t.table] = t.emit_next_records(last_record, @select_limit)
        @state_store.update!
      rescue => e
        log.error "unexpected error", error: e
        log.error_backtrace e.backtrace
      end
    end
  end
end