class LogStash::Outputs::GoogleCloudStorage

This output writes events to files on disk. You can use fields from the event as parts of the filename and/or path.

By default, this output writes one event per line in json format. You can customise the line format using the `line` codec like

source,ruby

output {

file {
  path => ...
  codec => line { format => "custom format: %{message}"}
}

}

Constants

FIELD_REF

Attributes

failure_path[R]

Public Instance Methods

close() click to toggle source
# File lib/logstash/outputs/googlecloudstorage.rb, line 235
def close
  @flusher.stop unless @flusher.nil?
  @io_mutex.synchronize do
    @logger.debug("Close: closing files")

    @files.each do |path, fd|
      begin
        fd.close
        @logger.debug("Closed file #{path}", :fd => fd)
      rescue Exception => e
        @logger.error("Exception while flushing and closing files.", :exception => e)
      end
    end
  end
end
multi_receive_encoded(events_and_encoded) click to toggle source
# File lib/logstash/outputs/googlecloudstorage.rb, line 207
def multi_receive_encoded(events_and_encoded)
  encoded_by_path = Hash.new {|h,k| h[k] = []}

  events_and_encoded.each do |event,encoded|
    file_output_path = event_path(event)
    encoded_by_path[file_output_path] << encoded
  end

  @io_mutex.synchronize do
    encoded_by_path.each do |path,chunks|
      fd = open(path)
      if @write_behavior == "overwrite"
        fd.truncate(0)
        fd.seek(0, IO::SEEK_SET)
        fd.write(chunks.last)
      else
        # append to the file
        chunks.each {|chunk| fd.write(chunk + "\n") }
      end
      fd.flush unless @flusher && @flusher.alive?
      upload_object(fd.path) unless @flusher && @flusher.alive?
    end

    close_stale_files
  end
end
register() click to toggle source
# File lib/logstash/outputs/googlecloudstorage.rb, line 119
def register
  require "fileutils" # For mkdir_p

  @files = {}
  @io_mutex = Mutex.new

  @path = File.expand_path(path)

  validate_path

  if path_with_field_ref?
    @file_root = extract_file_root
  else
    @file_root = File.dirname(path)
  end
  @failure_path = File.join(@file_root, @filename_failure)

  @flush_interval = @flush_interval.to_i
  if @flush_interval > 0
    @flusher = Interval.start(@flush_interval, -> { flush_pending_files })
  end

  @content_type = @gzip ? 'application/gzip' : 'text/plain'
  @content_encoding = @gzip_content_encoding ? 'gzip' : 'identity'

  @last_stale_cleanup_cycle = Time.now
  @stale_cleanup_interval = 10
end

Private Instance Methods

cached?(path) click to toggle source
# File lib/logstash/outputs/googlecloudstorage.rb, line 324
def cached?(path)
  @files.include?(path) && !@files[path].nil?
end
close_stale_files() click to toggle source

every 10 seconds or so (triggered by events, but if there are no events there's no point closing files anyway)

# File lib/logstash/outputs/googlecloudstorage.rb, line 306
def close_stale_files
  now = Time.now
  return unless now - @last_stale_cleanup_cycle >= @stale_cleanup_interval

  @logger.debug("Starting stale files cleanup cycle", :files => @files)
  inactive_files = @files.select { |path, fd| not fd.active }
  @logger.debug("%d stale files found" % inactive_files.count, :inactive_files => inactive_files)
  inactive_files.each do |path, fd|
    @logger.info("Closing file %s" % path)
    fd.close
    @files.delete(path)
  end
  # mark all files as inactive, a call to write will mark them as active again
  @files.each { |path, fd| fd.active = false }
  @last_stale_cleanup_cycle = now
end
deleted?(path) click to toggle source
# File lib/logstash/outputs/googlecloudstorage.rb, line 329
def deleted?(path)
  !File.exist?(path)
end
event_path(event) click to toggle source
# File lib/logstash/outputs/googlecloudstorage.rb, line 258
def event_path(event)
  file_output_path = generate_filepath(event)
  if path_with_field_ref? && !inside_file_root?(file_output_path)
    @logger.warn("GoogleCloudStorage: the event tried to write outside the files root, writing the event to the failure file",  :event => event, :filename => @failure_path)
    file_output_path = @failure_path
  elsif !@create_if_deleted && deleted?(file_output_path)
    file_output_path = @failure_path
  end
  @logger.debug("GoogleCloudStorage, writing event to file.", :filename => file_output_path)

  file_output_path
end
extract_file_root() click to toggle source
# File lib/logstash/outputs/googlecloudstorage.rb, line 282
def extract_file_root
  parts = File.expand_path(path).split(File::SEPARATOR)
  parts.take_while { |part| part !~ FIELD_REF }.join(File::SEPARATOR)
end
flush_pending_files() click to toggle source

the back-bone of @flusher, our periodic-flushing interval.

# File lib/logstash/outputs/googlecloudstorage.rb, line 289
def flush_pending_files
  @io_mutex.synchronize do
    @logger.debug("Starting flush cycle")

    @files.each do |path, fd|
      @logger.debug("Flushing file", :path => path, :fd => fd)
      fd.flush
      upload_object(fd.path)
    end
  end
rescue => e
  # squash exceptions caught while flushing after logging them
  @logger.error("Exception flushing files", :exception => e.message, :backtrace => e.backtrace)
end
generate_filepath(event) click to toggle source
# File lib/logstash/outputs/googlecloudstorage.rb, line 272
def generate_filepath(event)
  event.sprintf(@path)
end
initialize_google_client() click to toggle source
# File lib/logstash/outputs/googlecloudstorage.rb, line 156
def initialize_google_client
  require "google/api_client"
  require "openssl"

  @client = Google::APIClient.new(:application_name => 'Logstash Google Cloud Storage output plugin', :application_version => '0.1')
  @storage = @client.discovered_api('storage', 'v1')

  key = Google::APIClient::PKCS12.load_key(@key_path, @key_password)
  service_account = Google::APIClient::JWTAsserter.new(@service_account, 'https://www.googleapis.com/auth/devstorage.read_write', key)
  @client.authorization = service_account.authorize
end
inside_file_root?(log_path) click to toggle source
# File lib/logstash/outputs/googlecloudstorage.rb, line 252
def inside_file_root?(log_path)
  target_file = File.expand_path(log_path)
  return target_file.start_with?("#{@file_root.to_s}/")
end
open(path) click to toggle source
# File lib/logstash/outputs/googlecloudstorage.rb, line 334
def open(path)
  if !deleted?(path) && cached?(path)
    return @files[path]
  end

  if deleted?(path)
    if @create_if_deleted
      @logger.debug("Required path was deleted, creating the file again", :path => path)
      @files.delete(path)
    else
      return @files[path] if cached?(path)
    end
  end

  @logger.info("Opening file", :path => path)

  dir = File.dirname(path)
  if !Dir.exist?(dir)
    @logger.info("Creating directory", :directory => dir)
    if @dir_mode != -1
      FileUtils.mkdir_p(dir, :mode => @dir_mode)
    else
      FileUtils.mkdir_p(dir)
    end
  end

  # work around a bug opening fifos (bug JRUBY-6280)
  stat = File.stat(path) rescue nil
  if stat && stat.ftype == "fifo"
    fd = java.io.FileWriter.new(java.io.File.new(path))
  else
    if @file_mode != -1
      fd = File.new(path, "a+", @file_mode)
    else
      fd = File.new(path, "a+")
    end
  end
  if gzip
    fd = Zlib::GzipWriter.new(fd)
  end
  @files[path] = IOWriter.new(fd)
end
path_with_field_ref?() click to toggle source
# File lib/logstash/outputs/googlecloudstorage.rb, line 277
def path_with_field_ref?
  path =~ FIELD_REF
end
root_directory() click to toggle source
# File lib/logstash/outputs/googlecloudstorage.rb, line 196
def root_directory
  parts = @path.split(File::SEPARATOR).select { |item| !item.empty?  }
  if Gem.win_platform?
    # First part is the drive letter
    parts[1]
  else
    parts.first
  end
end
upload_object(filename) click to toggle source

Uploads a local file to the configured bucket.

# File lib/logstash/outputs/googlecloudstorage.rb, line 169
def upload_object(filename)
  begin
    @logger.debug("GCS: upload object.", :filename => filename)

    media = Google::APIClient::UploadIO.new(filename, @content_type)
    metadata_insert_result = @client.execute(:api_method => @storage.objects.insert,
                                             :parameters => {
                                               'uploadType' => 'multipart',
                                               'bucket' => @bucket,
                                               'contentEncoding' => @content_encoding,
                                               'name' => File.basename(filename)
                                             },
                                             :body_object => {contentType: @content_type},
                                             :media => media)
    contents = metadata_insert_result.data
    @logger.debug("GCS: multipart insert",
                  :object => contents.name,
                  :self_link => contents.self_link)
  rescue => e
    @logger.error("GCS: failed to upload file", :exception => e)
    # TODO(rdc): limit retries?
    sleep 1
    retry
  end
end
validate_path() click to toggle source
# File lib/logstash/outputs/googlecloudstorage.rb, line 149
def validate_path
  if (root_directory =~ FIELD_REF) != nil
    @logger.error("GoogleCloudStorage: The starting part of the path should not be dynamic.", :path => @path)
    raise LogStash::ConfigurationError.new("The starting part of the path should not be dynamic.")
  end
end