class CycloneLariat::Middleware
Attributes
errors_notifier[R]
events_repo[R]
message_notifier[R]
Public Class Methods
new(dataset: nil, errors_notifier: nil, message_notifier: nil, repo: MessagesRepo)
click to toggle source
# File lib/cyclone_lariat/middleware.rb, line 9 def initialize(dataset: nil, errors_notifier: nil, message_notifier: nil, repo: MessagesRepo) @events_repo = repo.new(dataset) if dataset @message_notifier = message_notifier @errors_notifier = errors_notifier end
Public Instance Methods
call(_worker_instance, queue, _sqs_msg, body, &block)
click to toggle source
# File lib/cyclone_lariat/middleware.rb, line 15 def call(_worker_instance, queue, _sqs_msg, body, &block) msg = receive_message(body) message_notifier&.info 'Receive message', message: msg, queue: queue catch_standard_error(queue, msg) do event = Event.wrap(msg) store_in_dataset(event) do catch_business_error(event, &block) end end end
Private Instance Methods
catch_business_error(event) { || ... }
click to toggle source
# File lib/cyclone_lariat/middleware.rb, line 48 def catch_business_error(event) yield rescue LunaPark::Errors::Business => e errors_notifier&.error(e, event: event) event.client_error = e end
catch_standard_error(queue, msg) { || ... }
click to toggle source
# File lib/cyclone_lariat/middleware.rb, line 55 def catch_standard_error(queue, msg) yield rescue Exception => e errors_notifier&.error(e, queue: queue, message: msg) raise e end
receive_message(body)
click to toggle source
# File lib/cyclone_lariat/middleware.rb, line 33 def receive_message(body) body[:Message] ? JSON.parse(body[:Message], symbolize_names: true ) : body end
store_in_dataset(event) { || ... }
click to toggle source
# File lib/cyclone_lariat/middleware.rb, line 37 def store_in_dataset(event) return yield if events_repo.nil? existed = events_repo.find(uuid: event.uuid) return true if existed&.processed? events_repo.create(event) unless existed yield events_repo.processed! uuid: event.uuid, error: event.client_error end