class LogStash::Outputs::GoogleCloudStorage
Summary: plugin to upload log events to Google Cloud Storage (GCS), rolling files based on the date pattern provided as a configuration setting. Events are written to files locally and, once file is closed, this plugin uploads it to the configured bucket.
For more info on Google Cloud Storage, please go to: cloud.google.com/products/cloud-storage
In order to use this plugin, a Google service account must be used. For more information, please refer to: developers.google.com/storage/docs/authentication#service_accounts
Recommendation: experiment with the settings depending on how much log data you generate, so the uploader can keep up with the generated logs. Using gzip output can be a good option to reduce network traffic when uploading the log files and in terms of storage costs as well.
USAGE: This is an example of logstash config:
- source,json
output {
google_cloud_storage { bucket => "my_bucket" (required) key_path => "/path/to/privatekey.p12" (required) key_password => "notasecret" (optional) service_account => "1234@developer.gserviceaccount.com" (required) temp_directory => "/tmp/logstash-gcs" (optional) log_file_prefix => "logstash_gcs" (optional) max_file_size_kbytes => 1024 (optional) output_format => "plain" (optional) date_pattern => "%Y-%m-%dT%H:00" (optional) flush_interval_secs => 2 (optional) gzip => false (optional) gzip_content_encoding => false (optional) uploader_interval_secs => 60 (optional) upload_synchronous => false (optional) }
}
Improvements TODO list:
-
Support logstash event variables to determine filename.
-
Turn Google API code into a Plugin Mixin (like AwsConfig).
-
There's no recover method, so if logstash/plugin crashes, files may not
be uploaded to GCS.
-
Allow user to configure file name.
-
Attributes
Public Instance Methods
# File lib/logstash/outputs/google_cloud_storage.rb, line 184 def close @logger.debug('Stopping the plugin, uploading the remaining files.') Thread.kill(@uploader_thread) unless @uploader_thread.nil? # Force rotate the log. If it contains data it will be submitted # to the work pool and will be uploaded before the plugin stops. @log_rotater.rotate_log! @workers.stop! end
Method called for each log event. It writes the event to the current output file, flushing depending on flush interval configuration.
# File lib/logstash/outputs/google_cloud_storage.rb, line 171 def receive(event) @logger.debug('Received event', :event => event) if @output_format == 'json' message = LogStash::Json.dump(event.to_hash) else message = event.to_s end @log_rotater.writeln(message) end
# File lib/logstash/outputs/google_cloud_storage.rb, line 152 def register @logger.debug('Registering Google Cloud Storage plugin') @workers = LogStash::Outputs::Gcs::WorkerPool.new(@max_concurrent_uploads, @upload_synchronous) initialize_temp_directory initialize_path_factory initialize_log_rotater initialize_google_client start_uploader @content_type = @gzip ? 'application/gzip' : 'text/plain' @content_encoding = @gzip_content_encoding ? 'gzip' : 'identity' end
Private Instance Methods
Initializes Google Client instantiating client and authorizing access.
# File lib/logstash/outputs/google_cloud_storage.rb, line 235 def initialize_google_client require "google/api_client" require "openssl" @client = Google::APIClient.new(:application_name => 'Logstash Google Cloud Storage output plugin', :application_version => '0.1') @storage = @client.discovered_api('storage', 'v1') key = Google::APIClient::PKCS12.load_key(@key_path, @key_password) service_account = Google::APIClient::JWTAsserter.new(@service_account, 'https://www.googleapis.com/auth/devstorage.read_write', key) @client.authorization = service_account.authorize end
# File lib/logstash/outputs/google_cloud_storage.rb, line 292 def initialize_log_rotater max_file_size = @max_file_size_kbytes * 1024 @log_rotater = LogStash::Outputs::Gcs::LogRotate.new(@path_factory, max_file_size, @gzip, @flush_interval_secs, @gzip_content_encoding) @log_rotater.on_rotate do |filename| @logger.info("Rotated out file: #{filename}") @workers.post do upload_and_delete(filename) end end end
# File lib/logstash/outputs/google_cloud_storage.rb, line 212 def initialize_path_factory @path_factory = LogStash::Outputs::Gcs::PathFactoryBuilder.build do |builder| builder.set_directory @temp_directory builder.set_prefix @log_file_prefix builder.set_include_host @include_hostname builder.set_date_pattern @date_pattern builder.set_include_part(@max_file_size_kbytes > 0) builder.set_include_uuid @include_uuid builder.set_is_gzipped @gzip end end
Creates temporary directory, if it does not exist.
A random suffix is appended to the temporary directory
# File lib/logstash/outputs/google_cloud_storage.rb, line 200 def initialize_temp_directory require "stud/temporary" if @temp_directory.empty? @temp_directory = Stud::Temporary.directory('logstash-gcs') end FileUtils.mkdir_p(@temp_directory) unless File.directory?(@temp_directory) @logger.info("Using temporary directory: #{@temp_directory}") end
start_uploader
periodically sends flush events through the log rotater
# File lib/logstash/outputs/google_cloud_storage.rb, line 225 def start_uploader @uploader_thread = Thread.new do Stud.interval(@uploader_interval_secs) do @log_rotater.writeln(nil) end end end
# File lib/logstash/outputs/google_cloud_storage.rb, line 279 def upload_and_delete(filename) file_size = File.stat(filename).size if file_size > 0 upload_object(filename) else @logger.debug('File size is zero, skip upload.', :filename => filename) end @logger.debug('Delete local temporary file', :filename => filename) File.delete(filename) end
Uploads a local file to the configured bucket.
# File lib/logstash/outputs/google_cloud_storage.rb, line 253 def upload_object(filename) begin @logger.debug("GCS: upload object.", :filename => filename) media = Google::APIClient::UploadIO.new(filename, @content_type) metadata_insert_result = @client.execute(:api_method => @storage.objects.insert, :parameters => { 'uploadType' => 'multipart', 'bucket' => @bucket, 'contentEncoding' => @content_encoding, 'name' => File.basename(filename) }, :body_object => {contentType: @content_type}, :media => media) contents = metadata_insert_result.data @logger.debug("GCS: multipart insert", :object => contents.name, :self_link => contents.self_link) rescue => e @logger.error("GCS: failed to upload file", :exception => e) # TODO(rdc): limit retries? sleep 1 retry end end