class Barrister::Amqp::Transport

Public Class Methods

new(service_name, options={}) click to toggle source

NOTE: transport needs to implement request method for the Barrister::Client to send requests to the server

# File lib/barrister/amqp.rb, line 27
def initialize(service_name, options={})
  conn = Bunny.new ENV.fetch('AMQP_URL') #"Please set AMQP_URL to something like: amqp://user:password@host:port/vhost"
  conn.start
  @ch             = conn.create_channel
  @service_q      = @ch.queue(service_name, auto_delete: false)
  @reply_q        = @ch.queue('', exclusive: true)
  @x              = @ch.default_exchange
  @response_table = Hash.new { |h,k| h[k] = Queue.new }
  @timeout        = options[:timeout] || 1 # Timeout is in seconds

  @reply_q.subscribe(block: false) do |delivery_info, properties, payload|
    @response_table[properties[:correlation_id]].push payload # push anything that comes in the response_q
  end
end

Public Instance Methods

request(message) click to toggle source
# File lib/barrister/amqp.rb, line 42
def request(message)
  enveloppe = Config.wrapper.new(message).wrap
  # NOTE message could be an array
  print "[AMQP TRANSPORT --->] \n #{enveloppe} \n" if Config.debug
  @x.publish(enveloppe['message'], { correlation_id: enveloppe['id'], reply_to: @reply_q.name, routing_key: @service_q.name})

  response = Timeout::timeout(@timeout) do
    @response_table[enveloppe['id']].pop.tap do
      @response_table.delete enveloppe['id']
    end
  end

  JSON.parse(response).tap do |resp|
    print "[AMQP TRANSPORT <---] \n #{resp} \n" if Config.debug
  end
rescue Timeout::Error
  raise RpcException.new(-32603, "Request timed out")
rescue JSON::ParserError => e
  raise RpcException.new(-32000, "Bad response #{e.message}")
end