class Barrister::Amqp::Transport
Public Class Methods
new(service_name, options={})
click to toggle source
NOTE: transport needs to implement request method for the Barrister::Client to send requests to the server
# File lib/barrister/amqp.rb, line 27 def initialize(service_name, options={}) conn = Bunny.new ENV.fetch('AMQP_URL') #"Please set AMQP_URL to something like: amqp://user:password@host:port/vhost" conn.start @ch = conn.create_channel @service_q = @ch.queue(service_name, auto_delete: false) @reply_q = @ch.queue('', exclusive: true) @x = @ch.default_exchange @response_table = Hash.new { |h,k| h[k] = Queue.new } @timeout = options[:timeout] || 1 # Timeout is in seconds @reply_q.subscribe(block: false) do |delivery_info, properties, payload| @response_table[properties[:correlation_id]].push payload # push anything that comes in the response_q end end
Public Instance Methods
request(message)
click to toggle source
# File lib/barrister/amqp.rb, line 42 def request(message) enveloppe = Config.wrapper.new(message).wrap # NOTE message could be an array print "[AMQP TRANSPORT --->] \n #{enveloppe} \n" if Config.debug @x.publish(enveloppe['message'], { correlation_id: enveloppe['id'], reply_to: @reply_q.name, routing_key: @service_q.name}) response = Timeout::timeout(@timeout) do @response_table[enveloppe['id']].pop.tap do @response_table.delete enveloppe['id'] end end JSON.parse(response).tap do |resp| print "[AMQP TRANSPORT <---] \n #{resp} \n" if Config.debug end rescue Timeout::Error raise RpcException.new(-32603, "Request timed out") rescue JSON::ParserError => e raise RpcException.new(-32000, "Bad response #{e.message}") end