class Fluent::S3InputOutput

Attributes

s3[RW]

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_s3_input.rb, line 32
def initialize
  super
  require 'net/http'
  require 'oj'
  require 'aws-sdk'
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_s3_input.rb, line 39
def configure(conf)
  super

  if @aws_key_id and @aws_sec_key
    @s3 = Aws::S3::Client.new(
      region: @aws_region,
      access_key_id: @aws_key_id,
      secret_access_key: @aws_sec_key,
    )
  else
    @s3 = Aws::S3::Client.new(
      region: @aws_region,
    )
  end
end
emit(tag, es, chain) click to toggle source
# File lib/fluent/plugin/out_s3_input.rb, line 67
def emit(tag, es, chain)
  begin
    tag_parts = tag.split('.')
    es.each { |time, record|
      s3_bucket = record[s3_bucket_key]
      s3_key = record[s3_object_key_key]
      s3_key_ext = s3_key.split(".")[-1]
      resp = s3.get_object(bucket: s3_bucket, key: s3_key)

      if @gzip_exts.include?(s3_key_ext)
        input = Zlib::GzipReader.new(resp.body)
      elsif @zip_exts.include?(s3_key_ext)
        io = Zip::InputStream.new(resp.body)
        input = io.get_next_entry
      else
        input = resp.body
      end

      default_record = {}
      if @merge_record
        default_record = {}.merge(record)
      end

      s3_record = {}
      if @format == 'json'
        json_data=normalize_json input.read
        begin
          s3_record = Oj.load(json_data)
        rescue Oj::ParseError=>e
          $log.error e.to_s
          $log.error json_data
        end
      elsif @format == 'csv'
        data = input.read
        File.open("/tmp/s3debug", 'w') { |file| file.write(data) }
        s3_record=CSV.parse(data).to_json
      else
        raise "Unsupported format - #{@format}"
      end


      s3_record.each do |a_record|

        # parse the time from the record
        @time_keys.each do |time_key|
          if s3_record.include? time_key
            time=Time.strptime(a_record[time_key], @time_format).to_f
            $log.debug "Reset time for #{time_key}, Setting time to #{time}"
            break
          end
        end

        if @record_key == nil
          tmp_record=a_record.merge(default_record)
          new_record=tmp_record
        else
          new_record[record_key]=a_record
        end

        @remove_keys.each do |key_to_remove|
          new_record.delete(key_to_remove)
        end
        $log.debug "Emit - #{new_record}"
        router.emit(@tag, time, new_record)
      end
    }
    chain.next
  rescue StandardError => e
    $log.warn "s3_input: #{e.class} #{e.message} #{e.backtrace.join(', ')}"
  end
end
normalize_json(json) click to toggle source

Allow JSON data in a couple of formats {} single event

{},{}

array of events

{}n{}n{} concatenated events (flume)

# File lib/fluent/plugin/out_s3_input.rb, line 59
def normalize_json(json)
  if json[0] != "["
    json=json.gsub /}\n{/,"},{"
    json="[#{json}]"
  end
  json
end