class LogStash::Inputs::SQS
Pull events from an Amazon Web Services Simple Queue Service (SQS
) queue.
SQS
is a simple, scalable queue system that is part of the Amazon Web Services suite of tools.
Although SQS
is similar to other queuing systems like AMQP, it uses a custom API and requires that you have an AWS account. See aws.amazon.com/sqs/ for more details on how SQS
works, what the pricing schedule looks like and how to setup a queue.
To use this plugin, you must:
* Have an AWS account * Setup an SQS queue * Create an identify that has access to consume messages from the queue.
The “consumer” identity must have the following permissions on the queue:
* `sqs:ChangeMessageVisibility` * `sqs:ChangeMessageVisibilityBatch` * `sqs:DeleteMessage` * `sqs:DeleteMessageBatch` * `sqs:GetQueueAttributes` * `sqs:GetQueueUrl` * `sqs:ListQueues` * `sqs:ReceiveMessage`
Typically, you should setup an IAM policy, create a user and apply the IAM policy to the user. A sample policy is as follows:
- source,json
-
{
"Statement": [ { "Action": [ "sqs:ChangeMessageVisibility", "sqs:ChangeMessageVisibilityBatch", "sqs:GetQueueAttributes", "sqs:GetQueueUrl", "sqs:ListQueues", "sqs:SendMessage", "sqs:SendMessageBatch" ], "Effect": "Allow", "Resource": [ "arn:aws:sqs:us-east-1:123456789012:Logstash" ] } ]
}
See aws.amazon.com/iam/ for more details on setting up AWS identities.
Constants
- BACKOFF_FACTOR
- BACKOFF_SLEEP_TIME
- DEFAULT_POLLING_FREQUENCY
- MAX_MESSAGES_TO_FETCH
- MAX_TIME_BEFORE_GIVING_UP
- SENT_TIMESTAMP
- SQS_ATTRIBUTES
Attributes
Public Instance Methods
# File lib/logstash/inputs/sqs.rb, line 124 def add_sqs_data(event, message) event.set(@id_field, message.message_id) if @id_field event.set(@md5_field, message.md5_of_body) if @md5_field event.set(@sent_timestamp_field, convert_epoch_to_timestamp(message.attributes[SENT_TIMESTAMP])) if @sent_timestamp_field event end
# File lib/logstash/inputs/sqs.rb, line 131 def handle_message(message, output_queue) @codec.decode(message.body) do |event| add_sqs_data(event, message) decorate(event) output_queue << event end end
# File lib/logstash/inputs/sqs.rb, line 116 def polling_options { :max_number_of_messages => MAX_MESSAGES_TO_FETCH, :attribute_names => SQS_ATTRIBUTES, :wait_time_seconds => @polling_frequency } end
# File lib/logstash/inputs/sqs.rb, line 100 def register require "aws-sdk" @logger.info("Registering SQS input", :queue => @queue) setup_queue end
# File lib/logstash/inputs/sqs.rb, line 139 def run(output_queue) @logger.debug("Polling SQS queue", :polling_options => polling_options) run_with_backoff do poller.poll(polling_options) do |messages, stats| break if stop? messages.each {|message| handle_message(message, output_queue) } @logger.debug("SQS Stats:", :request_count => stats.request_count, :received_message_count => stats.received_message_count, :last_message_received_at => stats.last_message_received_at) if @logger.debug? end end end
# File lib/logstash/inputs/sqs.rb, line 107 def setup_queue aws_sqs_client = Aws::SQS::Client.new(aws_options_hash) queue_url = aws_sqs_client.get_queue_url(:queue_name => @queue)[:queue_url] @poller = Aws::SQS::QueuePoller.new(queue_url, :client => aws_sqs_client) rescue Aws::SQS::Errors::ServiceError, Seahorse::Client::NetworkingError => e @logger.error("Cannot establish connection to Amazon SQS", exception_details(e)) raise LogStash::ConfigurationError, "Verify the SQS queue name and your credentials" end
Private Instance Methods
# File lib/logstash/inputs/sqs.rb, line 170 def backoff_sleep(sleep_time) sleep(sleep_time) sleep_time > MAX_TIME_BEFORE_GIVING_UP ? sleep_time : sleep_time * BACKOFF_FACTOR end
# File lib/logstash/inputs/sqs.rb, line 175 def convert_epoch_to_timestamp(time) LogStash::Timestamp.at(time.to_i / 1000) end
# File lib/logstash/inputs/sqs.rb, line 179 def exception_details(e, sleep_time = nil) details = { :queue => @queue, :exception => e.class, :message => e.message } details[:code] = e.code if e.is_a?(Aws::SQS::Errors::ServiceError) && e.code details[:cause] = e.original_error if e.respond_to?(:original_error) && e.original_error # Seahorse::Client::NetworkingError details[:sleep_time] = sleep_time if sleep_time details[:backtrace] = e.backtrace if @logger.debug? details end
Runs an AWS request inside a Ruby block with an exponential backoff in case we experience a ServiceError.
@param [Block] block Ruby code block to execute.
# File lib/logstash/inputs/sqs.rb, line 159 def run_with_backoff(&block) sleep_time = BACKOFF_SLEEP_TIME begin block.call rescue Aws::SQS::Errors::ServiceError, Seahorse::Client::NetworkingError => e @logger.warn("SQS error ... retrying with exponential backoff", exception_details(e, sleep_time)) sleep_time = backoff_sleep(sleep_time) retry end end