class LogStash::Inputs::SQS

Pull events from an Amazon Web Services Simple Queue Service (SQS) queue.

SQS is a simple, scalable queue system that is part of the Amazon Web Services suite of tools.

Although SQS is similar to other queuing systems like AMQP, it uses a custom API and requires that you have an AWS account. See aws.amazon.com/sqs/ for more details on how SQS works, what the pricing schedule looks like and how to setup a queue.

To use this plugin, you must:

* Have an AWS account
* Setup an SQS queue
* Create an identify that has access to consume messages from the queue.

The “consumer” identity must have the following permissions on the queue:

* `sqs:ChangeMessageVisibility`
* `sqs:ChangeMessageVisibilityBatch`
* `sqs:DeleteMessage`
* `sqs:DeleteMessageBatch`
* `sqs:GetQueueAttributes`
* `sqs:GetQueueUrl`
* `sqs:ListQueues`
* `sqs:ReceiveMessage`

Typically, you should setup an IAM policy, create a user and apply the IAM policy to the user. A sample policy is as follows:

source,json

{

"Statement": [
  {
    "Action": [
      "sqs:ChangeMessageVisibility",
      "sqs:ChangeMessageVisibilityBatch",
      "sqs:GetQueueAttributes",
      "sqs:GetQueueUrl",
      "sqs:ListQueues",
      "sqs:SendMessage",
      "sqs:SendMessageBatch"
    ],
    "Effect": "Allow",
    "Resource": [
      "arn:aws:sqs:us-east-1:123456789012:Logstash"
    ]
  }
]

}

See aws.amazon.com/iam/ for more details on setting up AWS identities.

Constants

BACKOFF_FACTOR
BACKOFF_SLEEP_TIME
DEFAULT_POLLING_FREQUENCY
MAX_MESSAGES_TO_FETCH
MAX_TIME_BEFORE_GIVING_UP
SENT_TIMESTAMP
SQS_ATTRIBUTES

Attributes

poller[R]

Public Instance Methods

add_sqs_data(event, message) click to toggle source
# File lib/logstash/inputs/sqs.rb, line 124
def add_sqs_data(event, message)
  event.set(@id_field, message.message_id) if @id_field
  event.set(@md5_field, message.md5_of_body) if @md5_field
  event.set(@sent_timestamp_field, convert_epoch_to_timestamp(message.attributes[SENT_TIMESTAMP])) if @sent_timestamp_field
  event
end
handle_message(message, output_queue) click to toggle source
# File lib/logstash/inputs/sqs.rb, line 131
def handle_message(message, output_queue)
  @codec.decode(message.body) do |event|
    add_sqs_data(event, message)
    decorate(event)
    output_queue << event
  end
end
polling_options() click to toggle source
# File lib/logstash/inputs/sqs.rb, line 116
def polling_options
  { 
    :max_number_of_messages => MAX_MESSAGES_TO_FETCH,
    :attribute_names => SQS_ATTRIBUTES,
    :wait_time_seconds => @polling_frequency
  }
end
register() click to toggle source
# File lib/logstash/inputs/sqs.rb, line 100
def register
  require "aws-sdk"
  @logger.info("Registering SQS input", :queue => @queue)

  setup_queue
end
run(output_queue) click to toggle source
# File lib/logstash/inputs/sqs.rb, line 139
def run(output_queue)
  @logger.debug("Polling SQS queue", :polling_options => polling_options)

  run_with_backoff do
    poller.poll(polling_options) do |messages, stats|
      break if stop?
      messages.each {|message| handle_message(message, output_queue) }
      @logger.debug("SQS Stats:", :request_count => stats.request_count,
                    :received_message_count => stats.received_message_count,
                    :last_message_received_at => stats.last_message_received_at) if @logger.debug?
    end
  end
end
setup_queue() click to toggle source
# File lib/logstash/inputs/sqs.rb, line 107
def setup_queue
  aws_sqs_client = Aws::SQS::Client.new(aws_options_hash)
  queue_url = aws_sqs_client.get_queue_url(:queue_name =>  @queue)[:queue_url]
  @poller = Aws::SQS::QueuePoller.new(queue_url, :client => aws_sqs_client)
rescue Aws::SQS::Errors::ServiceError, Seahorse::Client::NetworkingError => e
  @logger.error("Cannot establish connection to Amazon SQS", exception_details(e))
  raise LogStash::ConfigurationError, "Verify the SQS queue name and your credentials"
end

Private Instance Methods

backoff_sleep(sleep_time) click to toggle source
# File lib/logstash/inputs/sqs.rb, line 170
def backoff_sleep(sleep_time)
  sleep(sleep_time)
  sleep_time > MAX_TIME_BEFORE_GIVING_UP ? sleep_time : sleep_time * BACKOFF_FACTOR
end
convert_epoch_to_timestamp(time) click to toggle source
# File lib/logstash/inputs/sqs.rb, line 175
def convert_epoch_to_timestamp(time)
  LogStash::Timestamp.at(time.to_i / 1000)
end
exception_details(e, sleep_time = nil) click to toggle source
# File lib/logstash/inputs/sqs.rb, line 179
def exception_details(e, sleep_time = nil)
  details = { :queue => @queue, :exception => e.class, :message => e.message }
  details[:code] = e.code if e.is_a?(Aws::SQS::Errors::ServiceError) && e.code
  details[:cause] = e.original_error if e.respond_to?(:original_error) && e.original_error # Seahorse::Client::NetworkingError
  details[:sleep_time] = sleep_time if sleep_time
  details[:backtrace] = e.backtrace if @logger.debug?
  details
end
run_with_backoff(&block) click to toggle source

Runs an AWS request inside a Ruby block with an exponential backoff in case we experience a ServiceError.

@param [Block] block Ruby code block to execute.

# File lib/logstash/inputs/sqs.rb, line 159
def run_with_backoff(&block)
  sleep_time = BACKOFF_SLEEP_TIME
  begin
    block.call
  rescue Aws::SQS::Errors::ServiceError, Seahorse::Client::NetworkingError => e
    @logger.warn("SQS error ... retrying with exponential backoff", exception_details(e, sleep_time))
    sleep_time = backoff_sleep(sleep_time)
    retry
  end
end