module MessageStore::Postgres::Get

Public Class Methods

build(stream_name, **args) click to toggle source
# File lib/message_store/postgres/get.rb, line 31
def self.build(stream_name, **args)
  cls = specialization(stream_name)
  cls.build(stream_name, **args)
end
call(stream_name, **args) click to toggle source
# File lib/message_store/postgres/get.rb, line 48
def self.call(stream_name, **args)
  position = args.delete(:position)
  instance = build(stream_name, **args)
  instance.(position)
end
configure(receiver, stream_name, **args) click to toggle source
# File lib/message_store/postgres/get.rb, line 36
def self.configure(receiver, stream_name, **args)
  attr_name = args.delete(:attr_name)
  attr_name ||= :get

  instance = build(stream_name, **args)
  receiver.public_send("#{attr_name}=", instance)
end
error_message(pg_error) click to toggle source
# File lib/message_store/postgres/get.rb, line 128
def self.error_message(pg_error)
  pg_error.message.gsub('ERROR:', '').strip
end
included(cls) click to toggle source
# File lib/message_store/postgres/get.rb, line 4
def self.included(cls)
  cls.class_exec do
    include MessageStore::Get

    prepend Call
    prepend BatchSize

    dependency :session, Session

    abstract :stream_name
    abstract :sql_command
    abstract :parameters
    abstract :parameter_values
    abstract :last_position
    abstract :log_text

    virtual :specialize_error
    virtual :assure
  end
end
message_data(record) click to toggle source
# File lib/message_store/postgres/get.rb, line 103
def self.message_data(record)
  record['data'] = Get::Deserialize.data(record['data'])
  record['metadata'] = Get::Deserialize.metadata(record['metadata'])
  record['time'] = Get::Time.utc_coerced(record['time'])

  MessageData::Read.build(record)
end
specialization(stream_name) click to toggle source
# File lib/message_store/postgres/get.rb, line 132
def self.specialization(stream_name)
  if StreamName.category?(stream_name)
    Category
  else
    Stream
  end
end

Public Instance Methods

configure(session: nil) click to toggle source
# File lib/message_store/postgres/get.rb, line 44
def configure(session: nil)
  Session.configure(self, session: session)
end
convert(result) click to toggle source
# File lib/message_store/postgres/get.rb, line 91
def convert(result)
  logger.trace(tag: :get) { "Converting result to message data (Result Count: #{result.ntuples})" }

  message_data = result.map do |record|
    Get.message_data(record)
  end

  logger.debug(tag: :get) { "Converted result to message data (Message Data Count: #{message_data.length})" }

  message_data
end
get_result(stream_name, position) click to toggle source
# File lib/message_store/postgres/get.rb, line 75
def get_result(stream_name, position)
  logger.trace(tag: :get) { "Getting result (#{log_text(stream_name, position)})" }

  parameter_values = parameter_values(stream_name, position)

  begin
    result = session.execute(sql_command, parameter_values)
  rescue PG::RaiseException => e
    raise_error(e)
  end

  logger.debug(tag: :get) { "Finished getting result (Count: #{result.ntuples}, #{log_text(stream_name, position)})" }

  result
end
raise_error(pg_error) click to toggle source
# File lib/message_store/postgres/get.rb, line 111
def raise_error(pg_error)
  error_message = Get.error_message(pg_error)

  error = Condition.error(error_message)

  if error.nil?
    error = specialize_error(error_message)
  end

  if not error.nil?
    logger.error { error_message }
    raise error
  end

  raise pg_error
end