class AggregateStreams::Handle

Constants

TransformError

Public Instance Methods

assure_message_data(message_data) click to toggle source
# File lib/aggregate_streams/handle.rb, line 105
def assure_message_data(message_data)
  unless message_data.instance_of?(MessageStore::MessageData::Write)
    raise TransformError, "Not an instance of MessageData::Write"
  end
end
configure(session: nil, settings:) click to toggle source
# File lib/aggregate_streams/handle.rb, line 20
def configure(session: nil, settings:)
  settings.set(self)

  writer_session = self.writer_session
  writer_session ||= session

  Store.configure(self, category: category, session: writer_session, snapshot_interval: snapshot_interval)

  MessageStore::Postgres::Write.configure(self, session: writer_session)
end
handle(message_data) click to toggle source
# File lib/aggregate_streams/handle.rb, line 31
def handle(message_data)
  logger.trace { "Handling message (Stream: #{message_data.stream_name}, Global Position: #{message_data.global_position})" }

  Retry.(MessageStore::ExpectedVersion::Error, millisecond_intervals: [0, 10, 100, 1000]) do
    stream_id = Messaging::StreamName.get_id(message_data.stream_name)

    aggregation, version = store.fetch(stream_id, include: :version)

    if aggregation.processed?(message_data)
      logger.info(tag: :ignored) { "Message already handled (Stream: #{message_data.stream_name}, Global Position: #{message_data.global_position})" }
      return
    end

    raw_input_data = Messaging::Message::Transform::MessageData.read(message_data)
    input_metadata = Messaging::Message::Metadata.build(raw_input_data[:metadata])

    output_metadata = raw_metadata(input_metadata)

    write_message_data = MessageStore::MessageData::Write.new

    SetAttributes.(write_message_data, message_data, copy: [:type, :data])

    write_message_data.metadata = output_metadata

    input_category = Messaging::StreamName.get_category(message_data.stream_name)
    write_message_data = transform(write_message_data, input_category)

    if write_message_data
      assure_message_data(write_message_data)
    else
      logger.info(tag: :ignored) { "Message ignored (Stream: #{message_data.stream_name}, Global Position: #{message_data.global_position})" }
      return
    end

    stream_name = stream_name(stream_id)
    write.(write_message_data, stream_name, expected_version: version)

    logger.info do
      message_type = message_data.type
      unless write_message_data.type == message_type
        message_type = "#{write_message_data.type} ← #{message_type}"
      end

      "Message copied (Message Type: #{message_type}, Stream: #{message_data.stream_name}, Global Position: #{message_data.global_position})"
    end
  end
end
raw_metadata(metadata) click to toggle source
# File lib/aggregate_streams/handle.rb, line 79
def raw_metadata(metadata)
  output_metadata = Messaging::Message::Metadata.build

  output_metadata.follow(metadata)

  output_metadata = output_metadata.to_h

  output_metadata.delete(:local_properties)

  if output_metadata[:properties].empty?
    output_metadata.delete(:properties)
  end

  output_metadata.delete_if { |_, v| v.nil? }

  output_metadata
end
transform(write_message_data, stream_name) click to toggle source
# File lib/aggregate_streams/handle.rb, line 97
def transform(write_message_data, stream_name)
  if transform_action.nil?
    write_message_data
  else
    transform_action.(write_message_data, stream_name)
  end
end