class Fluent::S3InputOutput
Attributes
s3[RW]
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_s3_input.rb, line 32 def initialize super require 'net/http' require 'oj' require 'aws-sdk' end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_s3_input.rb, line 39 def configure(conf) super if @aws_key_id and @aws_sec_key @s3 = Aws::S3::Client.new( region: @aws_region, access_key_id: @aws_key_id, secret_access_key: @aws_sec_key, ) else @s3 = Aws::S3::Client.new( region: @aws_region, ) end end
emit(tag, es, chain)
click to toggle source
# File lib/fluent/plugin/out_s3_input.rb, line 67 def emit(tag, es, chain) begin tag_parts = tag.split('.') es.each { |time, record| s3_bucket = record[s3_bucket_key] s3_key = record[s3_object_key_key] s3_key_ext = s3_key.split(".")[-1] resp = s3.get_object(bucket: s3_bucket, key: s3_key) if @gzip_exts.include?(s3_key_ext) input = Zlib::GzipReader.new(resp.body) elsif @zip_exts.include?(s3_key_ext) io = Zip::InputStream.new(resp.body) input = io.get_next_entry else input = resp.body end default_record = {} if @merge_record default_record = {}.merge(record) end s3_record = {} if @format == 'json' json_data=normalize_json input.read begin s3_record = Oj.load(json_data) rescue Oj::ParseError=>e $log.error e.to_s $log.error json_data end elsif @format == 'csv' data = input.read File.open("/tmp/s3debug", 'w') { |file| file.write(data) } s3_record=CSV.parse(data).to_json else raise "Unsupported format - #{@format}" end s3_record.each do |a_record| # parse the time from the record @time_keys.each do |time_key| if s3_record.include? time_key time=Time.strptime(a_record[time_key], @time_format).to_f $log.debug "Reset time for #{time_key}, Setting time to #{time}" break end end if @record_key == nil tmp_record=a_record.merge(default_record) new_record=tmp_record else new_record[record_key]=a_record end @remove_keys.each do |key_to_remove| new_record.delete(key_to_remove) end $log.debug "Emit - #{new_record}" router.emit(@tag, time, new_record) end } chain.next rescue StandardError => e $log.warn "s3_input: #{e.class} #{e.message} #{e.backtrace.join(', ')}" end end
normalize_json(json)
click to toggle source
Allow JSON data in a couple of formats {} single event
- {},{}
-
array of events
{}n{}n{} concatenated events (flume)
# File lib/fluent/plugin/out_s3_input.rb, line 59 def normalize_json(json) if json[0] != "[" json=json.gsub /}\n{/,"},{" json="[#{json}]" end json end