class Fluent::Plugin::BosOutput

Constants

DEFAULT_FORMAT_TYPE
DEFAULT_LINE_FORMAT_TYPE

Attributes

formatter[RW]

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_bos.rb, line 62
def configure(conf)
  compat_parameters_convert(conf, :inject, :formatter) 
  super
  
  @formatter = formatter_create
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_bos.rb, line 142
def format(tag, time, record)
  record = inject_values_to_record(tag, time, record)
  @formatter.format(tag, time, record).chomp + "\n"
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_bos.rb, line 56
def multi_workers_ready?
  true
end
prefer_buffered_processing() click to toggle source
# File lib/fluent/plugin/out_bos.rb, line 52
def prefer_buffered_processing
  true
end
send_tmpfile() click to toggle source
# File lib/fluent/plugin/out_bos.rb, line 89
def send_tmpfile
  while thread_current_running?
    if @part_number <= 100
        if @queue.size > 1
            to_send = @queue.pop
            tmpfile = to_send["file"]
            tmpfile.rewind
            s = tmpfile.read
            tmpfile.close
            tmpfile.unlink
            response = @client.upload_part(
                @bos_bucket, @part_prefix+@part_suffix.to_s, @upload_id, @part_number,s.size ,{})  do |buf_writer|
                       buf_writer << s
            end
            @part_list << {
              "partNumber" => @part_number,
              "eTag" => response['etag']
            }
            @part_number += 1 
        elsif @queue.size == 1
             to_send = @current_file
             if Time.now - to_send["time"] > 900
               @queue.pop
               tmpfile = to_send["file"]
               tmpfile.rewind
               s = tmpfile.read
               tmpfile.close
               tmpfile.unlink
               response = @client.upload_part(
                   @bos_bucket, @part_prefix+@part_suffix.to_s, @upload_id, @part_number,s.size ,{})  do |buf_writer|
                       buf_writer << s
               end
               @part_list << {
                 "partNumber" => @part_number,
                  "eTag" => response['etag']
               }
              @client.complete_multipart_upload(@bos_bucket, @part_prefix+@part_suffix.to_s, @upload_id, @part_list)
              @part_suffix += 1
              @upload_id = @client.initiate_multipart_upload(@bos_bucket, @part_prefix+@part_suffix.to_s)["uploadId"]
              @part_list = []
              @part_number = 1
             end
        end 
    elsif 
       @client.complete_multipart_upload(@bos_bucket, @part_prefix+@part_suffix.to_s, @upload_id, @part_list)
       @part_suffix += 1
       @upload_id = @client.initiate_multipart_upload(@bos_bucket, @part_prefix+@part_suffix.to_s)["uploadId"]
       @part_list = []
       @part_number = 1
    end 
  end
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_bos.rb, line 69
def start 
  credentials = Auth::BceCredentials.new(
      @bce_key_id,
      @bce_sec_key
  )

  bosconf = BceClientConfiguration.new(
      credentials,
      @bos_endpoint
  )
  @client = Services::BosClient.new(bosconf)
  @part_suffix = 1
  @upload_id = @client.initiate_multipart_upload(@bos_bucket, @part_prefix+@part_suffix.to_s)["uploadId"]
  @part_number=1
  @part_list=[]
  @queue = Queue.new
  super
  thread_create(:send_tmpfile_to_bos, &method(:send_tmpfile))
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_bos.rb, line 147
def write(chunk)
    if @current_tmpfile.nil? || @current_tmpfile.path.nil? || @current_tmpfile.size >= 5*1024*1024
    @current_tmpfile = Tempfile.new("bostmpfile")
    @current_file = {
      "time" => Time.now,
      "file" => @current_tmpfile
    }
    @queue.push(@current_file)
  end
  ss = chunk.read
  @current_tmpfile.write(ss)
  @current_tmpfile.size
end