class Appfuel::Service::RpcClient
Attributes
call_id[RW]
channel[R]
condition[R]
config[R]
exchange[R]
lock[R]
reply_queue[R]
response[RW]
Public Instance Methods
close()
click to toggle source
# File lib/appfuel/service/rpc_client.rb, line 30 def close @bunny.close if connected? end
publish(to_queue, action_route, msg, headers = {})
click to toggle source
# File lib/appfuel/service/rpc_client.rb, line 7 def publish(to_queue, action_route, msg, headers = {}) @mutex.synchronize do ensure_connection! unless connected? end self.call_id = SecureRandom.uuid msg = msg.to_json @response = nil params = { routing_key: to_queue, correlation_id: call_id, reply_to: reply_queue.name, content_type: 'application/json', headers: {action_route: action_route}.merge(headers) } exchange.publish(msg, params) lock.synchronize { condition.wait(lock) } result = JSON.parse(@response) Appfuel::ResponseHandler.new.create_response(result) end
Private Instance Methods
ensure_connection!()
click to toggle source
Calls superclass method
# File lib/appfuel/service/rpc_client.rb, line 36 def ensure_connection! super @reply_queue = channel.queue('', exclusive: true) subscribe end
subscribe()
click to toggle source
# File lib/appfuel/service/rpc_client.rb, line 42 def subscribe @lock = Mutex.new @condition = ConditionVariable.new that = self reply_queue.bind(@opts[:exchange], routing_key: reply_queue.name) reply_queue.subscribe do |_delivery_info, properties, payload| if properties[:correlation_id] == that.call_id that.response = payload.to_s that.lock.synchronize { that.condition.signal } else Sneakers.logger.warn "request not found for correlation_id: " + "(#{properties[:correlation_id]}" end end end