class Circuitry::Publisher

Constants

CONNECTION_ERRORS
DEFAULT_OPTIONS

Attributes

timeout[RW]

Public Class Methods

default_async_strategy() click to toggle source
# File lib/circuitry/publisher.rb, line 57
def self.default_async_strategy
  Circuitry.publisher_config.async_strategy
end
new(options = {}) click to toggle source
# File lib/circuitry/publisher.rb, line 36
def initialize(options = {})
  options = DEFAULT_OPTIONS.merge(options)

  self.async = options[:async]
  self.timeout = options[:timeout]
end

Public Instance Methods

publish(topic_name, object) click to toggle source
# File lib/circuitry/publisher.rb, line 43
def publish(topic_name, object)
  raise ArgumentError, 'topic_name cannot be nil' if topic_name.nil?
  raise ArgumentError, 'object cannot be nil' if object.nil?
  raise PublishError, 'AWS configuration is not set' unless can_publish?

  message = object.to_json

  if async?
    process_asynchronously { publish_message(topic_name, message) }
  else
    publish_message(topic_name, message)
  end
end

Protected Instance Methods

publish_message(topic_name, message) click to toggle source
# File lib/circuitry/publisher.rb, line 63
def publish_message(topic_name, message)
  middleware.invoke(topic_name, message) do
    # TODO: Don't use ruby timeout.
    # http://www.mikeperham.com/2015/05/08/timeout-rubys-most-dangerous-api/
    Timeout.timeout(timeout) do
      logger.debug("Publishing message to #{topic_name}")

      handler = ->(error, attempt_number, _total_delay) do
        logger.warn("Error publishing attempt ##{attempt_number}: #{error.class} (#{error.message}); retrying...")
      end

      with_retries(max_tries: 3, handler: handler, rescue: CONNECTION_ERRORS, base_sleep_seconds: 0.05, max_sleep_seconds: 0.25) do
        topic = Topic.find(topic_name)
        sns_publish(topic: topic, message: message)
      end
    end
  end
end

Private Instance Methods

can_publish?() click to toggle source
# File lib/circuitry/publisher.rb, line 97
def can_publish?
  return true if Circuitry.publisher_config.use_iam_profile

  Circuitry.publisher_config.aws_options.values.all? do |value|
    !value.nil? && !value.empty?
  end
end
logger() click to toggle source
# File lib/circuitry/publisher.rb, line 93
def logger
  Circuitry.publisher_config.logger
end
middleware() click to toggle source
# File lib/circuitry/publisher.rb, line 105
def middleware
  Circuitry.publisher_config.middleware
end
sns_publish(topic:, message:) click to toggle source
# File lib/circuitry/publisher.rb, line 86
def sns_publish(topic:, message:)
  sns.publish(topic_arn: topic.arn, message: message)

rescue Aws::SNS::Errors::InvalidParameter => ex
  raise SnsPublishError.new(topic: topic, message:  message, exception: ex)
end