class Fluent::Plugin::SQSInput

Public Instance Methods

client() click to toggle source
# File lib/fluent/plugin/in_sqs.rb, line 52
def client
  @client ||= Aws::SQS::Client.new(stub_responses: @stub_responses)
end
configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_sqs.rb, line 28
def configure(conf)
  super

  if @tag == nil
    raise Fluent::ConfigError, "tag configuration key is mandatory"
  end

  if @sqs_url == nil
    raise Fluent::ConfigError, "sqs_url configuration key is mandatory"
  end

  Aws.config = {
    access_key_id: @aws_key_id,
    secret_access_key: @aws_sec_key,
    region: @region
  }
end
parse_message(message) click to toggle source
# File lib/fluent/plugin/in_sqs.rb, line 95
def parse_message(message)
  record = {}

  if @parse_body_as_json == true
    record['body'] = prase_json_string(message.body.to_s)
  else
    record['body'] = message.body.to_s
  end

  if @add_receipt_handle == true
    record['receipt_handle'] = message.receipt_handle.to_s
  end

  if @add_message_id == true
    record['message_id'] = message.message_id.to_s
  end

  if @add_md5_of_body == true
    record['md5_of_body'] = message.md5_of_body.to_s
  end

  if @add_queue_url == true
    record['queue_url'] = message.queue_url.to_s
  end

  if @attribute_name_to_extract.to_s.strip.length > 0
    record[@attribute_name_to_extract] = message.message_attributes[@attribute_name_to_extract].string_value.to_s
  end
  
  record
end
prase_json_string(jsonStr) click to toggle source
# File lib/fluent/plugin/in_sqs.rb, line 83
def prase_json_string(jsonStr)
  response = jsonStr

  begin
    response = JSON.parse(jsonStr)
  rescue => ex
    # unable to pase json str (str is not a valid json)
  end

  response
end
queue() click to toggle source
# File lib/fluent/plugin/in_sqs.rb, line 56
def queue
  @queue ||= Aws::SQS::Resource.new(client: client).queue(@sqs_url)
end
run() click to toggle source
# File lib/fluent/plugin/in_sqs.rb, line 64
def run
  queue.receive_messages(
    max_number_of_messages: @max_number_of_messages,
    wait_time_seconds: @wait_time_seconds,
    visibility_timeout: @visibility_timeout,
    attribute_names: ['All'], # Receive all available built-in message attributes.
    message_attribute_names: ['All'] # Receive any custom message attributes.
  ).each do |message|
    record = parse_message(message)

    message.delete if @delete_message

    router.emit(@tag, Fluent::Engine.now, record)
  end
rescue => ex
  log.error 'failed to emit or receive', error: ex.to_s, error_class: ex.class
  log.warn_backtrace ex.backtrace
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_sqs.rb, line 60
def shutdown
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_sqs.rb, line 46
def start
  super

  timer_execute(:in_sqs_run_periodic_timer, @receive_interval, &method(:run))
end