class EventSource::Postgres::Put
Public Class Methods
build(session: nil)
click to toggle source
# File lib/event_source/postgres/put.rb, line 9 def self.build(session: nil) new.tap do |instance| instance.configure(session: session) end end
call(write_event, stream_name, expected_version: nil, session: nil)
click to toggle source
# File lib/event_source/postgres/put.rb, line 26 def self.call(write_event, stream_name, expected_version: nil, session: nil) instance = build(session: session) instance.(write_event, stream_name, expected_version: expected_version) end
configure(receiver, session: nil, attr_name: nil)
click to toggle source
# File lib/event_source/postgres/put.rb, line 20 def self.configure(receiver, session: nil, attr_name: nil) attr_name ||= :put instance = build(session: session) receiver.public_send "#{attr_name}=", instance end
statement()
click to toggle source
# File lib/event_source/postgres/put.rb, line 90 def self.statement @statement ||= "SELECT write_event($1::varchar, $2::varchar, $3::varchar, $4::jsonb, $5::jsonb, $6::int);" end
Public Instance Methods
call(write_event, stream_name, expected_version: nil)
click to toggle source
# File lib/event_source/postgres/put.rb, line 31 def call(write_event, stream_name, expected_version: nil) logger.trace { "Putting event data (Stream Name: #{stream_name}, Type: #{write_event.type}, Expected Version: #{expected_version.inspect})" } logger.trace(tags: [:data, :event_data]) { write_event.pretty_inspect } write_event.id ||= identifier.get id, type, data, metadata = destructure_event(write_event) expected_version = ExpectedVersion.canonize(expected_version) insert_event(id, stream_name, type, data, metadata, expected_version).tap do |position| logger.info { "Put event data (Position: #{position}, Stream Name: #{stream_name}, Type: #{write_event.type}, Expected Version: #{expected_version.inspect}, ID: #{id.inspect})" } logger.info(tags: [:data, :event_data]) { write_event.pretty_inspect } end end
configure(session: nil)
click to toggle source
# File lib/event_source/postgres/put.rb, line 15 def configure(session: nil) Session.configure(self, session: session) Identifier::UUID::Random.configure(self) end
destructure_event(write_event)
click to toggle source
# File lib/event_source/postgres/put.rb, line 46 def destructure_event(write_event) id = write_event.id type = write_event.type data = write_event.data metadata = write_event.metadata logger.debug(tags: [:data, :event_data]) { "ID: #{id.pretty_inspect}" } logger.debug(tags: [:data, :event_data]) { "Type: #{type.pretty_inspect}" } logger.debug(tags: [:data, :event_data]) { "Data: #{data.pretty_inspect}" } logger.debug(tags: [:data, :event_data]) { "Metadata: #{metadata.pretty_inspect}" } return id, type, data, metadata end
execute_query(id, stream_name, type, serialized_data, serialized_metadata, expected_version)
click to toggle source
# File lib/event_source/postgres/put.rb, line 67 def execute_query(id, stream_name, type, serialized_data, serialized_metadata, expected_version) logger.trace { "Executing insert (Stream Name: #{stream_name}, Type: #{type}, Expected Version: #{expected_version.inspect}, ID: #{id.inspect})" } params = [ id, stream_name, type, serialized_data, serialized_metadata, expected_version ] begin records = session.execute(self.class.statement, params) rescue PG::RaiseException => e raise_error e end logger.debug { "Executed insert (Stream Name: #{stream_name}, Type: #{type}, Expected Version: #{expected_version.inspect}, ID: #{id.inspect})" } records end
insert_event(id, stream_name, type, data, metadata, expected_version)
click to toggle source
# File lib/event_source/postgres/put.rb, line 60 def insert_event(id, stream_name, type, data, metadata, expected_version) serialized_data = serialized_data(data) serialized_metadata = serialized_metadata(metadata) records = execute_query(id, stream_name, type, serialized_data, serialized_metadata, expected_version) position(records) end
position(records)
click to toggle source
# File lib/event_source/postgres/put.rb, line 126 def position(records) position = nil unless records[0].nil? position = records[0].values[0] end position end
raise_error(pg_error)
click to toggle source
# File lib/event_source/postgres/put.rb, line 134 def raise_error(pg_error) error_message = pg_error.message if error_message.include? 'Wrong expected version' error_message.gsub!('ERROR:', '').strip! logger.error { error_message } raise ExpectedVersion::Error, error_message end raise pg_error end
serialized_data(data)
click to toggle source
# File lib/event_source/postgres/put.rb, line 94 def serialized_data(data) serialized_data = nil if data.is_a?(Hash) && data.empty? data = nil end unless data.nil? serializable_data = EventData::Hash[data] serialized_data = Transform::Write.(serializable_data, :json) end logger.debug(tags: [:data, :serialize]) { "Serialized Data: #{serialized_data.inspect}" } serialized_data end
serialized_metadata(metadata)
click to toggle source
# File lib/event_source/postgres/put.rb, line 110 def serialized_metadata(metadata) serialized_metadata = nil if metadata.is_a?(Hash) && metadata.empty? metadata = nil end unless metadata.nil? serializable_metadata = EventData::Hash[metadata] serialized_metadata = Transform::Write.(serializable_metadata, :json) end logger.debug(tags: [:data, :serialize]) { "Serialized Metadata: #{serialized_metadata.inspect}" } serialized_metadata end