class Bosh::Director::NatsRpc
Remote procedure call client wrapping NATS
Public Class Methods
new(nats_uri)
click to toggle source
# File lib/bosh/director/nats_rpc.rb, line 5 def initialize(nats_uri) @nats_uri = nats_uri @logger = Config.logger @lock = Mutex.new @inbox_name = "director.#{Config.process_uuid}" @requests = {} end
Public Instance Methods
cancel_request(request_id)
click to toggle source
Stops listening for a response
# File lib/bosh/director/nats_rpc.rb, line 44 def cancel_request(request_id) @lock.synchronize { @requests.delete(request_id) } end
generate_request_id()
click to toggle source
# File lib/bosh/director/nats_rpc.rb, line 48 def generate_request_id SecureRandom.uuid end
nats()
click to toggle source
Returns a lazily connected NATS client
# File lib/bosh/director/nats_rpc.rb, line 14 def nats @nats ||= connect end
send_message(client, payload)
click to toggle source
Publishes a payload (encoded as JSON) without expecting a response
# File lib/bosh/director/nats_rpc.rb, line 19 def send_message(client, payload) message = JSON.generate(payload) @logger.debug("SENT: #{client} #{message}") EM.schedule do nats.publish(client, message) end end
send_request(client, request, &callback)
click to toggle source
Sends a request (encoded as JSON) and listens for the response
# File lib/bosh/director/nats_rpc.rb, line 28 def send_request(client, request, &callback) request_id = generate_request_id request["reply_to"] = "#{@inbox_name}.#{request_id}" @lock.synchronize do @requests[request_id] = callback end message = JSON.generate(request) @logger.debug("SENT: #{client} #{message}") EM.schedule do subscribe_inbox nats.publish(client, message) end request_id end
Private Instance Methods
connect()
click to toggle source
# File lib/bosh/director/nats_rpc.rb, line 54 def connect # double-check locking to reduce synchronization if @nats.nil? @lock.synchronize do if @nats.nil? @nats = NATS.connect(:uri => @nats_uri, :autostart => false) end end end @nats end
handle_response(message, subject)
click to toggle source
# File lib/bosh/director/nats_rpc.rb, line 82 def handle_response(message, subject) @logger.debug("RECEIVED: #{subject} #{message}") begin request_id = subject.split(".").last callback = @lock.synchronize { @requests.delete(request_id) } if callback message = message.empty? ? nil : JSON.parse(message) callback.call(message) end rescue Exception => e @logger.warn(e.message) end end
subscribe_inbox()
click to toggle source
subscribe to an inbox, if not already subscribed
# File lib/bosh/director/nats_rpc.rb, line 67 def subscribe_inbox # double-check locking to reduce synchronization if @subject_id.nil? # nats lazy-load needs to be outside the synchronized block client = nats @lock.synchronize do if @subject_id.nil? @subject_id = client.subscribe("#{@inbox_name}.>") do |message, _, subject| handle_response(message, subject) end end end end end