class Vx::Lib::Consumer::Rpc::RpcClient
Constants
- REP
- REQ
Attributes
consumer[R]
Public Class Methods
new(consumer)
click to toggle source
# File lib/vx/lib/consumer/rpc.rb, line 71 def initialize(consumer) @consumer = consumer @consumed = false @await = {} @mutex = Mutex.new @wakeup = Mutex.new end
Public Instance Methods
call(ns, method, params, options = {})
click to toggle source
# File lib/vx/lib/consumer/rpc.rb, line 123 def call(ns, method, params, options = {}) timeout = options[:timeout] || 3 routing_key = options[:routing_key] || "vx.rpc.#{ns}".freeze call_id = SecureRandom.uuid cond = ConditionVariable.new result = nil message = { method: method.to_s, params: params, id: call_id } with_queue do |q| consumer.session.with_pub_channel do |ch| exch = ch.exchange RPC_EXCHANGE_NAME env = { payload: message, rpc: REQ, exchange: exch.name, consumer: consumer.params.consumer_name, properties: { routing_key: routing_key, correlation_id: call_id }, channel: ch.id } @mutex.synchronize { @await[call_id] = cond } consumer.with_middlewares :pub, env do exch.publish( message.to_json, routing_key: routing_key, correlation_id: call_id, reply_to: q.name, content_type: JSON_CONTENT_TYPE ) end @wakeup.synchronize{ cond.wait(@wakeup, timeout) } @mutex.synchronize do _, payload = @await.delete(call_id) if payload result = payload[RPC_PAYLOAD_RESULT] else nil end end end end result end
cancel()
click to toggle source
# File lib/vx/lib/consumer/rpc.rb, line 189 def cancel if subscriber? @subscriber.cancel @subscriber = nil @consumed = false end end
consume()
click to toggle source
# File lib/vx/lib/consumer/rpc.rb, line 79 def consume return if @consumed ch = consumer.session.conn.create_channel consumer.session.assign_error_handlers_to_channel(ch) @q = ch.queue(RPC_EXCHANGE_NAME, exclusive: true) @subscriber = @q.subscribe do |delivery_info, properties, payload| handle_delivery ch, properties, payload end @consumed = true end
handle_delivery(ch, properties, payload)
click to toggle source
# File lib/vx/lib/consumer/rpc.rb, line 94 def handle_delivery(ch, properties, payload) if payload payload = ::JSON.parse(payload) end env = { consumer: consumer.params.consumer_name, queue: @q.name, rpc: REP, channel: ch.id, payload: payload, properties: properties } consumer.with_middlewares :sub, env do call_id = properties[:correlation_id] c = @mutex.synchronize{ @await.delete(call_id) } if c @mutex.synchronize do @await[call_id] = [properties, payload] end @wakeup.synchronize do c.signal end end end end
subscriber?()
click to toggle source
# File lib/vx/lib/consumer/rpc.rb, line 185 def subscriber? !!@subscriber end
with_queue() { |q| ... }
click to toggle source
# File lib/vx/lib/consumer/rpc.rb, line 180 def with_queue consume yield @q end