class PubSubModelSync::MessagePublisher

Attributes

current_transaction[RW]

Public Class Methods

connector() click to toggle source
# File lib/pub_sub_model_sync/message_publisher.rb, line 9
def connector
  @connector ||= PubSubModelSync::Connector.new
end
connector_publish(payload) click to toggle source
# File lib/pub_sub_model_sync/message_publisher.rb, line 82
def connector_publish(payload)
  log("Publishing message #{[payload]}...") if config.debug
  connector.publish(payload)
  log("Published message: #{[payload]}")
  config.on_after_publish.call(payload)
end
init_transaction(key, settings = {}) click to toggle source

Starts a new transaction @param key (String|Nil) @return (Transaction)

# File lib/pub_sub_model_sync/message_publisher.rb, line 36
def init_transaction(key, settings = {})
  new_transaction = PubSubModelSync::Transaction.new(key, settings)
  if current_transaction
    current_transaction.add_transaction(new_transaction)
  else
    self.current_transaction = new_transaction
  end
  new_transaction
end
publish(payload, &block) click to toggle source

Similar to :publish! method Notifies error via :on_error_publish instead of raising error @return Payload

# File lib/pub_sub_model_sync/message_publisher.rb, line 92
def publish(payload, &block)
  publish!(payload, &block)
rescue => e
  notify_error(e, payload)
end
publish!(payload, &block) click to toggle source

Publishes payload to pubsub @param payload (PubSubModelSync::Payload) @return Payload Raises error if exist

# File lib/pub_sub_model_sync/message_publisher.rb, line 73
def publish!(payload, &block)
  payload.headers[:ordering_key] = ordering_key_for(payload)
  return unless ensure_publish(payload)

  current_transaction ? current_transaction.add_payload(payload) : connector_publish(payload)
  block&.call
  payload
end
publish_data(klass, data, action, headers: {}) click to toggle source

Publishes a class level notification via pubsub @refer PublisherConcern.ps_class_publish @return Payload

# File lib/pub_sub_model_sync/message_publisher.rb, line 49
def publish_data(klass, data, action, headers: {})
  info = { klass: klass.to_s, action: action.to_sym, mode: :klass }
  log("Building payload for: #{info.inspect}") if config.debug
  payload = PubSubModelSync::Payload.new(data, info, headers)
  define_transaction_key(payload)
  publish(payload)
end
publish_model(model, action, settings = {}) click to toggle source

@param model (ActiveRecord::Base,PubSubModelSync::PublisherConcern) @param action (Symbol,String @see PublishConcern::ps_publish) @param settings (Hash @see PayloadBuilder.settings)

# File lib/pub_sub_model_sync/message_publisher.rb, line 60
def publish_model(model, action, settings = {})
  log("Building payload for: #{[model, action].inspect}") if config.debug
  payload = PubSubModelSync::PayloadBuilder.new(model, action, settings).call
  define_transaction_key(payload)
  transaction(payload.headers[:ordering_key]) do # catch and group all :ps_before_publish syncs
    publish(payload) { model.ps_after_publish(action, payload) } if ensure_model_publish(model, action, payload)
  end
end
transaction(key, settings = {}, &block) click to toggle source

Permits to group all payloads with the same ordering_key and be processed in the same order

they are published by the subscribers. Grouping by ordering_key allows us to enable
multiple workers in our Pub/Sub service(s), and still guarantee that related payloads will
be processed in the correct order, despite of the multiple threads. This thanks to the fact
that Pub/Sub services will always send messages with the same `ordering_key` into the same
worker/thread.

@see Transaction.new(...) @param key (String|Nil) @param block (Yield) block to be executed

# File lib/pub_sub_model_sync/message_publisher.rb, line 22
def transaction(key, settings = {}, &block)
  t = init_transaction(key, settings)
  block.call
  t.finish
rescue
  t.rollback
  raise
ensure
  t.clean_publisher
end

Private Class Methods

define_transaction_key(payload) click to toggle source
# File lib/pub_sub_model_sync/message_publisher.rb, line 123
def define_transaction_key(payload)
  current_transaction&.key ||= payload.headers[:ordering_key]
end
ensure_model_publish(model, action, payload) click to toggle source
# File lib/pub_sub_model_sync/message_publisher.rb, line 110
def ensure_model_publish(model, action, payload)
  res_before = model.ps_before_publish(action, payload)
  cancelled = res_before == :cancel
  log("Publish cancelled by model.ps_before_publish: #{[payload]}") if config.debug && cancelled
  !cancelled
end
ensure_publish(payload) click to toggle source
# File lib/pub_sub_model_sync/message_publisher.rb, line 100
def ensure_publish(payload)
  cancelled = config.on_before_publish.call(payload) == :cancel
  log("Publish cancelled by config.on_before_publish: #{[payload]}") if config.debug && cancelled
  !cancelled
end
notify_error(exception, payload) click to toggle source
# File lib/pub_sub_model_sync/message_publisher.rb, line 117
def notify_error(exception, payload)
  info = [payload, exception.message, exception.backtrace]
  res = config.on_error_publish.call(exception, { payload: payload })
  log("Error publishing: #{info}", :error) if res != :skip_log
end
ordering_key_for(payload) click to toggle source
# File lib/pub_sub_model_sync/message_publisher.rb, line 106
def ordering_key_for(payload)
  payload.headers[:forced_ordering_key] || current_transaction&.key || payload.headers[:ordering_key]
end