class Thrift::AmqpRpcClientTransport

Public Class Methods

new(service_queue_name, opts={}) click to toggle source
# File lib/thrift/amqp/amqp_rpc_client.rb, line 32
def initialize(service_queue_name, opts={})
  @service_queue_name = service_queue_name
  @outbuf = Bytes.empty_byte_buffer
  @timeout = opts[:timeout] || 100
  if opts[:connection].nil?
    if opts[:host].nil?
      raise ArgumentError, ":host key not provided in opts dict to make connection"
    end

    if opts[:port].nil?
      raise ArgumentError, ":port key not provided in opts dict to make connection"
    end

    vhost = opts[:vhost] || "/"
    user = opts[:user] || "guest"
    password = opts[:password] || "guest"
    ssl = opts[:ssl] || false

    @conn = Bunny.new(:host => opts[:host], :port => opts[:port], :vhost => vhost, :user => user, :password => password, :ssl=> ssl)

    @conn.start
    @connection_started = true
  else
    @conn = opts[:connection]
    @connection_started = false
  end

  @from_name                 = opts[:from_name].nil? ? "Unknown Client" : opts[:from_name]
  @exchange                  = opts[:exchange] || nil

  @ch                        = @conn.create_channel
  @service_exchange          = @exchange.nil? ? @ch.default_exchange : declare_exchange
  @service_response_exchange = @ch.default_exchange
  @reply_queue               = @ch.queue("", :exclusive => true)
  @is_opened                 = true

end

Public Instance Methods

close() click to toggle source
# File lib/thrift/amqp/amqp_rpc_client.rb, line 76
def close
  if @is_opened
    @reply_queue.delete
    @ch.close

    if @connection_started
      @conn.close
      @connection_started = false
    end

    @is_opened = false
  end
end
declare_exchange() click to toggle source
# File lib/thrift/amqp/amqp_rpc_client.rb, line 70
def declare_exchange
  @ch.direct(@exchange)
rescue
  @ch.direct(@exchange, no_declare: true)
end
flush(options={}) click to toggle source

If blocking is set to true then wait for a response message in the reply_to queue, otherwise just send and go!

# File lib/thrift/amqp/amqp_rpc_client.rb, line 96
def flush(options={})

  operation = options.has_key?(:operation) ? options[:operation] : ""
  blocking = options.has_key?(:blocking) ? options[:blocking] : true
  msg_timeout = options.has_key?(:msg_timeout) ? options[:msg_timeout] : @timeout
  log_messages = options.has_key?(:log_messages) ? options[:log_messages] : false

  correlation_id = self.generate_uuid

  headers = {:service_name      => @service_queue_name,
             :operation         => operation,
             :response_required => blocking,   #Tell the receiver if a response is required
             :from_name         => @from_name
  }

  #Publish the message
  print_log "Publishing message reply-to: #{@reply_queue.name} - headers: #{headers}", correlation_id if log_messages
  start_time = Time.now
  @service_exchange.publish(@outbuf,
                            :routing_key    => @service_queue_name,
                            :correlation_id => correlation_id,
                            :expiration     => msg_timeout,
                            :reply_to       => @reply_queue.name,
                            :headers        => headers)

  #If this is a standard RPC blocking call, then wait for there to be a response from the
  #service provider or timeout and log the timeout
  if blocking
    @response = ""
    begin
      #Adding 1sec to timeout to account for clock differences
      Timeout.timeout(msg_timeout + 1, ResponseTimeout) do
        @reply_queue.subscribe(:block => true) do |delivery_info, properties, payload|

          if log_messages
            response_time = Time.now - start_time
            print_log "---- Response Message received in #{response_time}sec for #{@reply_queue.name}", correlation_id
            print_log "HEADERS: #{properties}", correlation_id
          end

          if properties[:correlation_id] == correlation_id
            @response = payload

            #once the return message has been received, no need to continue a subscription
            delivery_info.consumer.cancel
          end
        end
      end
    rescue ResponseTimeout => ex
      #Trying to work around weirdness being seen in a multi threaded workflow environment
      if @response == ""
        msg = "A timeout has occurred (#{msg_timeout}sec) trying to call #{@service_queue_name}.#{operation}"
        print_log msg, correlation_id
        @outbuf = Bytes.empty_byte_buffer
        raise ex, msg
      else
        print_log "Ignoring timeout - #{@response}", correlation_id
      end
    rescue => e
      @outbuf = Bytes.empty_byte_buffer
      raise e
    end
    @inbuf = StringIO.new Bytes.force_binary_encoding(@response)
  end
  @outbuf = Bytes.empty_byte_buffer
end
open?() click to toggle source
# File lib/thrift/amqp/amqp_rpc_client.rb, line 90
def open?; @is_opened end
read(sz) click to toggle source
# File lib/thrift/amqp/amqp_rpc_client.rb, line 91
def read(sz); @inbuf.read sz end
write(buf) click to toggle source
# File lib/thrift/amqp/amqp_rpc_client.rb, line 92
def write(buf); @outbuf << Bytes.force_binary_encoding(buf) end

Protected Instance Methods

generate_uuid() click to toggle source
# File lib/thrift/amqp/amqp_rpc_client.rb, line 165
def generate_uuid
  UUIDTools::UUID.timestamp_create.to_s
end
print_log(message="", correlation_id="") click to toggle source