class Fluent::Plugin::OSSInput

OSSInput class implementation

Constants

DECOMPRESSOR_REGISTRY
DEFAULT_PARSE_TYPE

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_oss.rb, line 63
def initialize
  super
  @decompressor = nil
end

Private Class Methods

register_decompressor(name, decompressor) click to toggle source
# File lib/fluent/plugin/in_oss.rb, line 343
def self.register_decompressor(name, decompressor)
  DECOMPRESSOR_REGISTRY.register(name, decompressor)
end

Public Instance Methods

check_bucket() click to toggle source
# File lib/fluent/plugin/in_oss.rb, line 109
def check_bucket
  unless @oss.bucket_exist?(@bucket)
    raise "The specified bucket does not exist: bucket = #{@bucket}"
  end

  @bucket_handler = @oss.get_bucket(@bucket)
end
configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_oss.rb, line 75
def configure(conf)
  super

  raise Fluent::ConfigError, 'Invalid oss endpoint' if @endpoint.nil?

  raise Fluent::ConfigError, 'Invalid mns endpoint' if @mns.endpoint.nil?

  raise Fluent::ConfigError, 'Invalid mns queue' if @mns.queue.nil?

  @decompressor = DECOMPRESSOR_REGISTRY.lookup(@store_as).new(log: log)
  @decompressor.configure(conf)

  parser_config = conf.elements('parse').first
  @parser = parser_create(conf: parser_config, default_type: DEFAULT_PARSE_TYPE)

  @flush_pause_milliseconds *= 0.001
end
create_oss_client() click to toggle source
# File lib/fluent/plugin/in_oss.rb, line 117
def create_oss_client
  @oss = Aliyun::OSS::Client.new(
    endpoint: @endpoint,
    access_key_id: @access_key_id,
    access_key_secret: @access_key_secret,
    download_crc_enable: @download_crc_enable,
    upload_crc_enable: @upload_crc_enable,
    open_timeout: @open_timeout,
    read_timeout: @read_timeout
  )
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/in_oss.rb, line 93
def multi_workers_ready?
  true
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_oss.rb, line 129
def shutdown
  @running = false
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_oss.rb, line 97
def start
  @oss_sdk_log_dir += '/' unless @oss_sdk_log_dir.end_with?('/')
  Aliyun::Common::Logging.set_log_file(@oss_sdk_log_dir + Aliyun::Common::Logging::DEFAULT_LOG_FILE)
  create_oss_client unless @oss

  check_bucket
  super

  @running = true
  thread_create(:in_oss, &method(:run))
end

Private Instance Methods

delete_message(queue, message) click to toggle source
# File lib/fluent/plugin/in_oss.rb, line 224
def delete_message(queue, message)
  request_opts = { ReceiptHandle: message.receipt_handle }
  opts = {
    log: log,
    method: 'DELETE',
    endpoint: @mns.endpoint,
    path: "/queues/#{queue}/messages",
    access_key_id: @access_key_id,
    access_key_secret: @access_key_secret
  }
  Fluent::Plugin::MNS::Request.new(opts, {}, request_opts).execute
end
get_objects(message) click to toggle source
# File lib/fluent/plugin/in_oss.rb, line 211
def get_objects(message)
  objects = []
  events = JSON.parse(Base64.decode64(message.body))['events']
  events.each do |event|
    objects.push(OSSObject.new(event['eventName'],
                               @bucket,
                               event['oss']['object']['key'],
                               event['oss']['object']['size'],
                               event['oss']['object']['eTag']))
  end
  objects
end
process(message) click to toggle source
# File lib/fluent/plugin/in_oss.rb, line 159
def process(message)
  objects = get_objects(message)
  objects.each do |object|
    key = object.key
    log.info "read object #{key}, size #{object.size} from OSS"

    if @bucket_handler.object_exists?(key)
      if @decompressor.save_to_local?
        io = Tempfile.new('chunk-' + @store_as + '-in-')
        io.binmode
        @bucket_handler.get_object(key) do |chunk|
          io.write(chunk)
        end
      else
        io = StringIO.new
        @bucket_handler.get_object(key) do |chunk|
          io << chunk
        end
      end

      io.rewind

      begin
        content = @decompressor.decompress(io)
      rescue StandardError => ex
        log.warn "#{ex}, skip object #{key}"
        next
      end

      es = Fluent::MultiEventStream.new
      content.each_line do |line|
        @parser.parse(line) do |time, record|
          es.add(time, record)
        end

        if es.size >= @flush_batch_lines
          router.emit_stream(@tag, es)
          es = Fluent::MultiEventStream.new
          if @flush_pause_milliseconds > 0
            sleep(@flush_pause_milliseconds)
          end
        end
      end
      router.emit_stream(@tag, es)
      io.close(true) rescue nil if @decompressor.save_to_local?
    else
      log.warn "in_oss: object #{key} does not exist!"
    end
  end
  delete_message(@mns.queue, message)
end
receive_message(queue, wait_seconds) click to toggle source
# File lib/fluent/plugin/in_oss.rb, line 145
def receive_message(queue, wait_seconds)
  request_opts = {}
  request_opts = { waitseconds: wait_seconds } if wait_seconds
  opts = {
    log: log,
    method: 'GET',
    endpoint: @mns.endpoint,
    path: "/queues/#{queue}/messages",
    access_key_id: @access_key_id,
    access_key_secret: @access_key_secret
  }
  Fluent::Plugin::MNS::Request.new(opts, {}, request_opts).execute
end
run() click to toggle source
# File lib/fluent/plugin/in_oss.rb, line 136
def run
  while @running
    log.info "start to poll message from MNS queue #{@mns.queue}"
    message = receive_message(@mns.queue, @mns.wait_seconds)
    process(Fluent::Plugin::MNS::Message.new(@mns.queue, message)) unless message.nil?
    sleep(@mns.poll_interval_seconds)
  end
end