class PubSubModelSync::MessagePublisher
Attributes
current_transaction[RW]
Public Class Methods
connector()
click to toggle source
# File lib/pub_sub_model_sync/message_publisher.rb, line 9 def connector @connector ||= PubSubModelSync::Connector.new end
connector_publish(payload)
click to toggle source
# File lib/pub_sub_model_sync/message_publisher.rb, line 82 def connector_publish(payload) log("Publishing message #{[payload]}...") if config.debug connector.publish(payload) log("Published message: #{[payload]}") config.on_after_publish.call(payload) end
init_transaction(key, settings = {})
click to toggle source
Starts a new transaction @param key (String|Nil) @return (Transaction
)
# File lib/pub_sub_model_sync/message_publisher.rb, line 36 def init_transaction(key, settings = {}) new_transaction = PubSubModelSync::Transaction.new(key, settings) if current_transaction current_transaction.add_transaction(new_transaction) else self.current_transaction = new_transaction end new_transaction end
publish(payload, &block)
click to toggle source
Similar to :publish! method Notifies error via :on_error_publish instead of raising error @return Payload
# File lib/pub_sub_model_sync/message_publisher.rb, line 92 def publish(payload, &block) publish!(payload, &block) rescue => e notify_error(e, payload) end
publish!(payload, &block)
click to toggle source
Publishes payload to pubsub @param payload (PubSubModelSync::Payload
) @return Payload
Raises error if exist
# File lib/pub_sub_model_sync/message_publisher.rb, line 73 def publish!(payload, &block) payload.headers[:ordering_key] = ordering_key_for(payload) return unless ensure_publish(payload) current_transaction ? current_transaction.add_payload(payload) : connector_publish(payload) block&.call payload end
publish_data(klass, data, action, headers: {})
click to toggle source
Publishes a class level notification via pubsub @refer PublisherConcern.ps_class_publish @return Payload
# File lib/pub_sub_model_sync/message_publisher.rb, line 49 def publish_data(klass, data, action, headers: {}) info = { klass: klass.to_s, action: action.to_sym, mode: :klass } log("Building payload for: #{info.inspect}") if config.debug payload = PubSubModelSync::Payload.new(data, info, headers) define_transaction_key(payload) publish(payload) end
publish_model(model, action, settings = {})
click to toggle source
@param model (ActiveRecord::Base,PubSubModelSync::PublisherConcern) @param action (Symbol,String @see PublishConcern::ps_publish) @param settings (Hash @see PayloadBuilder.settings)
# File lib/pub_sub_model_sync/message_publisher.rb, line 60 def publish_model(model, action, settings = {}) log("Building payload for: #{[model, action].inspect}") if config.debug payload = PubSubModelSync::PayloadBuilder.new(model, action, settings).call define_transaction_key(payload) transaction(payload.headers[:ordering_key]) do # catch and group all :ps_before_publish syncs publish(payload) { model.ps_after_publish(action, payload) } if ensure_model_publish(model, action, payload) end end
transaction(key, settings = {}, &block)
click to toggle source
Permits to group all payloads with the same ordering_key and be processed in the same order
they are published by the subscribers. Grouping by ordering_key allows us to enable multiple workers in our Pub/Sub service(s), and still guarantee that related payloads will be processed in the correct order, despite of the multiple threads. This thanks to the fact that Pub/Sub services will always send messages with the same `ordering_key` into the same worker/thread.
@see Transaction.new(...)
@param key (String|Nil) @param block (Yield) block to be executed
# File lib/pub_sub_model_sync/message_publisher.rb, line 22 def transaction(key, settings = {}, &block) t = init_transaction(key, settings) block.call t.finish rescue t.rollback raise ensure t.clean_publisher end
Private Class Methods
define_transaction_key(payload)
click to toggle source
# File lib/pub_sub_model_sync/message_publisher.rb, line 123 def define_transaction_key(payload) current_transaction&.key ||= payload.headers[:ordering_key] end
ensure_model_publish(model, action, payload)
click to toggle source
# File lib/pub_sub_model_sync/message_publisher.rb, line 110 def ensure_model_publish(model, action, payload) res_before = model.ps_before_publish(action, payload) cancelled = res_before == :cancel log("Publish cancelled by model.ps_before_publish: #{[payload]}") if config.debug && cancelled !cancelled end
ensure_publish(payload)
click to toggle source
# File lib/pub_sub_model_sync/message_publisher.rb, line 100 def ensure_publish(payload) cancelled = config.on_before_publish.call(payload) == :cancel log("Publish cancelled by config.on_before_publish: #{[payload]}") if config.debug && cancelled !cancelled end
notify_error(exception, payload)
click to toggle source
# File lib/pub_sub_model_sync/message_publisher.rb, line 117 def notify_error(exception, payload) info = [payload, exception.message, exception.backtrace] res = config.on_error_publish.call(exception, { payload: payload }) log("Error publishing: #{info}", :error) if res != :skip_log end
ordering_key_for(payload)
click to toggle source
# File lib/pub_sub_model_sync/message_publisher.rb, line 106 def ordering_key_for(payload) payload.headers[:forced_ordering_key] || current_transaction&.key || payload.headers[:ordering_key] end