class LogStash::Inputs::S3Cloudtrail
Stream events from files from a S3 bucket.
Each line from each file generates an event. Files ending in `.gz` are handled as gzip'ed files.
Public Instance Methods
backup_to_bucket(object)
click to toggle source
# File lib/logstash/inputs/s3cloudtrail.rb, line 113 def backup_to_bucket(object) unless @backup_to_bucket.nil? backup_key = "#{@backup_add_prefix}#{object.key}" @backup_bucket.object(backup_key).copy_from(:copy_source => "#{object.bucket_name}/#{object.key}") if @delete object.delete() end end end
backup_to_dir(filename)
click to toggle source
# File lib/logstash/inputs/s3cloudtrail.rb, line 124 def backup_to_dir(filename) unless @backup_to_dir.nil? FileUtils.cp(filename, @backup_to_dir) end end
list_new_files()
click to toggle source
# File lib/logstash/inputs/s3cloudtrail.rb, line 95 def list_new_files objects = {} @s3bucket.objects(:prefix => @prefix).each do |log| @logger.debug("S3Cloudtrail input: Found key", :key => log.key) unless ignore_filename?(log.key) if sincedb.newer?(log.last_modified) objects[log.key] = log.last_modified @logger.debug("S3Cloudtrail input: Adding to objects[]", :key => log.key) @logger.debug("objects[] length is: ", :length => objects.length) end end end return objects.keys.sort {|a,b| objects[a] <=> objects[b]} end
process_files(queue)
click to toggle source
# File lib/logstash/inputs/s3cloudtrail.rb, line 131 def process_files(queue) objects = list_new_files objects.each do |key| if stop? break else @logger.debug("S3 input processing", :bucket => @bucket, :key => key) process_log(queue, key) end end end
register()
click to toggle source
# File lib/logstash/inputs/s3cloudtrail.rb, line 59 def register require "fileutils" require "digest/md5" require "aws-sdk-resources" @logger.info("Registering s3cloudtrail input", :bucket => @bucket, :region => @region) s3 = get_s3object @s3bucket = s3.bucket(@bucket) unless @backup_to_bucket.nil? @backup_bucket = s3.bucket(@backup_to_bucket) begin s3.client.head_bucket({ :bucket => @backup_to_bucket}) rescue Aws::S3::Errors::NoSuchBucket s3.create_bucket({ :bucket => @backup_to_bucket}) end end unless @backup_to_dir.nil? Dir.mkdir(@backup_to_dir, 0700) unless File.exists?(@backup_to_dir) end FileUtils.mkdir_p(@temporary_directory) unless Dir.exist?(@temporary_directory) end
run(queue)
click to toggle source
# File lib/logstash/inputs/s3cloudtrail.rb, line 87 def run(queue) @current_thread = Thread.current Stud.interval(@interval) do process_files(queue) end end
stop()
click to toggle source
# File lib/logstash/inputs/s3cloudtrail.rb, line 145 def stop # @current_thread is initialized in the `#run` method, # this variable is needed because the `#stop` is a called in another thread # than the `#run` method and requiring us to call stop! with a explicit thread. Stud.stop!(@current_thread) end
Private Instance Methods
delete_file_from_bucket(object)
click to toggle source
# File lib/logstash/inputs/s3cloudtrail.rb, line 339 def delete_file_from_bucket(object) if @delete and @backup_to_bucket.nil? object.delete() end end
download_remote_file(remote_object, local_filename)
click to toggle source
Stream the remove file to the local disk
@param [S3Object] Reference to the remove S3 objec to download @param [String] The Temporary filename to stream to. @return [Boolean] True if the file was completely downloaded
# File lib/logstash/inputs/s3cloudtrail.rb, line 326 def download_remote_file(remote_object, local_filename) completed = false @logger.debug("S3Cloudtrail input: Download remote file", :remote_key => remote_object.key, :local_filename => local_filename) File.open(local_filename, 'wb') do |s3file| return completed if stop? remote_object.get(:response_target => s3file) end completed = true return completed end
event_is_metadata?(event)
click to toggle source
# File lib/logstash/inputs/s3cloudtrail.rb, line 199 def event_is_metadata?(event) return false if event.get("message").nil? line = event.get("message") version_metadata?(line) || fields_metadata?(line) end
fields_metadata?(line)
click to toggle source
# File lib/logstash/inputs/s3cloudtrail.rb, line 211 def fields_metadata?(line) line.start_with?('#Fields: ') end
get_s3object()
click to toggle source
# File lib/logstash/inputs/s3cloudtrail.rb, line 346 def get_s3object s3 = Aws::S3::Resource.new(aws_options_hash) end
gzip?(filename)
click to toggle source
# File lib/logstash/inputs/s3cloudtrail.rb, line 263 def gzip?(filename) filename.end_with?('.gz') end
ignore_filename?(filename)
click to toggle source
# File lib/logstash/inputs/s3cloudtrail.rb, line 285 def ignore_filename?(filename) if @prefix == filename return true elsif filename.end_with?("/") return true elsif (@backup_add_prefix && @backup_to_bucket == @bucket && filename =~ /^#{backup_add_prefix}/) return true elsif @exclude_pattern.nil? return false elsif filename =~ Regexp.new(@exclude_pattern) return true else return false end end
process_local_log(queue, filename)
click to toggle source
Read the content of the local file
@param [Queue] Where to push the event @param [String] Which file to read from @return [Boolean] True if the file was completely read, false otherwise.
# File lib/logstash/inputs/s3cloudtrail.rb, line 159 def process_local_log(queue, filename) @logger.debug('Processing file', :filename => filename) metadata = {} # Currently codecs operates on bytes instead of stream. # So all IO stuff: decompression, reading need to be done in the actual # input and send as bytes to the codecs. read_file(filename) do |line| if stop? @logger.warn("Logstash S3Cloudtrail input, stop reading in the middle of the file, we will read it again when logstash is started") return false end @codec.decode(line) do |event| # We are making an assumption concerning cloudfront # log format, the user will use the plain or the line codec # and the message key will represent the actual line content. # If the event is only metadata the event will be drop. # This was the behavior of the pre 1.5 plugin. # # The line need to go through the codecs to replace # unknown bytes in the log stream before doing a regexp match or # you will get a `Error: invalid byte sequence in UTF-8' if event_is_metadata?(event) @logger.debug('Event is metadata, updating the current cloudfront metadata', :event => event) update_metadata(metadata, event) else decorate(event) event.set("cloudfront_version", metadata[:cloudfront_version]) unless metadata[:cloudfront_version].nil? event.set("cloudfront_fields", metadata[:cloudfront_fields]) unless metadata[:cloudfront_fields].nil? queue << event end end end return true end
process_log(queue, key)
click to toggle source
# File lib/logstash/inputs/s3cloudtrail.rb, line 302 def process_log(queue, key) object = @s3bucket.object(key) filename = File.join(temporary_directory, File.basename(key)) if download_remote_file(object, filename) if process_local_log(queue, filename) lastmod = object.last_modified backup_to_bucket(object) backup_to_dir(filename) delete_file_from_bucket(object) FileUtils.remove_entry_secure(filename, true) sincedb.write(lastmod) end else FileUtils.remove_entry_secure(filename, true) end end
read_file(filename, &block)
click to toggle source
# File lib/logstash/inputs/s3cloudtrail.rb, line 229 def read_file(filename, &block) if gzip?(filename) read_gzip_file(filename, block) else read_plain_file(filename, block) end end
read_gzip_file(filename, block)
click to toggle source
# File lib/logstash/inputs/s3cloudtrail.rb, line 244 def read_gzip_file(filename, block) begin Zlib::GzipReader.open(filename) do |decoder| lines = decoder.read.split("\n") lines.each do |line| block.call(line) end end #Zlib::GzipReader.open(filename) do |decoder| # decoder.each_line { |line| block.call(line) } #end rescue Zlib::Error, Zlib::GzipFile::Error => e @logger.error("Gzip codec: We cannot uncompress the gzip file", :filename => filename) raise e end end
read_plain_file(filename, block)
click to toggle source
# File lib/logstash/inputs/s3cloudtrail.rb, line 237 def read_plain_file(filename, block) File.open(filename, 'rb') do |file| file.each(&block) end end
sincedb()
click to toggle source
# File lib/logstash/inputs/s3cloudtrail.rb, line 268 def sincedb @sincedb ||= if @sincedb_path.nil? @logger.info("Using default generated file for the sincedb", :filename => sincedb_file) SinceDB::File.new(sincedb_file) else @logger.info("Using the provided sincedb_path", :sincedb_path => @sincedb_path) SinceDB::File.new(@sincedb_path) end end
sincedb_file()
click to toggle source
# File lib/logstash/inputs/s3cloudtrail.rb, line 280 def sincedb_file File.join(ENV["HOME"], ".sincedb_" + Digest::MD5.hexdigest("#{@bucket}+#{@prefix}")) end
update_metadata(metadata, event)
click to toggle source
# File lib/logstash/inputs/s3cloudtrail.rb, line 216 def update_metadata(metadata, event) line = event.get('message').strip if version_metadata?(line) metadata[:cloudfront_version] = line.split(/#Version: (.+)/).last end if fields_metadata?(line) metadata[:cloudfront_fields] = line.split(/#Fields: (.+)/).last end end
version_metadata?(line)
click to toggle source
# File lib/logstash/inputs/s3cloudtrail.rb, line 206 def version_metadata?(line) line.start_with?('#Version: ') end