class LogStash::Outputs::Swift
INFORMATION:
This plugin batches and uploads logstash events into Openstack Swift
.
Requirements:
-
Amazon S3 Bucket and S3 Access Permissions (Typically access_key_id and secret_access_key)
-
S3 PutObject permission
Swift
outputs create temporary files into the OS' temporary directory, you can specify where to save them using the `temporary_directory` option.
Swift
output files have the following format
ls.s3.312bc026-2f5d-49bc-ae9f-5940cf4ad9a6.2013-04-18T10.00.tag_hello.part0.txt
|======= | ls.s3 | indicate logstash plugin s3 | | 312bc026-2f5d-49bc-ae9f-5940cf4ad9a6 | a new, random uuid per file. | | 2013-04-18T10.00 | represents the time whenever you specify time_file. | | tag_hello | this indicates the event's tag. | | part0 | this means if you indicate size_file then it will generate more parts if you file.size > size_file. When a file is full it will be pushed to the bucket and then deleted from the temporary directory. If a file is empty, it is simply deleted. Empty files will not be pushed | |=======
Crash Recovery:
-
This plugin will recover and upload temporary log files after crash/abnormal termination when using `restore` set to true
- Note regarding time_file and size_file
-
:
Both time_file and size_file settings can trigger a log "file rotation" A log rotation pushes the current log "part" to s3 and deleted from local temporary storage. If you specify BOTH size_file and time_file then it will create file for each tag (if specified). When EITHER time_file minutes have elapsed OR log file size > size_file, a log rotation is triggered. If you ONLY specify time_file but NOT file_size, one file for each tag (if specified) will be created. When time_file minutes elapses, a log rotation will be triggered. If you ONLY specify size_file, but NOT time_file, one files for each tag (if specified) will be created. When size of log file part > size_file, a log rotation will be triggered. If NEITHER size_file nor time_file is specified, ONLY one file for each tag (if specified) will be created. WARNING: Since no log rotation is triggered, S3 Upload will only occur when logstash restarts.
#### Usage: This is an example of logstash config:
- source,ruby
-
output {
swift { username => "crazy_name" (required) api_key => "crazy_key" (required) auth_url => "https://auth_tokens" (required) project_name => "crazy project name" (required) domain_name => "crazy domain name" (required) container => "your_bucket_container" (required) }
Constants
- CRASH_RECOVERY_THREADPOOL
- PERIODIC_CHECK_INTERVAL_IN_SECONDS
- PREFIX_KEY_NORMALIZE_CHARACTER
include LogStash::PluginMixins::AwsConfig::V2
Public Instance Methods
# File lib/logstash/outputs/swift.rb, line 240 def close stop_periodic_check if @rotation.needs_periodic? @logger.debug("Uploading current workspace") # The plugin has stopped receiving new events, but we still have # data on disk, lets make sure it get to S3. # If Logstash get interrupted, the `restore_from_crash` (when set to true) method will pickup # the content in the temporary directly and upload it. # This will block the shutdown until all upload are done or the use force quit. @file_repository.each_files do |file| upload_file(file) end @file_repository.shutdown @uploader.stop # wait until all the current upload are complete @crash_uploader.stop if @restore # we might have still work to do for recovery so wait until we are done end
# File lib/logstash/outputs/swift.rb, line 216 def multi_receive_encoded(events_and_encoded) puts 'milti rrecei' prefix_written_to = Set.new events_and_encoded.each do |event, encoded| prefix_key = normalize_key(event.sprintf(@prefix)) prefix_written_to << prefix_key begin puts 'begin' @file_repository.get_file(prefix_key) { |file| file.write(encoded) } # The output should stop accepting new events coming in, since it cannot do anything with them anymore. # Log the error and rethrow it. rescue Errno::ENOSPC => e @logger.error("S3: No space left in temporary directory", :temporary_directory => @temporary_directory) raise e end end puts 'done' # Groups IO calls to optimize fstat checks rotate_if_needed(prefix_written_to) end
# File lib/logstash/outputs/swift.rb, line 260 def normalize_key(prefix_key) prefix_key.gsub(PathValidator.matches_re, PREFIX_KEY_NORMALIZE_CHARACTER) end
# File lib/logstash/outputs/swift.rb, line 174 def register # I've move the validation of the items into custom classes # to prepare for the new config validation that will be part of the core so the core can # be moved easily. unless @prefix.empty? if !PathValidator.valid?(prefix) raise LogStash::ConfigurationError, "Prefix must not contains: #{PathValidator::INVALID_CHARACTERS}" end end if !WritableDirectoryValidator.valid?(@temporary_directory) raise LogStash::ConfigurationError, "Logstash must have the permissions to write to the temporary directory: #{@temporary_directory}" end if @validate_credentials_on_root_bucket && !WriteContainerPermissionValidator.new(@logger).valid?(container_resource) raise LogStash::ConfigurationError, "Logstash must have the privileges to write to root container `#{@container}`, check your credentials or your permissions." end if @time_file.nil? && @size_file.nil? || @size_file == 0 && @time_file == 0 raise LogStash::ConfigurationError, "The S3 plugin must have at least one of time_file or size_file set to a value greater than 0" end @file_repository = FileRepository.new(@tags, @encoding, @temporary_directory) @rotation = rotation_strategy executor = Concurrent::ThreadPoolExecutor.new({ :min_threads => 1, :max_threads => @upload_workers_count, :max_queue => @upload_queue_size, :fallback_policy => :caller_runs }) @uploader = Uploader.new(container_resource, @logger, executor) # Restoring from crash will use a new threadpool to slowly recover # New events should have more priority. restore_from_crash if @restore # If we need time based rotation we need to do periodic check on the file # to take care of file that were not updated recently start_periodic_check if @rotation.needs_periodic? end
Private Instance Methods
# File lib/logstash/outputs/swift.rb, line 341 def clean_temporary_file(file) @logger.debug("Removing temporary file", :file => file.path) file.delete! end
# File lib/logstash/outputs/swift.rb, line 282 def container_resource Excon.defaults[:ciphers] = 'DEFAULT' service = Fog::Storage::OpenStack.new({ openstack_username: @username, openstack_api_key: @api_key, openstack_auth_url: @auth_url, openstack_project_name: @project_name, openstack_domain_name: @domain_name, connection_options: {} }) service.directories.get(@container) end
The upload process will use a separate uploader/threadpool with less resource allocated to it. but it will use an unbounded queue for the work, it may take some time before all the older files get processed.
# File lib/logstash/outputs/swift.rb, line 348 def restore_from_crash @crash_uploader = Uploader.new(container_resource, @logger, CRASH_RECOVERY_THREADPOOL) temp_folder_path = Pathname.new(@temporary_directory) Dir.glob(::File.join(@temporary_directory, "**/*")) .select { |file| ::File.file?(file) } .each do |file| temp_file = TemporaryFile.create_from_existing_file(file, temp_folder_path) @logger.debug("Recovering from crash and uploading", :file => temp_file.path) @crash_uploader.upload_async(temp_file, :on_complete => method(:clean_temporary_file)) end end
# File lib/logstash/outputs/swift.rb, line 296 def rotate_if_needed(prefixes) puts 'rotate!' prefixes.each do |prefix| # Each file access is thread safe, # until the rotation is done then only # one thread has access to the resource. @file_repository.get_factory(prefix) do |factory| temp_file = factory.current if @rotation.rotate?(temp_file) @logger.debug("Rotate file", :strategy => @rotation.class.name, :key => temp_file.key, :path => temp_file.path) upload_file(temp_file) factory.rotate! end end end end
# File lib/logstash/outputs/swift.rb, line 330 def rotation_strategy case @rotation_strategy when "size" SizeRotationPolicy.new(size_file) when "time" TimeRotationPolicy.new(time_file) when "size_and_time" SizeAndTimeRotationPolicy.new(size_file, time_file) end end
We start a task in the background for check for stale files and make sure we rotate them to S3 if needed.
# File lib/logstash/outputs/swift.rb, line 266 def start_periodic_check @logger.debug("Start periodic rotation check") @periodic_check = Concurrent::TimerTask.new(:execution_interval => PERIODIC_CHECK_INTERVAL_IN_SECONDS) do @logger.debug("Periodic check for stale files") rotate_if_needed(@file_repository.keys) end @periodic_check.execute end
# File lib/logstash/outputs/swift.rb, line 278 def stop_periodic_check @periodic_check.shutdown end
# File lib/logstash/outputs/swift.rb, line 318 def upload_file(temp_file) puts 'upload file' @logger.debug("Queue for upload", :path => temp_file.path) # if the queue is full the calling thread will be used to upload temp_file.close # make sure the content is on disk if temp_file.size > 0 @uploader.upload_async(temp_file, :on_complete => method(:clean_temporary_file)) end end