class Fluent::Plugin::OSSInput
OSSInput
class implementation
Constants
- DECOMPRESSOR_REGISTRY
- DEFAULT_PARSE_TYPE
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_oss.rb, line 63 def initialize super @decompressor = nil end
Private Class Methods
register_decompressor(name, decompressor)
click to toggle source
# File lib/fluent/plugin/in_oss.rb, line 343 def self.register_decompressor(name, decompressor) DECOMPRESSOR_REGISTRY.register(name, decompressor) end
Public Instance Methods
check_bucket()
click to toggle source
# File lib/fluent/plugin/in_oss.rb, line 109 def check_bucket unless @oss.bucket_exist?(@bucket) raise "The specified bucket does not exist: bucket = #{@bucket}" end @bucket_handler = @oss.get_bucket(@bucket) end
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_oss.rb, line 75 def configure(conf) super raise Fluent::ConfigError, 'Invalid oss endpoint' if @endpoint.nil? raise Fluent::ConfigError, 'Invalid mns endpoint' if @mns.endpoint.nil? raise Fluent::ConfigError, 'Invalid mns queue' if @mns.queue.nil? @decompressor = DECOMPRESSOR_REGISTRY.lookup(@store_as).new(log: log) @decompressor.configure(conf) parser_config = conf.elements('parse').first @parser = parser_create(conf: parser_config, default_type: DEFAULT_PARSE_TYPE) @flush_pause_milliseconds *= 0.001 end
create_oss_client()
click to toggle source
# File lib/fluent/plugin/in_oss.rb, line 117 def create_oss_client @oss = Aliyun::OSS::Client.new( endpoint: @endpoint, access_key_id: @access_key_id, access_key_secret: @access_key_secret, download_crc_enable: @download_crc_enable, upload_crc_enable: @upload_crc_enable, open_timeout: @open_timeout, read_timeout: @read_timeout ) end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/in_oss.rb, line 93 def multi_workers_ready? true end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_oss.rb, line 129 def shutdown @running = false super end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_oss.rb, line 97 def start @oss_sdk_log_dir += '/' unless @oss_sdk_log_dir.end_with?('/') Aliyun::Common::Logging.set_log_file(@oss_sdk_log_dir + Aliyun::Common::Logging::DEFAULT_LOG_FILE) create_oss_client unless @oss check_bucket super @running = true thread_create(:in_oss, &method(:run)) end
Private Instance Methods
delete_message(queue, message)
click to toggle source
# File lib/fluent/plugin/in_oss.rb, line 224 def delete_message(queue, message) request_opts = { ReceiptHandle: message.receipt_handle } opts = { log: log, method: 'DELETE', endpoint: @mns.endpoint, path: "/queues/#{queue}/messages", access_key_id: @access_key_id, access_key_secret: @access_key_secret } Fluent::Plugin::MNS::Request.new(opts, {}, request_opts).execute end
get_objects(message)
click to toggle source
# File lib/fluent/plugin/in_oss.rb, line 211 def get_objects(message) objects = [] events = JSON.parse(Base64.decode64(message.body))['events'] events.each do |event| objects.push(OSSObject.new(event['eventName'], @bucket, event['oss']['object']['key'], event['oss']['object']['size'], event['oss']['object']['eTag'])) end objects end
process(message)
click to toggle source
# File lib/fluent/plugin/in_oss.rb, line 159 def process(message) objects = get_objects(message) objects.each do |object| key = object.key log.info "read object #{key}, size #{object.size} from OSS" if @bucket_handler.object_exists?(key) if @decompressor.save_to_local? io = Tempfile.new('chunk-' + @store_as + '-in-') io.binmode @bucket_handler.get_object(key) do |chunk| io.write(chunk) end else io = StringIO.new @bucket_handler.get_object(key) do |chunk| io << chunk end end io.rewind begin content = @decompressor.decompress(io) rescue StandardError => ex log.warn "#{ex}, skip object #{key}" next end es = Fluent::MultiEventStream.new content.each_line do |line| @parser.parse(line) do |time, record| es.add(time, record) end if es.size >= @flush_batch_lines router.emit_stream(@tag, es) es = Fluent::MultiEventStream.new if @flush_pause_milliseconds > 0 sleep(@flush_pause_milliseconds) end end end router.emit_stream(@tag, es) io.close(true) rescue nil if @decompressor.save_to_local? else log.warn "in_oss: object #{key} does not exist!" end end delete_message(@mns.queue, message) end
receive_message(queue, wait_seconds)
click to toggle source
# File lib/fluent/plugin/in_oss.rb, line 145 def receive_message(queue, wait_seconds) request_opts = {} request_opts = { waitseconds: wait_seconds } if wait_seconds opts = { log: log, method: 'GET', endpoint: @mns.endpoint, path: "/queues/#{queue}/messages", access_key_id: @access_key_id, access_key_secret: @access_key_secret } Fluent::Plugin::MNS::Request.new(opts, {}, request_opts).execute end
run()
click to toggle source
# File lib/fluent/plugin/in_oss.rb, line 136 def run while @running log.info "start to poll message from MNS queue #{@mns.queue}" message = receive_message(@mns.queue, @mns.wait_seconds) process(Fluent::Plugin::MNS::Message.new(@mns.queue, message)) unless message.nil? sleep(@mns.poll_interval_seconds) end end