class Nsq::ClientBase
Attributes
connections[R]
topic[R]
Public Instance Methods
connected?()
click to toggle source
# File lib/nsq/client_base.rb, line 13 def connected? @connections.values.any?(&:connected?) end
terminate()
click to toggle source
# File lib/nsq/client_base.rb, line 18 def terminate @discovery_thread.kill if @discovery_thread drop_all_connections end
Private Instance Methods
add_connection(nsqd, options = {})
click to toggle source
# File lib/nsq/client_base.rb, line 87 def add_connection(nsqd, options = {}) info "+ Adding connection #{nsqd}" host, port = nsqd.split(':') connection = Connection.new({ host: host, port: port, ssl_context: @ssl_context, tls_options: @tls_options, tls_v1: @tls_v1 }.merge(options)) @connections[nsqd] = connection end
connections_changed()
click to toggle source
optional subclass hook
# File lib/nsq/client_base.rb, line 117 def connections_changed end
discover_repeatedly(opts = {})
click to toggle source
discovers nsqds from an nsqlookupd repeatedly
opts: nsqlookups: ['127.0.0.1:4161'], topic: 'topic-to-find-nsqds-for', interval: 60
# File lib/nsq/client_base.rb, line 33 def discover_repeatedly(opts = {}) @discovery_thread = Thread.new do @discovery = Discovery.new(opts[:nsqlookupds]) loop do begin nsqds = nsqds_from_lookupd(opts[:topic]) drop_and_add_connections(nsqds) rescue DiscoveryException # We can't connect to any nsqlookupds. That's okay, we'll just # leave our current nsqd connections alone and try again later. warn 'Could not connect to any nsqlookupd instances in discovery loop' end sleep opts[:interval] end end @discovery_thread.abort_on_exception = true end
drop_all_connections()
click to toggle source
# File lib/nsq/client_base.rb, line 109 def drop_all_connections @connections.keys.each do |nsqd| drop_connection(nsqd) end end
drop_and_add_connections(nsqds)
click to toggle source
# File lib/nsq/client_base.rb, line 65 def drop_and_add_connections(nsqds) # drop nsqd connections that are no longer in lookupd missing_nsqds = @connections.keys - nsqds missing_nsqds.each do |nsqd| drop_connection(nsqd) end # add new ones new_nsqds = nsqds - @connections.keys new_nsqds.each do |nsqd| begin add_connection(nsqd) rescue Exception => ex error "Failed to connect to nsqd @ #{nsqd}: #{ex}" end end # balance RDY state amongst the connections connections_changed end
drop_connection(nsqd)
click to toggle source
# File lib/nsq/client_base.rb, line 101 def drop_connection(nsqd) info "- Dropping connection #{nsqd}" connection = @connections.delete(nsqd) connection.close if connection connections_changed end
nsqds_from_lookupd(topic = nil)
click to toggle source
# File lib/nsq/client_base.rb, line 56 def nsqds_from_lookupd(topic = nil) if topic @discovery.nsqds_for_topic(topic) else @discovery.nsqds end end