class Circuitry::Subscriber

Constants

CONNECTION_ERRORS
DEFAULT_OPTIONS

Attributes

batch_size[RW]
lock[R]
queue[RW]
subscribed[RW]
timeout[RW]
wait_time[RW]

Public Class Methods

async_strategies() click to toggle source
Calls superclass method
# File lib/circuitry/subscriber.rb, line 62
def self.async_strategies
  super - [:batch]
end
default_async_strategy() click to toggle source
# File lib/circuitry/subscriber.rb, line 66
def self.default_async_strategy
  Circuitry.subscriber_config.async_strategy
end
new(options = {}) click to toggle source
# File lib/circuitry/subscriber.rb, line 29
def initialize(options = {})
  options = DEFAULT_OPTIONS.merge(options)

  self.subscribed = false
  self.queue = Queue.find(Circuitry.subscriber_config.queue_name).url

  %i[lock async timeout wait_time batch_size].each do |sym|
    send(:"#{sym}=", options[sym])
  end

  trap_signals
end

Public Instance Methods

subscribe(&block) click to toggle source
# File lib/circuitry/subscriber.rb, line 42
def subscribe(&block)
  raise ArgumentError, 'block required' if block.nil?
  raise SubscribeError, 'AWS configuration is not set' unless can_subscribe?

  logger.info("Subscribing to queue: #{queue}")

  self.subscribed = true
  poll(&block)
  self.subscribed = false

  logger.info("Unsubscribed from queue: #{queue}")
rescue *CONNECTION_ERRORS => e
  logger.error("Connection error to queue: #{queue}: #{e}")
  raise SubscribeError, e.message
end
subscribed?() click to toggle source
# File lib/circuitry/subscriber.rb, line 58
def subscribed?
  subscribed
end

Protected Instance Methods

lock=(value) click to toggle source
# File lib/circuitry/subscriber.rb, line 75
def lock=(value)
  value = case value
          when true then Circuitry.subscriber_config.lock_strategy
          when false then Circuitry::Locks::NOOP.new
          when Circuitry::Locks::Base then value
          else raise ArgumentError, lock_value_error(value)
          end

  @lock = value
end

Private Instance Methods

can_subscribe?() click to toggle source
# File lib/circuitry/subscriber.rb, line 194
def can_subscribe?
  return true if Circuitry.subscriber_config.use_iam_profile

  Circuitry.subscriber_config.aws_options.values.all? do |value|
    !value.nil? && !value.empty?
  end
end
delete_message(message) click to toggle source
# File lib/circuitry/subscriber.rb, line 181
def delete_message(message)
  logger.debug("Removing message #{message.id} from queue")
  sqs.delete_message(queue_url: queue, receipt_handle: message.receipt_handle)
end
error_handler() click to toggle source
# File lib/circuitry/subscriber.rb, line 190
def error_handler
  Circuitry.subscriber_config.error_handler
end
handle_message(message, &block) click to toggle source

TODO: Don’t use ruby timeout. www.mikeperham.com/2015/05/08/timeout-rubys-most-dangerous-api/

# File lib/circuitry/subscriber.rb, line 172
def handle_message(message, &block)
  Timeout.timeout(timeout) do
    block.call(message.body, message.topic.name)
  end
rescue => e
  logger.error("Error handling message #{message.id}: #{e}")
  raise e
end
handle_message_with_middleware(message, &block) click to toggle source
# File lib/circuitry/subscriber.rb, line 147
def handle_message_with_middleware(message, &block)
  middleware.invoke(message.topic.name, message.body) do
    handle_message(message, &block)
    delete_message(message)
  end
end
lock_value_error(value) click to toggle source
# File lib/circuitry/subscriber.rb, line 88
def lock_value_error(value)
  opts = Circuitry::Locks::Base
  "Invalid value `#{value}`, must be one of `true`, `false`, or instance of `#{opts}`"
end
logger() click to toggle source
# File lib/circuitry/subscriber.rb, line 186
def logger
  Circuitry.subscriber_config.logger
end
middleware() click to toggle source
# File lib/circuitry/subscriber.rb, line 202
def middleware
  Circuitry.subscriber_config.middleware
end
poll(&block) click to toggle source
# File lib/circuitry/subscriber.rb, line 102
def poll(&block)
  poller = Aws::SQS::QueuePoller.new(queue, client: sqs)

  poller.before_request do |_stats|
    throw :stop_polling unless subscribed?
  end

  poller.poll(max_number_of_messages: batch_size, wait_time_seconds: wait_time, skip_delete: true) do |messages|
    messages = [messages] unless messages.is_a?(Array)
    process_messages(Array(messages), &block)
    Circuitry.flush
  end
end
process_message(message, &block) click to toggle source
# File lib/circuitry/subscriber.rb, line 132
def process_message(message, &block)
  message = Message.new(message)

  logger.debug("Processing message #{message.id}")

  handled = try_with_lock(message.id) do
    handle_message_with_middleware(message, &block)
  end

  logger.info("Ignoring duplicate message #{message.id}") unless handled
rescue => e
  logger.error("Error processing message #{message.id}: #{e}")
  error_handler.call(e) if error_handler
end
process_messages(messages, &block) click to toggle source
# File lib/circuitry/subscriber.rb, line 116
def process_messages(messages, &block)
  if async?
    process_messages_asynchronously(messages, &block)
  else
    process_messages_synchronously(messages, &block)
  end
end
process_messages_asynchronously(messages, &block) click to toggle source
# File lib/circuitry/subscriber.rb, line 124
def process_messages_asynchronously(messages, &block)
  messages.each { |message| process_asynchronously { process_message(message, &block) } }
end
process_messages_synchronously(messages, &block) click to toggle source
# File lib/circuitry/subscriber.rb, line 128
def process_messages_synchronously(messages, &block)
  messages.each { |message| process_message(message, &block) }
end
trap_signals() click to toggle source
# File lib/circuitry/subscriber.rb, line 93
def trap_signals
  trap('SIGINT') do
    if subscribed?
      Thread.new { logger.info('Interrupt received, unsubscribing from queue...') }
      self.subscribed = false
    end
  end
end
try_with_lock(id) { || ... } click to toggle source
# File lib/circuitry/subscriber.rb, line 154
def try_with_lock(id)
  if lock.soft_lock(id)
    begin
      yield
    rescue => e
      lock.unlock(id)
      raise e
    end

    lock.hard_lock(id)
    true
  else
    false
  end
end