class Rmsg::Topic

Topic handles publishing and subscribing to a topic with a key, over RabbitMQ.

Public Class Methods

new(params) click to toggle source

@param params [Hash] @option params :rabbit [Rmsg::Rabbit] Example: Rmsg::Rabbit.new @option params :topic [String] Example: ‘services’

# File lib/rmsg/topic.rb, line 8
def initialize(params)
  @rabbit = params[:rabbit]
  @exchange = @rabbit.channel.topic(params[:topic])
end

Public Instance Methods

publish(message, key) click to toggle source

Publish a message with a routing key. @param message [Hash] Message to be published. @param key [String] Example: ‘users.key_changed’ @return [Exchange] The exchange used to publish.

# File lib/rmsg/topic.rb, line 17
def publish(message, key)
  @exchange.publish(message.to_json, :routing_key => key)
end
subscribe(key) { |message| ... } click to toggle source

Subscribe to the topic, listening for a specific key. Subscribing happens by continuously blocking the current process. It is specifically designed for long running processes. When receiving INT it will gracefully close. @param key [String] Example: ‘users.key_changed’ @yieldparam message [Hash] The message received, to be processed within the block.

# File lib/rmsg/topic.rb, line 27
def subscribe(key)
  @queue = @rabbit.channel.queue("", :exclusive => true)
  @queue.bind(@exchange, :routing_key => key)
  begin
    @queue.subscribe(:block => true) do |delivery_info, metadata, payload|
      message = JSON.parse(payload, symbolize_names: true)
      yield message
    end
  rescue Interrupt => _
    @rabbit.close
  end
end