class Bosh::Director::NatsRpc

Remote procedure call client wrapping NATS

Public Class Methods

new(nats_uri) click to toggle source
# File lib/bosh/director/nats_rpc.rb, line 5
def initialize(nats_uri)
  @nats_uri = nats_uri
  @logger = Config.logger
  @lock = Mutex.new
  @inbox_name = "director.#{Config.process_uuid}"
  @requests = {}
end

Public Instance Methods

cancel_request(request_id) click to toggle source

Stops listening for a response

# File lib/bosh/director/nats_rpc.rb, line 44
def cancel_request(request_id)
  @lock.synchronize { @requests.delete(request_id) }
end
generate_request_id() click to toggle source
# File lib/bosh/director/nats_rpc.rb, line 48
def generate_request_id
  SecureRandom.uuid
end
nats() click to toggle source

Returns a lazily connected NATS client

# File lib/bosh/director/nats_rpc.rb, line 14
def nats
  @nats ||= connect
end
send_message(client, payload) click to toggle source

Publishes a payload (encoded as JSON) without expecting a response

# File lib/bosh/director/nats_rpc.rb, line 19
def send_message(client, payload)
  message = JSON.generate(payload)
  @logger.debug("SENT: #{client} #{message}")
  EM.schedule do
    nats.publish(client, message)
  end
end
send_request(client, request, &callback) click to toggle source

Sends a request (encoded as JSON) and listens for the response

# File lib/bosh/director/nats_rpc.rb, line 28
def send_request(client, request, &callback)
  request_id = generate_request_id
  request["reply_to"] = "#{@inbox_name}.#{request_id}"
  @lock.synchronize do
    @requests[request_id] = callback
  end
  message = JSON.generate(request)
  @logger.debug("SENT: #{client} #{message}")
  EM.schedule do
    subscribe_inbox
    nats.publish(client, message)
  end
  request_id
end

Private Instance Methods

connect() click to toggle source
# File lib/bosh/director/nats_rpc.rb, line 54
def connect
  # double-check locking to reduce synchronization
  if @nats.nil?
    @lock.synchronize do
      if @nats.nil?
        @nats = NATS.connect(:uri => @nats_uri, :autostart => false)
      end
    end
  end
  @nats
end
handle_response(message, subject) click to toggle source
# File lib/bosh/director/nats_rpc.rb, line 82
def handle_response(message, subject)
  @logger.debug("RECEIVED: #{subject} #{message}")
  begin
    request_id = subject.split(".").last
    callback = @lock.synchronize { @requests.delete(request_id) }
    if callback
      message = message.empty? ? nil : JSON.parse(message)
      callback.call(message)
    end
  rescue Exception => e
    @logger.warn(e.message)
  end
end
subscribe_inbox() click to toggle source

subscribe to an inbox, if not already subscribed

# File lib/bosh/director/nats_rpc.rb, line 67
def subscribe_inbox
  # double-check locking to reduce synchronization
  if @subject_id.nil?
    # nats lazy-load needs to be outside the synchronized block
    client = nats
    @lock.synchronize do
      if @subject_id.nil?
        @subject_id = client.subscribe("#{@inbox_name}.>") do |message, _, subject|
          handle_response(message, subject)
        end
      end
    end
  end
end