class Circuitry::Publisher
Constants
- CONNECTION_ERRORS
- DEFAULT_OPTIONS
Attributes
timeout[RW]
Public Class Methods
default_async_strategy()
click to toggle source
# File lib/circuitry/publisher.rb, line 57 def self.default_async_strategy Circuitry.publisher_config.async_strategy end
new(options = {})
click to toggle source
# File lib/circuitry/publisher.rb, line 36 def initialize(options = {}) options = DEFAULT_OPTIONS.merge(options) self.async = options[:async] self.timeout = options[:timeout] end
Public Instance Methods
publish(topic_name, object)
click to toggle source
# File lib/circuitry/publisher.rb, line 43 def publish(topic_name, object) raise ArgumentError, 'topic_name cannot be nil' if topic_name.nil? raise ArgumentError, 'object cannot be nil' if object.nil? raise PublishError, 'AWS configuration is not set' unless can_publish? message = object.to_json if async? process_asynchronously { publish_message(topic_name, message) } else publish_message(topic_name, message) end end
Protected Instance Methods
publish_message(topic_name, message)
click to toggle source
# File lib/circuitry/publisher.rb, line 63 def publish_message(topic_name, message) middleware.invoke(topic_name, message) do # TODO: Don't use ruby timeout. # http://www.mikeperham.com/2015/05/08/timeout-rubys-most-dangerous-api/ Timeout.timeout(timeout) do logger.debug("Publishing message to #{topic_name}") handler = ->(error, attempt_number, _total_delay) do logger.warn("Error publishing attempt ##{attempt_number}: #{error.class} (#{error.message}); retrying...") end with_retries(max_tries: 3, handler: handler, rescue: CONNECTION_ERRORS, base_sleep_seconds: 0.05, max_sleep_seconds: 0.25) do topic = Topic.find(topic_name) sns_publish(topic: topic, message: message) end end end end
Private Instance Methods
can_publish?()
click to toggle source
# File lib/circuitry/publisher.rb, line 97 def can_publish? return true if Circuitry.publisher_config.use_iam_profile Circuitry.publisher_config.aws_options.values.all? do |value| !value.nil? && !value.empty? end end
logger()
click to toggle source
# File lib/circuitry/publisher.rb, line 93 def logger Circuitry.publisher_config.logger end
middleware()
click to toggle source
# File lib/circuitry/publisher.rb, line 105 def middleware Circuitry.publisher_config.middleware end
sns_publish(topic:, message:)
click to toggle source
# File lib/circuitry/publisher.rb, line 86 def sns_publish(topic:, message:) sns.publish(topic_arn: topic.arn, message: message) rescue Aws::SNS::Errors::InvalidParameter => ex raise SnsPublishError.new(topic: topic, message: message, exception: ex) end