module Mimi::Messaging

Mimi::Messaging implements a messaging layer of a microservice application.

Usage: [TBD]

Constants

DEFAULT_LOG_AT_LEVEL

By default Mimi::Messaging logs at given level

EVENT_TARGET_REGEX

Event target validation pattern: “[<name>.]<name>#<identifier>” Where <name> consists of characters: A-Za-z0-9_- and <method_name> can be any of: A-Za-z0-9_

Example: “shop.orders#created”

REQUEST_TARGET_REGEX

Request target validation pattern: “[<name>.]<name>/<identifier>” Where <name> consists of characters: A-Za-z0-9_- and <method_name> can be any of: A-Za-z0-9_

Example: “shop.orders/list”

VERSION

Public Class Methods

adapter() click to toggle source

Returns the configured adapter

@return [Mimi::Messaging::Adapter]

# File lib/mimi/messaging.rb, line 84
def self.adapter
  raise Error, "Mimi::Messaging adapter is not configured" unless @adapter

  @adapter
end
command(target, message = {}, opts = {}) click to toggle source

Sends the command to the given target

Example:

Mimi::Messaging.command("users/create", name: "John Smith")

@param target [String] “<queue>/<method>” @param message [Hash,Mimi::Messaging::Message] @param opts [Hash] additional adapter-specific options

@return nil

# File lib/mimi/messaging.rb, line 198
def self.command(target, message = {}, opts = {})
  raise ArgumentError, "Invalid target argument" unless REQUEST_TARGET_REGEX.match(target)
  raise ArgumentError, "Invalid message, Hash or Message is expected" unless message.is_a?(Hash)
  raise Error, "Failed to send command, adapter is not started" unless started?(:adapter)

  adapter.command(target, Mimi::Messaging::Message.new(message), opts)
end
configure(options) click to toggle source

Configure the Messaging layer

Configures the adapter (type) and the adapter specific options.

@param options [Hash] options passed to the adapter @option options [String,Symbol] :mq_adapter Adapter type, one of “memory”, “test” etc

# File lib/mimi/messaging.rb, line 58
def self.configure(options)
  raise ArgumentError, "Hash is expected as options" unless options.is_a?(Hash)
  raise ConfigurationError, ":mq_adapter is expected to be set" unless options.key?(:mq_adapter)

  @options = options.dup
  adapter_name = options[:mq_adapter].to_s
  adapter_class = Mimi::Messaging::Adapters.registered_adapters[adapter_name]
  unless adapter_class
    registered_adapter_names = Mimi::Messaging::Adapters.registered_adapters.keys
    raise(
      ConfigurationError,
      "Failed to find adapter with name '#{adapter_name}', " \
      " registered adapters are: #{registered_adapter_names.join(', ')}"
    )
  end

  @adapter = adapter_class.new(@options)
  raise ConfigurationError, "Message serializer is not registered" unless @serializer

  @adapter.register_message_serializer(@serializer)
end
event(target, message = {}, opts = {}) click to toggle source

Broadcasts the event with the given target

@param target [String] “<topic>#<event_type>”, e.g. “customers#created” @param message [Hash,Mimi::Messaging::Message] @param opts [Hash] additional options

# File lib/mimi/messaging.rb, line 233
def self.event(target, message = {}, opts = {})
  raise ArgumentError, "Invalid target argument" unless EVENT_TARGET_REGEX.match(target)
  raise ArgumentError, "Invalid message, Hash or Message is expected" unless message.is_a?(Hash)
  raise Error, "Failed to broadcast event, adapter is not started" unless started?(:adapter)

  adapter.event(target, Mimi::Messaging::Message.new(message), opts)
end
log(message) click to toggle source

Logs with configured logger at configured logging level

@param message [String]

# File lib/mimi/messaging.rb, line 381
def self.log(message)
  return unless logger

  log_at_level = options[:mq_log_at_level] || DEFAULT_LOG_AT_LEVEL
  log_at_level = log_at_level.to_sym
  return if log_at_level == :none

  logger.send(log_at_level, message)
end
logger() click to toggle source

Returns configured logger

@return [Logger] or compatible

# File lib/mimi/messaging.rb, line 373
def self.logger
  @logger
end
options() click to toggle source

Returns the module configured options

@return [Hash]

# File lib/mimi/messaging.rb, line 94
def self.options
  @options
end
query(target, message = {}, opts = {}) click to toggle source

Executes the query to the given target and returns response

Raises Timeout::Error if the response from the target was not received in time.

Example:

result = Mimi::Messaging.query("users/find", id: 157)

@param target [String] “<queue>/<method>” @param message [Hash,Mimi::Messaging::Message] @param opts [Hash] additional options, e.g. :timeout

@return [Hash]

# File lib/mimi/messaging.rb, line 219
def self.query(target, message = {}, opts = {})
  raise ArgumentError, "Invalid target argument" unless REQUEST_TARGET_REGEX.match(target)
  raise ArgumentError, "Invalid message, Hash or Message is expected" unless message.is_a?(Hash)
  raise Error, "Failed to send query, adapter is not started" unless started?(:adapter)

  adapter.query(target, Mimi::Messaging::Message.new(message), opts)
end
register_event_processor(topic_name, processor, opts = {}) click to toggle source

Registers an event processor without a queue

If the adapter and the processors are started, the processor will be automatically started (registered with the adapter).

Processor must respond to call_event() which accepts 3 arguments: (method, message, opts).

TBD: It must ack! or nack! the message.

If the processor raises an error, the message will be NACK-ed and accepted again at a later time.

@param topic_name [String] “<topic>” @param processor [#call_event()] @param opts [Hash] additional adapter-specific options

# File lib/mimi/messaging.rb, line 301
def self.register_event_processor(topic_name, processor, opts = {})
  # validates processor
  if !processor.respond_to?(:call_event) || processor.method(:call_event).arity < 3
    raise(
      ArgumentError,
      "Invalid event processor passed to .register_event_processor(), " \
      "expected to respond to #call_event(method_name, request, opts)"
    )
  end

  message_processor_params = {
    type: :event,
    topic_name: topic_name,
    processor: processor,
    opts: opts.dup,
    started: false
  }
  if started?(:adapter) && started?(:processors)
    start_message_processor(message_processor_params)
  end
  message_processors << message_processor_params
end
register_event_processor_with_queue(topic_name, queue_name, processor, opts = {}) click to toggle source

Registers an event processor with a queue

If the adapter and the processors are started, the processor will be automatically started (registered with the adapter).

Processor must respond to call_event() which accepts 3 arguments: (method, message, opts).

TBD: It must ack! or nack! the message.

If the processor raises an error, the message will be NACK-ed and accepted again at a later time.

@param topic_name [String] “<topic>” @param queue_name [String] “<queue>” @param processor [#call_event()] @param opts [Hash] additional adapter-specific options

# File lib/mimi/messaging.rb, line 342
def self.register_event_processor_with_queue(topic_name, queue_name, processor, opts = {})
  # validates processor
  if !processor.respond_to?(:call_event) || processor.method(:call_event).arity < 3
    raise(
      ArgumentError,
      "Invalid event processor passed to .register_event_processor_with_queue(), " \
      "expected to respond to #call_event(method_name, request, opts)"
    )
  end

  message_processor_params = {
    type: :event_with_queue,
    topic_name: topic_name,
    queue_name: queue_name,
    processor: processor,
    opts: opts.dup,
    started: false
  }
  if started?(:adapter) && started?(:processors)
    start_message_processor(message_processor_params)
  end
  message_processors << message_processor_params
end
register_request_processor(queue_name, processor, opts = {}) click to toggle source

Registers the request (command/query) processor.

If the adapter and the processors are started, the processor will be automatically started (registered with the adapter).

Processor must respond to call_command() AND call_query() which accepts 3 arguments: (method, message, opts).

TBD: It must ack! or nack! the message.

If the processor raises an error, the message will be NACK-ed and accepted again at a later time.

@param queue_name [String] “<queue>” @param processor [#call_command()] @param opts [Hash] additional adapter-specific options

# File lib/mimi/messaging.rb, line 258
def self.register_request_processor(queue_name, processor, opts = {})
  # validates processor
  unless (
    processor.respond_to?(:call_command) && processor.method(:call_command).arity >= 3 &&
    processor.respond_to?(:call_query) && processor.method(:call_query).arity >= 3
  )
    raise(
      ArgumentError,
      "Invalid request processor passed to .register_request_processor(), " \
      "expected to respond to #call_command(...) AND #call_query(method_name, request, opts)"
    )
  end

  message_processor_params = {
    type: :request,
    queue_name: queue_name,
    processor: processor,
    opts: opts.dup,
    started: false
  }
  if started?(:adapter) && started?(:processors)
    start_message_processor(message_processor_params)
  end
  message_processors << message_processor_params
end
start(params = {}) click to toggle source

Starts the Messaging module

Starts the adapter if it is not started yet, and registers the current message serializer with it. Starting the adapter opens connections with a message broker.

Automatically starts all currently registered message processors, unless the :processors option is false.

Example:

# to only start the adapter, so that we can send messages,
# but not process incoming messages:
Mimi::Messaging.start(processors: false)

# to start everything
Mimi::Messaging.start

@param params [Hash] additional parameters @option params [true,false] :adapter (default: true)

start the adapter

@option params [true,false] :processors (default: true)

automatically registers message processors
# File lib/mimi/messaging.rb, line 121
def self.start(params = {})
  adapter # ensures that adapter is configured
  log("#{name} starting with adapter '#{options[:mq_adapter]}'")
  params = { # defaults
    adapter: true,
    processors: true
  }.merge(params)

  if !started?(:adapter) && params[:adapter]
    adapter.start
    started!(:adapter)
  end

  if !started?(:processors) && params[:processors]
    start_all_message_processors
    started!(:processors)
  end

  true
end
stop(params = {}) click to toggle source

Stops the Messaging module

Stops all currently registered message processors, unless :processors option is false.

Stops the adapter, unless :adapter option is false. Stopping the adapter closes connections with a message broker.

Example:

# to start everything
Mimi::Messaging.start

# to only stop the message processors, so that we can send messages
# but not process incoming messages:
Mimi::Messaging.stop(adapter: false, processors: true)

# to stop everything
Mimi::Messaging.stop

@param params [Hash] additional parameters @option params [true,false] :processors (default: true)

deregister all message processors

@option params [true,false] :adapter (default: true)

deregister all message processors
# File lib/mimi/messaging.rb, line 167
def self.stop(params = {})
  params = { # defaults
    adapter: true,
    processors: true
  }.merge(params)

  if params[:processors]
    stop_all_processors
    started!(:processors, false)
  end

  if params[:adapter]
    adapter.stop # TODO: stopping adapter without stopping processors? TBD
    started!(:adapter, false)
  end

  log("#{name} stopped")
  true
end
unregister_all_processors() click to toggle source

Deregisters all message processors

# File lib/mimi/messaging.rb, line 473
def self.unregister_all_processors
  stop_all_processors
  message_processors.replace([])
end
use(options) click to toggle source

Configure up the Messaging module

Sets up Messaging layer dependencies configuration, e.g. configures logger, message serializer etc.

# File lib/mimi/messaging.rb, line 46
def self.use(options)
  @serializer = options[:serializer] if options.key?(:serializer)
  @logger = options[:logger] if options.key?(:logger)
end

Private Class Methods

message_processors() click to toggle source

Returns the set of registered message processors

@return [Array{Hash}]

# File lib/mimi/messaging.rb, line 423
def self.message_processors
  @message_processors ||= []
end
start_all_message_processors() click to toggle source

Starts all registered message processors at the adapter

# File lib/mimi/messaging.rb, line 457
def self.start_all_message_processors
  message_processors.each { |p| start_message_processor(p) }
end
start_message_processor(message_processor_params) click to toggle source

Starts the message processor at the configured and started adapter

@param message_processor_params [Hash]

# File lib/mimi/messaging.rb, line 432
def self.start_message_processor(message_processor_params)
  return if message_processor_params[:started] # do not start processor twice

  p = message_processor_params
  case p[:type]
  when :request
    log "#{self} starting request processor #{p[:processor]}@#{p[:queue_name]}"
    adapter.start_request_processor(p[:queue_name], p[:processor], p[:opts])
  when :event
    log "#{self} starting event processor #{p[:processor]}@#{p[:topic_name]}"
    adapter.start_event_processor(p[:topic_name], p[:processor], p[:opts])
  when :event_with_queue
    log "#{self} starting event processor #{p[:processor]}@#{p[:topic_name]}/#{p[:queue_name]}"
    adapter.start_event_processor_with_queue(
      p[:topic_name], p[:queue_name], p[:processor], p[:opts]
    )
  else
    raise "Unexpected message processor type: #{message_processor[:type].inspect}"
  end
  message_processor_params[:started] = true
end
started!(name, value = true) click to toggle source

Sets the state of the given subsystem

Example:

started!(:adapter, false)

@param name [Symbol] @param value [true,false] (default: true)

# File lib/mimi/messaging.rb, line 413
def self.started!(name, value = true)
  @started ||= {}
  @started[name] = !!value
end
started?(name) click to toggle source

Returns true if the given subsystem started

Example:

started?(:adapter)

@param name [Symbol] @return [true,false]

# File lib/mimi/messaging.rb, line 399
def self.started?(name)
  @started ||= {}
  @started[name]
end
stop_all_processors() click to toggle source

Stops all registered message processors at the adapter

# File lib/mimi/messaging.rb, line 464
def self.stop_all_processors
  log "#{self} stopping all message processors"
  adapter.stop_all_processors
  message_processors.each { |p| p[:started] = false }
end