class Qurd::Listener

Provide an interface for interacting with configured queues and AWS.

Attributes

aws_credentials[R]

@!attribute aws_credentials [r]

The AWS credentials for the account
@return [Aws::Credentials]

@!attribute message [r]

The message reveived
@return [Qurd::Message]

@!attribute name [r]

The name of the executor
@return [String]

@!attribute queues [r]

An array of AWS SQS URLs for the account
@return [Array<String>]

@!attribute region [r]

The AWS region for the account
@return [String]

@!attribute visibility_timeout [r]

@return [String]

@!attribute wait_time [r]

@return [String]
message[R]

@!attribute aws_credentials [r]

The AWS credentials for the account
@return [Aws::Credentials]

@!attribute message [r]

The message reveived
@return [Qurd::Message]

@!attribute name [r]

The name of the executor
@return [String]

@!attribute queues [r]

An array of AWS SQS URLs for the account
@return [Array<String>]

@!attribute region [r]

The AWS region for the account
@return [String]

@!attribute visibility_timeout [r]

@return [String]

@!attribute wait_time [r]

@return [String]
name[R]

@!attribute aws_credentials [r]

The AWS credentials for the account
@return [Aws::Credentials]

@!attribute message [r]

The message reveived
@return [Qurd::Message]

@!attribute name [r]

The name of the executor
@return [String]

@!attribute queues [r]

An array of AWS SQS URLs for the account
@return [Array<String>]

@!attribute region [r]

The AWS region for the account
@return [String]

@!attribute visibility_timeout [r]

@return [String]

@!attribute wait_time [r]

@return [String]
queues[R]

@!attribute aws_credentials [r]

The AWS credentials for the account
@return [Aws::Credentials]

@!attribute message [r]

The message reveived
@return [Qurd::Message]

@!attribute name [r]

The name of the executor
@return [String]

@!attribute queues [r]

An array of AWS SQS URLs for the account
@return [Array<String>]

@!attribute region [r]

The AWS region for the account
@return [String]

@!attribute visibility_timeout [r]

@return [String]

@!attribute wait_time [r]

@return [String]
region[R]

@!attribute aws_credentials [r]

The AWS credentials for the account
@return [Aws::Credentials]

@!attribute message [r]

The message reveived
@return [Qurd::Message]

@!attribute name [r]

The name of the executor
@return [String]

@!attribute queues [r]

An array of AWS SQS URLs for the account
@return [Array<String>]

@!attribute region [r]

The AWS region for the account
@return [String]

@!attribute visibility_timeout [r]

@return [String]

@!attribute wait_time [r]

@return [String]
visibility_timeout[R]

@!attribute aws_credentials [r]

The AWS credentials for the account
@return [Aws::Credentials]

@!attribute message [r]

The message reveived
@return [Qurd::Message]

@!attribute name [r]

The name of the executor
@return [String]

@!attribute queues [r]

An array of AWS SQS URLs for the account
@return [Array<String>]

@!attribute region [r]

The AWS region for the account
@return [String]

@!attribute visibility_timeout [r]

@return [String]

@!attribute wait_time [r]

@return [String]
wait_time[R]

@!attribute aws_credentials [r]

The AWS credentials for the account
@return [Aws::Credentials]

@!attribute message [r]

The message reveived
@return [Qurd::Message]

@!attribute name [r]

The name of the executor
@return [String]

@!attribute queues [r]

An array of AWS SQS URLs for the account
@return [Array<String>]

@!attribute region [r]

The AWS region for the account
@return [String]

@!attribute visibility_timeout [r]

@return [String]

@!attribute wait_time [r]

@return [String]

Public Class Methods

new(attrs = {}) click to toggle source

@param [Hash] attrs @option attrs [Aws::Credentials] :aws_credentials @option attrs [String] :name @option attrs [Array<String|Regexp>] :queues An array of SQS names

and Regexps

@option attrs [String] :region @option attrs [String] :visibility_timeout @option attrs [String] :wait_time

# File lib/qurd/listener.rb, line 44
def initialize(attrs = {})
  @aws_credentials = attrs[:aws_credentials]
  @name = attrs[:name]
  @region = attrs[:region]
  @visibility_timeout = attrs[:visibility_timeout]
  @wait_time = attrs[:wait_time]
  @queues = convert_queues attrs[:queues]
  configure_queues
end

Public Instance Methods

inspect() click to toggle source

@private

# File lib/qurd/listener.rb, line 104
def inspect
  format('<Qurd::Listener:%x name:%s>',  object_id, name)
end
listen() click to toggle source

Create one thread per queue, receive messages from it and process each message received @return [Array<Thread>]

# File lib/qurd/listener.rb, line 76
def listen
  threads = queue_threads do |qurl, _context|
    loop do
      begin
        msgs = aws_client(:SQS).receive_message(
          queue_url: qurl,
          wait_time_seconds: wait_time,
          visibility_timeout: visibility_timeout
        )
        threads = process_messages(qurl, msgs)
        joins = threads.map do |thread|
          thread.join(qurd_configuration.listen_timeout)
        end
        if joins.compact.count != threads.count
          nthreads = threads.count - joins.compact.count
          lock_counter { @counter.thread_timeouts += nthreads }
          qurd_logger.warn('Some threads timed out')
        end
      rescue Aws::Errors::ServiceError => e
        lock_counter { @counter.aws_service_errors += 1 }
        qurd_logger.error("Aws raised #{e}")
      end
    end
  end
  threads << stats_thread
end
queue_threads() { |url, ctx| ... } click to toggle source

Create a thread for each queue URL, a context denoting the listener name and the queue URL. @param [Proc] _block the proc each thread should run @yieldparam [String] url the url of the queue @yieldparam [Cabin::Context] ctx the logging context

# File lib/qurd/listener.rb, line 59
def queue_threads(&_block)
  queues.map do |qurl|
    @mutex = Mutex.new
    @counter = Hashie::Mash.new({thread_timeouts: 0, aws_service_errors: 0, successes: 0, failures: 0, messages: 0})

    qurd_logger.debug("Creating thread for #{qurl}")
    Thread.new(qurl) do |url|
      ctx = qurd_config.get_context(name: name, queue_name: url[/([^\/]+)$/])
      qurd_logger.debug('Thread running')
      yield url, ctx
    end
  end
end

Private Instance Methods

configure_queues() click to toggle source
# File lib/qurd/listener.rb, line 147
def configure_queues
  threads = configure_queues_threads
  joins = threads.map do |thread|
    thread.join(qurd_configuration.sqs_set_attributes_timeout)
  end

  qurd_logger! 'One or more threads timed out' \
    if joins.compact.count != threads.count
end
configure_queues_threads() click to toggle source
# File lib/qurd/listener.rb, line 157
def configure_queues_threads
  queue_threads do |q, _context|
    qurd_logger.debug("Setting wait_time:#{wait_time} " \
                      "visibility_timeout:#{visibility_timeout} #{q}")
    begin
      aws_client(:SQS).set_queue_attributes(
        queue_url: q,
        attributes: {
          ReceiveMessageWaitTimeSeconds: wait_time,
          VisibilityTimeout: visibility_timeout
        }
      )
    rescue Aws::SQS::Errors::ServiceError::QueueDoesNotExist => e
      qurd_logger.error("SQS raised #{e}")
      Thread.terminate
    rescue Aws::SQS::Errors::ServiceError => e
      qurd_logger.error("SQS raised #{e}")
      raise e
    end
  end
end
convert_queues(queues) click to toggle source

Convert regexes to and strings to queue URLs @param [Array<String>] queues An array of queues to monitor @return [Array<String>] SQS URLs

# File lib/qurd/listener.rb, line 236
def convert_queues(queues)
  queues.map do |q|
    q[0] == '/' ? parse_regex(q) : queue_url(q)
  end.flatten.compact
end
lock_counter() { || ... } click to toggle source
# File lib/qurd/listener.rb, line 119
def lock_counter(&_block)
  @mutex.synchronize {
    begin
      yield
    rescue ThreadError => e
      @mutex.sleep(0.1)
      qurd_logger.debug("ThreadError: #{e}")
      retry
    end
  }
end
modifier2int(str) click to toggle source
# File lib/qurd/listener.rb, line 194
def modifier2int(str)
  args = 0
  str.each_byte do |c|
    args |= case c.chr
            when 'i' then Regexp::IGNORECASE
            when 'm' then Regexp::MULTILINE
            when 'x' then Regexp::EXTENDED
            when 'o' then 0
            when 'e' then 16
            else qurd_logger! "Unknown regex modifier #{c.chr}"
            end
  end
  args
end
parse_regex(r) click to toggle source

Convert a regex string to a regex, including modifiers @param [String] r String form of the regex @return [Regexp] The compiled regex @example With modifier

Qurd::Configuration.parse_regex("/foo/i")
# File lib/qurd/listener.rb, line 184
def parse_regex(r)
  # /foo/ or /foo/i
  m = r.match %r{\A/(.*)/([a-z]*)\Z}mx
  qurd_logger.debug("Found re: #{m[0]} 1: #{m[1]} 2: #{m[2]}")
  args = modifier2int(m[2])
  regex = Regexp.new(m[1], args)
  qurd_logger.debug("Compiled regex #{regex}")
  queue_url regex
end
process_messages(qurl, msgs) click to toggle source
# File lib/qurd/listener.rb, line 131
def process_messages(qurl, msgs)
  lock_counter { @counter.messages += msgs.messages.count }
  msgs.messages.map do |msg|
    Thread.new(msg) do |m|
      qurd_logger.debug("Found message #{msg}")
      r = Processor.new self, m, name, qurl
      r.process
      lock_counter { 
        r.message.failed? ?
          @counter.failures += 1 :
          @counter.successes += 1
      }
    end
  end
end
queue_url(name) click to toggle source

Find the SQS URL for a named queue or a regex @overload queue_url(name)

@param [String] name The AWS SQS name
@return [String] AWS SQS URL

@overload queue_url(name)

@param [Regexp] name regex of a queue name
@return [Array<String>]
# File lib/qurd/listener.rb, line 217
def queue_url(name)
  @sqs_queues ||= aws_client(:SQS).list_queues.queue_urls

  if name.respond_to?(:upcase)
    url = @sqs_queues.find { |u| u[/([^\/]+$)/] == name }
  else
    url = @sqs_queues.select { |u| u =~ name }
  end
  qurd_logger.debug("Queue #{name} found '#{url}'")
  qurd_logger.warn("No queue found for '#{name}'") if url.nil? || url.empty?
  url
rescue Aws::SQS::Errors::ServiceError => e
  qurd_logger.error("SQS raised #{e}")
  raise e
end
stats_thread() click to toggle source
# File lib/qurd/listener.rb, line 110
def stats_thread
  Thread.new do
    loop do
      sleep(qurd_configuration.stats_interval)
      lock_counter { qurd_logger.info("STATS", @counter) }
    end
  end
end