class Fluent::Plugin::SQSInput
Public Instance Methods
client()
click to toggle source
# File lib/fluent/plugin/in_sqs.rb, line 52 def client @client ||= Aws::SQS::Client.new(stub_responses: @stub_responses) end
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_sqs.rb, line 28 def configure(conf) super if @tag == nil raise Fluent::ConfigError, "tag configuration key is mandatory" end if @sqs_url == nil raise Fluent::ConfigError, "sqs_url configuration key is mandatory" end Aws.config = { access_key_id: @aws_key_id, secret_access_key: @aws_sec_key, region: @region } end
parse_message(message)
click to toggle source
# File lib/fluent/plugin/in_sqs.rb, line 95 def parse_message(message) record = {} if @parse_body_as_json == true record['body'] = prase_json_string(message.body.to_s) else record['body'] = message.body.to_s end if @add_receipt_handle == true record['receipt_handle'] = message.receipt_handle.to_s end if @add_message_id == true record['message_id'] = message.message_id.to_s end if @add_md5_of_body == true record['md5_of_body'] = message.md5_of_body.to_s end if @add_queue_url == true record['queue_url'] = message.queue_url.to_s end if @attribute_name_to_extract.to_s.strip.length > 0 record[@attribute_name_to_extract] = message.message_attributes[@attribute_name_to_extract].string_value.to_s end record end
prase_json_string(jsonStr)
click to toggle source
# File lib/fluent/plugin/in_sqs.rb, line 83 def prase_json_string(jsonStr) response = jsonStr begin response = JSON.parse(jsonStr) rescue => ex # unable to pase json str (str is not a valid json) end response end
queue()
click to toggle source
# File lib/fluent/plugin/in_sqs.rb, line 56 def queue @queue ||= Aws::SQS::Resource.new(client: client).queue(@sqs_url) end
run()
click to toggle source
# File lib/fluent/plugin/in_sqs.rb, line 64 def run queue.receive_messages( max_number_of_messages: @max_number_of_messages, wait_time_seconds: @wait_time_seconds, visibility_timeout: @visibility_timeout, attribute_names: ['All'], # Receive all available built-in message attributes. message_attribute_names: ['All'] # Receive any custom message attributes. ).each do |message| record = parse_message(message) message.delete if @delete_message router.emit(@tag, Fluent::Engine.now, record) end rescue => ex log.error 'failed to emit or receive', error: ex.to_s, error_class: ex.class log.warn_backtrace ex.backtrace end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_sqs.rb, line 60 def shutdown super end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_sqs.rb, line 46 def start super timer_execute(:in_sqs_run_periodic_timer, @receive_interval, &method(:run)) end