class AggregateStreams::Aggregation

Public Instance Methods

processed?(message) click to toggle source
# File lib/aggregate_streams/aggregation.rb, line 24
def processed?(message)
  message_category = Messaging::StreamName.get_category(message.stream_name)

  sequence = sequence(message_category)
  return false if sequence.nil?

  sequence >= message.global_position
end
record_processed(message) click to toggle source
# File lib/aggregate_streams/aggregation.rb, line 15
def record_processed(message)
  causation_stream_name = message.metadata.fetch(:causation_message_stream_name)
  causation_global_position = message.metadata.fetch(:causation_message_global_position)

  causation_category = Messaging::StreamName.get_category(causation_stream_name)

  set_sequence(causation_category, causation_global_position)
end
sequence(category) click to toggle source
# File lib/aggregate_streams/aggregation.rb, line 11
def sequence(category)
  sequences[category]
end
set_sequence(category, sequence) click to toggle source
# File lib/aggregate_streams/aggregation.rb, line 7
def set_sequence(category, sequence)
  sequences[category] = sequence
end