class PubSubModelSync::MessageProcessor

Attributes

payload[RW]

Public Class Methods

new(payload, klass = nil, action = nil) click to toggle source

@param payload (Payload): payload to be delivered @Deprecated: def initialize(data, klass, action)

# File lib/pub_sub_model_sync/message_processor.rb, line 9
def initialize(payload, klass = nil, action = nil)
  @payload = payload
  return if @payload.is_a?(Payload)

  # support for deprecated
  log('Deprecated: Use Payload instead of new(data, klass, action)')
  @payload = PubSubModelSync::Payload.new(payload, { klass: klass, action: action })
end

Public Instance Methods

process() click to toggle source
# File lib/pub_sub_model_sync/message_processor.rb, line 25
def process
  retries ||= 0
  process!
rescue => e
  retry_process?(e, retries += 1) ? retry : notify_error(e)
end
process!() click to toggle source
# File lib/pub_sub_model_sync/message_processor.rb, line 18
def process!
  subscribers = filter_subscribers
  payload_info = { klass: payload.klass, action: payload.action, mode: payload.mode }
  log("No subscribers found for #{payload_info}", :warn) if config.debug && subscribers.empty?
  subscribers.each(&method(:run_subscriber))
end

Private Instance Methods

filter_subscribers() click to toggle source

@return (Array<PubSubModelSync::Subscriber>)

# File lib/pub_sub_model_sync/message_processor.rb, line 78
def filter_subscribers
  config.subscribers.select do |subscriber|
    subscriber.from_klass == payload.klass && subscriber.action == payload.action && payload.mode == subscriber.mode
  end
end
lost_db_connection?(error) click to toggle source
# File lib/pub_sub_model_sync/message_processor.rb, line 57
def lost_db_connection?(error)
  connection_lost_classes = %w[ActiveRecord::ConnectionTimeoutError PG::UnableToSend]
  connection_lost_classes.include?(error.class.name) || error.message.match?(/lost connection/i)
end
notify_error(error) click to toggle source

@param error (StandardError)

# File lib/pub_sub_model_sync/message_processor.rb, line 51
def notify_error(error)
  info = [payload, error.message, error.backtrace]
  res = config.on_error_processing.call(error, { payload: payload })
  log("Error processing message: #{info}", :error) if res != :skip_log
end
processable?(subscriber) click to toggle source
# File lib/pub_sub_model_sync/message_processor.rb, line 44
def processable?(subscriber)
  cancel = config.on_before_processing.call(payload, { subscriber: subscriber }) == :cancel
  log("process message cancelled: #{payload}") if cancel && config.debug
  !cancel
end
retry_process?(error, retries) click to toggle source
# File lib/pub_sub_model_sync/message_processor.rb, line 62
def retry_process?(error, retries) # rubocop:disable Metrics/MethodLength
  error_payload = [payload, error.message, error.backtrace]
  return false unless lost_db_connection?(error)

  if retries <= 5
    sleep(retries)
    log("Error processing message: (retrying #{retries}/5): #{error_payload}", :error)
    ActiveRecord::Base.connection.reconnect! rescue nil # rubocop:disable Style/RescueModifier
    true
  else
    log("Retried 5 times and error persists, exiting...: #{error_payload}", :error)
    Process.exit!(true)
  end
end
run_subscriber(subscriber) click to toggle source
# File lib/pub_sub_model_sync/message_processor.rb, line 34
def run_subscriber(subscriber)
  processor = PubSubModelSync::RunSubscriber.new(subscriber, payload)
  return unless processable?(subscriber)

  log("Processing message #{[subscriber, payload]}...") if config.debug
  processor.call
  res = config.on_success_processing.call(payload, { subscriber: subscriber })
  log "processed message with: #{payload.inspect}" if res != :skip_log
end