class MessageStore::Postgres::Put

Public Class Methods

build(session: nil) click to toggle source
# File lib/message_store/postgres/put.rb, line 10
def self.build(session: nil)
  new.tap do |instance|
    instance.configure(session: session)
  end
end
call(write_message, stream_name, expected_version: nil, session: nil) click to toggle source
# File lib/message_store/postgres/put.rb, line 27
def self.call(write_message, stream_name, expected_version: nil, session: nil)
  instance = build(session: session)
  instance.(write_message, stream_name, expected_version: expected_version)
end
configure(receiver, session: nil, attr_name: nil) click to toggle source
# File lib/message_store/postgres/put.rb, line 21
def self.configure(receiver, session: nil, attr_name: nil)
  attr_name ||= :put
  instance = build(session: session)
  receiver.public_send "#{attr_name}=", instance
end
statement() click to toggle source
# File lib/message_store/postgres/put.rb, line 91
def self.statement
  @statement ||= "SELECT write_message($1::varchar, $2::varchar, $3::varchar, $4::jsonb, $5::jsonb, $6::bigint);"
end

Public Instance Methods

call(write_message, stream_name, expected_version: nil) click to toggle source
# File lib/message_store/postgres/put.rb, line 32
def call(write_message, stream_name, expected_version: nil)
  logger.trace(tag: :put) { "Putting message data (Type: #{write_message.type}, Stream Name: #{stream_name}, Expected Version: #{expected_version.inspect})" }
  logger.trace(tags: [:data, :message_data]) { write_message.pretty_inspect }

  write_message.id ||= identifier.get

  id, type, data, metadata = destructure_message(write_message)
  expected_version = ExpectedVersion.canonize(expected_version)

  insert_message(id, stream_name, type, data, metadata, expected_version).tap do |position|
    logger.info(tag: :put) { "Put message data (Type: #{write_message.type}, Stream Name: #{stream_name}, Expected Version: #{expected_version.inspect}, ID: #{id.inspect}, Position: #{position})" }
    logger.info(tags: [:data, :message_data]) { write_message.pretty_inspect }
  end
end
configure(session: nil) click to toggle source
# File lib/message_store/postgres/put.rb, line 16
def configure(session: nil)
  Session.configure(self, session: session)
  Identifier::UUID::Random.configure(self)
end
destructure_message(write_message) click to toggle source
# File lib/message_store/postgres/put.rb, line 47
def destructure_message(write_message)
  id = write_message.id
  type = write_message.type
  data = write_message.data
  metadata = write_message.metadata

  logger.debug(tags: [:data, :message_data]) { "ID: #{id.pretty_inspect}" }
  logger.debug(tags: [:data, :message_data]) { "Type: #{type.pretty_inspect}" }
  logger.debug(tags: [:data, :message_data]) { "Data: #{data.pretty_inspect}" }
  logger.debug(tags: [:data, :message_data]) { "Metadata: #{metadata.pretty_inspect}" }

  return id, type, data, metadata
end
execute_query(id, stream_name, type, transformed_data, transformed_metadata, expected_version) click to toggle source
# File lib/message_store/postgres/put.rb, line 68
def execute_query(id, stream_name, type, transformed_data, transformed_metadata, expected_version)
  logger.trace(tag: :put) { "Executing insert (Stream Name: #{stream_name}, Type: #{type}, Expected Version: #{expected_version.inspect}, ID: #{id.inspect})" }

  params = [
    id,
    stream_name,
    type,
    transformed_data,
    transformed_metadata,
    expected_version
  ]

  begin
    records = session.execute(self.class.statement, params)
  rescue PG::RaiseException => e
    raise_error e
  end

  logger.debug(tag: :put) { "Executed insert (Type: #{type}, Stream Name: #{stream_name}, Expected Version: #{expected_version.inspect}, ID: #{id.inspect})" }

  records
end
insert_message(id, stream_name, type, data, metadata, expected_version) click to toggle source
# File lib/message_store/postgres/put.rb, line 61
def insert_message(id, stream_name, type, data, metadata, expected_version)
  transformed_data = transformed_data(data)
  transformed_metadata = transformed_metadata(metadata)
  records = execute_query(id, stream_name, type, transformed_data, transformed_metadata, expected_version)
  position(records)
end
position(records) click to toggle source
# File lib/message_store/postgres/put.rb, line 127
def position(records)
  position = nil
  unless records[0].nil?
    position = records[0].values[0]
  end
  position
end
raise_error(pg_error) click to toggle source
# File lib/message_store/postgres/put.rb, line 135
def raise_error(pg_error)
  error_message = pg_error.message
  if error_message.include? 'Wrong expected version'
    error_message.gsub!('ERROR:', '').strip!
    logger.error { error_message }
    raise ExpectedVersion::Error, error_message
  end
  raise pg_error
end
transformed_data(data) click to toggle source
# File lib/message_store/postgres/put.rb, line 95
def transformed_data(data)
  transformed_data = nil

  if data.is_a?(Hash) && data.empty?
    data = nil
  end

  unless data.nil?
    transformable_data = MessageData::Hash[data]
    transformed_data = Transform::Write.(transformable_data, :json)
  end

  logger.debug(tags: [:data, :serialize]) { "Transformed Data: #{transformed_data.inspect}" }
  transformed_data
end
transformed_metadata(metadata) click to toggle source
# File lib/message_store/postgres/put.rb, line 111
def transformed_metadata(metadata)
  transformed_metadata = nil

  if metadata.is_a?(Hash) && metadata.empty?
    metadata = nil
  end

  unless metadata.nil?
    transformable_metadata = MessageData::Hash[metadata]
    transformed_metadata = Transform::Write.(transformable_metadata, :json)
  end

  logger.debug(tags: [:data, :serialize]) { "Transformed Metadata: #{transformed_metadata.inspect}" }
  transformed_metadata
end