class ManageIQ::Messaging::Client

The abstract client class. It defines methods needed to publish or subscribe messages. It is not recommended to directly create a solid subclass instance. The proper way is to call class method Client.open with desired protocol. For example:

client = ManageIQ::Messaging::Client.open(
  :protocol  => 'Stomp',
  :host       => 'localhost',
  :port       => 61616,
  :password   => 'smartvm',
  :username   => 'admin',
  :client_ref => 'generic_1',
  :encoding   => 'json'
)

To close the connection one needs to explicitly call client.close. Alternatively if a block is given for the open method, the connection will be closed automatically before existing the block. For example:

ManageIQ::Messaging::Client.open(
  :protocol   => 'Stomp'
  :host       => 'localhost',
  :port       => 61616,
  :password   => 'smartvm',
  :username   => 'admin',
  :client_ref => 'generic_1'
) do |client|
  # do stuff with the client
  end
end

Public Class Methods

open(options) { |client| ... } click to toggle source

Open or create a connection to the message broker. Expected options keys are:

  • :protocol (Implemented: 'Stomp', 'Kafka'. Default 'Stomp')

  • :encoding ('yaml' or 'json'. Default 'yaml')

Other connection options are underlying messaging system specific.

Returns a Client instance if no block is given.

# File lib/manageiq/messaging/client.rb, line 40
def self.open(options)
  protocol = options[:protocol] || :Stomp
  client = Object.const_get("ManageIQ::Messaging::#{protocol}::Client").new(options)

  return client unless block_given?

  begin
    yield client
  ensure
    client.close
  end
  nil
end

Public Instance Methods

publish_message(options, &block) click to toggle source

Publish a message to a queue. The message will be delivered to only one subscriber. Expected keys in options are:

  • :service (service and affinity are used to determine the queue name)

  • :affinity (optional)

  • :class_name (optional)

  • :message (e.g. method name or message type)

  • :payload (message body, a string or an user object that can be serialized)

  • :sender (optional, identify the sender)

  • :headers (optional, additional headers to add to the message)

Other options are underlying messaging system specific.

Optionally a call back block can be provided to wait on the consumer to send an acknowledgment. Not every underlying messaging system supports callback. Example:

client.publish_message(
  :service  => 'ems_operation',
  :affinity => 'ems_amazon1',
  :message  => 'power_on',
  :payload  => {
    :ems_ref => 'u987',
    :id      => '123'
  }
) do |result|
  ansible_install_pkg(vm1) if result == 'running'
end
# File lib/manageiq/messaging/client.rb, line 80
def publish_message(options, &block)
  assert_options(options, [:message, :service])

  publish_message_impl(options, &block)
end
publish_messages(messages) click to toggle source

Publish multiple messages to a queue. An aggregate version of #publish_message but for better performance. All messages are sent in a batch. Every element in messages array is an options hash.

# File lib/manageiq/messaging/client.rb, line 91
def publish_messages(messages)
  publish_messages_impl(messages)
end
publish_topic(messages) click to toggle source

Publish a message as a topic. All subscribers will receive a copy of the message. messages can be either a hash or an array of hashes. Expected keys are:

  • :service (service is used to determine the topic address)

  • :event (event name)

  • :payload (message body, a string or an user object that can be serialized)

  • :sender (optional, identify the sender)

  • :headers (optional, additional headers to add to the message)

Other options are underlying messaging system specific.

# File lib/manageiq/messaging/client.rb, line 173
def publish_topic(messages)
  messages = Array.wrap(messages)
  messages.each { |msg| assert_options(msg, [:event, :service]) }

  publish_topic_impl(messages)
end
subscribe_background_job(options) click to toggle source

Subscribe to receive from a queue and run each message as a background job. Expected keys in options are:

  • :service (service and affinity are used to determine the queue)

  • :affinity (optional)

  • :auto_ack (default true, if it is false, client.ack method must be explicitly called)

Other options are underlying messaging system specific.

This subscriber consumes messages sent through publish_message with required options keys, for example:

client.publish_message(
  :service    => 'generic',
  :class_name => 'MiqTask',
  :message    => 'update_attributes', # method name, for instance method :instance_id is required
  :payload    => {
    :instance_id => 2, # database id of class instance stored in rails DB
    :args        => [{:status => 'Timeout'}] # argument list expected by the method
  }
)

Background job assumes each job is not re-triable. It is auto-acked as soon as a request is received

# File lib/manageiq/messaging/client.rb, line 157
def subscribe_background_job(options)
  assert_options(options, [:service])

  subscribe_background_job_impl(options)
end
subscribe_messages(options, &block) click to toggle source

Subscribe to receive messages from a queue. Expected keys in options are:

  • :service (service and affinity are used to determine the queue)

  • :affinity (optional)

  • :auto_ack (default true, if it is false, client.ack method must be explicitly called)

Other options are underlying messaging system specific.

A callback block is needed to consume the messages:

client.subscribe_message(options) do |messages|
  messages.each do |msg|
    # msg is a type of ManageIQ::Messaging::ReceivedMessage
    # attributes in msg
    msg.sender
    msg.message
    msg.payload
    msg.ack_ref

    msg.ack # needed only when options[:auto_ack] is false
    # process the message
  end
end

With the auto_ack option default to true, the message will be automatically acked immediately after the delivery. Some messaging systems allow the subscriber to ack each message in the callback block. The code in the block can decide when to ack according to whether a message can be retried. Ack the message in the beginning of processing if the message is not re-triable; otherwise ack it after the message is proccessed. Any un-acked message will be redelivered to next subscriber AFTER the current subscriber disconnects normally or abnormally (e.g. crashed).

To ack a message call msg.ack

# File lib/manageiq/messaging/client.rb, line 128
def subscribe_messages(options, &block)
  raise "A block is required" unless block_given?
  assert_options(options, [:service])

  subscribe_messages_impl(options, &block)
end
subscribe_topic(options, &block) click to toggle source

Subscribe to receive topic type messages. Expected keys in options are:

  • :service (service is used to determine the topic address)

Other options are underlying messaging system specific.

Some messaging systems allow subscribers to consume events missed during the period when the client is offline when they reconnect. Additional options are needed to turn on this feature.

A callback block is needed to consume the topic:

client.subcribe_topic(:service => 'provider_events', :auto_ack => false) do |msg|
  # msg is a type of ManageIQ::Messaging::ReceivedMessage
  # attributes in msg
  msg.sender
  msg.message
  msg.payload
  msg.ack_ref

  msg.ack # needed only when options[:auto_ack] is false
  # process the message
end

With the auto_ack option default to true, the message will be automatically acked immediately after the delivery. Some messaging systems allow the subscriber to ack each message in the callback block. The code in the block can decide when to ack according to whether a message can be retried. Ack the message in the beginning of processing if the message is not re-triable; otherwise ack it after the message is proccessed. Any un-acked message will be redelivered to next subscriber AFTER the current subscriber disconnects normally or abnormally (e.g. crashed).

To ack a message call msg.ack

# File lib/manageiq/messaging/client.rb, line 213
def subscribe_topic(options, &block)
  raise "A block is required" unless block_given?
  assert_options(options, [:service])

  subscribe_topic_impl(options, &block)
end

Private Instance Methods

assert_options(options, keys) click to toggle source
# File lib/manageiq/messaging/client.rb, line 226
def assert_options(options, keys)
  missing = keys - options.keys
  raise ArgumentError, "options must contain keys #{missing}" unless missing.empty?
end
logger() click to toggle source
# File lib/manageiq/messaging/client.rb, line 222
def logger
  ManageIQ::Messaging.logger
end