class LogStash::Inputs::GoogleCloudStorage

Generate a repeating message.

This plugin is intented only as an example.

Public Instance Methods

backup_to_bucket(filename) click to toggle source
# File lib/logstash/inputs/google_cloud_storage.rb, line 136
def backup_to_bucket(filename)
  unless @backup_to_bucket.nil?
    backup_key = "#{@backup_add_prefix}#{filename}"

    @logger.debug ("GCS input: bck_up object " + backup_key)

    result = @client.execute(
        api_method: @gcs.objects.copy,
        parameters: {destinationBucket: @backup_to_bucket, destinationObject:backup_key, sourceBucket: @bucket, sourceObject: filename }
    )

    if @delete
      result = @client.execute(
          api_method: @gcs.objects.delete,
          parameters: { bucket: @bucket, object: filename }
      )
    end
  end
end
backup_to_dir(filename) click to toggle source
# File lib/logstash/inputs/google_cloud_storage.rb, line 157
def backup_to_dir(filename)
  unless @backup_to_dir.nil?
    FileUtils.cp(filename, @backup_to_dir)
  end
end
list_new_files() click to toggle source
# File lib/logstash/inputs/google_cloud_storage.rb, line 104
def list_new_files


  @logger.debug("GCS input: Polling new objects from bucket "+ @bucket)

  objects = @client.execute(
      api_method: @gcs.objects.list,
      parameters: {bucket: @bucket}
  )

  logFiles = {}

  objects.data.items.each do |file|

    unless ignore_filename?(file.name)
      if sincedb.newer?(file.updated)
        logFiles[file.name] = file.updated
        @logger.info("GCS input: Adding to objects[]", :name => file.name)
        @logger.debug("GCS input: objects[] length is: ", :length => logFiles.length)
      end
    end

  end


  return logFiles.keys.sort {|a,b| logFiles[a] <=> logFiles[b]}
end
process_files(queue) click to toggle source
# File lib/logstash/inputs/google_cloud_storage.rb, line 165
def process_files(queue)
  objects = list_new_files

  objects.each do |filename|
    if stop?
      break
    else
      @logger.debug("GCS input: processing ", :bucket => @bucket, :filename => filename)
      process_log(queue, filename)
    end
  end
end
register() click to toggle source
# File lib/logstash/inputs/google_cloud_storage.rb, line 64
def register
  @host = Socket.gethostname

  require "fileutils"
  require "digest/md5"
  require "google/api_client"
  require "openssl"

  @logger.debug("Registering GCS input", :bucket => @bucket, :project => @project, :keyfile => @keyfile)
  # initialize_google_client
  @logger.info ("\n===========================GCS INPUT PLUG-IN===========================")
  @client = Google::APIClient.new(:application_name =>
                                      'Logstash Google Cloud Storage input plugin',
                                  :application_version => '0.1')

  key = Google::APIClient::PKCS12.load_key(@keyfile, @key_password)
  service_account = Google::APIClient::JWTAsserter.new(@service_account,
                                                       'https://www.googleapis.com/auth/devstorage.read_write',
                                                       key)
  @client.authorization = service_account.authorize
  @gcs = @client.discovered_api('storage', 'v1')


  unless @backup_to_dir.nil?
    Dir.mkdir(@backup_to_dir, 0700) unless File.exists?(@backup_to_dir)
  end

  FileUtils.mkdir_p(@temporary_directory) unless Dir.exist?(@temporary_directory)
end
run(queue) click to toggle source
# File lib/logstash/inputs/google_cloud_storage.rb, line 94
def run(queue)
  @current_thread = Thread.current
  Stud.interval(@interval) do
    process_files(queue)
  end
end
stop() click to toggle source
# File lib/logstash/inputs/google_cloud_storage.rb, line 181
def stop
  # @current_thread is initialized in the `#run` method,
  # this variable is needed because the `#stop` is a called in another thread
  # than the `#run` method and requiring us to call stop! with a explicit thread.
  Stud.stop!(@current_thread)
end

Private Instance Methods

delete_file_from_bucket(object) click to toggle source
# File lib/logstash/inputs/google_cloud_storage.rb, line 387
def delete_file_from_bucket(object)
  if @delete and @backup_to_bucket.nil?
    object.delete()
  end
end
download_remote_file(remote_object, local_filename) click to toggle source

Stream the remove file to the local disk

@param [GCS_OBJECT] Data from GCS object to download @param [String] The Temporary filename to stream to. @return [Boolean] True if the file was completely downloaded

# File lib/logstash/inputs/google_cloud_storage.rb, line 375
def download_remote_file(remote_object, local_filename)
  completed = false

  ##TODO NEED CHANGE IN WRITE LOGIC TO AVOID MEMORY ISSUES

  File.open(local_filename, 'wb') { |file| file.write(remote_object) }
  completed = true

  return completed
end
event_is_metadata?(event) click to toggle source
# File lib/logstash/inputs/google_cloud_storage.rb, line 236
def event_is_metadata?(event)
  return false unless event.get("message").class == String
  line = event.get("message")
  version_metadata?(line) || fields_metadata?(line)
end
fields_metadata?(line) click to toggle source
# File lib/logstash/inputs/google_cloud_storage.rb, line 248
def fields_metadata?(line)
  line.start_with?('#Fields: ')
end
gzip?(filename) click to toggle source
# File lib/logstash/inputs/google_cloud_storage.rb, line 299
def gzip?(filename)
  filename.end_with?('.gz')
end
ignore_filename?(filename) click to toggle source
# File lib/logstash/inputs/google_cloud_storage.rb, line 321
def ignore_filename?(filename)
  if @prefix == filename
    return true
  elsif filename.end_with?("/")
    return true
  elsif (@backup_add_prefix && @backup_to_bucket == @bucket && filename =~ /^#{backup_add_prefix}/)
    return true
  elsif @exclude_pattern.nil?
    return false
  elsif filename =~ Regexp.new(@exclude_pattern)
    return true
  else
    return false
  end
end
process_local_log(queue, filename, key) click to toggle source

Read the content of the local file

@param [Queue] Where to push the event @param [String] Which file to read from @return [Boolean] True if the file was completely read, false otherwise.

# File lib/logstash/inputs/google_cloud_storage.rb, line 196
def process_local_log(queue, filename, key)
  @logger.debug('GCS input: processing file ', :filename => filename)
  metadata = {}
  # Currently codecs operates on bytes instead of stream.
  # So all IO stuff: decompression, reading need to be done in the actual
  # input and send as bytes to the codecs.
  read_file(filename) do |line|
    if stop?
      @logger.warn("Logstash GCS input, stop reading in the middle of the file, we will read it again when logstash is started")
      return false
    end

    @codec.decode(line) do |event|
      # We are making an assumption concerning cloudfront
      # log format, the user will use the plain or the line codec
      # and the message key will represent the actual line content.
      # If the event is only metadata the event will be drop.
      # This was the behavior of the pre 1.5 plugin.
      #
      # The line need to go through the codecs to replace
      # unknown bytes in the log stream before doing a regexp match or
      # you will get a `Error: invalid byte sequence in UTF-8'
      #if event_is_metadata?(event)
      #  @logger.debug('Event is metadata, updating the current cloudfront metadata', :event => event)
      #  update_metadata(metadata, event)
      #else

        event.set("[@metadata][gcs]", { "key" => key })
        event.set("[@metadata][host]", @host)
        decorate(event)
        queue << event
      ##end
    end
  end

  return true
end
process_log(queue, key) click to toggle source
# File lib/logstash/inputs/google_cloud_storage.rb, line 338
def process_log(queue, key)

  object = @client.execute(
      api_method: @gcs.objects.get,
      parameters: {bucket: @bucket, object: key}
  )

  objectdata = @client.execute(
      api_method: @gcs.objects.get,
      parameters: {bucket: @bucket, object: key, alt: 'media'}
  )


  filename = File.join(temporary_directory, File.basename(key))

  if download_remote_file(objectdata.body, filename)

    if process_local_log(queue, filename, key)
      lastmod = object.data['updated']
      backup_to_bucket(key)
      backup_to_dir(filename)
      delete_file_from_bucket(key)
      FileUtils.remove_entry_secure(filename, true)
      sincedb.write(lastmod)

    end
  else
    FileUtils.remove_entry_secure(filename, true)
  end
end
read_file(filename, &block) click to toggle source
# File lib/logstash/inputs/google_cloud_storage.rb, line 266
def read_file(filename, &block)
  if gzip?(filename)
    read_gzip_file(filename, block)
  else
    read_plain_file(filename, block)
  end
end
read_gzip_file(filename, block) click to toggle source
# File lib/logstash/inputs/google_cloud_storage.rb, line 281
def read_gzip_file(filename, block)
  # Details about multiple streams and the usage of unused from: http://code.activestate.com/lists/ruby-talk/11168/
  File.open(filename) do |zio|
    while true do
      io = Zlib::GzipReader.new(zio)
      io.each_line { |line| block.call(line) }
      unused = io.unused
      io.finish
      break if unused.nil?
      zio.pos -= unused.length # reset the position to the other block in the stream
    end
  end
rescue Zlib::Error, Zlib::GzipFile::Error => e
  @logger.error("Gzip codec: We cannot uncompress the gzip file", :filename => filename)
  raise e
end
read_plain_file(filename, block) click to toggle source
# File lib/logstash/inputs/google_cloud_storage.rb, line 274
def read_plain_file(filename, block)
  File.open(filename, 'rb') do |file|
    file.each(&block)
  end
end
sincedb() click to toggle source
# File lib/logstash/inputs/google_cloud_storage.rb, line 304
def sincedb
  @sincedb ||= if @sincedb_path.nil?
                 @logger.debug("Using default generated file for the sincedb", :filename => sincedb_file)
                 SinceDB::File.new(sincedb_file)
               else
                 @logger.debug("Using the provided sincedb_path",
                              :sincedb_path => @sincedb_path)
                 SinceDB::File.new(@sincedb_path)
               end
end
sincedb_file() click to toggle source
# File lib/logstash/inputs/google_cloud_storage.rb, line 316
def sincedb_file
  File.join(ENV["HOME"], ".sincedb_" + Digest::MD5.hexdigest("#{@bucket}+#{@prefix}"))
end
update_metadata(metadata, event) click to toggle source
# File lib/logstash/inputs/google_cloud_storage.rb, line 253
def update_metadata(metadata, event)
  line = event.get('message').strip

  if version_metadata?(line)
    metadata[:cloudfront_version] = line.split(/#Version: (.+)/).last
  end

  if fields_metadata?(line)
    metadata[:cloudfront_fields] = line.split(/#Fields: (.+)/).last
  end
end
version_metadata?(line) click to toggle source
# File lib/logstash/inputs/google_cloud_storage.rb, line 243
def version_metadata?(line)
  line.start_with?('#Version: ')
end