class Mimi::Messaging::Adapters::Memory
A Memory
is an in-memory implementation of a messaging adapter.
All message dispatching happens within a single thread, the same as the caller's, so all ivocations are synchronous.
The Memory
purpose is only to use in tests and for the higher abstractions development.
Public Instance Methods
command(target, message, opts = {})
click to toggle source
Sends COMMAND to target
@param target [String] @param message [Mimi::Messaging::Message] @param opts [Hash]
# File lib/mimi/messaging/adapters/memory.rb, line 32 def command(target, message, opts = {}) raise ArgumentError, "Message is expected" unless message.is_a?(Mimi::Messaging::Message) dispatch_command(target, message, opts) nil end
event(target, message, opts = {})
click to toggle source
Sends EVENT to target
@param target [String] @param message [Mimi::Messaging::Message] @param opts [Hash]
# File lib/mimi/messaging/adapters/memory.rb, line 56 def event(target, message, opts = {}) raise ArgumentError, "Message is expected" unless message.is_a?(Mimi::Messaging::Message) dispatch_event(target, message, opts) end
query(target, message, opts = {})
click to toggle source
Sends QUERY to target
@param target [String] @param message [Mimi::Messaging::Message] @param opts [Hash]
# File lib/mimi/messaging/adapters/memory.rb, line 44 def query(target, message, opts = {}) raise ArgumentError, "Message is expected" unless message.is_a?(Mimi::Messaging::Message) response_serialized = dispatch_query(target, message, opts) deserialize(response_serialized) end
start()
click to toggle source
# File lib/mimi/messaging/adapters/memory.rb, line 20 def start end
start_event_processor(topic_name, processor, _opts = {})
click to toggle source
Calls superclass method
Mimi::Messaging::Adapters::Base#start_event_processor
# File lib/mimi/messaging/adapters/memory.rb, line 67 def start_event_processor(topic_name, processor, _opts = {}) super event_processors[topic_name] ||= [] event_processors[topic_name] << processor end
start_event_processor_with_queue(topic_name, queue_name, processor, opts = {})
click to toggle source
Calls superclass method
Mimi::Messaging::Adapters::Base#start_event_processor_with_queue
# File lib/mimi/messaging/adapters/memory.rb, line 73 def start_event_processor_with_queue(topic_name, queue_name, processor, opts = {}) super event_processors_with_queue[topic_name] ||= {} event_processors_with_queue[topic_name][queue_name] ||= [] event_processors_with_queue[topic_name][queue_name] << processor end
start_request_processor(queue_name, processor, _opts = {})
click to toggle source
Calls superclass method
Mimi::Messaging::Adapters::Base#start_request_processor
# File lib/mimi/messaging/adapters/memory.rb, line 61 def start_request_processor(queue_name, processor, _opts = {}) super request_processors[queue_name] ||= [] request_processors[queue_name] << processor end
stop()
click to toggle source
# File lib/mimi/messaging/adapters/memory.rb, line 23 def stop end
stop_all_processors()
click to toggle source
# File lib/mimi/messaging/adapters/memory.rb, line 80 def stop_all_processors @request_processors = {} @event_processors = {} @event_processors_with_queue = {} end
Private Instance Methods
dispatch_command(target, message, _opts = {})
click to toggle source
# File lib/mimi/messaging/adapters/memory.rb, line 101 def dispatch_command(target, message, _opts = {}) queue_name, method_name = target.split("/") return unless request_processors[queue_name] # pick random processor serving the target processor = request_processors[queue_name].sample processor.call_command(method_name, transmitted_message(message), {}) end
dispatch_event(target, message, _opts = {})
click to toggle source
# File lib/mimi/messaging/adapters/memory.rb, line 120 def dispatch_event(target, message, _opts = {}) topic_name, event_type = target.split("#") processors = event_processors[topic_name] || [] processor_queues = event_processors_with_queue[topic_name] || {} processor_queues.values.each do |same_queue_processors| processors << same_queue_processors.sample end processors.each do |processor| processor.call_event(event_type, transmitted_message(message), {}) end end
dispatch_query(target, message, _opts = {})
click to toggle source
# File lib/mimi/messaging/adapters/memory.rb, line 110 def dispatch_query(target, message, _opts = {}) queue_name, method_name = target.split("/") raise Timeout::Error unless request_processors[queue_name] # pick random processor serving the target processor = request_processors[queue_name].sample response = processor.call_query(method_name, transmitted_message(message), {}) serialize(response) end
event_processors()
click to toggle source
# File lib/mimi/messaging/adapters/memory.rb, line 137 def event_processors @event_processors ||= {} end
event_processors_with_queue()
click to toggle source
# File lib/mimi/messaging/adapters/memory.rb, line 141 def event_processors_with_queue @event_processors_with_queue ||= {} end
request_processors()
click to toggle source
# File lib/mimi/messaging/adapters/memory.rb, line 133 def request_processors @request_processors ||= {} end
transmitted_message(message)
click to toggle source
Simulates a transmitted message, following serialization/deserialization:
message out -> message in
@param message [Mimi::Messaging::Message] @return [Mimi::Messaging::Message]
# File lib/mimi/messaging/adapters/memory.rb, line 94 def transmitted_message(message) Mimi::Messaging::Message.new( deserialize(serialize(message)), message.headers ) end