class Thrift::AmqpRpcServer

Public Class Methods

new(processor, opts={}) click to toggle source
# File lib/thrift/amqp/amqp_rpc_service.rb, line 28
def initialize(processor, opts={})

  @processor = processor

  if opts[:connection].nil?

    if opts[:host].nil?
      raise ArgumentError, ":host key not provided in opts dict to make connection"
    end

    if opts[:port].nil?
      raise ArgumentError, ":port key not provided in opts dict to make connection"
    end

    vhost = opts[:vhost] || "/"
    user = opts[:user] || "guest"
    password = opts[:password] || "guest"
    ssl = opts[:ssl] || false

    @conn = Bunny.new(:host => opts[:host], :port => opts[:port], :vhost => vhost, :user => user, :password => password, :ssl=> ssl)
    @conn.start
  else
    @conn = opts[:connection]
  end

  #print "service:", @conn, "\n"

  if not opts.has_key?(:queue_name)
    raise ArgumentError, "A service queue name has not been specified"
  end

  @queue_name = opts[:queue_name]
  @protocol_factory = opts[:protocol_factory] || BinaryProtocolFactory
  @exchange = opts[:exchange] || nil

end

Public Instance Methods

close() click to toggle source
# File lib/thrift/amqp/amqp_rpc_service.rb, line 65
def close

  if not @request_channel.nil? and @request_channel.respond_to?('close')
    @request_channel.close
  end

  #Always close the broker connection when closing the server
  @conn.close

end
serve(options={}) click to toggle source
# File lib/thrift/amqp/amqp_rpc_service.rb, line 78
def serve(options={})
  log_messages = options[:log_messages] || false
  max_messages = options[:max_messages].nil? ? 10 : options[:max_messages]
  response_timeout = options[:response_timeout] || 10

  #Create a channel to the service queue
  @request_channel = @conn.create_channel(nil, max_messages )

  if @exchange.nil?
    @service_exchange = @request_channel.default_exchange
    @request_queue = @request_channel.queue(@queue_name, :auto_delete => true)
  else
    @service_exchange = @request_channel.direct(@exchange,:durable => true)
    @request_queue = @request_channel.queue(@queue_name, :auto_delete => true).bind(@service_exchange, :routing_key => @queue_name)
  end

  @request_queue.subscribe(:block => true) do |delivery_info, properties, payload|
    if log_messages
      Thread.current["correlation_id"] = properties.correlation_id
      print_log "---- Message received ----"
      print_log "HEADERS: #{properties}"
    end

    Thread.current["correlation_id"] = properties.correlation_id

    response_channel = @conn.create_channel
    response_exchange = response_channel.default_exchange

    response_required = (properties.headers && properties.headers.has_key?('response_required')) ? properties.headers['response_required'] : true
    process_timeout = response_timeout.to_i > properties.expiration.to_i ? response_timeout.to_i : properties.expiration.to_i


    print_log "Request to process #{@queue_name}.#{properties.headers['operation']} in #{process_timeout}sec" if log_messages

    input = StringIO.new payload
    out = StringIO.new
    transport = IOStreamTransport.new input, out
    protocol = @protocol_factory.new.get_protocol transport

    begin
      start_time = Time.now
      Timeout.timeout(process_timeout, ProcessingTimeout) do
        @processor.process protocol, protocol
      end
      processing_time = Time.now - start_time

      #rewind the buffer for reading
      if out.length > 0
        out.rewind

        print_log "Time to process request: #{processing_time}sec  Response length: #{out.length}"   if log_messages

        if response_required
          response_exchange.publish(out.read(out.length),
                                    :routing_key => properties.reply_to,
                                    :correlation_id => properties.correlation_id,
                                    :content_type => 'application/octet-stream' )
        end
      end

    rescue ProcessingTimeout => ex
      print_log "A timeout has occurred (#{process_timeout}sec) trying to call #{@queue_name}.#{properties.headers['operation']}"
    end

    response_channel.close


  end
end

Private Instance Methods

print_log(message="") click to toggle source