module Messaging::Write
Public Class Methods
included(cls)
click to toggle source
# File lib/messaging/write.rb, line 5 def self.included(cls) cls.class_exec do include Dependency include Virtual include Log::Dependency dependency :message_writer dependency :telemetry, ::Telemetry extend Build extend Call extend Configure abstract :configure const_set :Substitute, Substitute end end
register_telemetry_sink(writer)
click to toggle source
# File lib/messaging/write.rb, line 127 def self.register_telemetry_sink(writer) sink = Telemetry.sink writer.telemetry.register sink sink end
Public Instance Methods
call(message_or_batch, stream_name, expected_version: nil, reply_stream_name: nil)
click to toggle source
# File lib/messaging/write.rb, line 48 def call(message_or_batch, stream_name, expected_version: nil, reply_stream_name: nil) unless message_or_batch.is_a? Array logger.trace(tag: :write) { "Writing message (Type: #{message_or_batch.class.message_type}, Stream Name: #{stream_name}, Expected Version: #{expected_version.inspect}, Reply Stream Name: #{reply_stream_name.inspect})" } else logger.trace(tag: :write) do message_types = message_or_batch.map {|message| message.class.message_type }.uniq.join(', ') "Writing batch (Types: #{message_types}, Stream Name: #{stream_name}, Expected Version: #{expected_version.inspect}, Reply Stream Name: #{reply_stream_name.inspect})" end end logger.trace(tags: [:data, :message]) { message_or_batch.pretty_inspect } message_batch = Array(message_or_batch) message_data_batch = message_data_batch(message_batch, reply_stream_name) last_position = message_writer.(message_data_batch, stream_name, expected_version: expected_version) unless message_or_batch.is_a? Array logger.info(tag: :write) { "Wrote message (Type: #{message_or_batch.class.message_type}, Stream Name: #{stream_name}, Position: #{last_position}, Expected Version: #{expected_version.inspect}, Reply Stream Name: #{reply_stream_name.inspect})" } else logger.info(tag: :write) do message_types = message_or_batch.map {|message| message.class.message_type }.uniq.join(', ') "Wrote batch (Types: #{message_types}, Stream Name: #{stream_name}, Position: #{last_position}, Expected Version: #{expected_version.inspect}, Reply Stream Name: #{reply_stream_name.inspect})" end end logger.info(tags: [:data, :message]) { message_data_batch.pretty_inspect } message_batch.each do |message| telemetry.record :written, Telemetry::Data.new(message, stream_name, expected_version, reply_stream_name) end last_position end
Also aliased as: write
initial(message, stream_name)
click to toggle source
# File lib/messaging/write.rb, line 122 def initial(message, stream_name) write(message, stream_name, expected_version: :no_stream) end
Also aliased as: write_initial
message_data_batch(message_batch, reply_stream_name=nil)
click to toggle source
# File lib/messaging/write.rb, line 82 def message_data_batch(message_batch, reply_stream_name=nil) message_data_batch = [] message_batch.each do |message| unless reply_stream_name.nil? message.metadata.reply_stream_name = reply_stream_name end message_data_batch << Message::Export.(message) end message_data_batch end
reply(message)
click to toggle source
# File lib/messaging/write.rb, line 95 def reply(message) if message.is_a? Array error_msg = "Cannot reply with a batch" logger.error { error_msg } raise Error, error_msg end metadata = message.metadata reply_stream_name = metadata.reply_stream_name logger.trace(tags: [:write, :reply]) { "Replying (Message Type: #{message.message_type}, Stream Name: #{reply_stream_name.inspect})" } if reply_stream_name.nil? error_msg = "Message has no reply stream name. Cannot reply. (Message Type: #{message.message_type})" logger.error { error_msg } logger.error(tags: [:data, :message]) { message.pretty_inspect } raise Error, error_msg end metadata.clear_reply_stream_name write(message, reply_stream_name).tap do logger.info(tags: [:write, :reply]) { "Replied (Message Type: #{message.message_type}, Stream Name: #{reply_stream_name})" } telemetry.record :replied, Telemetry::Data.new(message, reply_stream_name) end end