module MessageStore::Postgres::Get
Public Class Methods
build(stream_name, **args)
click to toggle source
# File lib/message_store/postgres/get.rb, line 31 def self.build(stream_name, **args) cls = specialization(stream_name) cls.build(stream_name, **args) end
call(stream_name, **args)
click to toggle source
# File lib/message_store/postgres/get.rb, line 48 def self.call(stream_name, **args) position = args.delete(:position) instance = build(stream_name, **args) instance.(position) end
configure(receiver, stream_name, **args)
click to toggle source
# File lib/message_store/postgres/get.rb, line 36 def self.configure(receiver, stream_name, **args) attr_name = args.delete(:attr_name) attr_name ||= :get instance = build(stream_name, **args) receiver.public_send("#{attr_name}=", instance) end
error_message(pg_error)
click to toggle source
# File lib/message_store/postgres/get.rb, line 128 def self.error_message(pg_error) pg_error.message.gsub('ERROR:', '').strip end
included(cls)
click to toggle source
# File lib/message_store/postgres/get.rb, line 4 def self.included(cls) cls.class_exec do include MessageStore::Get prepend Call prepend BatchSize dependency :session, Session abstract :stream_name abstract :sql_command abstract :parameters abstract :parameter_values abstract :last_position abstract :log_text virtual :specialize_error virtual :assure end end
message_data(record)
click to toggle source
# File lib/message_store/postgres/get.rb, line 103 def self.message_data(record) record['data'] = Get::Deserialize.data(record['data']) record['metadata'] = Get::Deserialize.metadata(record['metadata']) record['time'] = Get::Time.utc_coerced(record['time']) MessageData::Read.build(record) end
specialization(stream_name)
click to toggle source
# File lib/message_store/postgres/get.rb, line 132 def self.specialization(stream_name) if StreamName.category?(stream_name) Category else Stream end end
Public Instance Methods
configure(session: nil)
click to toggle source
# File lib/message_store/postgres/get.rb, line 44 def configure(session: nil) Session.configure(self, session: session) end
convert(result)
click to toggle source
# File lib/message_store/postgres/get.rb, line 91 def convert(result) logger.trace(tag: :get) { "Converting result to message data (Result Count: #{result.ntuples})" } message_data = result.map do |record| Get.message_data(record) end logger.debug(tag: :get) { "Converted result to message data (Message Data Count: #{message_data.length})" } message_data end
get_result(stream_name, position)
click to toggle source
# File lib/message_store/postgres/get.rb, line 75 def get_result(stream_name, position) logger.trace(tag: :get) { "Getting result (#{log_text(stream_name, position)})" } parameter_values = parameter_values(stream_name, position) begin result = session.execute(sql_command, parameter_values) rescue PG::RaiseException => e raise_error(e) end logger.debug(tag: :get) { "Finished getting result (Count: #{result.ntuples}, #{log_text(stream_name, position)})" } result end
raise_error(pg_error)
click to toggle source
# File lib/message_store/postgres/get.rb, line 111 def raise_error(pg_error) error_message = Get.error_message(pg_error) error = Condition.error(error_message) if error.nil? error = specialize_error(error_message) end if not error.nil? logger.error { error_message } raise error end raise pg_error end