class RJR::Nodes::AMQP

AMQP node definition, implements the {RJR::Node} interface to listen for and invoke json-rpc requests over the Advanced Message Queuing Protocol.

Clients should specify the amqp broker to connect to when initializing a node and specify the remote queue when invoking requests.

@example Listening for json-rpc requests over amqp

# initialize node,
server = RJR::Nodes::AMQP.new :node_id => 'server', :broker => 'localhost'

# register rjr dispatchers (see RJR::Dispatcher)
server.dispatcher.handle('hello') do |name|
  "Hello #{name}!"
end

# listen, and block
server.listen
server.join

@example Invoking json-rpc requests over amqp

client = RJR::Nodes::AMQP.new :node_id => 'client', :broker => 'localhost'
puts client.invoke('server-queue', 'hello', 'mo') # the queue name is set to "#{node_id}-queue"

Constants

INDIRECT_NODE
PERSISTENT_NODE
RJR_NODE_TYPE

Public Class Methods

new(args = {}) click to toggle source

AMQPNode initializer

@param [Hash] args the options to create the amqp node with @option args [String] :broker the amqp message broker which to connect to

Calls superclass method RJR::Node::new
# File lib/rjr/nodes/amqp.rb, line 129
def initialize(args = {})
   super(args)
   @host          = args[:host] || args[:broker]
   @port          = args[:port]
   @vhost         = args[:vhost]
   @user          = args[:user] || args[:username]
   @pass          = args[:pass] || args[:password]
   @ssl           = args[:ssl]
   @amqp_lock     = Mutex.new
end

Public Instance Methods

invoke(routing_key, rpc_method, *args) click to toggle source

Instructs node to send rpc request, and wait for and return response.

Implementation of RJR::Node#invoke

Do not invoke directly from em event loop or callback as will block the message subscription used to receive responses

@param [String] routing_key destination queue to send request to @param [String] rpc_method json-rpc method to invoke on destination @param [Array] args array of arguments to convert to json and invoke remote method wtih @return [Object] the json result retrieved from destination converted to a ruby object @raise [Exception] if the destination raises an exception, it will be converted to json and re-raised here

# File lib/rjr/nodes/amqp.rb, line 185
def invoke(routing_key, rpc_method, *args)
  message = Messages::Request.new :method => rpc_method,
                                  :args   => args,
                                  :headers => @message_headers
  @@em.schedule do
    init_node {
      subscribe # begin listening for result
      send_msg(message.to_s, :routing_key => routing_key, :reply_to => @queue_name)
    }
  end

  # TODO optional timeout for response
  result = wait_for_result(message)

  if result.size > 2
    fail result[2]
  end
  return result[1]
end
listen() click to toggle source

Instruct Node to start listening for and dispatching rpc requests.

Implementation of RJR::Node#listen

# File lib/rjr/nodes/amqp.rb, line 164
def listen
  @@em.schedule do
    init_node {
      subscribe # start receiving messages
    }
  end
  self
end
notify(routing_key, rpc_method, *args) click to toggle source

Instructs node to send rpc notification (immadiately returns / no response is generated)

Implementation of RJR::Node#notif}

@param [String] routing_key destination queue to send request to @param [String] rpc_method json-rpc method to invoke on destination @param [Array] args array of arguments to convert to json and invoke remote method wtih

# File lib/rjr/nodes/amqp.rb, line 216
def notify(routing_key, rpc_method, *args)
  # will block until message is published
  published_l = Mutex.new
  published_c = ConditionVariable.new

  invoked = false
  message = Messages::Notification.new :method => rpc_method,
                                       :args   => args,
                                       :headers => @message_headers
  @@em.schedule do
    init_node {
      send_msg(message.to_s, :routing_key => routing_key, :reply_to => @queue_name){
        published_l.synchronize { invoked = true ; published_c.signal }
      }
    }
  end
  published_l.synchronize { published_c.wait published_l unless invoked }
  nil
end
send_msg(msg, metadata, &on_publish) click to toggle source

Publish a message using the amqp exchange

Implementation of RJR::Node#send_msg

# File lib/rjr/nodes/amqp.rb, line 147
def send_msg(msg, metadata, &on_publish)
  @amqp_lock.synchronize {
    #raise RJR::Errors::ConnectionError.new("client unreachable") if @disconnected
    routing_key = metadata[:routing_key]
    reply_to    = metadata[:reply_to]
    @exchange.publish msg,
                      :routing_key => routing_key,
                      :reply_to => reply_to do |*cargs|
      on_publish.call unless on_publish.nil?
    end
  }
  nil
end
to_s() click to toggle source
# File lib/rjr/nodes/amqp.rb, line 140
def to_s
  "RJR::Nodes::AMQP<#{@node_id},#{@host},#{@port},#{@vhost},#{@queue_name}>"
end

Private Instance Methods

amqp_options() click to toggle source
# File lib/rjr/nodes/amqp.rb, line 59
def amqp_options
  opts = {}
  opts[:host]  = @host  if @host
  opts[:port]  = @port  if @port
  opts[:vhost] = @vhost if @vhost
  opts[:user]  = @user  if @user
  opts[:pass]  = @pass  if @pass
  opts[:ssl]   = @ssl   if @ssl
  opts
end
init_node(&on_init) click to toggle source

Internal helper, initialize the amqp subsystem

# File lib/rjr/nodes/amqp.rb, line 71
def init_node(&on_init)
   if !@conn.nil? && @conn.connected?
     on_init.call
     return
   end

   @conn = ::AMQP.connect(amqp_options) do |conn|
     ::AMQP.connection = conn # XXX not sure why this is needed but the amqp
                              # em interface won't shut down cleanly otherwise

     conn.on_tcp_connection_failure { puts "OTCF #{@node_id}" }

     ### connect to qpid broker
     @channel = ::AMQP::Channel.new(conn)

     # qpid constructs that will be created for node
     @queue_name  = "#{@node_id.to_s}-queue"
     @queue       = @channel.queue(@queue_name, :auto_delete => true)
     @exchange    = @channel.default_exchange

     @listening = false
     #@disconnected = false

     @exchange.on_return do |basic_return, metadata, payload|
         puts "#{payload} was returned! reply_code = #{basic_return.reply_code}, reply_text = #{basic_return.reply_text}"
         #@disconnected = true # FIXME member will be set on wrong class
         # TODO these are only run when we fail to send message to queue,
         # need to detect when that queue is shutdown & other events
         connection_event(:error)
         connection_event(:closed)
     end

     on_init.call
   end
end
subscribe() click to toggle source

Internal helper, subscribe to messages using the amqp queue

# File lib/rjr/nodes/amqp.rb, line 108
def subscribe
  if @listening
    return
  end

  @amqp_lock.synchronize {
    @listening = true
    @queue.subscribe do |metadata, msg|
      # swap reply to and routing key
      handle_message(msg, {:routing_key => metadata.reply_to, :reply_to => @queue_name})
    end
  }
  nil
end