class Fluent::Plugin::SQSPollInput

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_sqs_poll.rb, line 15
def configure(conf)
  super
end
poll() click to toggle source
# File lib/fluent/plugin/in_sqs_poll.rb, line 33
def poll
  region = @sqs_url.split('.')[1]
  Aws.config.update(region: region)

  if @aws_access_key && @aws_secret_key
    Aws.config.update(
      credentials: Aws::Credentials.new(@aws_access_key, @aws_secret_key)
    )
  end

  poller = Aws::SQS::QueuePoller.new(@sqs_url)
  
  poller.before_request do |stats|
    throw :stop_polling if @terminate
  end
  
  poller.poll(max_number_of_messages: @max_number_of_messages) do |messages|
    messages.each do |msg|
      begin
        router.emit(@tag, Time.now.to_i,
          {
            'body' => msg.body,
            'handle' => msg.receipt_handle,
            'id' => msg.message_id,
            'md5' => msg.md5_of_body,
            'sqs_receive_count' => msg.attributes['ApproximateReceiveCount'],
          }
        )
      rescue Exception => e
        $log.error("SQS exception", error: e.to_s, error_class: e.class.to_s)
        $log.warn_backtrace(e.backtrace)
      end
    end
    throw :stop_polling if @terminate
  end
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_sqs_poll.rb, line 26
def shutdown
  super

  @terminate = true
  @thread.join
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_sqs_poll.rb, line 19
def start
  super

  @terminate = false
  @thread = Thread.new(&method(:poll))
end