class LogStash::Outputs::Swift::Uploader
Constants
- DEFAULT_THREADPOOL
- TIME_BEFORE_RETRYING_SECONDS
Attributes
container[R]
logger[R]
upload_options[R]
Public Class Methods
new(container, logger, threadpool = DEFAULT_THREADPOOL)
click to toggle source
# File lib/logstash/outputs/swift/uploader.rb, line 19 def initialize(container, logger, threadpool = DEFAULT_THREADPOOL) @container = container @workers_pool = threadpool @logger = logger end
Public Instance Methods
stop()
click to toggle source
# File lib/logstash/outputs/swift/uploader.rb, line 57 def stop @workers_pool.shutdown @workers_pool.wait_for_termination(nil) # block until its done end
upload(file, options = {})
click to toggle source
# File lib/logstash/outputs/swift/uploader.rb, line 32 def upload(file, options = {}) puts 'in uploader. upload' upload_options = options.fetch(:upload_options, {}) begin container.files.create(key: file.key, body: ::File.read(::File.expand_path(file.path))) puts 'dpone' rescue Errno::ENOENT => e logger.error("File doesn't exist! Unrecoverable error.", :exception => e.class, :message => e.message, :path => file.path, :backtrace => e.backtrace) rescue => e # When we get here it usually mean that S3 tried to do some retry by himself (default is 3) # When the retry limit is reached or another error happen we will wait and retry. # # Thread might be stuck here, but I think its better than losing anything # its either a transient errors or something bad really happened. logger.error("Uploading failed, retrying.", :exception => e.class, :message => e.message, :path => file.path, :backtrace => e.backtrace) sleep TIME_BEFORE_RETRYING_SECONDS retry end options[:on_complete].call(file) unless options[:on_complete].nil? rescue => e logger.error("An error occured in the `on_complete` uploader", :exception => e.class, :message => e.message, :path => file.path, :backtrace => e.backtrace) raise e # reraise it since we don't deal with it now end
upload_async(file, options = {})
click to toggle source
# File lib/logstash/outputs/swift/uploader.rb, line 25 def upload_async(file, options = {}) @workers_pool.post do LogStash::Util.set_thread_name("Swift output uploader, file: #{file.path}") upload(file, options) end end