class Fluent::Plugin::RdsPgsqlLogInput

Constants

LOG_REGEXP

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_rds_pgsql_log.rb, line 20
def configure(conf)
  super

  raise Fluent::ConfigError.new("region is required") unless @region
  if !has_iam_role?
    raise Fluent::ConfigError.new("access_key_id is required") if @access_key_id.nil?
    raise Fluent::ConfigError.new("secret_access_key is required") if @secret_access_key.nil?
  end
  raise Fluent::ConfigError.new("db_instance_identifier is required") unless @db_instance_identifier
  raise Fluent::ConfigError.new("pos_file is required") unless @pos_file
  raise Fluent::ConfigError.new("refresh_interval is required") unless @refresh_interval
  raise Fluent::ConfigError.new("tag is required") unless @tag

  begin
    options = {
      :region => @region,
    }
    if @access_key_id && @secret_access_key
      options[:access_key_id] = @access_key_id
      options[:secret_access_key] = @secret_access_key
    end
    @rds = Aws::RDS::Client.new(options)
  rescue => e
    log.warn "RDS Client error occurred: #{e.message}"
  end
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_rds_pgsql_log.rb, line 56
def shutdown
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_rds_pgsql_log.rb, line 47
def start
  super

  # pos file touch
  File.open(@pos_file, File::RDWR|File::CREAT).close

  timer_execute(:poll_logs, @refresh_interval, repeat: true, &method(:input))
end

Private Instance Methods

event_time_of_row(record) click to toggle source
# File lib/fluent/plugin/in_rds_pgsql_log.rb, line 181
def event_time_of_row(record)
  time = Time.parse(record["time"])
  return Fluent::EventTime.from_time(time)
end
get_and_parse_posfile() click to toggle source
# File lib/fluent/plugin/in_rds_pgsql_log.rb, line 78
def get_and_parse_posfile
  begin
    # get & parse pos file
    log.debug "pos file get start"

    pos_last_written_timestamp = 0
    pos_info = {}
    File.open(@pos_file, File::RDONLY) do |file|
      file.each_line do |line|

        pos_match = /^(\d+)$/.match(line)
        if pos_match
          pos_last_written_timestamp = pos_match[1].to_i
          log.debug "pos_last_written_timestamp: #{pos_last_written_timestamp}"
        end

        pos_match = /^(.+)\t(.+)$/.match(line)
        if pos_match
          pos_info[pos_match[1]] = pos_match[2]
          log.debug "log_file: #{pos_match[1]}, marker: #{pos_match[2]}"
        end
      end
      @pos_last_written_timestamp = pos_last_written_timestamp
      @pos_info = pos_info
    end
  rescue => e
    log.warn "pos file get and parse error occurred: #{e.message}"
  end
end
get_logdata(logs) click to toggle source
# File lib/fluent/plugin/in_rds_pgsql_log.rb, line 165
def get_logdata(logs)
  log_file_name = logs.context.params[:log_file_name]
  raw_records = []
  begin
    logs.each do |log|
      # save got line's marker
      @pos_info[log_file_name] = log.marker

      raw_records += log.log_file_data.split("\n")
    end
  rescue => e
    log.warn e.message
  end
  return raw_records
end
get_logfile(log_files) click to toggle source
# File lib/fluent/plugin/in_rds_pgsql_log.rb, line 137
def get_logfile(log_files)
  begin
    log_files.each do |log_file|
      log_file.describe_db_log_files.each do |item|
        # save maximum written timestamp value
        @pos_last_written_timestamp = item[:last_written] if @pos_last_written_timestamp < item[:last_written]

        # log file download
        log_file_name = item[:log_file_name]
        marker = @pos_info.has_key?(log_file_name) ? @pos_info[log_file_name] : "0"

        log.debug "download log from rds: log_file_name=#{log_file_name}, marker=#{marker}"
        logs = @rds.download_db_log_file_portion(
          db_instance_identifier: @db_instance_identifier,
          log_file_name: log_file_name,
          marker: marker,
        )
        raw_records = get_logdata(logs)

        #emit
        parse_and_emit(raw_records, log_file_name) unless raw_records.nil?
      end
    end
  rescue => e
    log.warn e.message
  end
end
get_logfile_list() click to toggle source
# File lib/fluent/plugin/in_rds_pgsql_log.rb, line 124
def get_logfile_list
  begin
    log.debug "get logfile-list from rds: db_instance_identifier=#{@db_instance_identifier}, pos_last_written_timestamp=#{@pos_last_written_timestamp}"
    @rds.describe_db_log_files(
      db_instance_identifier: @db_instance_identifier,
      file_last_written: @pos_last_written_timestamp,
      max_records: 10,
    )
  rescue => e
    log.warn "RDS Client describe_db_log_files error occurred: #{e.message}"
  end
end
has_iam_role?() click to toggle source
# File lib/fluent/plugin/in_rds_pgsql_log.rb, line 69
def has_iam_role?
  begin
    ec2 = Aws::EC2::Client.new(region: @region)
    !ec2.config.credentials.nil?
  rescue => e
    log.warn "EC2 Client error occurred: #{e.message}"
  end
end
input() click to toggle source
# File lib/fluent/plugin/in_rds_pgsql_log.rb, line 62
def input
  get_and_parse_posfile
  log_files = get_logfile_list
  get_logfile(log_files)
  put_posfile
end
parse_and_emit(raw_records, log_file_name) click to toggle source
# File lib/fluent/plugin/in_rds_pgsql_log.rb, line 186
def parse_and_emit(raw_records, log_file_name)
  begin
    log.debug "raw_records.count: #{raw_records.count}"
    record = nil
    raw_records.each do |raw_record|
      log.debug "raw_record=#{raw_record}"
      line_match = LOG_REGEXP.match(raw_record)

      unless line_match
        # combine chain of log
        record["message"] << "\n" + raw_record unless record.nil?
      else
        # emit before record
        router.emit(@tag, event_time_of_row(record), record) unless record.nil?

        # set a record
        record = {
          "time" => line_match[:time],
          "host" => line_match[:host],
          "user" => line_match[:user],
          "database" => line_match[:database],
          "pid" => line_match[:pid],
          "message_level" => line_match[:message_level],
          "message" => line_match[:message],
          "log_file_name" => log_file_name,
        }
      end
    end
    # emit last record
    router.emit(@tag, event_time_of_row(record), record) unless record.nil?
  rescue => e
    log.warn e.message
  end
end
put_posfile() click to toggle source
# File lib/fluent/plugin/in_rds_pgsql_log.rb, line 108
def put_posfile
  # pos file write
  begin
    log.debug "pos file write"
    File.open(@pos_file, File::WRONLY|File::TRUNC) do |file|
      file.puts @pos_last_written_timestamp.to_s

      @pos_info.each do |log_file_name, marker|
        file.puts "#{log_file_name}\t#{marker}"
      end
    end
  rescue => e
    log.warn "pos file write error occurred: #{e.message}"
  end
end