class Thrift::AmqpRpcClientTransport
Public Class Methods
new(service_queue_name, opts={})
click to toggle source
# File lib/thrift/amqp/amqp_rpc_client.rb, line 32 def initialize(service_queue_name, opts={}) @service_queue_name = service_queue_name @outbuf = Bytes.empty_byte_buffer @timeout = opts[:timeout] || 100 if opts[:connection].nil? if opts[:host].nil? raise ArgumentError, ":host key not provided in opts dict to make connection" end if opts[:port].nil? raise ArgumentError, ":port key not provided in opts dict to make connection" end vhost = opts[:vhost] || "/" user = opts[:user] || "guest" password = opts[:password] || "guest" ssl = opts[:ssl] || false @conn = Bunny.new(:host => opts[:host], :port => opts[:port], :vhost => vhost, :user => user, :password => password, :ssl=> ssl) @conn.start @connection_started = true else @conn = opts[:connection] @connection_started = false end @from_name = opts[:from_name].nil? ? "Unknown Client" : opts[:from_name] @exchange = opts[:exchange] || nil @ch = @conn.create_channel @service_exchange = @exchange.nil? ? @ch.default_exchange : declare_exchange @service_response_exchange = @ch.default_exchange @reply_queue = @ch.queue("", :exclusive => true) @is_opened = true end
Public Instance Methods
close()
click to toggle source
# File lib/thrift/amqp/amqp_rpc_client.rb, line 76 def close if @is_opened @reply_queue.delete @ch.close if @connection_started @conn.close @connection_started = false end @is_opened = false end end
declare_exchange()
click to toggle source
# File lib/thrift/amqp/amqp_rpc_client.rb, line 70 def declare_exchange @ch.direct(@exchange) rescue @ch.direct(@exchange, no_declare: true) end
flush(options={})
click to toggle source
If blocking is set to true then wait for a response message in the reply_to queue, otherwise just send and go!
# File lib/thrift/amqp/amqp_rpc_client.rb, line 96 def flush(options={}) operation = options.has_key?(:operation) ? options[:operation] : "" blocking = options.has_key?(:blocking) ? options[:blocking] : true msg_timeout = options.has_key?(:msg_timeout) ? options[:msg_timeout] : @timeout log_messages = options.has_key?(:log_messages) ? options[:log_messages] : false correlation_id = self.generate_uuid headers = {:service_name => @service_queue_name, :operation => operation, :response_required => blocking, #Tell the receiver if a response is required :from_name => @from_name } #Publish the message print_log "Publishing message reply-to: #{@reply_queue.name} - headers: #{headers}", correlation_id if log_messages start_time = Time.now @service_exchange.publish(@outbuf, :routing_key => @service_queue_name, :correlation_id => correlation_id, :expiration => msg_timeout, :reply_to => @reply_queue.name, :headers => headers) #If this is a standard RPC blocking call, then wait for there to be a response from the #service provider or timeout and log the timeout if blocking @response = "" begin #Adding 1sec to timeout to account for clock differences Timeout.timeout(msg_timeout + 1, ResponseTimeout) do @reply_queue.subscribe(:block => true) do |delivery_info, properties, payload| if log_messages response_time = Time.now - start_time print_log "---- Response Message received in #{response_time}sec for #{@reply_queue.name}", correlation_id print_log "HEADERS: #{properties}", correlation_id end if properties[:correlation_id] == correlation_id @response = payload #once the return message has been received, no need to continue a subscription delivery_info.consumer.cancel end end end rescue ResponseTimeout => ex #Trying to work around weirdness being seen in a multi threaded workflow environment if @response == "" msg = "A timeout has occurred (#{msg_timeout}sec) trying to call #{@service_queue_name}.#{operation}" print_log msg, correlation_id @outbuf = Bytes.empty_byte_buffer raise ex, msg else print_log "Ignoring timeout - #{@response}", correlation_id end rescue => e @outbuf = Bytes.empty_byte_buffer raise e end @inbuf = StringIO.new Bytes.force_binary_encoding(@response) end @outbuf = Bytes.empty_byte_buffer end
open?()
click to toggle source
# File lib/thrift/amqp/amqp_rpc_client.rb, line 90 def open?; @is_opened end
read(sz)
click to toggle source
# File lib/thrift/amqp/amqp_rpc_client.rb, line 91 def read(sz); @inbuf.read sz end
write(buf)
click to toggle source
# File lib/thrift/amqp/amqp_rpc_client.rb, line 92 def write(buf); @outbuf << Bytes.force_binary_encoding(buf) end
Protected Instance Methods
generate_uuid()
click to toggle source
# File lib/thrift/amqp/amqp_rpc_client.rb, line 165 def generate_uuid UUIDTools::UUID.timestamp_create.to_s end
print_log(message="", correlation_id="")
click to toggle source
# File lib/thrift/amqp/amqp_rpc_client.rb, line 169 def print_log(message="", correlation_id="") puts "#{Time.now.utc} C Thread: #{Thread.current.object_id} CID:#{correlation_id} - #{message}" end