class MultipleMan::ModelPublisher

Attributes

options[RW]

Public Class Methods

new(options = {}) click to toggle source
# File lib/multiple_man/model_publisher.rb, line 6
def initialize(options = {})
  self.options = options.with_indifferent_access
end

Public Instance Methods

publish(records, operation=:create) click to toggle source
# File lib/multiple_man/model_publisher.rb, line 10
def publish(records, operation=:create)
  return unless MultipleMan.configuration.enabled

  Connection.connect do |connection|
    ActiveSupport::Notifications.instrument('multiple_man.publish_messages') do
      all_records(records) do |record|
        ActiveSupport::Notifications.instrument('multiple_man.publish_message') do
          push_record(connection, record, operation)
        end
      end
    end
  end
rescue Exception => ex
  err = ProducerError.new(reason: ex, payload: records.inspect)
  MultipleMan.error(err, reraise: false)
end

Private Instance Methods

all_records(records) { |records| ... } click to toggle source
# File lib/multiple_man/model_publisher.rb, line 40
def all_records(records, &block)
  if records.respond_to?(:find_each)
    records.find_each(batch_size: 100, &block)
  elsif records.respond_to?(:each)
    records.each(&block)
  else
    yield records
  end
end
push_record(connection, record, operation) click to toggle source
# File lib/multiple_man/model_publisher.rb, line 31
def push_record(connection, record, operation)
  data = PayloadGenerator.new(record, operation, options)
  routing_key = RoutingKey.new(data.type, operation).to_s

  MultipleMan.logger.debug("  Record Data: #{data} | Routing Key: #{routing_key}")

  connection.topic.publish(data.payload, routing_key: routing_key)
end