class FluentPluginElbAccessLogInput

Constants

ACCESS_LOG_FIELDS
ELB_TYPES
USER_AGENT_SUFFIX

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_elb_access_log.rb, line 90
def configure(conf)
  super

  unless ELB_TYPES.include?(@elb_type)
    raise Fluent::ConfigError, "Invalid ELB type: #{@elb_type}"
  end

  unless %w(and or).include?(@filter_operator)
    raise Fluent::ConfigError, "Invalid filter operator: #{@filter_operator}"
  end

  FileUtils.touch(@tsfile_path)
  FileUtils.touch(@histfile_path)
  tsfile_start_datetime = parse_tsfile

  if @start_datetime and not tsfile_start_datetime
    @start_datetime = Time.parse(@start_datetime).utc
  else
    if @start_datetime
      log.warn("start_datetime(#{@start_datetime}) is set. but tsfile datetime(#{tsfile_start_datetime}) is used")
    end

    @start_datetime = tsfile_start_datetime || Time.now.utc
  end

  @history = load_history

  if @filter
    @filter = Hash[@filter.map {|k, v| [k.to_s, Regexp.new(v.to_s)] }]
  end

  if @file_filter
    @file_filter = Regexp.new(@file_filter)
  end
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_elb_access_log.rb, line 154
def shutdown
  @loop.stop
  @thread.kill
  @thread.join
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_elb_access_log.rb, line 126
def start
  super

  # Load client
  client

  @loop = Coolio::Loop.new
  timestamp = @start_datetime

  timer = TimerWatcher.new(@interval, true, log) do
    new_timestamp = fetch(timestamp)

    if new_timestamp > timestamp
      save_timestamp(new_timestamp)
      timestamp = new_timestamp
    end

    if @history.length > @history_length
      @history.shift(@history.length - @history_length)
    end

    save_history
  end

  @loop.attach(timer)
  @thread = Thread.new(&method(:run))
end

Private Instance Methods

client() click to toggle source
# File lib/fluent/plugin/in_elb_access_log.rb, line 426
def client
  return @client if @client

  options = {user_agent_suffix: USER_AGENT_SUFFIX}
  options[:region] = @region if @region
  options[:http_proxy] = @http_proxy if @http_proxy

  if @aws_key_id and @aws_sec_key
    options[:access_key_id] = @aws_key_id
    options[:secret_access_key] = @aws_sec_key
  elsif @profile
    credentials_opts = {profile_name: @profile}
    credentials_opts[:path] = @credentials_path if @credentials_path
    credentials = Aws::SharedCredentials.new(credentials_opts)
    options[:credentials] = credentials
  end

  if @debug
    options[:logger] = Logger.new(log.out)
    options[:log_level] = :debug
    #options[:http_wire_trace] = true
  end

  @client = Aws::S3::Client.new(options)
end
emit_access_log(access_log) click to toggle source
# File lib/fluent/plugin/in_elb_access_log.rb, line 225
def emit_access_log(access_log)
  if @sampling_interval > 1
    access_log = sampling(access_log)
  end

  records = parse_log(access_log)

  records.each do |record|
    begin
      time = Time.parse(record['timestamp'])
      router.emit(@tag, time.to_i, record)
    rescue ArgumentError => e
      @log.warn("#{e.message}: #{record}")
      @log.warn('A record that has bad timestamp is not emitted.')
    end
  end
end
fetch(timestamp) click to toggle source
# File lib/fluent/plugin/in_elb_access_log.rb, line 170
def fetch(timestamp)
  last_timestamp = timestamp

  prefixes(timestamp).each do |prefix|
    client.list_objects_v2(bucket: @s3_bucket, prefix: prefix).each do |page|
      page.contents.each do |obj|
        if @file_filter and obj.key !~ @file_filter
          next
        end

        account_id, logfile_const, region, elb_name, logfile_datetime, ip, logfile_suffix = obj.key.split('_', 7)
        logfile_datetime = Time.parse(logfile_datetime)

        if logfile_suffix !~ /\.log(\.gz)?\z/ or logfile_datetime <= (timestamp - @buffer_sec)
          next
        end

        unless @history.include?(obj.key)
          access_log = client.get_object(bucket: @s3_bucket, key: obj.key).body

          if obj.key.end_with?('.gz')
            begin
              access_log = MultipleFilesGzipReader.new(access_log)

              # check gzip format
              access_log.first
              access_log.rewind
            rescue Zlib::Error => e
              @log.warn("#{e.message}: #{access_log.inspect.slice(0, 64)}")
              next
            end
          else
            access_log = access_log.each_line
          end

          emit_access_log(access_log)
          last_timestamp = logfile_datetime
          @history.push(obj.key)
        end
      end
    end
  end

  last_timestamp
end
load_history() click to toggle source
# File lib/fluent/plugin/in_elb_access_log.rb, line 410
def load_history
  File.read(@histfile_path).split("\n")
end
parse_alb_line(line) click to toggle source
# File lib/fluent/plugin/in_elb_access_log.rb, line 327
def parse_alb_line(line)
  parsed = nil

  begin
    parsed = CSV.parse_line(line, col_sep: ' ')
  rescue => e
    begin
      parsed = line.split(' ', 13)

      # request
      parsed[12] ||= ''
      parsed[12].sub!(/\A"/, '')
      parsed[12].sub!(/"(.*)\z/, '')

      user_agent, ssl_cipher, ssl_protocol, target_group_arn, trace_id, domain_name, chosen_cert_arn = rsplit($1.strip, ' ', 7)

      parsed[13] = unquote(user_agent)
      parsed[14] = ssl_cipher
      parsed[15] = ssl_protocol
      parsed[16] = target_group_arn
      parsed[17] = unquote(trace_id)
      parsed[18] = unquote(domain_name)
      parsed[19] = unquote(chosen_cert_arn)
    rescue => e2
      @log.warn("#{e.message}: #{e2.message}: #{line}")
    end
  end

  parsed
end
parse_clb_line(line) click to toggle source
# File lib/fluent/plugin/in_elb_access_log.rb, line 300
def parse_clb_line(line)
  parsed = nil

  begin
    parsed = CSV.parse_line(line, col_sep: ' ')
  rescue => e
    begin
      parsed = line.split(' ', 12)

      # request
      parsed[11] ||= ''
      parsed[11].sub!(/\A"/, '')
      parsed[11].sub!(/"(.*)\z/, '')

      user_agent, ssl_cipher, ssl_protocol = rsplit($1.strip, ' ', 3)

      parsed[12] = unquote(user_agent)
      parsed[13] = ssl_cipher
      parsed[14] = ssl_protocol
    rescue => e2
      @log.warn("#{e.message}: #{e2.message}: #{line}")
    end
  end

  parsed
end
parse_log(access_log) click to toggle source
# File lib/fluent/plugin/in_elb_access_log.rb, line 243
def parse_log(access_log)
  parsed_access_log = []

  access_log.each do |line|
    line.chomp!

    case @elb_type
    when 'clb'
      line = parse_clb_line(line)
    when 'alb'
      line = parse_alb_line(line)
    end

    parsed_access_log << line if line
  end

  records = []
  access_log_fields = ACCESS_LOG_FIELDS.fetch(@elb_type)

  parsed_access_log.each do |row|
    record = Hash[access_log_fields.keys.zip(row)]

    split_address_port!(record, 'client')

    case @elb_type
    when 'clb'
      split_address_port!(record, 'backend')
    when 'alb'
      split_address_port!(record, 'target')
    end

    if @filter
      if @filter_operator == 'or'
        next if @filter.all? {|k, r| record[k] !~ r }
      else
        next if @filter.any? {|k, r| record[k] !~ r }
      end
    end

    if @type_cast
      access_log_fields.each do |name, conv|
        if conv and (value = record[name])
          record[name] = value.send(conv)
        end
      end
    end

    if @parse_request
      parse_request!(record)
    end

    records << record
  end

  records
end
parse_request!(record) click to toggle source
# File lib/fluent/plugin/in_elb_access_log.rb, line 376
def parse_request!(record)
  request = record['request']
  return unless request
  method, uri, http_version = request.split(' ', 3)

  record["request#{@request_separator}method"] = method
  record["request#{@request_separator}uri"] = uri
  record["request#{@request_separator}http_version"] = http_version

  begin
    uri = Addressable::URI.parse(uri)

    if uri
      [:scheme ,:user, :host, :port, :path, :query, :fragment].each do |key|
        value = uri.send(key)

        if not @type_cast and key == :port
          value = value.to_s
        end

        record["request#{@request_separator}uri#{@request_separator}#{key}"] = value
      end
    end
  rescue => e
    @log.warn("#{e.message}: #{uri}")
  end
end
parse_tsfile() click to toggle source
# File lib/fluent/plugin/in_elb_access_log.rb, line 420
def parse_tsfile
  Time.parse(File.read(@tsfile_path)).utc
rescue
  nil
end
prefixes(timestamp) click to toggle source
# File lib/fluent/plugin/in_elb_access_log.rb, line 216
def prefixes(timestamp)
  base_prefix = "AWSLogs/#{@account_id}/elasticloadbalancing/#{@region}/"
  base_prefix = "#{@s3_prefix}/#{base_prefix}" if @s3_prefix

  [timestamp - 86400, timestamp, timestamp + 86400].map do |date|
    base_prefix + date.strftime('%Y/%m/%d/')
  end
end
rsplit(str, sep, n) click to toggle source
# File lib/fluent/plugin/in_elb_access_log.rb, line 452
def rsplit(str, sep, n)
  str = str.dup
  substrs = []

  (n - 1).times do
    pos = str.rindex(sep)
    next unless pos
    substr = str.slice!(pos..-1).slice(sep.length..-1)
    substrs << substr
  end

  substrs << str
  substrs.reverse
end
run() click to toggle source
# File lib/fluent/plugin/in_elb_access_log.rb, line 163
def run
  @loop.run
rescue => e
  log.error(e.message)
  log.error_backtrace(e.backtrace)
end
sampling(access_log) click to toggle source
# File lib/fluent/plugin/in_elb_access_log.rb, line 358
def sampling(access_log)
  access_log.each_with_index.select {|_, i| (i % @sampling_interval).zero? }.map(&:first)
end
save_history() click to toggle source
# File lib/fluent/plugin/in_elb_access_log.rb, line 414
def save_history
  open(@histfile_path, 'w') do |histfile|
    histfile << @history.join("\n")
  end
end
save_timestamp(timestamp) click to toggle source
# File lib/fluent/plugin/in_elb_access_log.rb, line 404
def save_timestamp(timestamp)
  open(@tsfile_path, 'w') do |tsfile|
    tsfile << timestamp.to_s
  end
end
split_address_port!(record, prefix) click to toggle source
# File lib/fluent/plugin/in_elb_access_log.rb, line 362
def split_address_port!(record, prefix)
  address_port = record["#{prefix}_port"]
  return unless address_port

  if @split_addr_port
    address, port = address_port.split(':', 2)
    record[prefix] = address
    record["#{prefix}_port"] = port
  else
    record[prefix] = address_port
    record.delete("#{prefix}_port")
  end
end
unquote(str) click to toggle source
# File lib/fluent/plugin/in_elb_access_log.rb, line 467
def unquote(str)
  return nil if (str || '').empty?
  str.sub(/\A"/, '').sub(/"\z/, '')
end