class Mimi::Messaging::Adapters::Memory

A Memory is an in-memory implementation of a messaging adapter.

All message dispatching happens within a single thread, the same as the caller's, so all ivocations are synchronous.

The Memory purpose is only to use in tests and for the higher abstractions development.

Public Instance Methods

command(target, message, opts = {}) click to toggle source

Sends COMMAND to target

@param target [String] @param message [Mimi::Messaging::Message] @param opts [Hash]

# File lib/mimi/messaging/adapters/memory.rb, line 32
def command(target, message, opts = {})
  raise ArgumentError, "Message is expected" unless message.is_a?(Mimi::Messaging::Message)
  dispatch_command(target, message, opts)
  nil
end
event(target, message, opts = {}) click to toggle source

Sends EVENT to target

@param target [String] @param message [Mimi::Messaging::Message] @param opts [Hash]

# File lib/mimi/messaging/adapters/memory.rb, line 56
def event(target, message, opts = {})
  raise ArgumentError, "Message is expected" unless message.is_a?(Mimi::Messaging::Message)
  dispatch_event(target, message, opts)
end
query(target, message, opts = {}) click to toggle source

Sends QUERY to target

@param target [String] @param message [Mimi::Messaging::Message] @param opts [Hash]

# File lib/mimi/messaging/adapters/memory.rb, line 44
def query(target, message, opts = {})
  raise ArgumentError, "Message is expected" unless message.is_a?(Mimi::Messaging::Message)
  response_serialized = dispatch_query(target, message, opts)
  deserialize(response_serialized)
end
start() click to toggle source
# File lib/mimi/messaging/adapters/memory.rb, line 20
def start
end
start_event_processor(topic_name, processor, _opts = {}) click to toggle source
# File lib/mimi/messaging/adapters/memory.rb, line 67
def start_event_processor(topic_name, processor, _opts = {})
  super
  event_processors[topic_name] ||= []
  event_processors[topic_name] << processor
end
start_event_processor_with_queue(topic_name, queue_name, processor, opts = {}) click to toggle source
# File lib/mimi/messaging/adapters/memory.rb, line 73
def start_event_processor_with_queue(topic_name, queue_name, processor, opts = {})
  super
  event_processors_with_queue[topic_name] ||= {}
  event_processors_with_queue[topic_name][queue_name] ||= []
  event_processors_with_queue[topic_name][queue_name] << processor
end
start_request_processor(queue_name, processor, _opts = {}) click to toggle source
# File lib/mimi/messaging/adapters/memory.rb, line 61
def start_request_processor(queue_name, processor, _opts = {})
  super
  request_processors[queue_name] ||= []
  request_processors[queue_name] << processor
end
stop() click to toggle source
# File lib/mimi/messaging/adapters/memory.rb, line 23
def stop
end
stop_all_processors() click to toggle source
# File lib/mimi/messaging/adapters/memory.rb, line 80
def stop_all_processors
  @request_processors = {}
  @event_processors = {}
  @event_processors_with_queue = {}
end

Private Instance Methods

dispatch_command(target, message, _opts = {}) click to toggle source
# File lib/mimi/messaging/adapters/memory.rb, line 101
def dispatch_command(target, message, _opts = {})
  queue_name, method_name = target.split("/")
  return unless request_processors[queue_name]

  # pick random processor serving the target
  processor = request_processors[queue_name].sample
  processor.call_command(method_name, transmitted_message(message), {})
end
dispatch_event(target, message, _opts = {}) click to toggle source
# File lib/mimi/messaging/adapters/memory.rb, line 120
def dispatch_event(target, message, _opts = {})
  topic_name, event_type = target.split("#")
  processors = event_processors[topic_name] || []
  processor_queues = event_processors_with_queue[topic_name] || {}
  processor_queues.values.each do |same_queue_processors|
    processors << same_queue_processors.sample
  end

  processors.each do |processor|
    processor.call_event(event_type, transmitted_message(message), {})
  end
end
dispatch_query(target, message, _opts = {}) click to toggle source
# File lib/mimi/messaging/adapters/memory.rb, line 110
def dispatch_query(target, message, _opts = {})
  queue_name, method_name = target.split("/")
  raise Timeout::Error unless request_processors[queue_name]

  # pick random processor serving the target
  processor = request_processors[queue_name].sample
  response = processor.call_query(method_name, transmitted_message(message), {})
  serialize(response)
end
event_processors() click to toggle source
# File lib/mimi/messaging/adapters/memory.rb, line 137
def event_processors
  @event_processors ||= {}
end
event_processors_with_queue() click to toggle source
# File lib/mimi/messaging/adapters/memory.rb, line 141
def event_processors_with_queue
  @event_processors_with_queue ||= {}
end
request_processors() click to toggle source
# File lib/mimi/messaging/adapters/memory.rb, line 133
def request_processors
  @request_processors ||= {}
end
transmitted_message(message) click to toggle source

Simulates a transmitted message, following serialization/deserialization:

message out -> message in

@param message [Mimi::Messaging::Message] @return [Mimi::Messaging::Message]

# File lib/mimi/messaging/adapters/memory.rb, line 94
def transmitted_message(message)
  Mimi::Messaging::Message.new(
    deserialize(serialize(message)),
    message.headers
  )
end