class Mimi::Messaging::Adapters::Base

An abstract messaging adapter.

An adapter implementation must implement the following methods:

An adapter implementation must register itself using `.register_adapter_name` method.

Attributes

serializer[R]

Public Class Methods

new(params = {}) click to toggle source

Creates an Adapter instance

@param params [Hash] adapter-specific configuration parameters

# File lib/mimi/messaging/adapters/base.rb, line 42
def initialize(params = {})
end
register_adapter_name(adapter_name) click to toggle source

Registers adapter class with given adapter name

@param adapter_name [String,Symbol]

# File lib/mimi/messaging/adapters/base.rb, line 29
def self.register_adapter_name(adapter_name)
  adapter_name = adapter_name.to_s
  if Mimi::Messaging::Adapters.registered_adapters.key?(adapter_name)
    raise "Mimi::Messaging adapter '#{adapter_name}' is already registered"
  end

  Mimi::Messaging::Adapters.registered_adapters[adapter_name] = self
end

Public Instance Methods

command(_target, _message, _opts = {}) click to toggle source

Sends the command to the given target

@param target [String] “<queue>/<method>” @param message [Mimi::Messaging::Message] @param opts [Hash] additional options

@return nil @raise [SomeError]

# File lib/mimi/messaging/adapters/base.rb, line 72
def command(_target, _message, _opts = {})
  raise "Method #command(target, message, opts) is not implemented by #{self.class}"
end
event(_target, _message, _opts = {}) click to toggle source

Broadcasts the event with the given target

@param target [String] “<topic>#<event_type>”, e.g. “customers#created” @param message [Mimi::Messaging::Message] @param opts [Hash] additional options

# File lib/mimi/messaging/adapters/base.rb, line 95
def event(_target, _message, _opts = {})
  raise "Method #event(target, message, opts) is not implemented by #{self.class}"
end
query(_target, _message, _opts = {}) click to toggle source

Executes the query to the given target and returns response

@param target [String] “<queue>/<method>” @param message [Mimi::Messaging::Message] @param opts [Hash] additional options, e.g. :timeout

@return [Hash] @raise [SomeError,TimeoutError]

# File lib/mimi/messaging/adapters/base.rb, line 85
def query(_target, _message, _opts = {})
  raise "Method #query(target, message, opts) is not implemented by #{self.class}"
end
register_message_serializer(serializer) click to toggle source

Registers the message serializer

Message serializer must implement methods serialize(Hash) -> String and deserialize(String) -> Hash

@param serializer [#serialize(),#deserialize()]

# File lib/mimi/messaging/adapters/base.rb, line 189
def register_message_serializer(serializer)
  raise "Message serializer is already registered in #{self.class}" if @serializer
  if !serializer.respond_to?(:serialize) || !serializer.respond_to?(:deserialize)
    raise "Invalid message serializer passed to #{self.class}"
  end

  @serializer = serializer
end
start() click to toggle source

Starts the adapter.

All the message processors must be started after the adapter is started. Before the adapter is started it MAY respond with an error to an attempt to start a message processor.

Serializer must be registered before any message is sent or received.

# File lib/mimi/messaging/adapters/base.rb, line 53
def start
  raise "Method #start() is not implemented by #{self.class}"
end
start_event_processor(_topic_name, processor, _opts = {}) click to toggle source

Starts an event processor without a queue

Processor must respond to call_event() which accepts 3 arguments: (method, message, opts).

TBD: It must ack! or nack! the message.

If the processor raises an error, the message will be NACK-ed and accepted again at a later time.

@param topic_name [String] “<topic>” @param processor [#call_event()] @param opts [Hash] additional adapter-specific options

# File lib/mimi/messaging/adapters/base.rb, line 145
def start_event_processor(_topic_name, processor, _opts = {})
  # validates processor
  return if processor.respond_to?(:call_event) && processor.method(:call_event).arity >= 3

  raise(
    ArgumentError,
    "Invalid event processor passed to #{self.class}##{__method__}(), " \
    "expected to respond to #call_event(event_type, message, opts)"
  )
end
start_event_processor_with_queue(_topic_name, _queue_name, processor, _opts = {}) click to toggle source

Starts an event processor with a queue

Processor must respond to call_event() which accepts 3 arguments: (method, message, opts).

TBD: It must ack! or nack! the message.

If the processor raises an error, the message will be NACK-ed and accepted again at a later time.

@param topic_name [String] “<topic>” @param queue_name [String] “<queue>” @param processor [#call_event()] @param opts [Hash] additional adapter-specific options

# File lib/mimi/messaging/adapters/base.rb, line 171
def start_event_processor_with_queue(_topic_name, _queue_name, processor, _opts = {})
  # validates processor
  return if processor.respond_to?(:call_event) && processor.method(:call_event).arity >= 3

  raise(
    ArgumentError,
    "Invalid event processor passed to #{self.class}##{__method__}(), " \
    "expected to respond to #call_event(event_type, message, opts)"
  )
end
start_request_processor(_queue_name, processor, _opts = {}) click to toggle source

Starts a request (command/query) processor.

Processor must respond to call_command() AND call_query() which accepts 3 arguments: (method, message, opts).

TBD: It must ack! or nack! the message.

If the processor raises an error, the message will be NACK-ed and accepted again at a later time.

NOTE: Method must be overloaded by a subclass.

@param queue_name [String] “<queue>” @param processor [#call_command(),#call_query()] @param opts [Hash] additional adapter-specific options

# File lib/mimi/messaging/adapters/base.rb, line 115
def start_request_processor(_queue_name, processor, _opts = {})
  # validates processor
  if (
    processor.respond_to?(:call_command) && processor.method(:call_command).arity >= 3 &&
    processor.respond_to?(:call_query) && processor.method(:call_query).arity >= 3
  )
    return
  end

  raise(
    ArgumentError,
    "Invalid request processor passed to #{self.class}##{__method__}(), " \
    "expected to respond to #call_command(method_name, message, opts) AND #call_query(...)"
  )
end
stop() click to toggle source

Stops all message processors and then stops the adapter.

# File lib/mimi/messaging/adapters/base.rb, line 59
def stop
  raise "Method #stop() is not implemented by #{self.class}"
end
stop_all_processors() click to toggle source

Stops all message (command, query and event) processors.

Stops currently registered processors and stops accepting new messages for processors.

# File lib/mimi/messaging/adapters/base.rb, line 203
def stop_all_processors
  raise "Method #stop_all_processors() is not implemented by #{self.class}"
end

Protected Instance Methods

deserialize(message) click to toggle source

Deserializes a message (String) received on-the-wire to a Hash

@param message [String] @return [Hash]

# File lib/mimi/messaging/adapters/base.rb, line 225
def deserialize(message)
  raise "Message serializer is not registered in #{self.class}" unless @serializer

  @serializer.deserialize(message)
end
serialize(message) click to toggle source

Serializes a message (Hash) to be sent on-the-wire using configured serializer

@param message [Hash] @return [String]

# File lib/mimi/messaging/adapters/base.rb, line 214
def serialize(message)
  raise "Message serializer is not registered in #{self.class}" unless @serializer

  @serializer.serialize(message)
end