class Fluent::Plugin::BosOutput
Constants
- DEFAULT_FORMAT_TYPE
- DEFAULT_LINE_FORMAT_TYPE
Attributes
formatter[RW]
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_bos.rb, line 62 def configure(conf) compat_parameters_convert(conf, :inject, :formatter) super @formatter = formatter_create end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_bos.rb, line 142 def format(tag, time, record) record = inject_values_to_record(tag, time, record) @formatter.format(tag, time, record).chomp + "\n" end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/out_bos.rb, line 56 def multi_workers_ready? true end
prefer_buffered_processing()
click to toggle source
# File lib/fluent/plugin/out_bos.rb, line 52 def prefer_buffered_processing true end
send_tmpfile()
click to toggle source
# File lib/fluent/plugin/out_bos.rb, line 89 def send_tmpfile while thread_current_running? if @part_number <= 100 if @queue.size > 1 to_send = @queue.pop tmpfile = to_send["file"] tmpfile.rewind s = tmpfile.read tmpfile.close tmpfile.unlink response = @client.upload_part( @bos_bucket, @part_prefix+@part_suffix.to_s, @upload_id, @part_number,s.size ,{}) do |buf_writer| buf_writer << s end @part_list << { "partNumber" => @part_number, "eTag" => response['etag'] } @part_number += 1 elsif @queue.size == 1 to_send = @current_file if Time.now - to_send["time"] > 900 @queue.pop tmpfile = to_send["file"] tmpfile.rewind s = tmpfile.read tmpfile.close tmpfile.unlink response = @client.upload_part( @bos_bucket, @part_prefix+@part_suffix.to_s, @upload_id, @part_number,s.size ,{}) do |buf_writer| buf_writer << s end @part_list << { "partNumber" => @part_number, "eTag" => response['etag'] } @client.complete_multipart_upload(@bos_bucket, @part_prefix+@part_suffix.to_s, @upload_id, @part_list) @part_suffix += 1 @upload_id = @client.initiate_multipart_upload(@bos_bucket, @part_prefix+@part_suffix.to_s)["uploadId"] @part_list = [] @part_number = 1 end end elsif @client.complete_multipart_upload(@bos_bucket, @part_prefix+@part_suffix.to_s, @upload_id, @part_list) @part_suffix += 1 @upload_id = @client.initiate_multipart_upload(@bos_bucket, @part_prefix+@part_suffix.to_s)["uploadId"] @part_list = [] @part_number = 1 end end end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_bos.rb, line 69 def start credentials = Auth::BceCredentials.new( @bce_key_id, @bce_sec_key ) bosconf = BceClientConfiguration.new( credentials, @bos_endpoint ) @client = Services::BosClient.new(bosconf) @part_suffix = 1 @upload_id = @client.initiate_multipart_upload(@bos_bucket, @part_prefix+@part_suffix.to_s)["uploadId"] @part_number=1 @part_list=[] @queue = Queue.new super thread_create(:send_tmpfile_to_bos, &method(:send_tmpfile)) end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_bos.rb, line 147 def write(chunk) if @current_tmpfile.nil? || @current_tmpfile.path.nil? || @current_tmpfile.size >= 5*1024*1024 @current_tmpfile = Tempfile.new("bostmpfile") @current_file = { "time" => Time.now, "file" => @current_tmpfile } @queue.push(@current_file) end ss = chunk.read @current_tmpfile.write(ss) @current_tmpfile.size end