module NATS

Copyright 2010-2018 The NATS Authors Licensed under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at

www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Constants

DEFAULT_DRAIN_TIMEOUT

Drain mode support

DEFAULT_PING_INTERVAL

Ping intervals

DEFAULT_PING_MAX
DEFAULT_PORT
DEFAULT_URI
FAST_PRODUCER_THRESHOLD

Maximum outbound size per client to trigger FP, 20MB

LANG
MAX_PENDING_SIZE
MAX_RECONNECT_ATTEMPTS
PROTOCOL_VERSION
RECONNECT_TIME_WAIT
VERSION

NOTE: These are all announced to the server on CONNECT

Attributes

close_cb[R]
disconnect_cb[R]
reconnect_cb[R]
bytes_received[R]
bytes_sent[R]
close_cb[R]
closing[R]
closing?[R]
disconnect_cb[R]
draining[R]
draining?[R]
msgs_received[R]
msgs_sent[R]
options[R]
pings[R]
reconnecting[R]
reconnecting?[R]
server_info[R]
server_pool[R]

Public Class Methods

connect(uri=nil, opts={}, &blk) click to toggle source

Create and return a connection to the server with the given options. The optional block will be called when the connection has been completed.

@param [String] uri The URI or comma separated list of URIs of NATS servers to connect to. @param [Hash] opts @option opts [String|URI] :uri The URI to connect to, example nats://localhost:4222 @option opts [Boolean] :reconnect Boolean that can be used to suppress reconnect functionality. @option opts [Boolean] :debug Boolean that can be used to output additional debug information. @option opts [Boolean] :verbose Boolean that is sent to server for setting verbose protocol mode. @option opts [Boolean] :pedantic Boolean that is sent to server for setting pedantic mode. @option opts [Boolean] :ssl Boolean that is sent to server for setting TLS/SSL mode. @option opts [Hash] :tls Map of options for configuring secure connection handled to EM#start_tls directly. @option opts [Integer] :max_reconnect_attempts Integer that can be used to set the max number of reconnect tries @option opts [Integer] :reconnect_time_wait Integer that can be used to set the number of seconds to wait between reconnect tries @option opts [Integer] :ping_interval Integer that can be used to set the ping interval in seconds. @option opts [Integer] :max_outstanding_pings Integer that can be used to set the max number of outstanding pings before declaring a connection closed. @param [Block] &blk called when the connection is completed. Connection will be passed to the block. @return [NATS] connection to the server.

@example Connect to local NATS server.

NATS.connect do |nc|
  # ...
end

@example Setting custom server URI to connect.

NATS.connect("nats://localhost:4222") do |nc|
  # ...
end

@example Setting username and password to authenticate.

NATS.connect("nats://user:password@localhost:4222") do |nc|
  # ...
end

@example Specifying explicit list of servers via options.

NATS.connect(servers: ["nats://127.0.0.1:4222","nats://127.0.0.1:4223","nats://127.0.0.1:4224"]) do |nc|
  # ...
end

@example Using comma separated array to define list of servers.

NATS.connect("nats://localhost:4223,nats://localhost:4224") do |nc|
  # ...
end

@example Only specifying endpoint uses NATS default scheme and port.

NATS.connect("demo.nats.io") do |nc|
  # ...
end

@example Setting infinite reconnect retries with 2 seconds back off against custom URI.

NATS.connect("demo.nats.io:4222", max_reconnect_attempts: -1, reconnect_time_wait: 2) do |nc|
  # ...
end
# File lib/nats/client.rb, line 151
def connect(uri=nil, opts={}, &blk)
  case uri
  when String
    # Initialize TLS defaults in case any url is using it.
    uris = opts[:uri] = process_uri(uri)
    opts[:tls] ||= {} if uris.any? {|u| u.scheme == 'tls'}
  when Hash
    opts = uri
  end

  # Defaults
  opts[:verbose] = false if opts[:verbose].nil?
  opts[:pedantic] = false if opts[:pedantic].nil?
  opts[:reconnect] = true if opts[:reconnect].nil?
  opts[:ssl] = false if opts[:ssl].nil?
  opts[:max_reconnect_attempts] = MAX_RECONNECT_ATTEMPTS if opts[:max_reconnect_attempts].nil?
  opts[:reconnect_time_wait] = RECONNECT_TIME_WAIT if opts[:reconnect_time_wait].nil?
  opts[:ping_interval] = DEFAULT_PING_INTERVAL if opts[:ping_interval].nil?
  opts[:max_outstanding_pings] = DEFAULT_PING_MAX if opts[:max_outstanding_pings].nil?
  opts[:drain_timeout] = DEFAULT_DRAIN_TIMEOUT if opts[:drain_timeout].nil?

  # Override with ENV
  opts[:uri] ||= ENV['NATS_URI'] || DEFAULT_URI
  opts[:verbose] = ENV['NATS_VERBOSE'].downcase == 'true' unless ENV['NATS_VERBOSE'].nil?
  opts[:pedantic] = ENV['NATS_PEDANTIC'].downcase == 'true' unless ENV['NATS_PEDANTIC'].nil?
  opts[:debug] = ENV['NATS_DEBUG'].downcase == 'true' unless ENV['NATS_DEBUG'].nil?
  opts[:reconnect] = ENV['NATS_RECONNECT'].downcase == 'true' unless ENV['NATS_RECONNECT'].nil?
  opts[:fast_producer_error] = ENV['NATS_FAST_PRODUCER'].downcase == 'true' unless ENV['NATS_FAST_PRODUCER'].nil?
  opts[:ssl] = ENV['NATS_SSL'].downcase == 'true' unless ENV['NATS_SSL'].nil?
  opts[:max_reconnect_attempts] = ENV['NATS_MAX_RECONNECT_ATTEMPTS'].to_i unless ENV['NATS_MAX_RECONNECT_ATTEMPTS'].nil?
  opts[:reconnect_time_wait] = ENV['NATS_RECONNECT_TIME_WAIT'].to_i unless ENV['NATS_RECONNECT_TIME_WAIT'].nil?
  opts[:name] ||= ENV['NATS_CONNECTION_NAME']
  opts[:no_echo] ||= ENV['NATS_NO_ECHO'] || false
  opts[:ping_interval] = ENV['NATS_PING_INTERVAL'].to_i unless ENV['NATS_PING_INTERVAL'].nil?
  opts[:max_outstanding_pings] = ENV['NATS_MAX_OUTSTANDING_PINGS'].to_i unless ENV['NATS_MAX_OUTSTANDING_PINGS'].nil?
  opts[:drain_timeout] ||= ENV['NATS_DRAIN_TIMEOUT'].to_i unless ENV['NATS_DRAIN_TIMEOUT'].nil?

  uri = opts[:uris] || opts[:servers] || opts[:uri]

  if opts[:tls]
    case
    when opts[:tls][:ca_file]
      # Ensure that the file exists before going further
      # in order to report configuration errors during
      # connect synchronously.
      if !File.readable?(opts[:tls][:ca_file])
        raise(Error, "TLS Verification is enabled but ca_file %s is not readable" % opts[:tls][:ca_file])
      end

      # Certificate is supplied so assume we mean verification by default,
      # but still allow disabling explicitly by setting to false.
      opts[:tls][:verify_peer] ||= true
    when (opts[:tls][:verify_peer] && !opts[:tls][:ca_file])
      raise(Error, "TLS Verification is enabled but ca_file is not set")
    else
      # Otherwise, disable verifying peer by default,
      # thus never reaching EM#ssl_verify_peer
      opts[:tls][:verify_peer] = false
    end

    # Allow overriding directly but default to those which server supports.
    opts[:tls][:ssl_version] ||= %w(tlsv1 tlsv1_1 tlsv1_2)
    opts[:tls][:protocols]   ||= %w(tlsv1 tlsv1_1 tlsv1_2)
  end

  # If they pass an array here just pass along to the real connection, and use first as the first attempt..
  # Real connection will do proper walk throughs etc..
  unless uri.nil?
    uris = uri.kind_of?(Array) ? uri : [uri]
    uris.shuffle! unless opts[:dont_randomize_servers]
    u = uris.first
    @uri = u.is_a?(URI) ? u.dup : URI.parse(u)
  end

  @err_cb = proc { |e| raise e } unless err_cb
  @close_cb = proc { } unless close_cb
  @disconnect_cb = proc { } unless disconnect_cb

  client = EM.connect(@uri.host, @uri.port, self, opts)
  client.on_connect(&blk) if blk
  return client
end
connected?() click to toggle source

@return [Boolean] Connected state

# File lib/nats/client.rb, line 282
def connected?
  return false unless client
  client.connected?
end
connected_server() click to toggle source

@return [URI] Connected server

# File lib/nats/client.rb, line 276
def connected_server
  return nil unless client
  client.connected_server
end
create_inbox() click to toggle source

Returns a subject that can be used for “directed” communications. @return [String]

# File lib/nats/client.rb, line 370
def create_inbox
  "_INBOX.#{SecureRandom.hex(13)}"
end
drain(&blk) click to toggle source

Drain gracefully disconnects from the server, letting subscribers process pending messages already sent by server and optionally calls the associated block. @param [Block] &blk called when drain is done and connection is closed.

# File lib/nats/client.rb, line 269
def drain(&blk)
  if (client and !client.draining? and (client.connected? || client.reconnecting?))
    client.drain { blk.call if blk }
  end
end
draining?() click to toggle source

@return [Boolean] Draining state

# File lib/nats/client.rb, line 294
def draining?
  return false unless client
  client.draining?
end
flush(*args, &blk) click to toggle source

Flushes all messages and subscriptions in the default connection @see NATS#flush

# File lib/nats/client.rb, line 376
def flush(*args, &blk)
  (@client ||= connect).flush(*args, &blk)
end
new(options) click to toggle source
# File lib/nats/client.rb, line 456
def initialize(options)
  @options = options
  process_uri_options

  @buf = nil
  @ssid, @subs = 1, {}
  @err_cb = NATS.err_cb
  @close_cb = NATS.close_cb
  @reconnect_cb = NATS.reconnect_cb
  @disconnect_cb = NATS.disconnect_cb
  @reconnect_timer, @needed = nil, nil
  @connected, @closing, @reconnecting, @conn_cb_called = false, false, false, false
  @msgs_received = @msgs_sent = @bytes_received = @bytes_sent = @pings = 0
  @pending_size = 0
  @server_info = { }

  # Mark whether we should be connecting securely, try best effort
  # in being compatible with present ssl support.
  @ssl = false
  @tls = nil
  @tls = options[:tls] if options[:tls]
  @ssl = options[:ssl] if options[:ssl] or @tls

  # New style request/response implementation.
  @resp_sub = nil
  @resp_map = nil
  @resp_sub_prefix = nil
  @nuid = NATS::NUID.new

  # Drain mode
  @draining = false
  @drained_subs = false

  # NKEYS
  @user_credentials = options[:user_credentials] if options[:user_credentials]
  @nkeys_seed = options[:nkeys_seed] if options[:nkeys_seed]
  @user_nkey_cb = nil
  @user_jwt_cb = nil
  @signature_cb = nil

  # NKEYS
  setup_nkeys_connect if @user_credentials or @nkeys_seed
end
on_close(&callback) click to toggle source

Set the default on_closed callback. @param [Block] &callback called when will reach a state when will no longer be connected.

# File lib/nats/client.rb, line 333
def on_close(&callback)
  @close_cb = callback
  @client.on_close(&callback) unless @client.nil?
end
on_disconnect(&callback) click to toggle source

Set the default on_disconnect callback. @param [Block] &callback called whenever client disconnects from a server.

# File lib/nats/client.rb, line 326
def on_disconnect(&callback)
  @disconnect_cb = callback
  @client.on_disconnect(&callback) unless @client.nil?
end
on_error(&callback) click to toggle source

Set the default on_error callback. @param [Block] &callback called when an error has been detected.

# File lib/nats/client.rb, line 313
def on_error(&callback)
  @err_cb, @err_cb_overridden = callback, true
end
on_reconnect(&callback) click to toggle source

Set the default on_reconnect callback. @param [Block] &callback called when a reconnect attempt is made.

# File lib/nats/client.rb, line 319
def on_reconnect(&callback)
  @reconnect_cb = callback
  @client.on_reconnect(&callback) unless @client.nil?
end
options() click to toggle source

@return [Hash] Options

# File lib/nats/client.rb, line 300
def options
  return {} unless client
  client.options
end
pending_data_size(*args) click to toggle source

Return bytes outstanding for the default client connection. @see NATS#pending_data_size

# File lib/nats/client.rb, line 382
def pending_data_size(*args)
  (@client ||= connect).pending_data_size(*args)
end
publish(*args, &blk) click to toggle source

Publish a message using the default client connection. @see NATS#publish

# File lib/nats/client.rb, line 340
def publish(*args, &blk)
  (@client ||= connect).publish(*args, &blk)
end
reconnecting?() click to toggle source

@return [Boolean] Reconnecting state

# File lib/nats/client.rb, line 288
def reconnecting?
  return false unless client
  client.reconnecting?
end
request(*args, &blk) click to toggle source

Publish a message and wait for a response on the default client connection. @see NATS#request

# File lib/nats/client.rb, line 364
def request(*args, &blk)
  (@client ||= connect).request(*args, &blk)
end
server_info() click to toggle source

@return [Hash] Server information

# File lib/nats/client.rb, line 306
def server_info
  return nil unless client
  client.server_info
end
start(*args, &blk) click to toggle source

Create a default client connection to the server. @see NATS::connect

# File lib/nats/client.rb, line 236
def start(*args, &blk)
  @reactor_was_running = EM.reactor_running?
  unless (@reactor_was_running || blk)
    raise(Error, "EM needs to be running when NATS.start is called without a run block")
  end
  # Setup optimized select versions
  if EM.epoll?
    EM.epoll
  elsif EM.kqueue?
    EM.kqueue
  elsif EM.library_type == :java
    # No warning needed, we're using Java NIO
  else
    Kernel.warn('Neither epoll nor kqueue are supported, performance may be impacted')
  end
  EM.run { @client = connect(*args, &blk) }
end
stop(&blk) click to toggle source

Close the default client connection and optionally call the associated block. @param [Block] &blk called when the connection is closed.

# File lib/nats/client.rb, line 256
def stop(&blk)
  client.close if (client and (client.connected? || client.reconnecting?))
  blk.call if blk
  @err_cb = nil
  @close_cb = nil
  @reconnect_cb = nil
  @disconnect_cb = nil
end
subscribe(*args, &blk) click to toggle source

Subscribe using the default client connection. @see NATS#subscribe

# File lib/nats/client.rb, line 346
def subscribe(*args, &blk)
  (@client ||= connect).subscribe(*args, &blk)
end
timeout(*args, &blk) click to toggle source

Set a timeout for receiving messages for the subscription. @see NATS#timeout

# File lib/nats/client.rb, line 358
def timeout(*args, &blk)
  (@client ||= connect).timeout(*args, &blk)
end
unsubscribe(*args) click to toggle source

Cancel a subscription on the default client connection. @see NATS#unsubscribe

# File lib/nats/client.rb, line 352
def unsubscribe(*args)
  (@client ||= connect).unsubscribe(*args)
end

Private Class Methods

process_uri(uris) click to toggle source
# File lib/nats/client.rb, line 413
def process_uri(uris)
  connect_uris = []
  uris.split(',').each do |uri|
    opts = {}

    # Scheme
    if uri.include?("://")
      scheme, uri = uri.split("://")
      opts[:scheme] = scheme
    else
      opts[:scheme] = 'nats'
    end

    # UserInfo
    if uri.include?("@")
      userinfo, endpoint = uri.split("@")
      host, port = endpoint.split(":")
      opts[:userinfo] = userinfo
    else
      host, port = uri.split(":")
    end

    # Host and Port
    opts[:host] = host || "localhost"
    opts[:port] = port || DEFAULT_PORT

    connect_uris << URI::Generic.build(opts)
  end

  connect_uris
end
uri_is_remote?(uri) click to toggle source
# File lib/nats/client.rb, line 409
def uri_is_remote?(uri)
  uri.host != 'localhost' && uri.host != '127.0.0.1'
end

Public Instance Methods

auth_connection?() click to toggle source
# File lib/nats/client.rb, line 784
def auth_connection?
  !@uri.user.nil? || @options[:token] || @server_info[:auth_required]
end
cancel_ping_timer() click to toggle source
# File lib/nats/client.rb, line 1039
def cancel_ping_timer
  if @ping_timer
    EM.cancel_timer(@ping_timer)
    @ping_timer = nil
  end
end
cancel_reconnect_timer() click to toggle source
# File lib/nats/client.rb, line 1169
def cancel_reconnect_timer
  if @reconnect_timer
    EM.cancel_timer(@reconnect_timer)
    @reconnect_timer = nil
  end
end
client_using_secure_connection?() click to toggle source
# File lib/nats/client.rb, line 1019
def client_using_secure_connection?
  @tls || @ssl
end
close() click to toggle source

Close the connection to the server.

# File lib/nats/client.rb, line 767
def close
  @closing = true
  cancel_ping_timer
  cancel_reconnect_timer
  close_connection_after_writing if connected?
  process_disconnect if reconnecting?
end
connected_server() click to toggle source

@return [URI] Connected server

# File lib/nats/client.rb, line 1331
def connected_server
  connected? ? @uri : nil
end
disconnect_error_string() click to toggle source
# File lib/nats/client.rb, line 1176
def disconnect_error_string
  return "Client disconnected from server on #{@uri}" if @connected
  return "Could not connect to server on #{@uri}"
end
discovered_servers() click to toggle source

Retrieves the list of servers which have been discovered via server connect_urls announcements

# File lib/nats/client.rb, line 1337
def discovered_servers
  server_pool.select {|s| s[:discovered] }
end
drain(&blk) click to toggle source

Drain gracefully closes the connection. @param [Block] blk called when drain is done and connection is closed.

# File lib/nats/client.rb, line 551
def drain(&blk)
  return if draining? or closing?
  @draining = true

  # Remove interest in all subjects to stop receiving messages.
  @subs.each do |sid, _|
    send_command("UNSUB #{sid} #{CR_LF}")
  end

  # Roundtrip to ensure no more messages are received.
  flush do
    drain_timeout_timer, draining_timer = nil, nil
    drain_timeout_timer = EM.add_timer(options[:drain_timeout]) do
      EM.cancel_timer(draining_timer)

      # Report the timeout via the error callback and just close
      err_cb.call(NATS::ClientError.new("Drain Timeout"))
      @draining = false
      close unless closing?
      blk.call if blk
    end

    # Periodically check for the pending data to be empty.
    draining_timer = EM.add_periodic_timer(0.1) do
      next unless closing? or @buf.nil? or @buf.empty?

      # Subscriptions have been drained already so disallow publishing.
      @drained_subs = true
      next unless pending_data_size == 0
      EM.cancel_timer(draining_timer)
      EM.cancel_timer(drain_timeout_timer)

      # We're done draining and can close now.
      @draining = false
      close unless closing?
      blk.call if blk
    end
  end
end
flush(&blk) click to toggle source

Flushes all messages and subscriptions for the connection. All messages and subscriptions have been processed by the server when the optional callback is called.

# File lib/nats/client.rb, line 732
def flush(&blk)
  queue_server_rt(&blk) if blk
end
had_error?() click to toggle source
# File lib/nats/client.rb, line 1161
def had_error?
  server_pool.first && server_pool.first[:error_received]
end
multiple_servers_available?() click to toggle source
# File lib/nats/client.rb, line 1157
def multiple_servers_available?
  server_pool && server_pool.size > 1
end
on_close(&callback) click to toggle source

Define a callback to be called when client is disconnected from server. @param [Block] &callback called when will reach a state when will no longer be connected.

# File lib/nats/client.rb, line 762
def on_close(&callback)
  @close_cb = callback
end
on_connect(&callback) click to toggle source

Define a callback to be called when the client connection has been established. @param [Block] callback

# File lib/nats/client.rb, line 738
def on_connect(&callback)
  @connect_cb = callback
end
on_disconnect(&callback) click to toggle source

Define a callback to be called when client is disconnected from server. @param [Block] &callback called whenever client disconnects from a server.

# File lib/nats/client.rb, line 756
def on_disconnect(&callback)
  @disconnect_cb = callback
end
on_error(&callback) click to toggle source

Define a callback to be called when errors occur on the client connection. @param [Block] &callback called when an error has been detected.

# File lib/nats/client.rb, line 744
def on_error(&callback)
  @err_cb, @err_cb_overridden = callback, true
end
on_reconnect(&callback) click to toggle source

Define a callback to be called when a reconnect attempt is made. @param [Block] &callback called when a reconnect attempt is made.

# File lib/nats/client.rb, line 750
def on_reconnect(&callback)
  @reconnect_cb = callback
end
pending_data_size() click to toggle source

Return bytes outstanding waiting to be sent to server.

# File lib/nats/client.rb, line 776
def pending_data_size
  get_outbound_data_size + @pending_size
end
process_pong() click to toggle source
# File lib/nats/client.rb, line 1117
def process_pong
  @pongs_received += 1
  @pings_outstanding -= 1
end
process_uri_options() click to toggle source

Parse out URIs which can now be an array of server choices The server pool will contain both explicit and implicit members.

# File lib/nats/client.rb, line 1322
def process_uri_options #:nodoc
  @server_pool = []
  uri = options[:uris] || options[:servers] || options[:uri]
  uri = uri.kind_of?(Array) ? uri : [uri]
  uri.each { |u| server_pool << { :uri => u.is_a?(URI) ? u.dup : URI.parse(u) } }
  bind_primary
end
publish(subject, msg=EMPTY_MSG, opt_reply=nil, &blk) click to toggle source

Publish a message to a given subject, with optional reply subject and completion block @param [String] subject @param [Object, to_s] msg @param [String] opt_reply @param [Block] blk, closure called when publish has been processed by the server.

# File lib/nats/client.rb, line 505
def publish(subject, msg=EMPTY_MSG, opt_reply=nil, &blk)
  return unless subject and not @drained_subs
  msg = msg.to_s

  # Accounting
  @msgs_sent += 1
  @bytes_sent += msg.bytesize if msg

  send_command("PUB #{subject} #{opt_reply} #{msg.bytesize}#{CR_LF}#{msg}#{CR_LF}")
  queue_server_rt(&blk) if blk
end
request(subject, data=nil, opts={}, &cb) click to toggle source

Send a request and have the response delivered to the supplied callback. @param [String] subject @param [Object] msg @param [Block] callback @return [Object] sid

# File lib/nats/client.rb, line 623
def request(subject, data=nil, opts={}, &cb)
  return unless subject

  # In case of using async request then fallback to auto unsubscribe
  # based request/response and not break compatibility too much since
  # new request/response style can only be used with fibers.
  if cb
    inbox = "_INBOX.#{@nuid.next}"
    s = subscribe(inbox, opts) { |msg, reply|
      case cb.arity
      when 0 then cb.call
      when 1 then cb.call(msg)
      else cb.call(msg, reply)
      end
    }
    publish(subject, data, inbox)
    return s
  end

  # If this is the first request being made, then need to start
  # the responses mux handler that handles the responses.
  start_resp_mux_sub! unless @resp_sub_prefix

  # Generate unique token for the reply subject.
  token = @nuid.next
  inbox = "#{@resp_sub_prefix}.#{token}"

  # Synchronous request/response requires using a Fiber
  # to be able to await the response.
  f = Fiber.current
  @resp_map[token][:fiber] = f

  # If awaiting more than a single response then use array
  # to include all that could be gathered before the deadline.
  expected = opts[:max] ||= 1
  @resp_map[token][:expected] = expected
  @resp_map[token][:msgs] = [] if expected > 1

  # Announce the request with the inbox using the token.
  publish(subject, data, inbox)

  # If deadline expires, then discard the token and resume fiber
  opts[:timeout] ||= 0.5
  t = EM.add_timer(opts[:timeout]) do
    if expected > 1
      f.resume @resp_map[token][:msgs]
    else
      f.resume
    end

    @resp_map.delete(token)
  end

  # Wait for the response and cancel timeout callback if received.
  if expected > 1
    # Wait to receive all replies that can get before deadline.
    msgs = Fiber.yield
    EM.cancel_timer(t)

    # Slice and throwaway responses that are not needed.
    return msgs.slice(0, expected)
  else
    msg = Fiber.yield
    EM.cancel_timer(t)
    return msg
  end
end
server_using_secure_connection?() click to toggle source
# File lib/nats/client.rb, line 1023
def server_using_secure_connection?
  @server_info[:ssl_required] || @server_info[:tls_required]
end
setup_nkeys_connect() click to toggle source
# File lib/nats/client.rb, line 1242
def setup_nkeys_connect
  begin
    require 'nkeys'
    require 'base64'
  rescue LoadError
    raise(Error, "nkeys is not installed")
  end

  case
  when @nkeys_seed
    @user_nkey_cb = proc {
      seed = File.read(@nkeys_seed).chomp
      kp = NKEYS::from_seed(seed)

      # Take a copy since original will be gone with the wipe.
      pub_key = kp.public_key.dup
      kp.wipe!

      pub_key
    }

    @signature_cb = proc { |nonce|
      seed = File.read(@nkeys_seed).chomp
      kp = NKEYS::from_seed(seed)
      raw_signed = kp.sign(nonce)
      kp.wipe!
      encoded = Base64.urlsafe_encode64(raw_signed)
      encoded.gsub('=', '')
    }
  when @user_credentials
    # When the credentials are within a single decorated file.
    @user_jwt_cb = proc {
      jwt_start = "BEGIN NATS USER JWT".freeze
      found = false
      jwt = nil
      File.readlines(@user_credentials).each do |line|
        case
        when found
          jwt = line.chomp
          break
        when line.include?(jwt_start)
          found = true
        end
      end
      raise(Error, "No JWT found in #{@user_credentials}") if not found

      jwt
    }

    @signature_cb = proc { |nonce|
      seed_start = "BEGIN USER NKEY SEED".freeze
      found = false
      seed = nil
      File.readlines(@user_credentials).each do |line|
        case
        when found
          seed = line.chomp
          break
        when line.include?(seed_start)
          found = true
        end
      end
      raise(Error, "No nkey user seed found in #{@user_credentials}") if not found

      kp = NKEYS::from_seed(seed)
      raw_signed = kp.sign(nonce)

      # seed is a reference so also cleared when doing wipe,
      # which can be done since Ruby strings are mutable.
      kp.wipe
      encoded = Base64.urlsafe_encode64(raw_signed)

      # Remove padding
      encoded.gsub('=', '')
    }
  end
end
should_delay_connect?(server) click to toggle source
# File lib/nats/client.rb, line 1122
def should_delay_connect?(server)
  case
  when server[:was_connected]
    server[:reconnect_attempts] >= 0
  when server[:last_reconnect_attempt]
    (MonotonicTime.now - server[:last_reconnect_attempt]) < @options[:reconnect_time_wait]
  else
    false
  end
end
should_not_reconnect?() click to toggle source
# File lib/nats/client.rb, line 1165
def should_not_reconnect?
  !@options[:reconnect]
end
ssl_handshake_completed() click to toggle source
# File lib/nats/client.rb, line 1058
def ssl_handshake_completed
  @connected = true
end
ssl_verify_peer(cert) click to toggle source
# File lib/nats/client.rb, line 1027
def ssl_verify_peer(cert)
  incoming = OpenSSL::X509::Certificate.new(cert)
  store = OpenSSL::X509::Store.new
  store.set_default_paths
  store.add_file @options[:tls][:ca_file]
  result = store.verify(incoming)
  err_cb.call(NATS::ConnectError.new('TLS Verification failed checking issuer based on CA %s' % @options[:tls][:ca_file])) unless result
  result
rescue NATS::ConnectError
  false
end
start_resp_mux_sub!() click to toggle source
# File lib/nats/client.rb, line 691
def start_resp_mux_sub!
  @resp_sub_prefix = "_INBOX.#{@nuid.next}"
  @resp_map = Hash.new { |h,k| h[k] = { }}

  # Single subscription that will be handling all the requests
  # using fibers to yield the responses.
  subscribe("#{@resp_sub_prefix}.*") do |msg, reply, subject|
    token = subject.split('.').last

    # Discard the response if requestor not interested already.
    next unless @resp_map.key? token

    # Take fiber that will be passed the response
    f = @resp_map[token][:fiber]
    expected = @resp_map[token][:expected]

    if expected == 1
      f.resume msg
      @resp_map.delete(token)
      next
    end

    if @resp_map[token][:msgs].size < expected
      @resp_map[token][:msgs] << msg

      msgs = @resp_map[token][:msgs]
      if msgs.size >= expected
        f.resume(msgs)
      else
        # Wait to gather more messages or timeout.
        next
      end
    end

    @resp_map.delete(token)
  end
end
subscribe(subject, opts={}, &callback) click to toggle source

Subscribe to a subject with optional wildcards. Messages will be delivered to the supplied callback. Callback can take any number of the supplied arguments as defined by the list: msg, reply, sub. Returns subscription id which can be passed to unsubscribe. @param [String] subject, optionally with wilcards. @param [Hash] opts, optional options hash, e.g. :queue, :max. @param [Block] callback, called when a message is delivered. @return [Object] sid, Subject Identifier

# File lib/nats/client.rb, line 525
def subscribe(subject, opts={}, &callback)
  return unless subject and not draining?
  sid = (@ssid += 1)
  sub = @subs[sid] = { :subject => subject, :callback => callback, :received => 0 }
  sub[:queue] = opts[:queue] if opts[:queue]
  sub[:max] = opts[:max] if opts[:max]
  send_command("SUB #{subject} #{opts[:queue]} #{sid}#{CR_LF}")
  # Setup server support for auto-unsubscribe
  unsubscribe(sid, opts[:max]) if opts[:max]
  sid
end
subscription_count() click to toggle source

Return the active subscription count. @return [Number]

# File lib/nats/client.rb, line 593
def subscription_count
  @subs.size
end
timeout(sid, timeout, opts={}, &callback) click to toggle source

Setup a timeout for receiving messages for the subscription. @param [Object] sid @param [Number] timeout, float in seconds @param [Hash] opts, options, :auto_unsubscribe(true), :expected(1)

# File lib/nats/client.rb, line 601
def timeout(sid, timeout, opts={}, &callback)
  # Setup a timeout if requested
  return unless sub = @subs[sid]

  auto_unsubscribe, expected = true, 1
  auto_unsubscribe = opts[:auto_unsubscribe] if opts.key?(:auto_unsubscribe)
  expected = opts[:expected] if opts.key?(:expected)

  EM.cancel_timer(sub[:timeout]) if sub[:timeout]

  sub[:timeout] = EM.add_timer(timeout) do
    unsubscribe(sid) if auto_unsubscribe
    callback.call(sid) if callback
  end
  sub[:expected] = expected
end
unsubscribe(sid, opt_max=nil) click to toggle source

Cancel a subscription. @param [Object] sid @param [Number] opt_max, optional number of responses to receive before auto-unsubscribing

# File lib/nats/client.rb, line 540
def unsubscribe(sid, opt_max=nil)
  return if draining?
  opt_max_str = " #{opt_max}" unless opt_max.nil?
  send_command("UNSUB #{sid}#{opt_max_str}#{CR_LF}")
  return unless sub = @subs[sid]
  sub[:max] = opt_max
  @subs.delete(sid) unless (sub[:max] && (sub[:received] < sub[:max]))
end