module Messaging::Write

Public Class Methods

included(cls) click to toggle source
# File lib/messaging/write.rb, line 5
def self.included(cls)
  cls.class_exec do
    include Dependency
    include Virtual
    include Log::Dependency

    dependency :message_writer
    dependency :telemetry, ::Telemetry

    extend Build
    extend Call
    extend Configure

    abstract :configure

    const_set :Substitute, Substitute
  end
end
register_telemetry_sink(writer) click to toggle source
# File lib/messaging/write.rb, line 127
def self.register_telemetry_sink(writer)
  sink = Telemetry.sink
  writer.telemetry.register sink
  sink
end

Public Instance Methods

call(message_or_batch, stream_name, expected_version: nil, reply_stream_name: nil) click to toggle source
# File lib/messaging/write.rb, line 48
def call(message_or_batch, stream_name, expected_version: nil, reply_stream_name: nil)
  unless message_or_batch.is_a? Array
    logger.trace(tag: :write) { "Writing message (Type: #{message_or_batch.class.message_type}, Stream Name: #{stream_name}, Expected Version: #{expected_version.inspect}, Reply Stream Name: #{reply_stream_name.inspect})" }
  else
    logger.trace(tag: :write) do
      message_types = message_or_batch.map {|message| message.class.message_type }.uniq.join(', ')
      "Writing batch (Types: #{message_types}, Stream Name: #{stream_name}, Expected Version: #{expected_version.inspect}, Reply Stream Name: #{reply_stream_name.inspect})"
    end
  end
  logger.trace(tags: [:data, :message]) { message_or_batch.pretty_inspect }

  message_batch = Array(message_or_batch)

  message_data_batch = message_data_batch(message_batch, reply_stream_name)
  last_position = message_writer.(message_data_batch, stream_name, expected_version: expected_version)

  unless message_or_batch.is_a? Array
    logger.info(tag: :write) { "Wrote message (Type: #{message_or_batch.class.message_type}, Stream Name: #{stream_name}, Position: #{last_position}, Expected Version: #{expected_version.inspect}, Reply Stream Name: #{reply_stream_name.inspect})" }
  else
    logger.info(tag: :write) do
      message_types = message_or_batch.map {|message| message.class.message_type }.uniq.join(', ')
      "Wrote batch (Types: #{message_types}, Stream Name: #{stream_name}, Position: #{last_position}, Expected Version: #{expected_version.inspect}, Reply Stream Name: #{reply_stream_name.inspect})"
    end
  end
  logger.info(tags: [:data, :message]) { message_data_batch.pretty_inspect }

  message_batch.each do |message|
    telemetry.record :written, Telemetry::Data.new(message, stream_name, expected_version, reply_stream_name)
  end

  last_position
end
Also aliased as: write
initial(message, stream_name) click to toggle source
# File lib/messaging/write.rb, line 122
def initial(message, stream_name)
  write(message, stream_name, expected_version: :no_stream)
end
Also aliased as: write_initial
message_data_batch(message_batch, reply_stream_name=nil) click to toggle source
# File lib/messaging/write.rb, line 82
def message_data_batch(message_batch, reply_stream_name=nil)
  message_data_batch = []
  message_batch.each do |message|
    unless reply_stream_name.nil?
       message.metadata.reply_stream_name = reply_stream_name
    end

    message_data_batch << Message::Export.(message)
  end

  message_data_batch
end
reply(message) click to toggle source
# File lib/messaging/write.rb, line 95
def reply(message)
  if message.is_a? Array
    error_msg = "Cannot reply with a batch"
    logger.error { error_msg }
    raise Error, error_msg
  end

  metadata = message.metadata
  reply_stream_name = metadata.reply_stream_name

  logger.trace(tags: [:write, :reply]) { "Replying (Message Type: #{message.message_type}, Stream Name: #{reply_stream_name.inspect})" }

  if reply_stream_name.nil?
    error_msg = "Message has no reply stream name. Cannot reply. (Message Type: #{message.message_type})"
    logger.error { error_msg }
    logger.error(tags: [:data, :message]) { message.pretty_inspect }
    raise Error, error_msg
  end

  metadata.clear_reply_stream_name

  write(message, reply_stream_name).tap do
    logger.info(tags: [:write, :reply]) { "Replied (Message Type: #{message.message_type}, Stream Name: #{reply_stream_name})" }
    telemetry.record :replied, Telemetry::Data.new(message, reply_stream_name)
  end
end
write(message_or_batch, stream_name, expected_version: nil, reply_stream_name: nil)
Alias for: call
write_initial(message, stream_name)
Alias for: initial