class AggregateStreams::PositionStore
Public Class Methods
build(input_category, output_category, session: nil)
click to toggle source
# File lib/aggregate_streams/position_store.rb, line 11 def self.build(input_category, output_category, session: nil) instance = new(input_category, output_category) MessageStore::Postgres::Session.configure(instance, session: session) instance end
get_sql_command()
click to toggle source
# File lib/aggregate_streams/position_store.rb, line 39 def self.get_sql_command %{ SELECT metadata->>'causationMessageGlobalPosition' AS position FROM messages WHERE category(stream_name) = $1 AND category(metadata->>'causationMessageStreamName') = $2 ORDER BY global_position DESC LIMIT 1 } end
Public Instance Methods
get()
click to toggle source
# File lib/aggregate_streams/position_store.rb, line 17 def get logger.trace { "Get position (Output Category: #{output_category.inspect}, Input Category: #{input_category.inspect})" } sql_command = self.class.get_sql_command parameter_values = [output_category, input_category] result = session.execute(sql_command, parameter_values) if result.ntuples.zero? position = nil else record = result[0] position = record['position'].to_i + 1 end logger.info { "Get position done (Position: #{position || '(none)'}, Output Category: #{output_category.inspect}, Input Category: #{input_category.inspect})" } position end