class MultipleMan::ModelPublisher
Attributes
options[RW]
Public Class Methods
new(options = {})
click to toggle source
# File lib/multiple_man/model_publisher.rb, line 6 def initialize(options = {}) self.options = options.with_indifferent_access end
Public Instance Methods
publish(records, operation=:create)
click to toggle source
# File lib/multiple_man/model_publisher.rb, line 10 def publish(records, operation=:create) return unless MultipleMan.configuration.enabled Connection.connect do |connection| ActiveSupport::Notifications.instrument('multiple_man.publish_messages') do all_records(records) do |record| ActiveSupport::Notifications.instrument('multiple_man.publish_message') do push_record(connection, record, operation) end end end end rescue Exception => ex err = ProducerError.new(reason: ex, payload: records.inspect) MultipleMan.error(err, reraise: false) end
Private Instance Methods
all_records(records) { |records| ... }
click to toggle source
# File lib/multiple_man/model_publisher.rb, line 40 def all_records(records, &block) if records.respond_to?(:find_each) records.find_each(batch_size: 100, &block) elsif records.respond_to?(:each) records.each(&block) else yield records end end
push_record(connection, record, operation)
click to toggle source
# File lib/multiple_man/model_publisher.rb, line 31 def push_record(connection, record, operation) data = PayloadGenerator.new(record, operation, options) routing_key = RoutingKey.new(data.type, operation).to_s MultipleMan.logger.debug(" Record Data: #{data} | Routing Key: #{routing_key}") connection.topic.publish(data.payload, routing_key: routing_key) end