class Circuitry::Subscriber
Constants
- CONNECTION_ERRORS
- DEFAULT_OPTIONS
Attributes
batch_size[RW]
lock[R]
queue[RW]
subscribed[RW]
timeout[RW]
wait_time[RW]
Public Class Methods
async_strategies()
click to toggle source
Calls superclass method
# File lib/circuitry/subscriber.rb, line 62 def self.async_strategies super - [:batch] end
default_async_strategy()
click to toggle source
# File lib/circuitry/subscriber.rb, line 66 def self.default_async_strategy Circuitry.subscriber_config.async_strategy end
new(options = {})
click to toggle source
# File lib/circuitry/subscriber.rb, line 29 def initialize(options = {}) options = DEFAULT_OPTIONS.merge(options) self.subscribed = false self.queue = Queue.find(Circuitry.subscriber_config.queue_name).url %i[lock async timeout wait_time batch_size].each do |sym| send(:"#{sym}=", options[sym]) end trap_signals end
Public Instance Methods
subscribe(&block)
click to toggle source
# File lib/circuitry/subscriber.rb, line 42 def subscribe(&block) raise ArgumentError, 'block required' if block.nil? raise SubscribeError, 'AWS configuration is not set' unless can_subscribe? logger.info("Subscribing to queue: #{queue}") self.subscribed = true poll(&block) self.subscribed = false logger.info("Unsubscribed from queue: #{queue}") rescue *CONNECTION_ERRORS => e logger.error("Connection error to queue: #{queue}: #{e}") raise SubscribeError, e.message end
subscribed?()
click to toggle source
# File lib/circuitry/subscriber.rb, line 58 def subscribed? subscribed end
Protected Instance Methods
lock=(value)
click to toggle source
# File lib/circuitry/subscriber.rb, line 75 def lock=(value) value = case value when true then Circuitry.subscriber_config.lock_strategy when false then Circuitry::Locks::NOOP.new when Circuitry::Locks::Base then value else raise ArgumentError, lock_value_error(value) end @lock = value end
Private Instance Methods
can_subscribe?()
click to toggle source
# File lib/circuitry/subscriber.rb, line 194 def can_subscribe? return true if Circuitry.subscriber_config.use_iam_profile Circuitry.subscriber_config.aws_options.values.all? do |value| !value.nil? && !value.empty? end end
delete_message(message)
click to toggle source
# File lib/circuitry/subscriber.rb, line 181 def delete_message(message) logger.debug("Removing message #{message.id} from queue") sqs.delete_message(queue_url: queue, receipt_handle: message.receipt_handle) end
error_handler()
click to toggle source
# File lib/circuitry/subscriber.rb, line 190 def error_handler Circuitry.subscriber_config.error_handler end
handle_message(message, &block)
click to toggle source
TODO: Don’t use ruby timeout. www.mikeperham.com/2015/05/08/timeout-rubys-most-dangerous-api/
# File lib/circuitry/subscriber.rb, line 172 def handle_message(message, &block) Timeout.timeout(timeout) do block.call(message.body, message.topic.name) end rescue => e logger.error("Error handling message #{message.id}: #{e}") raise e end
handle_message_with_middleware(message, &block)
click to toggle source
# File lib/circuitry/subscriber.rb, line 147 def handle_message_with_middleware(message, &block) middleware.invoke(message.topic.name, message.body) do handle_message(message, &block) delete_message(message) end end
lock_value_error(value)
click to toggle source
# File lib/circuitry/subscriber.rb, line 88 def lock_value_error(value) opts = Circuitry::Locks::Base "Invalid value `#{value}`, must be one of `true`, `false`, or instance of `#{opts}`" end
logger()
click to toggle source
# File lib/circuitry/subscriber.rb, line 186 def logger Circuitry.subscriber_config.logger end
middleware()
click to toggle source
# File lib/circuitry/subscriber.rb, line 202 def middleware Circuitry.subscriber_config.middleware end
poll(&block)
click to toggle source
# File lib/circuitry/subscriber.rb, line 102 def poll(&block) poller = Aws::SQS::QueuePoller.new(queue, client: sqs) poller.before_request do |_stats| throw :stop_polling unless subscribed? end poller.poll(max_number_of_messages: batch_size, wait_time_seconds: wait_time, skip_delete: true) do |messages| messages = [messages] unless messages.is_a?(Array) process_messages(Array(messages), &block) Circuitry.flush end end
process_message(message, &block)
click to toggle source
# File lib/circuitry/subscriber.rb, line 132 def process_message(message, &block) message = Message.new(message) logger.debug("Processing message #{message.id}") handled = try_with_lock(message.id) do handle_message_with_middleware(message, &block) end logger.info("Ignoring duplicate message #{message.id}") unless handled rescue => e logger.error("Error processing message #{message.id}: #{e}") error_handler.call(e) if error_handler end
process_messages(messages, &block)
click to toggle source
# File lib/circuitry/subscriber.rb, line 116 def process_messages(messages, &block) if async? process_messages_asynchronously(messages, &block) else process_messages_synchronously(messages, &block) end end
process_messages_asynchronously(messages, &block)
click to toggle source
# File lib/circuitry/subscriber.rb, line 124 def process_messages_asynchronously(messages, &block) messages.each { |message| process_asynchronously { process_message(message, &block) } } end
process_messages_synchronously(messages, &block)
click to toggle source
# File lib/circuitry/subscriber.rb, line 128 def process_messages_synchronously(messages, &block) messages.each { |message| process_message(message, &block) } end
trap_signals()
click to toggle source
# File lib/circuitry/subscriber.rb, line 93 def trap_signals trap('SIGINT') do if subscribed? Thread.new { logger.info('Interrupt received, unsubscribing from queue...') } self.subscribed = false end end end
try_with_lock(id) { || ... }
click to toggle source
# File lib/circuitry/subscriber.rb, line 154 def try_with_lock(id) if lock.soft_lock(id) begin yield rescue => e lock.unlock(id) raise e end lock.hard_lock(id) true else false end end