class Rmsg::Topic
Topic
handles publishing and subscribing to a topic with a key, over RabbitMQ.
Public Class Methods
new(params)
click to toggle source
@param params [Hash] @option params :rabbit [Rmsg::Rabbit] Example: Rmsg::Rabbit.new
@option params :topic [String] Example: ‘services’
# File lib/rmsg/topic.rb, line 8 def initialize(params) @rabbit = params[:rabbit] @exchange = @rabbit.channel.topic(params[:topic]) end
Public Instance Methods
publish(message, key)
click to toggle source
Publish a message with a routing key. @param message [Hash] Message to be published. @param key [String] Example: ‘users.key_changed’ @return [Exchange] The exchange used to publish.
# File lib/rmsg/topic.rb, line 17 def publish(message, key) @exchange.publish(message.to_json, :routing_key => key) end
subscribe(key) { |message| ... }
click to toggle source
Subscribe to the topic, listening for a specific key. Subscribing happens by continuously blocking the current process. It is specifically designed for long running processes. When receiving INT it will gracefully close. @param key [String] Example: ‘users.key_changed’ @yieldparam message [Hash] The message received, to be processed within the block.
# File lib/rmsg/topic.rb, line 27 def subscribe(key) @queue = @rabbit.channel.queue("", :exclusive => true) @queue.bind(@exchange, :routing_key => key) begin @queue.subscribe(:block => true) do |delivery_info, metadata, payload| message = JSON.parse(payload, symbolize_names: true) yield message end rescue Interrupt => _ @rabbit.close end end