class PubSubModelSync::MessageProcessor
Attributes
payload[RW]
Public Class Methods
new(payload, klass = nil, action = nil)
click to toggle source
@param payload (Payload
): payload to be delivered @Deprecated: def initialize(data, klass, action)
# File lib/pub_sub_model_sync/message_processor.rb, line 9 def initialize(payload, klass = nil, action = nil) @payload = payload return if @payload.is_a?(Payload) # support for deprecated log('Deprecated: Use Payload instead of new(data, klass, action)') @payload = PubSubModelSync::Payload.new(payload, { klass: klass, action: action }) end
Public Instance Methods
process()
click to toggle source
# File lib/pub_sub_model_sync/message_processor.rb, line 25 def process retries ||= 0 process! rescue => e retry_process?(e, retries += 1) ? retry : notify_error(e) end
process!()
click to toggle source
# File lib/pub_sub_model_sync/message_processor.rb, line 18 def process! subscribers = filter_subscribers payload_info = { klass: payload.klass, action: payload.action, mode: payload.mode } log("No subscribers found for #{payload_info}", :warn) if config.debug && subscribers.empty? subscribers.each(&method(:run_subscriber)) end
Private Instance Methods
filter_subscribers()
click to toggle source
@return (Array<PubSubModelSync::Subscriber>)
# File lib/pub_sub_model_sync/message_processor.rb, line 78 def filter_subscribers config.subscribers.select do |subscriber| subscriber.from_klass == payload.klass && subscriber.action == payload.action && payload.mode == subscriber.mode end end
lost_db_connection?(error)
click to toggle source
# File lib/pub_sub_model_sync/message_processor.rb, line 57 def lost_db_connection?(error) connection_lost_classes = %w[ActiveRecord::ConnectionTimeoutError PG::UnableToSend] connection_lost_classes.include?(error.class.name) || error.message.match?(/lost connection/i) end
notify_error(error)
click to toggle source
@param error (StandardError)
# File lib/pub_sub_model_sync/message_processor.rb, line 51 def notify_error(error) info = [payload, error.message, error.backtrace] res = config.on_error_processing.call(error, { payload: payload }) log("Error processing message: #{info}", :error) if res != :skip_log end
processable?(subscriber)
click to toggle source
# File lib/pub_sub_model_sync/message_processor.rb, line 44 def processable?(subscriber) cancel = config.on_before_processing.call(payload, { subscriber: subscriber }) == :cancel log("process message cancelled: #{payload}") if cancel && config.debug !cancel end
retry_process?(error, retries)
click to toggle source
# File lib/pub_sub_model_sync/message_processor.rb, line 62 def retry_process?(error, retries) # rubocop:disable Metrics/MethodLength error_payload = [payload, error.message, error.backtrace] return false unless lost_db_connection?(error) if retries <= 5 sleep(retries) log("Error processing message: (retrying #{retries}/5): #{error_payload}", :error) ActiveRecord::Base.connection.reconnect! rescue nil # rubocop:disable Style/RescueModifier true else log("Retried 5 times and error persists, exiting...: #{error_payload}", :error) Process.exit!(true) end end
run_subscriber(subscriber)
click to toggle source
# File lib/pub_sub_model_sync/message_processor.rb, line 34 def run_subscriber(subscriber) processor = PubSubModelSync::RunSubscriber.new(subscriber, payload) return unless processable?(subscriber) log("Processing message #{[subscriber, payload]}...") if config.debug processor.call res = config.on_success_processing.call(payload, { subscriber: subscriber }) log "processed message with: #{payload.inspect}" if res != :skip_log end