class Vx::Lib::Consumer::Rpc::RpcClient

Constants

REP
REQ

Attributes

consumer[R]

Public Class Methods

new(consumer) click to toggle source
# File lib/vx/lib/consumer/rpc.rb, line 71
def initialize(consumer)
  @consumer = consumer
  @consumed = false
  @await    = {}
  @mutex    = Mutex.new
  @wakeup   = Mutex.new
end

Public Instance Methods

call(ns, method, params, options = {}) click to toggle source
# File lib/vx/lib/consumer/rpc.rb, line 123
def call(ns, method, params, options = {})
  timeout     = options[:timeout] || 3
  routing_key = options[:routing_key] || "vx.rpc.#{ns}".freeze
  call_id     = SecureRandom.uuid
  cond        = ConditionVariable.new
  result      = nil

  message = {
    method: method.to_s,
    params: params,
    id:     call_id
  }

  with_queue do |q|

    consumer.session.with_pub_channel do |ch|
      exch = ch.exchange RPC_EXCHANGE_NAME

      env = {
        payload:     message,
        rpc:         REQ,
        exchange:    exch.name,
        consumer:    consumer.params.consumer_name,
        properties:  { routing_key: routing_key, correlation_id: call_id },
        channel:     ch.id
      }

      @mutex.synchronize { @await[call_id] = cond }

      consumer.with_middlewares :pub, env do
        exch.publish(
          message.to_json,
          routing_key:    routing_key,
          correlation_id: call_id,
          reply_to:       q.name,
          content_type:   JSON_CONTENT_TYPE
        )
      end

      @wakeup.synchronize{
        cond.wait(@wakeup, timeout)
      }
      @mutex.synchronize do
        _, payload = @await.delete(call_id)
        if payload
          result = payload[RPC_PAYLOAD_RESULT]
        else
          nil
        end
      end
    end
  end

  result

end
cancel() click to toggle source
# File lib/vx/lib/consumer/rpc.rb, line 189
def cancel
  if subscriber?
    @subscriber.cancel
    @subscriber = nil
    @consumed   = false
  end
end
consume() click to toggle source
# File lib/vx/lib/consumer/rpc.rb, line 79
def consume
  return if @consumed

  ch = consumer.session.conn.create_channel
  consumer.session.assign_error_handlers_to_channel(ch)

  @q = ch.queue(RPC_EXCHANGE_NAME, exclusive: true)

  @subscriber =
    @q.subscribe do |delivery_info, properties, payload|
      handle_delivery ch, properties, payload
    end
  @consumed = true
end
handle_delivery(ch, properties, payload) click to toggle source
# File lib/vx/lib/consumer/rpc.rb, line 94
def handle_delivery(ch, properties, payload)

  if payload
    payload = ::JSON.parse(payload)
  end

  env = {
    consumer:   consumer.params.consumer_name,
    queue:      @q.name,
    rpc:        REP,
    channel:    ch.id,
    payload:    payload,
    properties: properties
  }

  consumer.with_middlewares :sub, env do
    call_id = properties[:correlation_id]
    c = @mutex.synchronize{ @await.delete(call_id) }
    if c
      @mutex.synchronize do
        @await[call_id] = [properties, payload]
      end
      @wakeup.synchronize do
        c.signal
      end
    end
  end
end
subscriber?() click to toggle source
# File lib/vx/lib/consumer/rpc.rb, line 185
def subscriber?
  !!@subscriber
end
with_queue() { |q| ... } click to toggle source
# File lib/vx/lib/consumer/rpc.rb, line 180
def with_queue
  consume
  yield @q
end