module NATS
Copyright 2010-2018 The NATS
Authors Licensed under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at
www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Constants
- DEFAULT_DRAIN_TIMEOUT
Drain mode support
- DEFAULT_PING_INTERVAL
Ping intervals
- DEFAULT_PING_MAX
- DEFAULT_PORT
- DEFAULT_URI
- FAST_PRODUCER_THRESHOLD
Maximum outbound size per client to trigger FP, 20MB
- LANG
- MAX_PENDING_SIZE
- MAX_RECONNECT_ATTEMPTS
- PROTOCOL_VERSION
- RECONNECT_TIME_WAIT
- VERSION
NOTE: These are all announced to the server on CONNECT
Attributes
Public Class Methods
Create and return a connection to the server with the given options. The optional block will be called when the connection has been completed.
@param [String] uri The URI or comma separated list of URIs of NATS
servers to connect to. @param [Hash] opts @option opts [String|URI] :uri The URI to connect to, example nats://localhost:4222 @option opts [Boolean] :reconnect Boolean that can be used to suppress reconnect functionality. @option opts [Boolean] :debug Boolean that can be used to output additional debug information. @option opts [Boolean] :verbose Boolean that is sent to server for setting verbose protocol mode. @option opts [Boolean] :pedantic Boolean that is sent to server for setting pedantic mode. @option opts [Boolean] :ssl Boolean that is sent to server for setting TLS/SSL mode. @option opts [Hash] :tls Map of options for configuring secure connection handled to EM#start_tls directly. @option opts [Integer] :max_reconnect_attempts Integer that can be used to set the max number of reconnect tries @option opts [Integer] :reconnect_time_wait Integer that can be used to set the number of seconds to wait between reconnect tries @option opts [Integer] :ping_interval Integer that can be used to set the ping interval in seconds. @option opts [Integer] :max_outstanding_pings Integer that can be used to set the max number of outstanding pings before declaring a connection closed. @param [Block] &blk called when the connection is completed. Connection will be passed to the block. @return [NATS] connection to the server.
@example Connect to local NATS
server.
NATS.connect do |nc| # ... end
@example Setting custom server URI to connect.
NATS.connect("nats://localhost:4222") do |nc| # ... end
@example Setting username and password to authenticate.
NATS.connect("nats://user:password@localhost:4222") do |nc| # ... end
@example Specifying explicit list of servers via options.
NATS.connect(servers: ["nats://127.0.0.1:4222","nats://127.0.0.1:4223","nats://127.0.0.1:4224"]) do |nc| # ... end
@example Using comma separated array to define list of servers.
NATS.connect("nats://localhost:4223,nats://localhost:4224") do |nc| # ... end
@example Only specifying endpoint uses NATS
default scheme and port.
NATS.connect("demo.nats.io") do |nc| # ... end
@example Setting infinite reconnect retries with 2 seconds back off against custom URI.
NATS.connect("demo.nats.io:4222", max_reconnect_attempts: -1, reconnect_time_wait: 2) do |nc| # ... end
# File lib/nats/client.rb, line 151 def connect(uri=nil, opts={}, &blk) case uri when String # Initialize TLS defaults in case any url is using it. uris = opts[:uri] = process_uri(uri) opts[:tls] ||= {} if uris.any? {|u| u.scheme == 'tls'} when Hash opts = uri end # Defaults opts[:verbose] = false if opts[:verbose].nil? opts[:pedantic] = false if opts[:pedantic].nil? opts[:reconnect] = true if opts[:reconnect].nil? opts[:ssl] = false if opts[:ssl].nil? opts[:max_reconnect_attempts] = MAX_RECONNECT_ATTEMPTS if opts[:max_reconnect_attempts].nil? opts[:reconnect_time_wait] = RECONNECT_TIME_WAIT if opts[:reconnect_time_wait].nil? opts[:ping_interval] = DEFAULT_PING_INTERVAL if opts[:ping_interval].nil? opts[:max_outstanding_pings] = DEFAULT_PING_MAX if opts[:max_outstanding_pings].nil? opts[:drain_timeout] = DEFAULT_DRAIN_TIMEOUT if opts[:drain_timeout].nil? # Override with ENV opts[:uri] ||= ENV['NATS_URI'] || DEFAULT_URI opts[:verbose] = ENV['NATS_VERBOSE'].downcase == 'true' unless ENV['NATS_VERBOSE'].nil? opts[:pedantic] = ENV['NATS_PEDANTIC'].downcase == 'true' unless ENV['NATS_PEDANTIC'].nil? opts[:debug] = ENV['NATS_DEBUG'].downcase == 'true' unless ENV['NATS_DEBUG'].nil? opts[:reconnect] = ENV['NATS_RECONNECT'].downcase == 'true' unless ENV['NATS_RECONNECT'].nil? opts[:fast_producer_error] = ENV['NATS_FAST_PRODUCER'].downcase == 'true' unless ENV['NATS_FAST_PRODUCER'].nil? opts[:ssl] = ENV['NATS_SSL'].downcase == 'true' unless ENV['NATS_SSL'].nil? opts[:max_reconnect_attempts] = ENV['NATS_MAX_RECONNECT_ATTEMPTS'].to_i unless ENV['NATS_MAX_RECONNECT_ATTEMPTS'].nil? opts[:reconnect_time_wait] = ENV['NATS_RECONNECT_TIME_WAIT'].to_i unless ENV['NATS_RECONNECT_TIME_WAIT'].nil? opts[:name] ||= ENV['NATS_CONNECTION_NAME'] opts[:no_echo] ||= ENV['NATS_NO_ECHO'] || false opts[:ping_interval] = ENV['NATS_PING_INTERVAL'].to_i unless ENV['NATS_PING_INTERVAL'].nil? opts[:max_outstanding_pings] = ENV['NATS_MAX_OUTSTANDING_PINGS'].to_i unless ENV['NATS_MAX_OUTSTANDING_PINGS'].nil? opts[:drain_timeout] ||= ENV['NATS_DRAIN_TIMEOUT'].to_i unless ENV['NATS_DRAIN_TIMEOUT'].nil? uri = opts[:uris] || opts[:servers] || opts[:uri] if opts[:tls] case when opts[:tls][:ca_file] # Ensure that the file exists before going further # in order to report configuration errors during # connect synchronously. if !File.readable?(opts[:tls][:ca_file]) raise(Error, "TLS Verification is enabled but ca_file %s is not readable" % opts[:tls][:ca_file]) end # Certificate is supplied so assume we mean verification by default, # but still allow disabling explicitly by setting to false. opts[:tls][:verify_peer] ||= true when (opts[:tls][:verify_peer] && !opts[:tls][:ca_file]) raise(Error, "TLS Verification is enabled but ca_file is not set") else # Otherwise, disable verifying peer by default, # thus never reaching EM#ssl_verify_peer opts[:tls][:verify_peer] = false end # Allow overriding directly but default to those which server supports. opts[:tls][:ssl_version] ||= %w(tlsv1 tlsv1_1 tlsv1_2) opts[:tls][:protocols] ||= %w(tlsv1 tlsv1_1 tlsv1_2) end # If they pass an array here just pass along to the real connection, and use first as the first attempt.. # Real connection will do proper walk throughs etc.. unless uri.nil? uris = uri.kind_of?(Array) ? uri : [uri] uris.shuffle! unless opts[:dont_randomize_servers] u = uris.first @uri = u.is_a?(URI) ? u.dup : URI.parse(u) end @err_cb = proc { |e| raise e } unless err_cb @close_cb = proc { } unless close_cb @disconnect_cb = proc { } unless disconnect_cb client = EM.connect(@uri.host, @uri.port, self, opts) client.on_connect(&blk) if blk return client end
@return [Boolean] Connected state
# File lib/nats/client.rb, line 282 def connected? return false unless client client.connected? end
@return [URI] Connected server
# File lib/nats/client.rb, line 276 def connected_server return nil unless client client.connected_server end
Returns a subject that can be used for “directed” communications. @return [String]
# File lib/nats/client.rb, line 370 def create_inbox "_INBOX.#{SecureRandom.hex(13)}" end
Drain gracefully disconnects from the server, letting subscribers process pending messages already sent by server and optionally calls the associated block. @param [Block] &blk called when drain is done and connection is closed.
# File lib/nats/client.rb, line 269 def drain(&blk) if (client and !client.draining? and (client.connected? || client.reconnecting?)) client.drain { blk.call if blk } end end
@return [Boolean] Draining state
# File lib/nats/client.rb, line 294 def draining? return false unless client client.draining? end
Flushes all messages and subscriptions in the default connection @see NATS#flush
# File lib/nats/client.rb, line 376 def flush(*args, &blk) (@client ||= connect).flush(*args, &blk) end
# File lib/nats/client.rb, line 456 def initialize(options) @options = options process_uri_options @buf = nil @ssid, @subs = 1, {} @err_cb = NATS.err_cb @close_cb = NATS.close_cb @reconnect_cb = NATS.reconnect_cb @disconnect_cb = NATS.disconnect_cb @reconnect_timer, @needed = nil, nil @connected, @closing, @reconnecting, @conn_cb_called = false, false, false, false @msgs_received = @msgs_sent = @bytes_received = @bytes_sent = @pings = 0 @pending_size = 0 @server_info = { } # Mark whether we should be connecting securely, try best effort # in being compatible with present ssl support. @ssl = false @tls = nil @tls = options[:tls] if options[:tls] @ssl = options[:ssl] if options[:ssl] or @tls # New style request/response implementation. @resp_sub = nil @resp_map = nil @resp_sub_prefix = nil @nuid = NATS::NUID.new # Drain mode @draining = false @drained_subs = false # NKEYS @user_credentials = options[:user_credentials] if options[:user_credentials] @nkeys_seed = options[:nkeys_seed] if options[:nkeys_seed] @user_nkey_cb = nil @user_jwt_cb = nil @signature_cb = nil # NKEYS setup_nkeys_connect if @user_credentials or @nkeys_seed end
Set the default on_closed callback. @param [Block] &callback called when will reach a state when will no longer be connected.
# File lib/nats/client.rb, line 333 def on_close(&callback) @close_cb = callback @client.on_close(&callback) unless @client.nil? end
Set the default on_disconnect
callback. @param [Block] &callback called whenever client disconnects from a server.
# File lib/nats/client.rb, line 326 def on_disconnect(&callback) @disconnect_cb = callback @client.on_disconnect(&callback) unless @client.nil? end
Set the default on_error
callback. @param [Block] &callback called when an error has been detected.
# File lib/nats/client.rb, line 313 def on_error(&callback) @err_cb, @err_cb_overridden = callback, true end
Set the default on_reconnect
callback. @param [Block] &callback called when a reconnect attempt is made.
# File lib/nats/client.rb, line 319 def on_reconnect(&callback) @reconnect_cb = callback @client.on_reconnect(&callback) unless @client.nil? end
@return [Hash] Options
# File lib/nats/client.rb, line 300 def options return {} unless client client.options end
Return bytes outstanding for the default client connection. @see NATS#pending_data_size
# File lib/nats/client.rb, line 382 def pending_data_size(*args) (@client ||= connect).pending_data_size(*args) end
Publish a message using the default client connection. @see NATS#publish
# File lib/nats/client.rb, line 340 def publish(*args, &blk) (@client ||= connect).publish(*args, &blk) end
@return [Boolean] Reconnecting state
# File lib/nats/client.rb, line 288 def reconnecting? return false unless client client.reconnecting? end
Publish a message and wait for a response on the default client connection. @see NATS#request
# File lib/nats/client.rb, line 364 def request(*args, &blk) (@client ||= connect).request(*args, &blk) end
@return [Hash] Server information
# File lib/nats/client.rb, line 306 def server_info return nil unless client client.server_info end
Create a default client connection to the server. @see NATS::connect
# File lib/nats/client.rb, line 236 def start(*args, &blk) @reactor_was_running = EM.reactor_running? unless (@reactor_was_running || blk) raise(Error, "EM needs to be running when NATS.start is called without a run block") end # Setup optimized select versions if EM.epoll? EM.epoll elsif EM.kqueue? EM.kqueue elsif EM.library_type == :java # No warning needed, we're using Java NIO else Kernel.warn('Neither epoll nor kqueue are supported, performance may be impacted') end EM.run { @client = connect(*args, &blk) } end
Close the default client connection and optionally call the associated block. @param [Block] &blk called when the connection is closed.
# File lib/nats/client.rb, line 256 def stop(&blk) client.close if (client and (client.connected? || client.reconnecting?)) blk.call if blk @err_cb = nil @close_cb = nil @reconnect_cb = nil @disconnect_cb = nil end
Subscribe using the default client connection. @see NATS#subscribe
# File lib/nats/client.rb, line 346 def subscribe(*args, &blk) (@client ||= connect).subscribe(*args, &blk) end
Set a timeout for receiving messages for the subscription. @see NATS#timeout
# File lib/nats/client.rb, line 358 def timeout(*args, &blk) (@client ||= connect).timeout(*args, &blk) end
Cancel a subscription on the default client connection. @see NATS#unsubscribe
# File lib/nats/client.rb, line 352 def unsubscribe(*args) (@client ||= connect).unsubscribe(*args) end
Private Class Methods
# File lib/nats/client.rb, line 413 def process_uri(uris) connect_uris = [] uris.split(',').each do |uri| opts = {} # Scheme if uri.include?("://") scheme, uri = uri.split("://") opts[:scheme] = scheme else opts[:scheme] = 'nats' end # UserInfo if uri.include?("@") userinfo, endpoint = uri.split("@") host, port = endpoint.split(":") opts[:userinfo] = userinfo else host, port = uri.split(":") end # Host and Port opts[:host] = host || "localhost" opts[:port] = port || DEFAULT_PORT connect_uris << URI::Generic.build(opts) end connect_uris end
# File lib/nats/client.rb, line 409 def uri_is_remote?(uri) uri.host != 'localhost' && uri.host != '127.0.0.1' end
Public Instance Methods
# File lib/nats/client.rb, line 784 def auth_connection? !@uri.user.nil? || @options[:token] || @server_info[:auth_required] end
# File lib/nats/client.rb, line 1039 def cancel_ping_timer if @ping_timer EM.cancel_timer(@ping_timer) @ping_timer = nil end end
# File lib/nats/client.rb, line 1169 def cancel_reconnect_timer if @reconnect_timer EM.cancel_timer(@reconnect_timer) @reconnect_timer = nil end end
# File lib/nats/client.rb, line 1019 def client_using_secure_connection? @tls || @ssl end
Close the connection to the server.
# File lib/nats/client.rb, line 767 def close @closing = true cancel_ping_timer cancel_reconnect_timer close_connection_after_writing if connected? process_disconnect if reconnecting? end
@return [URI] Connected server
# File lib/nats/client.rb, line 1331 def connected_server connected? ? @uri : nil end
# File lib/nats/client.rb, line 1176 def disconnect_error_string return "Client disconnected from server on #{@uri}" if @connected return "Could not connect to server on #{@uri}" end
Retrieves the list of servers which have been discovered via server connect_urls announcements
# File lib/nats/client.rb, line 1337 def discovered_servers server_pool.select {|s| s[:discovered] } end
Drain gracefully closes the connection. @param [Block] blk called when drain is done and connection is closed.
# File lib/nats/client.rb, line 551 def drain(&blk) return if draining? or closing? @draining = true # Remove interest in all subjects to stop receiving messages. @subs.each do |sid, _| send_command("UNSUB #{sid} #{CR_LF}") end # Roundtrip to ensure no more messages are received. flush do drain_timeout_timer, draining_timer = nil, nil drain_timeout_timer = EM.add_timer(options[:drain_timeout]) do EM.cancel_timer(draining_timer) # Report the timeout via the error callback and just close err_cb.call(NATS::ClientError.new("Drain Timeout")) @draining = false close unless closing? blk.call if blk end # Periodically check for the pending data to be empty. draining_timer = EM.add_periodic_timer(0.1) do next unless closing? or @buf.nil? or @buf.empty? # Subscriptions have been drained already so disallow publishing. @drained_subs = true next unless pending_data_size == 0 EM.cancel_timer(draining_timer) EM.cancel_timer(drain_timeout_timer) # We're done draining and can close now. @draining = false close unless closing? blk.call if blk end end end
Flushes all messages and subscriptions for the connection. All messages and subscriptions have been processed by the server when the optional callback is called.
# File lib/nats/client.rb, line 732 def flush(&blk) queue_server_rt(&blk) if blk end
# File lib/nats/client.rb, line 1161 def had_error? server_pool.first && server_pool.first[:error_received] end
# File lib/nats/client.rb, line 1157 def multiple_servers_available? server_pool && server_pool.size > 1 end
Define a callback to be called when client is disconnected from server. @param [Block] &callback called when will reach a state when will no longer be connected.
# File lib/nats/client.rb, line 762 def on_close(&callback) @close_cb = callback end
Define a callback to be called when the client connection has been established. @param [Block] callback
# File lib/nats/client.rb, line 738 def on_connect(&callback) @connect_cb = callback end
Define a callback to be called when client is disconnected from server. @param [Block] &callback called whenever client disconnects from a server.
# File lib/nats/client.rb, line 756 def on_disconnect(&callback) @disconnect_cb = callback end
Define a callback to be called when errors occur on the client connection. @param [Block] &callback called when an error has been detected.
# File lib/nats/client.rb, line 744 def on_error(&callback) @err_cb, @err_cb_overridden = callback, true end
Define a callback to be called when a reconnect attempt is made. @param [Block] &callback called when a reconnect attempt is made.
# File lib/nats/client.rb, line 750 def on_reconnect(&callback) @reconnect_cb = callback end
Return bytes outstanding waiting to be sent to server.
# File lib/nats/client.rb, line 776 def pending_data_size get_outbound_data_size + @pending_size end
# File lib/nats/client.rb, line 1117 def process_pong @pongs_received += 1 @pings_outstanding -= 1 end
Parse out URIs which can now be an array of server choices The server pool will contain both explicit and implicit members.
# File lib/nats/client.rb, line 1322 def process_uri_options #:nodoc @server_pool = [] uri = options[:uris] || options[:servers] || options[:uri] uri = uri.kind_of?(Array) ? uri : [uri] uri.each { |u| server_pool << { :uri => u.is_a?(URI) ? u.dup : URI.parse(u) } } bind_primary end
Publish a message to a given subject, with optional reply subject and completion block @param [String] subject @param [Object, to_s] msg @param [String] opt_reply @param [Block] blk, closure called when publish has been processed by the server.
# File lib/nats/client.rb, line 505 def publish(subject, msg=EMPTY_MSG, opt_reply=nil, &blk) return unless subject and not @drained_subs msg = msg.to_s # Accounting @msgs_sent += 1 @bytes_sent += msg.bytesize if msg send_command("PUB #{subject} #{opt_reply} #{msg.bytesize}#{CR_LF}#{msg}#{CR_LF}") queue_server_rt(&blk) if blk end
Send a request and have the response delivered to the supplied callback. @param [String] subject @param [Object] msg @param [Block] callback @return [Object] sid
# File lib/nats/client.rb, line 623 def request(subject, data=nil, opts={}, &cb) return unless subject # In case of using async request then fallback to auto unsubscribe # based request/response and not break compatibility too much since # new request/response style can only be used with fibers. if cb inbox = "_INBOX.#{@nuid.next}" s = subscribe(inbox, opts) { |msg, reply| case cb.arity when 0 then cb.call when 1 then cb.call(msg) else cb.call(msg, reply) end } publish(subject, data, inbox) return s end # If this is the first request being made, then need to start # the responses mux handler that handles the responses. start_resp_mux_sub! unless @resp_sub_prefix # Generate unique token for the reply subject. token = @nuid.next inbox = "#{@resp_sub_prefix}.#{token}" # Synchronous request/response requires using a Fiber # to be able to await the response. f = Fiber.current @resp_map[token][:fiber] = f # If awaiting more than a single response then use array # to include all that could be gathered before the deadline. expected = opts[:max] ||= 1 @resp_map[token][:expected] = expected @resp_map[token][:msgs] = [] if expected > 1 # Announce the request with the inbox using the token. publish(subject, data, inbox) # If deadline expires, then discard the token and resume fiber opts[:timeout] ||= 0.5 t = EM.add_timer(opts[:timeout]) do if expected > 1 f.resume @resp_map[token][:msgs] else f.resume end @resp_map.delete(token) end # Wait for the response and cancel timeout callback if received. if expected > 1 # Wait to receive all replies that can get before deadline. msgs = Fiber.yield EM.cancel_timer(t) # Slice and throwaway responses that are not needed. return msgs.slice(0, expected) else msg = Fiber.yield EM.cancel_timer(t) return msg end end
# File lib/nats/client.rb, line 1023 def server_using_secure_connection? @server_info[:ssl_required] || @server_info[:tls_required] end
# File lib/nats/client.rb, line 1242 def setup_nkeys_connect begin require 'nkeys' require 'base64' rescue LoadError raise(Error, "nkeys is not installed") end case when @nkeys_seed @user_nkey_cb = proc { seed = File.read(@nkeys_seed).chomp kp = NKEYS::from_seed(seed) # Take a copy since original will be gone with the wipe. pub_key = kp.public_key.dup kp.wipe! pub_key } @signature_cb = proc { |nonce| seed = File.read(@nkeys_seed).chomp kp = NKEYS::from_seed(seed) raw_signed = kp.sign(nonce) kp.wipe! encoded = Base64.urlsafe_encode64(raw_signed) encoded.gsub('=', '') } when @user_credentials # When the credentials are within a single decorated file. @user_jwt_cb = proc { jwt_start = "BEGIN NATS USER JWT".freeze found = false jwt = nil File.readlines(@user_credentials).each do |line| case when found jwt = line.chomp break when line.include?(jwt_start) found = true end end raise(Error, "No JWT found in #{@user_credentials}") if not found jwt } @signature_cb = proc { |nonce| seed_start = "BEGIN USER NKEY SEED".freeze found = false seed = nil File.readlines(@user_credentials).each do |line| case when found seed = line.chomp break when line.include?(seed_start) found = true end end raise(Error, "No nkey user seed found in #{@user_credentials}") if not found kp = NKEYS::from_seed(seed) raw_signed = kp.sign(nonce) # seed is a reference so also cleared when doing wipe, # which can be done since Ruby strings are mutable. kp.wipe encoded = Base64.urlsafe_encode64(raw_signed) # Remove padding encoded.gsub('=', '') } end end
# File lib/nats/client.rb, line 1122 def should_delay_connect?(server) case when server[:was_connected] server[:reconnect_attempts] >= 0 when server[:last_reconnect_attempt] (MonotonicTime.now - server[:last_reconnect_attempt]) < @options[:reconnect_time_wait] else false end end
# File lib/nats/client.rb, line 1165 def should_not_reconnect? !@options[:reconnect] end
# File lib/nats/client.rb, line 1058 def ssl_handshake_completed @connected = true end
# File lib/nats/client.rb, line 1027 def ssl_verify_peer(cert) incoming = OpenSSL::X509::Certificate.new(cert) store = OpenSSL::X509::Store.new store.set_default_paths store.add_file @options[:tls][:ca_file] result = store.verify(incoming) err_cb.call(NATS::ConnectError.new('TLS Verification failed checking issuer based on CA %s' % @options[:tls][:ca_file])) unless result result rescue NATS::ConnectError false end
# File lib/nats/client.rb, line 691 def start_resp_mux_sub! @resp_sub_prefix = "_INBOX.#{@nuid.next}" @resp_map = Hash.new { |h,k| h[k] = { }} # Single subscription that will be handling all the requests # using fibers to yield the responses. subscribe("#{@resp_sub_prefix}.*") do |msg, reply, subject| token = subject.split('.').last # Discard the response if requestor not interested already. next unless @resp_map.key? token # Take fiber that will be passed the response f = @resp_map[token][:fiber] expected = @resp_map[token][:expected] if expected == 1 f.resume msg @resp_map.delete(token) next end if @resp_map[token][:msgs].size < expected @resp_map[token][:msgs] << msg msgs = @resp_map[token][:msgs] if msgs.size >= expected f.resume(msgs) else # Wait to gather more messages or timeout. next end end @resp_map.delete(token) end end
Subscribe to a subject with optional wildcards. Messages will be delivered to the supplied callback. Callback can take any number of the supplied arguments as defined by the list: msg, reply, sub. Returns subscription id which can be passed to unsubscribe
. @param [String] subject, optionally with wilcards. @param [Hash] opts, optional options hash, e.g. :queue, :max. @param [Block] callback, called when a message is delivered. @return [Object] sid, Subject Identifier
# File lib/nats/client.rb, line 525 def subscribe(subject, opts={}, &callback) return unless subject and not draining? sid = (@ssid += 1) sub = @subs[sid] = { :subject => subject, :callback => callback, :received => 0 } sub[:queue] = opts[:queue] if opts[:queue] sub[:max] = opts[:max] if opts[:max] send_command("SUB #{subject} #{opts[:queue]} #{sid}#{CR_LF}") # Setup server support for auto-unsubscribe unsubscribe(sid, opts[:max]) if opts[:max] sid end
Return the active subscription count. @return [Number]
# File lib/nats/client.rb, line 593 def subscription_count @subs.size end
Setup a timeout for receiving messages for the subscription. @param [Object] sid @param [Number] timeout, float in seconds @param [Hash] opts, options, :auto_unsubscribe(true), :expected(1)
# File lib/nats/client.rb, line 601 def timeout(sid, timeout, opts={}, &callback) # Setup a timeout if requested return unless sub = @subs[sid] auto_unsubscribe, expected = true, 1 auto_unsubscribe = opts[:auto_unsubscribe] if opts.key?(:auto_unsubscribe) expected = opts[:expected] if opts.key?(:expected) EM.cancel_timer(sub[:timeout]) if sub[:timeout] sub[:timeout] = EM.add_timer(timeout) do unsubscribe(sid) if auto_unsubscribe callback.call(sid) if callback end sub[:expected] = expected end
Cancel a subscription. @param [Object] sid @param [Number] opt_max, optional number of responses to receive before auto-unsubscribing
# File lib/nats/client.rb, line 540 def unsubscribe(sid, opt_max=nil) return if draining? opt_max_str = " #{opt_max}" unless opt_max.nil? send_command("UNSUB #{sid}#{opt_max_str}#{CR_LF}") return unless sub = @subs[sid] sub[:max] = opt_max @subs.delete(sid) unless (sub[:max] && (sub[:received] < sub[:max])) end