class EventSource::Postgres::Put

Public Class Methods

build(session: nil) click to toggle source
# File lib/event_source/postgres/put.rb, line 9
def self.build(session: nil)
  new.tap do |instance|
    instance.configure(session: session)
  end
end
call(write_event, stream_name, expected_version: nil, session: nil) click to toggle source
# File lib/event_source/postgres/put.rb, line 26
def self.call(write_event, stream_name, expected_version: nil, session: nil)
  instance = build(session: session)
  instance.(write_event, stream_name, expected_version: expected_version)
end
configure(receiver, session: nil, attr_name: nil) click to toggle source
# File lib/event_source/postgres/put.rb, line 20
def self.configure(receiver, session: nil, attr_name: nil)
  attr_name ||= :put
  instance = build(session: session)
  receiver.public_send "#{attr_name}=", instance
end
statement() click to toggle source
# File lib/event_source/postgres/put.rb, line 90
def self.statement
  @statement ||= "SELECT write_event($1::varchar, $2::varchar, $3::varchar, $4::jsonb, $5::jsonb, $6::int);"
end

Public Instance Methods

call(write_event, stream_name, expected_version: nil) click to toggle source
# File lib/event_source/postgres/put.rb, line 31
def call(write_event, stream_name, expected_version: nil)
  logger.trace { "Putting event data (Stream Name: #{stream_name}, Type: #{write_event.type}, Expected Version: #{expected_version.inspect})" }
  logger.trace(tags: [:data, :event_data]) { write_event.pretty_inspect }

  write_event.id ||= identifier.get

  id, type, data, metadata = destructure_event(write_event)
  expected_version = ExpectedVersion.canonize(expected_version)

  insert_event(id, stream_name, type, data, metadata, expected_version).tap do |position|
    logger.info { "Put event data (Position: #{position}, Stream Name: #{stream_name}, Type: #{write_event.type}, Expected Version: #{expected_version.inspect}, ID: #{id.inspect})" }
    logger.info(tags: [:data, :event_data]) { write_event.pretty_inspect }
  end
end
configure(session: nil) click to toggle source
# File lib/event_source/postgres/put.rb, line 15
def configure(session: nil)
  Session.configure(self, session: session)
  Identifier::UUID::Random.configure(self)
end
destructure_event(write_event) click to toggle source
# File lib/event_source/postgres/put.rb, line 46
def destructure_event(write_event)
  id = write_event.id
  type = write_event.type
  data = write_event.data
  metadata = write_event.metadata

  logger.debug(tags: [:data, :event_data]) { "ID: #{id.pretty_inspect}" }
  logger.debug(tags: [:data, :event_data]) { "Type: #{type.pretty_inspect}" }
  logger.debug(tags: [:data, :event_data]) { "Data: #{data.pretty_inspect}" }
  logger.debug(tags: [:data, :event_data]) { "Metadata: #{metadata.pretty_inspect}" }

  return id, type, data, metadata
end
execute_query(id, stream_name, type, serialized_data, serialized_metadata, expected_version) click to toggle source
# File lib/event_source/postgres/put.rb, line 67
def execute_query(id, stream_name, type, serialized_data, serialized_metadata, expected_version)
  logger.trace { "Executing insert (Stream Name: #{stream_name}, Type: #{type}, Expected Version: #{expected_version.inspect}, ID: #{id.inspect})" }

  params = [
    id,
    stream_name,
    type,
    serialized_data,
    serialized_metadata,
    expected_version
  ]

  begin
    records = session.execute(self.class.statement, params)
  rescue PG::RaiseException => e
    raise_error e
  end

  logger.debug { "Executed insert (Stream Name: #{stream_name}, Type: #{type}, Expected Version: #{expected_version.inspect}, ID: #{id.inspect})" }

  records
end
insert_event(id, stream_name, type, data, metadata, expected_version) click to toggle source
# File lib/event_source/postgres/put.rb, line 60
def insert_event(id, stream_name, type, data, metadata, expected_version)
  serialized_data = serialized_data(data)
  serialized_metadata = serialized_metadata(metadata)
  records = execute_query(id, stream_name, type, serialized_data, serialized_metadata, expected_version)
  position(records)
end
position(records) click to toggle source
# File lib/event_source/postgres/put.rb, line 126
def position(records)
  position = nil
  unless records[0].nil?
    position = records[0].values[0]
  end
  position
end
raise_error(pg_error) click to toggle source
# File lib/event_source/postgres/put.rb, line 134
def raise_error(pg_error)
  error_message = pg_error.message
  if error_message.include? 'Wrong expected version'
    error_message.gsub!('ERROR:', '').strip!
    logger.error { error_message }
    raise ExpectedVersion::Error, error_message
  end
  raise pg_error
end
serialized_data(data) click to toggle source
# File lib/event_source/postgres/put.rb, line 94
def serialized_data(data)
  serialized_data = nil

  if data.is_a?(Hash) && data.empty?
    data = nil
  end

  unless data.nil?
    serializable_data = EventData::Hash[data]
    serialized_data = Transform::Write.(serializable_data, :json)
  end

  logger.debug(tags: [:data, :serialize]) { "Serialized Data: #{serialized_data.inspect}" }
  serialized_data
end
serialized_metadata(metadata) click to toggle source
# File lib/event_source/postgres/put.rb, line 110
def serialized_metadata(metadata)
  serialized_metadata = nil

  if metadata.is_a?(Hash) && metadata.empty?
    metadata = nil
  end

  unless metadata.nil?
    serializable_metadata = EventData::Hash[metadata]
    serialized_metadata = Transform::Write.(serializable_metadata, :json)
  end

  logger.debug(tags: [:data, :serialize]) { "Serialized Metadata: #{serialized_metadata.inspect}" }
  serialized_metadata
end