class Fluent::Plugin::Elb_LogInput

Constants

ACCESSLOG_REGEXP
LOGFILE_REGEXP

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_elb_log.rb, line 32
def configure(conf)
  super

  if !has_iam_role?
    raise Fluent::ConfigError.new("access_key_id is required") if @access_key_id.nil?
    raise Fluent::ConfigError.new("secret_access_key is required") if @secret_access_key.nil?
  end
  raise Fluent::ConfigError.new("s3_bucketname is required") unless @s3_bucketname
  raise Fluent::ConfigError.new("timestamp_file is required") unless @timestamp_file
  raise Fluent::ConfigError.new("s3 bucket not found #{@s3_bucketname}") unless s3bucket_is_ok?
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_elb_log.rb, line 44
def start
  super

  # files touch
  File.open(@timestamp_file, File::RDWR|File::CREAT).close
  File.open(@buf_file, File::RDWR|File::CREAT).close

  timer_execute(:in_elb_log, @refresh_interval, &method(:input))
end

Private Instance Methods

delete_file_from_s3(object_name) click to toggle source
# File lib/fluent/plugin/in_elb_log.rb, line 267
def delete_file_from_s3(object_name)
  begin
    log.debug "deleting object from s3 name is #{object_name}"

    s3_client.delete_object(bucket: @s3_bucketname, key: object_name)
  rescue => e
    log.warn "error occurred: #{e.message}, #{e.backtrace}"
  end
end
emit_lines_from_buffer_file(record_common) click to toggle source
# File lib/fluent/plugin/in_elb_log.rb, line 277
def emit_lines_from_buffer_file(record_common)
  begin
    # emit per line
    File.open(@buf_file, File::RDONLY) do |file|
      file.each_line do |line|
        line_match = ACCESSLOG_REGEXP.match(line)
        unless line_match
          log.info "nomatch log found: #{line} in #{record_common['key']}"
          next
        end

        now = Fluent::Engine.now
        time = Time.parse(line_match[:time]).to_i rescue now

        router.emit(
          @tag,
          time,
          record_common
            .merge(format_record(line_match)
            .merge(@include_all_message ? {"all_message" => line} : {})
          )
        )
      end
    end
  rescue => e
    log.warn "error occurred: #{e.message}"
  end
end
format_record(item) click to toggle source
# File lib/fluent/plugin/in_elb_log.rb, line 306
def format_record(item)
  { "time" => item[:time].gsub(/Z/, '+0000'),
    "elb" => item[:elb],
    "client" => item[:client],
    "client_port" => item[:client_port],
    "backend" => item[:backend],
    "backend_port" => item[:backend_port],
    "request_processing_time" => item[:request_processing_time].to_f,
    "backend_processing_time" => item[:backend_processing_time].to_f,
    "response_processing_time" => item[:response_processing_time].to_f,
    "elb_status_code" => item[:elb_status_code],
    "backend_status_code" => item[:backend_status_code],
    "received_bytes" => item[:received_bytes].to_i,
    "sent_bytes" => item[:sent_bytes].to_i,
    "request_method" => item[:request_method],
    "request_uri" => item[:request_uri],
    "request_protocol" => item[:request_protocol],
    "user_agent" => item[:user_agent],
    "ssl_cipher" => item[:ssl_cipher],
    "ssl_protocol" => item[:ssl_protocol],
    "type" => item[:type],
    "target_group_arn" => item[:target_group_arn],
    "trace_id" => item[:trace_id],
    "domain_name" => item[:domain_name],
    "chosen_cert_arn" => item[:chosen_cert_arn],
    "matched_rule_priority" => item[:matched_rule_priority],
    "request_creation_time" => item[:request_creation_time],
    "actions_executed" => item[:actions_executed],
    "redirect_url" => item[:redirect_url],
    "error_reason" => item[:error_reason],
    "option1" => item[:option1],
    "option2" => item[:option2],
    "option3" => item[:option3]
  }
end
get_file_from_s3(object_name) click to toggle source
# File lib/fluent/plugin/in_elb_log.rb, line 249
def get_file_from_s3(object_name)
  begin
    log.debug "getting object from s3 name is #{object_name}"

    Tempfile.create('fluent-elblog') do |tfile|
      s3_client.get_object(bucket: @s3_bucketname, key: object_name, response_target: tfile.path)

      if File.extname(object_name) != '.gz'
        FileUtils.cp(tfile.path, @buf_file)
      else
        inflate(tfile.path, @buf_file)
      end
    end
  rescue => e
    log.warn "error occurred: #{e.message}, #{e.backtrace}"
  end
end
get_object_keys(timestamp) click to toggle source
# File lib/fluent/plugin/in_elb_log.rb, line 185
def get_object_keys(timestamp)
  object_keys = []

  resp = s3_client.list_objects_v2(
    bucket: @s3_bucketname,
    prefix: @s3_prefix
  )

  loop do
    resp.contents.each do |content|
      s3_last_modified_unixtime = content.last_modified.to_i

      object_key = content.key
      node_no = Digest::SHA1.hexdigest(object_key).to_i(16) % @num_nodes
      next unless node_no == @node_no

      matches = LOGFILE_REGEXP.match(object_key)
      if s3_last_modified_unixtime > timestamp and matches
        object_keys << {
          key: object_key,
          prefix: matches[:prefix],
          account_id: matches[:account_id],
          region: matches[:region],
          logfile_date: matches[:logfile_date],
          logfile_elb_name: matches[:logfile_elb_name],
          elb_timestamp: matches[:elb_timestamp],
          elb_ip_address: matches[:elb_ip_address],
          logfile_hash: matches[:logfile_hash],
          elb_timestamp_unixtime: Time.parse(matches[:elb_timestamp]).to_i,
          s3_last_modified_unixtime: s3_last_modified_unixtime,
        }
      end
    end

    if !resp.is_truncated
      return object_keys
    end

    resp = s3_client.list_objects_v2(
      bucket: @s3_bucketname,
      prefix: @s3_prefix,
      continuation_token: resp.next_continuation_token
    )
  end

  return object_keys
end
get_object_list(max_num) click to toggle source
# File lib/fluent/plugin/in_elb_log.rb, line 177
def get_object_list(max_num)
  s3_client.list_objects(
    bucket: @s3_bucketname,
    max_keys: max_num,
    prefix: @s3_prefix
  )
end
get_timestamp_file() click to toggle source
# File lib/fluent/plugin/in_elb_log.rb, line 65
def get_timestamp_file
  begin
    # get timestamp last proc
    start_time = @start_time ? Time.parse(@start_time).utc : Time.at(0)
    timestamp = start_time.to_i
    log.debug "timestamp file #{@timestamp_file} read"
    File.open(@timestamp_file, File::RDONLY) do |file|
      if file.size > 0
        timestamp_from_file = file.read.to_i
        if timestamp_from_file > timestamp
          timestamp = timestamp_from_file
        end
      end
    end
    log.debug "timestamp start at:" + Time.at(timestamp).to_s
    return timestamp
  rescue => e
    log.warn "timestamp file get and parse error occurred: #{e.message}"
  end
end
has_iam_role?() click to toggle source
# File lib/fluent/plugin/in_elb_log.rb, line 56
def has_iam_role?
  begin
    ec2 = Aws::EC2::Client.new(region: @region)
    !ec2.config.credentials.nil?
  rescue => e
    log.warn "EC2 Client error occurred: #{e.message}"
  end
end
inflate(srcfile, dstfile) click to toggle source
# File lib/fluent/plugin/in_elb_log.rb, line 233
def inflate(srcfile, dstfile)
  File.open(dstfile, File::WRONLY|File::CREAT|File::TRUNC) do |bfile|
    File.open(srcfile) do |file|
      zio = file
      loop do
        io = Zlib::GzipReader.new zio
        bfile.write io.read
        unused = io.unused
        io.finish
        break if unused.nil?
        zio.pos -= unused.length
      end
    end
  end
end
input() click to toggle source
# File lib/fluent/plugin/in_elb_log.rb, line 128
def input
  begin
    log.debug "start"
    timestamp = get_timestamp_file()

    object_keys = get_object_keys(timestamp)
    object_keys = sort_object_key(object_keys)

    log.info "processing #{object_keys.count} object(s)."

    object_keys.each do |object_key|
      record_common = {
        "account_id" => object_key[:account_id],
        "region" => object_key[:region],
        "logfile_date" => object_key[:logfile_date],
        "logfile_elb_name" => object_key[:logfile_elb_name],
        "elb_ip_address" => object_key[:elb_ip_address],
        "logfile_hash" => object_key[:logfile_hash],
        "elb_timestamp" => object_key[:elb_timestamp],
        "key" => object_key[:key],
        "prefix" => object_key[:prefix],
        "elb_timestamp_unixtime" => object_key[:elb_timestamp_unixtime],
        "s3_last_modified_unixtime" => object_key[:s3_last_modified_unixtime],
      }

      get_file_from_s3(object_key[:key])
      emit_lines_from_buffer_file(record_common)

      put_timestamp_file(object_key[:s3_last_modified_unixtime])

      if @delete
        delete_file_from_s3(object_key[:key])
      end
    end
  rescue => e
    log.warn "error occurred: #{e.message}"
  end
end
put_timestamp_file(timestamp) click to toggle source
# File lib/fluent/plugin/in_elb_log.rb, line 86
def put_timestamp_file(timestamp)
  begin
    log.debug "timestamp file #{@timestamp_file} write"
    File.open(@timestamp_file, File::WRONLY|File::CREAT|File::TRUNC) do |file|
      file.puts timestamp.to_s
    end
  rescue => e
    log.warn "timestamp file get and parse error occurred: #{e.message}"
  end
end
s3_client() click to toggle source
# File lib/fluent/plugin/in_elb_log.rb, line 97
def s3_client
  begin
    options = {
      region: @region,
    }
    if @access_key_id && @secret_access_key
      options[:access_key_id] = @access_key_id
      options[:secret_access_key] = @secret_access_key
    end
    if @http_proxy
      options[:http_proxy] = @http_proxy
    end
    log.debug "S3 client connect"
    Aws::S3::Client.new(options)
  rescue => e
    log.warn "S3 Client error occurred: #{e.message}"
  end
end
s3bucket_is_ok?() click to toggle source
# File lib/fluent/plugin/in_elb_log.rb, line 116
def s3bucket_is_ok?
  log.debug "searching for bucket #{@s3_bucketname}"

  begin
    # try get one
    !(get_object_list(1).nil?)
  rescue => e
    log.warn "error occurred: #{e.message}"
    false
  end
end
sort_object_key(src_object_keys) click to toggle source
# File lib/fluent/plugin/in_elb_log.rb, line 167
def sort_object_key(src_object_keys)
  begin
    src_object_keys.sort do |a, b|
      a[:s3_last_modified_unixtime] <=> b[:s3_last_modified_unixtime]
    end
  rescue => e
    log.warn "error occurred: #{e.message}"
  end
end