class Appfuel::Service::RpcClient

Attributes

call_id[RW]
channel[R]
condition[R]
config[R]
exchange[R]
lock[R]
reply_queue[R]
response[RW]

Public Instance Methods

close() click to toggle source
# File lib/appfuel/service/rpc_client.rb, line 30
def close
  @bunny.close if connected?
end
publish(to_queue, action_route, msg, headers = {}) click to toggle source
# File lib/appfuel/service/rpc_client.rb, line 7
def publish(to_queue, action_route, msg, headers = {})
  @mutex.synchronize do
    ensure_connection! unless connected?
  end

  self.call_id = SecureRandom.uuid
  msg          = msg.to_json
  @response    = nil
  params       = {
    routing_key:    to_queue,
    correlation_id: call_id,
    reply_to:       reply_queue.name,
    content_type:   'application/json',
    headers:        {action_route: action_route}.merge(headers)
  }

  exchange.publish(msg, params)
  lock.synchronize { condition.wait(lock) }

  result = JSON.parse(@response)
  Appfuel::ResponseHandler.new.create_response(result)
end

Private Instance Methods

ensure_connection!() click to toggle source
Calls superclass method
# File lib/appfuel/service/rpc_client.rb, line 36
def ensure_connection!
  super
  @reply_queue = channel.queue('', exclusive: true)
  subscribe
end
subscribe() click to toggle source
# File lib/appfuel/service/rpc_client.rb, line 42
def subscribe
  @lock      = Mutex.new
  @condition = ConditionVariable.new
  that       = self

  reply_queue.bind(@opts[:exchange], routing_key: reply_queue.name)

  reply_queue.subscribe do |_delivery_info, properties, payload|
    if properties[:correlation_id] == that.call_id
      that.response = payload.to_s
      that.lock.synchronize { that.condition.signal }
    else
      Sneakers.logger.warn "request not found for correlation_id: " +
                           "(#{properties[:correlation_id]}"
    end
  end
end