class SqsGrep::Runner

Public Class Methods

new(config) click to toggle source
# File lib/sqs-grep/runner.rb, line 10
def initialize(config)
  @config = config

  @config.sqs_client ||= begin
                           effective_options = SqsGrep::Client.core_v2_options.merge(config.client_options)
                           Aws::SQS::Client.new(effective_options)
                         end

  if @config.invoke_lambda
    @config.lambda_client ||= begin
                                effective_options = SqsGrep::Client.core_v2_options.merge(config.client_options)
                                Aws::Lambda::Client.new(effective_options)
                              end
  end
end

Public Instance Methods

run() click to toggle source

Sets $stdout.sync

# File lib/sqs-grep/runner.rb, line 27
def run
  queue_name = @config.queue_name
  queue_url = resolve_queue queue_name
  function_name = @config.invoke_lambda

  send_to_url = if @config.send_to
                  resolve_queue @config.send_to
                end

  $stdout.sync = true

  seen_message_ids = Set.new
  num_matched = 0

  loop do
    r = @config.sqs_client.receive_message(
      queue_url: queue_url,
      attribute_names: %w[ All ],
      max_number_of_messages: 10,
      visibility_timeout: @config.visibility_timeout,
      wait_time_seconds: @config.wait_time_seconds,
    )

    break if r.messages.empty?

    r.messages.each do |m|
      # puts "%s\t%s\t%s" % [ queue_name, m.message_id, m.receipt_handle ]

      if seen_message_ids.include? m.message_id
        $stderr.puts "Already seen message #{m.message_id} - bailing out in case we're looping"
        return 0
      end
      seen_message_ids << m.message_id

      matches = (m.body.match(@config.pattern) != nil)

      if matches ^ @config.invert_match
        json_data = {
          queue_url: queue_url,
          queue_name: queue_name,
          message_id: m.message_id,
          attributes: m.attributes.to_h,
          body: m.body,
        }
        if !@config.json_format
          puts "%s\t%s" % [ queue_name, m.message_id ]
          puts m.attributes.inspect
          puts m.body
          puts ""
        end

        if send_to_url
          # FIXME? discards message attributes
          send_res = @config.sqs_client.send_message(
            queue_url: send_to_url,
            message_body: m.body,
          )
          if !@config.json_format
            p send_res
            puts ""
          end
        end

        if function_name
          send_res = @config.lambda_client.invoke(
            function_name: function_name,
            invocation_type: 'Event',
            payload: m.body
          )
          if !@config.json_format
            p send_res
            puts ""
          end
        end

        if @config.delete_matched
          delete_res = @config.sqs_client.delete_message(
            queue_url: queue_url,
            receipt_handle: m.receipt_handle,
          )
          if !@config.json_format
            p delete_res
            puts ""
          end
        end

        if @config.json_format
          puts JSON.pretty_generate(json_data)
        end

        num_matched = num_matched + 1
        return 0 if @config.max_count and num_matched >= @config.max_count
      end
    end
  end

  return 0
end

Private Instance Methods

resolve_queue(queue_name) click to toggle source
# File lib/sqs-grep/runner.rb, line 128
def resolve_queue(queue_name)
  @config.sqs_client.list_queues(queue_name_prefix: queue_name).queue_urls.find {|url| File.basename(url) == queue_name} \
    or raise "Can't find queue named #{queue_name.inspect}"
end