class LogStash::Inputs::GoogleCloudStorage
Generate a repeating message.
This plugin is intented only as an example.
Public Instance Methods
backup_to_bucket(filename)
click to toggle source
# File lib/logstash/inputs/google_cloud_storage.rb, line 136 def backup_to_bucket(filename) unless @backup_to_bucket.nil? backup_key = "#{@backup_add_prefix}#{filename}" @logger.debug ("GCS input: bck_up object " + backup_key) result = @client.execute( api_method: @gcs.objects.copy, parameters: {destinationBucket: @backup_to_bucket, destinationObject:backup_key, sourceBucket: @bucket, sourceObject: filename } ) if @delete result = @client.execute( api_method: @gcs.objects.delete, parameters: { bucket: @bucket, object: filename } ) end end end
backup_to_dir(filename)
click to toggle source
# File lib/logstash/inputs/google_cloud_storage.rb, line 157 def backup_to_dir(filename) unless @backup_to_dir.nil? FileUtils.cp(filename, @backup_to_dir) end end
list_new_files()
click to toggle source
# File lib/logstash/inputs/google_cloud_storage.rb, line 104 def list_new_files @logger.debug("GCS input: Polling new objects from bucket "+ @bucket) objects = @client.execute( api_method: @gcs.objects.list, parameters: {bucket: @bucket} ) logFiles = {} objects.data.items.each do |file| unless ignore_filename?(file.name) if sincedb.newer?(file.updated) logFiles[file.name] = file.updated @logger.info("GCS input: Adding to objects[]", :name => file.name) @logger.debug("GCS input: objects[] length is: ", :length => logFiles.length) end end end return logFiles.keys.sort {|a,b| logFiles[a] <=> logFiles[b]} end
process_files(queue)
click to toggle source
# File lib/logstash/inputs/google_cloud_storage.rb, line 165 def process_files(queue) objects = list_new_files objects.each do |filename| if stop? break else @logger.debug("GCS input: processing ", :bucket => @bucket, :filename => filename) process_log(queue, filename) end end end
register()
click to toggle source
# File lib/logstash/inputs/google_cloud_storage.rb, line 64 def register @host = Socket.gethostname require "fileutils" require "digest/md5" require "google/api_client" require "openssl" @logger.debug("Registering GCS input", :bucket => @bucket, :project => @project, :keyfile => @keyfile) # initialize_google_client @logger.info ("\n===========================GCS INPUT PLUG-IN===========================") @client = Google::APIClient.new(:application_name => 'Logstash Google Cloud Storage input plugin', :application_version => '0.1') key = Google::APIClient::PKCS12.load_key(@keyfile, @key_password) service_account = Google::APIClient::JWTAsserter.new(@service_account, 'https://www.googleapis.com/auth/devstorage.read_write', key) @client.authorization = service_account.authorize @gcs = @client.discovered_api('storage', 'v1') unless @backup_to_dir.nil? Dir.mkdir(@backup_to_dir, 0700) unless File.exists?(@backup_to_dir) end FileUtils.mkdir_p(@temporary_directory) unless Dir.exist?(@temporary_directory) end
run(queue)
click to toggle source
# File lib/logstash/inputs/google_cloud_storage.rb, line 94 def run(queue) @current_thread = Thread.current Stud.interval(@interval) do process_files(queue) end end
stop()
click to toggle source
# File lib/logstash/inputs/google_cloud_storage.rb, line 181 def stop # @current_thread is initialized in the `#run` method, # this variable is needed because the `#stop` is a called in another thread # than the `#run` method and requiring us to call stop! with a explicit thread. Stud.stop!(@current_thread) end
Private Instance Methods
delete_file_from_bucket(object)
click to toggle source
# File lib/logstash/inputs/google_cloud_storage.rb, line 387 def delete_file_from_bucket(object) if @delete and @backup_to_bucket.nil? object.delete() end end
download_remote_file(remote_object, local_filename)
click to toggle source
Stream the remove file to the local disk
@param [GCS_OBJECT] Data from GCS object to download @param [String] The Temporary filename to stream to. @return [Boolean] True if the file was completely downloaded
# File lib/logstash/inputs/google_cloud_storage.rb, line 375 def download_remote_file(remote_object, local_filename) completed = false ##TODO NEED CHANGE IN WRITE LOGIC TO AVOID MEMORY ISSUES File.open(local_filename, 'wb') { |file| file.write(remote_object) } completed = true return completed end
event_is_metadata?(event)
click to toggle source
# File lib/logstash/inputs/google_cloud_storage.rb, line 236 def event_is_metadata?(event) return false unless event.get("message").class == String line = event.get("message") version_metadata?(line) || fields_metadata?(line) end
fields_metadata?(line)
click to toggle source
# File lib/logstash/inputs/google_cloud_storage.rb, line 248 def fields_metadata?(line) line.start_with?('#Fields: ') end
gzip?(filename)
click to toggle source
# File lib/logstash/inputs/google_cloud_storage.rb, line 299 def gzip?(filename) filename.end_with?('.gz') end
ignore_filename?(filename)
click to toggle source
# File lib/logstash/inputs/google_cloud_storage.rb, line 321 def ignore_filename?(filename) if @prefix == filename return true elsif filename.end_with?("/") return true elsif (@backup_add_prefix && @backup_to_bucket == @bucket && filename =~ /^#{backup_add_prefix}/) return true elsif @exclude_pattern.nil? return false elsif filename =~ Regexp.new(@exclude_pattern) return true else return false end end
process_local_log(queue, filename, key)
click to toggle source
Read the content of the local file
@param [Queue] Where to push the event @param [String] Which file to read from @return [Boolean] True if the file was completely read, false otherwise.
# File lib/logstash/inputs/google_cloud_storage.rb, line 196 def process_local_log(queue, filename, key) @logger.debug('GCS input: processing file ', :filename => filename) metadata = {} # Currently codecs operates on bytes instead of stream. # So all IO stuff: decompression, reading need to be done in the actual # input and send as bytes to the codecs. read_file(filename) do |line| if stop? @logger.warn("Logstash GCS input, stop reading in the middle of the file, we will read it again when logstash is started") return false end @codec.decode(line) do |event| # We are making an assumption concerning cloudfront # log format, the user will use the plain or the line codec # and the message key will represent the actual line content. # If the event is only metadata the event will be drop. # This was the behavior of the pre 1.5 plugin. # # The line need to go through the codecs to replace # unknown bytes in the log stream before doing a regexp match or # you will get a `Error: invalid byte sequence in UTF-8' #if event_is_metadata?(event) # @logger.debug('Event is metadata, updating the current cloudfront metadata', :event => event) # update_metadata(metadata, event) #else event.set("[@metadata][gcs]", { "key" => key }) event.set("[@metadata][host]", @host) decorate(event) queue << event ##end end end return true end
process_log(queue, key)
click to toggle source
# File lib/logstash/inputs/google_cloud_storage.rb, line 338 def process_log(queue, key) object = @client.execute( api_method: @gcs.objects.get, parameters: {bucket: @bucket, object: key} ) objectdata = @client.execute( api_method: @gcs.objects.get, parameters: {bucket: @bucket, object: key, alt: 'media'} ) filename = File.join(temporary_directory, File.basename(key)) if download_remote_file(objectdata.body, filename) if process_local_log(queue, filename, key) lastmod = object.data['updated'] backup_to_bucket(key) backup_to_dir(filename) delete_file_from_bucket(key) FileUtils.remove_entry_secure(filename, true) sincedb.write(lastmod) end else FileUtils.remove_entry_secure(filename, true) end end
read_file(filename, &block)
click to toggle source
# File lib/logstash/inputs/google_cloud_storage.rb, line 266 def read_file(filename, &block) if gzip?(filename) read_gzip_file(filename, block) else read_plain_file(filename, block) end end
read_gzip_file(filename, block)
click to toggle source
# File lib/logstash/inputs/google_cloud_storage.rb, line 281 def read_gzip_file(filename, block) # Details about multiple streams and the usage of unused from: http://code.activestate.com/lists/ruby-talk/11168/ File.open(filename) do |zio| while true do io = Zlib::GzipReader.new(zio) io.each_line { |line| block.call(line) } unused = io.unused io.finish break if unused.nil? zio.pos -= unused.length # reset the position to the other block in the stream end end rescue Zlib::Error, Zlib::GzipFile::Error => e @logger.error("Gzip codec: We cannot uncompress the gzip file", :filename => filename) raise e end
read_plain_file(filename, block)
click to toggle source
# File lib/logstash/inputs/google_cloud_storage.rb, line 274 def read_plain_file(filename, block) File.open(filename, 'rb') do |file| file.each(&block) end end
sincedb()
click to toggle source
# File lib/logstash/inputs/google_cloud_storage.rb, line 304 def sincedb @sincedb ||= if @sincedb_path.nil? @logger.debug("Using default generated file for the sincedb", :filename => sincedb_file) SinceDB::File.new(sincedb_file) else @logger.debug("Using the provided sincedb_path", :sincedb_path => @sincedb_path) SinceDB::File.new(@sincedb_path) end end
sincedb_file()
click to toggle source
# File lib/logstash/inputs/google_cloud_storage.rb, line 316 def sincedb_file File.join(ENV["HOME"], ".sincedb_" + Digest::MD5.hexdigest("#{@bucket}+#{@prefix}")) end
update_metadata(metadata, event)
click to toggle source
# File lib/logstash/inputs/google_cloud_storage.rb, line 253 def update_metadata(metadata, event) line = event.get('message').strip if version_metadata?(line) metadata[:cloudfront_version] = line.split(/#Version: (.+)/).last end if fields_metadata?(line) metadata[:cloudfront_fields] = line.split(/#Fields: (.+)/).last end end
version_metadata?(line)
click to toggle source
# File lib/logstash/inputs/google_cloud_storage.rb, line 243 def version_metadata?(line) line.start_with?('#Version: ') end