class AggregateStreams::PositionStore

Public Class Methods

build(input_category, output_category, session: nil) click to toggle source
# File lib/aggregate_streams/position_store.rb, line 11
def self.build(input_category, output_category, session: nil)
  instance = new(input_category, output_category)
  MessageStore::Postgres::Session.configure(instance, session: session)
  instance
end
get_sql_command() click to toggle source
# File lib/aggregate_streams/position_store.rb, line 39
def self.get_sql_command
  %{
    SELECT
      metadata->>'causationMessageGlobalPosition' AS position
    FROM messages
    WHERE
      category(stream_name) = $1 AND
      category(metadata->>'causationMessageStreamName') = $2
    ORDER BY global_position DESC LIMIT 1
  }
end

Public Instance Methods

get() click to toggle source
# File lib/aggregate_streams/position_store.rb, line 17
def get
  logger.trace { "Get position (Output Category: #{output_category.inspect}, Input Category: #{input_category.inspect})" }

  sql_command = self.class.get_sql_command

  parameter_values = [output_category, input_category]

  result = session.execute(sql_command, parameter_values)

  if result.ntuples.zero?
    position = nil
  else
    record = result[0]

    position = record['position'].to_i + 1
  end

  logger.info { "Get position done (Position: #{position || '(none)'}, Output Category: #{output_category.inspect}, Input Category: #{input_category.inspect})" }

  position
end