class AggregateStreams::Aggregation
Public Instance Methods
processed?(message)
click to toggle source
# File lib/aggregate_streams/aggregation.rb, line 24 def processed?(message) message_category = Messaging::StreamName.get_category(message.stream_name) sequence = sequence(message_category) return false if sequence.nil? sequence >= message.global_position end
record_processed(message)
click to toggle source
# File lib/aggregate_streams/aggregation.rb, line 15 def record_processed(message) causation_stream_name = message.metadata.fetch(:causation_message_stream_name) causation_global_position = message.metadata.fetch(:causation_message_global_position) causation_category = Messaging::StreamName.get_category(causation_stream_name) set_sequence(causation_category, causation_global_position) end
sequence(category)
click to toggle source
# File lib/aggregate_streams/aggregation.rb, line 11 def sequence(category) sequences[category] end
set_sequence(category, sequence)
click to toggle source
# File lib/aggregate_streams/aggregation.rb, line 7 def set_sequence(category, sequence) sequences[category] = sequence end