class LogStash::Inputs::S3

Stream events from files from a S3 bucket.

Each line from each file generates an event. Files ending in `.gz` are handled as gzip'ed files.

Constants

CredentialConfig

Public Instance Methods

backup_to_bucket(object) click to toggle source
# File lib/logstash/inputs/s3.rb, line 207
def backup_to_bucket(object)
  unless @backup_to_bucket.nil?
    backup_key = "#{@backup_add_prefix}#{object.key}"
    @backup_bucket.object(backup_key).copy_from(:copy_source => "#{object.bucket_name}/#{object.key}")
    if @delete
      object.delete()
    end
  end
end
backup_to_dir(filename) click to toggle source
# File lib/logstash/inputs/s3.rb, line 217
def backup_to_dir(filename)
  unless @backup_to_dir.nil?
    FileUtils.cp(filename, @backup_to_dir)
  end
end
list_new_files() click to toggle source
# File lib/logstash/inputs/s3.rb, line 180
def list_new_files
  objects = {}
  found = false
  begin
    @s3bucket.objects(:prefix => @prefix).each do |log|
      found = true
      @logger.debug('Found key', :key => log.key)
      if ignore_filename?(log.key)
        @logger.debug('Ignoring', :key => log.key)
      elsif log.content_length <= 0
        @logger.debug('Object Zero Length', :key => log.key)
      elsif !sincedb.newer?(log.last_modified)
        @logger.debug('Object Not Modified', :key => log.key)
      elsif (log.storage_class == 'GLACIER' || log.storage_class == 'DEEP_ARCHIVE') && !file_restored?(log.object)
        @logger.debug('Object Archived to Glacier', :key => log.key)
      else
        objects[log.key] = log.last_modified
        @logger.debug("Added to objects[]", :key => log.key, :length => objects.length)
      end
    end
    @logger.info('No files found in bucket', :prefix => prefix) unless found
  rescue Aws::Errors::ServiceError => e
    @logger.error("Unable to list objects in bucket", :exception => e.class, :message => e.message, :backtrace => e.backtrace, :prefix => prefix)
  end
  objects.keys.sort {|a,b| objects[a] <=> objects[b]}
end
process_files(queue) click to toggle source
# File lib/logstash/inputs/s3.rb, line 223
def process_files(queue)
  objects = list_new_files

  objects.each do |key|
    if stop?
      break
    else
      process_log(queue, key)
    end
  end
end
register() click to toggle source
# File lib/logstash/inputs/s3.rb, line 142
def register
  require "fileutils"
  require "digest/md5"

  @logger.info("Registering", :bucket => @bucket, :region => @region)

  s3 = get_s3object

  @s3bucket = s3.bucket(@bucket)

  unless @backup_to_bucket.nil?
    @backup_bucket = s3.bucket(@backup_to_bucket)
    begin
      s3.client.head_bucket({ :bucket => @backup_to_bucket})
    rescue Aws::S3::Errors::NoSuchBucket
      s3.create_bucket({ :bucket => @backup_to_bucket})
    end
  end

  unless @backup_to_dir.nil?
    Dir.mkdir(@backup_to_dir, 0700) unless File.exists?(@backup_to_dir)
  end

  FileUtils.mkdir_p(@temporary_directory) unless Dir.exist?(@temporary_directory)

  if !@watch_for_new_files && original_params.include?('interval')
    logger.warn("`watch_for_new_files` has been disabled; `interval` directive will be ignored.")
  end
end
run(queue) click to toggle source
# File lib/logstash/inputs/s3.rb, line 172
def run(queue)
  @current_thread = Thread.current
  Stud.interval(@interval) do
    process_files(queue)
    stop unless @watch_for_new_files
  end
end
stop() click to toggle source
# File lib/logstash/inputs/s3.rb, line 235
def stop
  # @current_thread is initialized in the `#run` method,
  # this variable is needed because the `#stop` is a called in another thread
  # than the `#run` method and requiring us to call stop! with a explicit thread.
  Stud.stop!(@current_thread)
end

Private Instance Methods

aws_options_hash() click to toggle source
# File lib/logstash/inputs/s3.rb, line 467
def aws_options_hash
  opts = {}

  if @access_key_id.is_a?(NilClass) ^ @secret_access_key.is_a?(NilClass)
    @logger.warn("Likely config error: Only one of access_key_id or secret_access_key was provided but not both.")
  end

  credential_config = CredentialConfig.new(@access_key_id, @secret_access_key, @session_token, @profile, 0, 1, @region)
  @credentials = Aws::CredentialProviderChain.new(credential_config).resolve

  opts[:credentials] = @credentials

  opts[:http_proxy] = @proxy_uri if @proxy_uri

  if self.respond_to?(:aws_service_endpoint)
    # used by CloudWatch to basically do the same as bellow (returns { region: region })
    opts.merge!(self.aws_service_endpoint(@region))
  else
    # NOTE: setting :region works with the aws sdk (resolves correct endpoint)
    opts[:region] = @region
  end

  if !@endpoint.is_a?(NilClass)
    opts[:endpoint] = @endpoint
  end

  return opts
end
delete_file_from_bucket(object) click to toggle source
# File lib/logstash/inputs/s3.rb, line 461
def delete_file_from_bucket(object)
  if @delete and @backup_to_bucket.nil?
    object.delete()
  end
end
download_remote_file(remote_object, local_filename) click to toggle source

Stream the remove file to the local disk

@param [S3Object] Reference to the remove S3 objec to download @param [String] The Temporary filename to stream to. @return [Boolean] True if the file was completely downloaded

# File lib/logstash/inputs/s3.rb, line 446
def download_remote_file(remote_object, local_filename)
  completed = false
  @logger.debug("Downloading remote file", :remote_key => remote_object.key, :local_filename => local_filename)
  File.open(local_filename, 'wb') do |s3file|
    return completed if stop?
    begin
      remote_object.get(:response_target => s3file)
      completed = true
    rescue Aws::Errors::ServiceError => e
      @logger.warn("Unable to download remote file", :exception => e.class, :message => e.message, :remote_key => remote_object.key)
    end
  end
  completed
end
event_is_metadata?(event) click to toggle source
# File lib/logstash/inputs/s3.rb, line 301
def event_is_metadata?(event)
  return false unless event.get("message").class == String
  line = event.get("message")
  version_metadata?(line) || fields_metadata?(line)
end
fields_metadata?(line) click to toggle source
# File lib/logstash/inputs/s3.rb, line 311
def fields_metadata?(line)
  line.start_with?('#Fields: ')
end
file_restored?(object) click to toggle source
# File lib/logstash/inputs/s3.rb, line 501
def file_restored?(object)
  begin
    restore = object.data.restore
    if restore && restore.match(/ongoing-request\s?=\s?["']false["']/)
      if restore = restore.match(/expiry-date\s?=\s?["'](.*?)["']/)
        expiry_date = DateTime.parse(restore[1])
        return true if DateTime.now < expiry_date # restored
      else
        @logger.debug("No expiry-date header for restore request: #{object.data.restore}")
        return nil # no expiry-date found for ongoing request
      end
    end
  rescue => e
    @logger.debug("Could not determine Glacier restore status", :exception => e.class, :message => e.message)
  end
  return false
end
get_s3object() click to toggle source
# File lib/logstash/inputs/s3.rb, line 496
def get_s3object
  options = symbolized_settings.merge(aws_options_hash || {})
  s3 = Aws::S3::Resource.new(options)
end
gzip?(filename) click to toggle source
# File lib/logstash/inputs/s3.rb, line 360
def gzip?(filename)
  Regexp.new(@gzip_pattern).match(filename)
end
ignore_filename?(filename) click to toggle source
# File lib/logstash/inputs/s3.rb, line 406
def ignore_filename?(filename)
  if @prefix == filename
    return true
  elsif filename.end_with?("/")
    return true
  elsif (@backup_add_prefix && @backup_to_bucket == @bucket && filename =~ /^#{backup_add_prefix}/)
    return true
  elsif @exclude_pattern.nil?
    return false
  elsif filename =~ Regexp.new(@exclude_pattern)
    return true
  else
    return false
  end
end
process_local_log(queue, filename, object) click to toggle source

Read the content of the local file

@param [Queue] Where to push the event @param [String] Which file to read from @param [S3Object] Source s3 object @return [Boolean] True if the file was completely read, false otherwise.

# File lib/logstash/inputs/s3.rb, line 250
def process_local_log(queue, filename, object)
  @logger.debug('Processing file', :filename => filename)
  metadata = {}
  # Currently codecs operates on bytes instead of stream.
  # So all IO stuff: decompression, reading need to be done in the actual
  # input and send as bytes to the codecs.
  read_file(filename) do |line|
    if stop?
      @logger.warn("Logstash S3 input, stop reading in the middle of the file, we will read it again when logstash is started")
      return false
    end

    @codec.decode(line) do |event|
      # We are making an assumption concerning cloudfront
      # log format, the user will use the plain or the line codec
      # and the message key will represent the actual line content.
      # If the event is only metadata the event will be drop.
      # This was the behavior of the pre 1.5 plugin.
      #
      # The line need to go through the codecs to replace
      # unknown bytes in the log stream before doing a regexp match or
      # you will get a `Error: invalid byte sequence in UTF-8'
      if event_is_metadata?(event)
        @logger.debug('Event is metadata, updating the current cloudfront metadata', :event => event)
        update_metadata(metadata, event)
      else
        decorate(event)

        event.set("cloudfront_version", metadata[:cloudfront_version]) unless metadata[:cloudfront_version].nil?
        event.set("cloudfront_fields", metadata[:cloudfront_fields]) unless metadata[:cloudfront_fields].nil?

        if @include_object_properties
          event.set("[@metadata][s3]", object.data.to_h)
        else
          event.set("[@metadata][s3]", {})
        end

        event.set("[@metadata][s3][key]", object.key)

        queue << event
      end
    end
  end
  # #ensure any stateful codecs (such as multi-line ) are flushed to the queue
  @codec.flush do |event|
    queue << event
  end

  return true
end
process_log(queue, key) click to toggle source
# File lib/logstash/inputs/s3.rb, line 422
def process_log(queue, key)
  @logger.debug("Processing", :bucket => @bucket, :key => key)
  object = @s3bucket.object(key)

  filename = File.join(temporary_directory, File.basename(key))
  if download_remote_file(object, filename)
    if process_local_log(queue, filename, object)
      lastmod = object.last_modified
      backup_to_bucket(object)
      backup_to_dir(filename)
      delete_file_from_bucket(object)
      FileUtils.remove_entry_secure(filename, true)
      sincedb.write(lastmod)
    end
  else
    FileUtils.remove_entry_secure(filename, true)
  end
end
read_file(filename, &block) click to toggle source
# File lib/logstash/inputs/s3.rb, line 327
def read_file(filename, &block)
  if gzip?(filename)
    read_gzip_file(filename, block)
  else
    read_plain_file(filename, block)
  end
rescue => e
  # skip any broken file
  @logger.error("Failed to read file, processing skipped", :exception => e.class, :message => e.message, :filename => filename)
end
read_gzip_file(filename, block) click to toggle source
# File lib/logstash/inputs/s3.rb, line 344
def read_gzip_file(filename, block)
  file_stream = FileInputStream.new(filename)
  gzip_stream = GZIPInputStream.new(file_stream)
  decoder = InputStreamReader.new(gzip_stream, "UTF-8")
  buffered = BufferedReader.new(decoder)

  while (line = buffered.readLine())
    block.call(line)
  end
ensure
  buffered.close unless buffered.nil?
  decoder.close unless decoder.nil?
  gzip_stream.close unless gzip_stream.nil?
  file_stream.close unless file_stream.nil?
end
read_plain_file(filename, block) click to toggle source
# File lib/logstash/inputs/s3.rb, line 338
def read_plain_file(filename, block)
  File.open(filename, 'rb') do |file|
    file.each(&block)
  end
end
sincedb() click to toggle source
# File lib/logstash/inputs/s3.rb, line 364
def sincedb
  @sincedb ||= if @sincedb_path.nil?
                  @logger.info("Using default generated file for the sincedb", :filename => sincedb_file)
                  SinceDB::File.new(sincedb_file)
                else
                  @logger.info("Using the provided sincedb_path", :sincedb_path => @sincedb_path)
                  SinceDB::File.new(@sincedb_path)
                end
end
sincedb_file() click to toggle source
# File lib/logstash/inputs/s3.rb, line 374
def sincedb_file
  digest = Digest::MD5.hexdigest("#{@bucket}+#{@prefix}")
  dir = File.join(LogStash::SETTINGS.get_value("path.data"), "plugins", "inputs", "s3")
  FileUtils::mkdir_p(dir)
  path = File.join(dir, "sincedb_#{digest}")

  # Migrate old default sincedb path to new one.
  if ENV["HOME"]
    # This is the old file path including the old digest mechanism.
    # It remains as a way to automatically upgrade users with the old default ($HOME)
    # to the new default (path.data)
    old = File.join(ENV["HOME"], ".sincedb_" + Digest::MD5.hexdigest("#{@bucket}+#{@prefix}"))
    if File.exist?(old)
      logger.info("Migrating old sincedb in $HOME to {path.data}")
      FileUtils.mv(old, path)
    end
  end

  path
end
symbolize(hash) click to toggle source
# File lib/logstash/inputs/s3.rb, line 399
def symbolize(hash)
  return hash unless hash.is_a?(Hash)
  symbolized = {}
  hash.each { |key, value| symbolized[key.to_sym] = symbolize(value) }
  symbolized
end
symbolized_settings() click to toggle source
# File lib/logstash/inputs/s3.rb, line 395
def symbolized_settings
  @symbolized_settings ||= symbolize(@additional_settings)
end
update_metadata(metadata, event) click to toggle source
# File lib/logstash/inputs/s3.rb, line 315
def update_metadata(metadata, event)
  line = event.get('message').strip

  if version_metadata?(line)
    metadata[:cloudfront_version] = line.split(/#Version: (.+)/).last
  end

  if fields_metadata?(line)
    metadata[:cloudfront_fields] = line.split(/#Fields: (.+)/).last
  end
end
version_metadata?(line) click to toggle source
# File lib/logstash/inputs/s3.rb, line 307
def version_metadata?(line)
  line.start_with?('#Version: ')
end