class Fluent::Plugin::Elb_LogInput
Constants
- ACCESSLOG_REGEXP
- LOGFILE_REGEXP
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_elb_log.rb, line 32 def configure(conf) super if !has_iam_role? raise Fluent::ConfigError.new("access_key_id is required") if @access_key_id.nil? raise Fluent::ConfigError.new("secret_access_key is required") if @secret_access_key.nil? end raise Fluent::ConfigError.new("s3_bucketname is required") unless @s3_bucketname raise Fluent::ConfigError.new("timestamp_file is required") unless @timestamp_file raise Fluent::ConfigError.new("s3 bucket not found #{@s3_bucketname}") unless s3bucket_is_ok? end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_elb_log.rb, line 44 def start super # files touch File.open(@timestamp_file, File::RDWR|File::CREAT).close File.open(@buf_file, File::RDWR|File::CREAT).close timer_execute(:in_elb_log, @refresh_interval, &method(:input)) end
Private Instance Methods
delete_file_from_s3(object_name)
click to toggle source
# File lib/fluent/plugin/in_elb_log.rb, line 267 def delete_file_from_s3(object_name) begin log.debug "deleting object from s3 name is #{object_name}" s3_client.delete_object(bucket: @s3_bucketname, key: object_name) rescue => e log.warn "error occurred: #{e.message}, #{e.backtrace}" end end
emit_lines_from_buffer_file(record_common)
click to toggle source
# File lib/fluent/plugin/in_elb_log.rb, line 277 def emit_lines_from_buffer_file(record_common) begin # emit per line File.open(@buf_file, File::RDONLY) do |file| file.each_line do |line| line_match = ACCESSLOG_REGEXP.match(line) unless line_match log.info "nomatch log found: #{line} in #{record_common['key']}" next end now = Fluent::Engine.now time = Time.parse(line_match[:time]).to_i rescue now router.emit( @tag, time, record_common .merge(format_record(line_match) .merge(@include_all_message ? {"all_message" => line} : {}) ) ) end end rescue => e log.warn "error occurred: #{e.message}" end end
format_record(item)
click to toggle source
# File lib/fluent/plugin/in_elb_log.rb, line 306 def format_record(item) { "time" => item[:time].gsub(/Z/, '+0000'), "elb" => item[:elb], "client" => item[:client], "client_port" => item[:client_port], "backend" => item[:backend], "backend_port" => item[:backend_port], "request_processing_time" => item[:request_processing_time].to_f, "backend_processing_time" => item[:backend_processing_time].to_f, "response_processing_time" => item[:response_processing_time].to_f, "elb_status_code" => item[:elb_status_code], "backend_status_code" => item[:backend_status_code], "received_bytes" => item[:received_bytes].to_i, "sent_bytes" => item[:sent_bytes].to_i, "request_method" => item[:request_method], "request_uri" => item[:request_uri], "request_protocol" => item[:request_protocol], "user_agent" => item[:user_agent], "ssl_cipher" => item[:ssl_cipher], "ssl_protocol" => item[:ssl_protocol], "type" => item[:type], "target_group_arn" => item[:target_group_arn], "trace_id" => item[:trace_id], "domain_name" => item[:domain_name], "chosen_cert_arn" => item[:chosen_cert_arn], "matched_rule_priority" => item[:matched_rule_priority], "request_creation_time" => item[:request_creation_time], "actions_executed" => item[:actions_executed], "redirect_url" => item[:redirect_url], "error_reason" => item[:error_reason], "option1" => item[:option1], "option2" => item[:option2], "option3" => item[:option3] } end
get_file_from_s3(object_name)
click to toggle source
# File lib/fluent/plugin/in_elb_log.rb, line 249 def get_file_from_s3(object_name) begin log.debug "getting object from s3 name is #{object_name}" Tempfile.create('fluent-elblog') do |tfile| s3_client.get_object(bucket: @s3_bucketname, key: object_name, response_target: tfile.path) if File.extname(object_name) != '.gz' FileUtils.cp(tfile.path, @buf_file) else inflate(tfile.path, @buf_file) end end rescue => e log.warn "error occurred: #{e.message}, #{e.backtrace}" end end
get_object_keys(timestamp)
click to toggle source
# File lib/fluent/plugin/in_elb_log.rb, line 185 def get_object_keys(timestamp) object_keys = [] resp = s3_client.list_objects_v2( bucket: @s3_bucketname, prefix: @s3_prefix ) loop do resp.contents.each do |content| s3_last_modified_unixtime = content.last_modified.to_i object_key = content.key node_no = Digest::SHA1.hexdigest(object_key).to_i(16) % @num_nodes next unless node_no == @node_no matches = LOGFILE_REGEXP.match(object_key) if s3_last_modified_unixtime > timestamp and matches object_keys << { key: object_key, prefix: matches[:prefix], account_id: matches[:account_id], region: matches[:region], logfile_date: matches[:logfile_date], logfile_elb_name: matches[:logfile_elb_name], elb_timestamp: matches[:elb_timestamp], elb_ip_address: matches[:elb_ip_address], logfile_hash: matches[:logfile_hash], elb_timestamp_unixtime: Time.parse(matches[:elb_timestamp]).to_i, s3_last_modified_unixtime: s3_last_modified_unixtime, } end end if !resp.is_truncated return object_keys end resp = s3_client.list_objects_v2( bucket: @s3_bucketname, prefix: @s3_prefix, continuation_token: resp.next_continuation_token ) end return object_keys end
get_object_list(max_num)
click to toggle source
# File lib/fluent/plugin/in_elb_log.rb, line 177 def get_object_list(max_num) s3_client.list_objects( bucket: @s3_bucketname, max_keys: max_num, prefix: @s3_prefix ) end
get_timestamp_file()
click to toggle source
# File lib/fluent/plugin/in_elb_log.rb, line 65 def get_timestamp_file begin # get timestamp last proc start_time = @start_time ? Time.parse(@start_time).utc : Time.at(0) timestamp = start_time.to_i log.debug "timestamp file #{@timestamp_file} read" File.open(@timestamp_file, File::RDONLY) do |file| if file.size > 0 timestamp_from_file = file.read.to_i if timestamp_from_file > timestamp timestamp = timestamp_from_file end end end log.debug "timestamp start at:" + Time.at(timestamp).to_s return timestamp rescue => e log.warn "timestamp file get and parse error occurred: #{e.message}" end end
has_iam_role?()
click to toggle source
# File lib/fluent/plugin/in_elb_log.rb, line 56 def has_iam_role? begin ec2 = Aws::EC2::Client.new(region: @region) !ec2.config.credentials.nil? rescue => e log.warn "EC2 Client error occurred: #{e.message}" end end
inflate(srcfile, dstfile)
click to toggle source
# File lib/fluent/plugin/in_elb_log.rb, line 233 def inflate(srcfile, dstfile) File.open(dstfile, File::WRONLY|File::CREAT|File::TRUNC) do |bfile| File.open(srcfile) do |file| zio = file loop do io = Zlib::GzipReader.new zio bfile.write io.read unused = io.unused io.finish break if unused.nil? zio.pos -= unused.length end end end end
input()
click to toggle source
# File lib/fluent/plugin/in_elb_log.rb, line 128 def input begin log.debug "start" timestamp = get_timestamp_file() object_keys = get_object_keys(timestamp) object_keys = sort_object_key(object_keys) log.info "processing #{object_keys.count} object(s)." object_keys.each do |object_key| record_common = { "account_id" => object_key[:account_id], "region" => object_key[:region], "logfile_date" => object_key[:logfile_date], "logfile_elb_name" => object_key[:logfile_elb_name], "elb_ip_address" => object_key[:elb_ip_address], "logfile_hash" => object_key[:logfile_hash], "elb_timestamp" => object_key[:elb_timestamp], "key" => object_key[:key], "prefix" => object_key[:prefix], "elb_timestamp_unixtime" => object_key[:elb_timestamp_unixtime], "s3_last_modified_unixtime" => object_key[:s3_last_modified_unixtime], } get_file_from_s3(object_key[:key]) emit_lines_from_buffer_file(record_common) put_timestamp_file(object_key[:s3_last_modified_unixtime]) if @delete delete_file_from_s3(object_key[:key]) end end rescue => e log.warn "error occurred: #{e.message}" end end
put_timestamp_file(timestamp)
click to toggle source
# File lib/fluent/plugin/in_elb_log.rb, line 86 def put_timestamp_file(timestamp) begin log.debug "timestamp file #{@timestamp_file} write" File.open(@timestamp_file, File::WRONLY|File::CREAT|File::TRUNC) do |file| file.puts timestamp.to_s end rescue => e log.warn "timestamp file get and parse error occurred: #{e.message}" end end
s3_client()
click to toggle source
# File lib/fluent/plugin/in_elb_log.rb, line 97 def s3_client begin options = { region: @region, } if @access_key_id && @secret_access_key options[:access_key_id] = @access_key_id options[:secret_access_key] = @secret_access_key end if @http_proxy options[:http_proxy] = @http_proxy end log.debug "S3 client connect" Aws::S3::Client.new(options) rescue => e log.warn "S3 Client error occurred: #{e.message}" end end
s3bucket_is_ok?()
click to toggle source
# File lib/fluent/plugin/in_elb_log.rb, line 116 def s3bucket_is_ok? log.debug "searching for bucket #{@s3_bucketname}" begin # try get one !(get_object_list(1).nil?) rescue => e log.warn "error occurred: #{e.message}" false end end
sort_object_key(src_object_keys)
click to toggle source
# File lib/fluent/plugin/in_elb_log.rb, line 167 def sort_object_key(src_object_keys) begin src_object_keys.sort do |a, b| a[:s3_last_modified_unixtime] <=> b[:s3_last_modified_unixtime] end rescue => e log.warn "error occurred: #{e.message}" end end