class STAN::Client
Attributes
client_id[R]
nats[R]
options[R]
pending_pub_acks[R]
sub_close_req_subject[R]
sub_map[R]
unsub_req_subject[R]
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/stan/client.rb, line 51 def initialize super # Connection to NATS, either owned or borrowed @nats = nil @borrowed_nats_connection = false # STAN subscriptions map @sub_map = {} # Publish Ack map (guid => ack) @pub_ack_map = {} @pending_pub_acks = nil # Cluster to which we are connecting @cluster_id = nil @client_id = nil # Connect options @options = {} # NATS Streaming subjects # Inbox subscription for periodical heartbeat messages @hb_inbox = nil @hb_inbox_sid = nil # Subscription for processing received acks from the server @ack_subject = nil @ack_subject_sid = nil # Publish prefix set by stan to which we append our subject on publish. @pub_prefix = nil @sub_req_subject = nil @unsub_req_subject = nil @close_req_subject = nil @sub_close_req_subject = nil # For initial connect request to discover subjects used by # the streaming server. @discover_subject = nil end
Public Instance Methods
ack(msg)
click to toggle source
Ack takes a received message and publishes an ack manually
# File lib/stan/client.rb, line 357 def ack(msg) return unless msg.sub msg.sub.synchronize do ack_proto = STAN::Protocol::Ack.new({ subject: msg.proto.subject, sequence: msg.proto.sequence }).to_proto nats.publish(msg.sub.ack_inbox, ack_proto) end end
close()
click to toggle source
Close wraps us the session with the NATS Streaming server
# File lib/stan/client.rb, line 317 def close # Send close when going away only if we have been able to successfully connect if @close_req_subject req = STAN::Protocol::CloseRequest.new(clientID: @client_id) raw = nats.request(@close_req_subject, req.to_proto, timeout: options[:connect_timeout]) resp = STAN::Protocol::CloseResponse.decode(raw.data) unless resp.error.empty? raise Error.new(resp.error) end end # If we do not even have a connection then return already... return unless @nats # TODO: If connection to nats was borrowed then we should # unsubscribe from all topics from STAN. If not borrowed # and we own the connection, then we just close. begin # Remove all related subscriptions for STAN @sub_map.each_pair do |_, sub| nats.unsubscribe(sub.sid) end # Finally, remove the core subscriptions for STAN nats.unsubscribe(@hb_inbox_sid) nats.unsubscribe(@ack_subject_sid) nats.flush(options[:connect_timeout]) rescue NATS::IO::Timeout raise CloseReqTimeoutError.new("stan: close request timeout") ensure if @borrowed_nats_connection @nats = nil else @nats.close end end end
connect(cluster_id, client_id, opts={}, &blk)
click to toggle source
Plugs into a NATS Streaming cluster, establishing a connection to NATS in case there is not one available to be borrowed.
# File lib/stan/client.rb, line 96 def connect(cluster_id, client_id, opts={}, &blk) @cluster_id = cluster_id @client_id = client_id @options = opts # Defaults @options[:connect_timeout] ||= DEFAULT_CONNECT_TIMEOUT @options[:max_pub_acks_inflight] ||= DEFAULT_MAX_PUB_ACKS_INFLIGHT # Buffered queue for controlling inflight published acks @pending_pub_acks = SizedQueue.new(options[:max_pub_acks_inflight]) # Prepare connect discovery request @discover_subject = "#{DEFAULT_DISCOVER_SUBJECT}.#{@cluster_id}".freeze # Prepare delivered msgs acks processing subscription @ack_subject = "#{DEFAULT_ACKS_SUBJECT}.#{STAN.create_guid}".freeze if @nats.nil? case options[:nats] when Hash # Custom NATS options in case borrowed connection not present # can be passed to establish a connection and have stan client # owning it. @nats = NATS::IO::Client.new nats.connect(options[:nats]) when NATS::IO::Client @nats = options[:nats] @borrowed_nats_connection = true else # Try to connect with NATS defaults @nats = NATS::IO::Client.new nats.connect(servers: ["nats://127.0.0.1:4222"]) end end # If no connection to NATS present at this point then bail already raise BadConnectionError.new("stan: invalid connection to nats") unless @nats # Heartbeat subscription @hb_inbox = (STAN.create_inbox).freeze # Setup acks and heartbeats processing callbacks @hb_inbox_sid = nats.subscribe(@hb_inbox) { |raw, reply, subject| process_heartbeats(raw, reply, subject) } @ack_subject_sid = nats.subscribe(@ack_subject) { |raw, reply, subject| process_ack(raw, reply, subject) } nats.flush(options[:connect_timeout]) # Initial connect request to discover subjects to be used # for communicating with STAN. req = STAN::Protocol::ConnectRequest.new({ clientID: @client_id, heartbeatInbox: @hb_inbox }) # TODO: Check for error and bail if required begin raw = nats.request(@discover_subject, req.to_proto, timeout: options[:connect_timeout]) rescue NATS::IO::Timeout raise ConnectReqTimeoutError.new("stan: failed connecting to '#{@cluster_id}'") end resp = STAN::Protocol::ConnectResponse.decode(raw.data) unless resp.error.empty? # We didn't really connect but we call closing in order to # cleanup any other present state. # FIXME: Errors happening here should be reported async close rescue nil raise ConnectError.new(resp.error) end # Capture communication channels to STAN only when there # have not been any errors when connecting. @pub_prefix = resp.pubPrefix.freeze @sub_req_subject = resp.subRequests.freeze @unsub_req_subject = resp.unsubRequests.freeze @close_req_subject = resp.closeRequests.freeze @sub_close_req_subject = resp.subCloseRequests.freeze # If callback given then we send a close request on exit # and wrap up session to STAN. if blk blk.call(self) # Close session to the STAN cluster close end end
publish(subject, payload, opts={}, &blk)
click to toggle source
Publish will publish to the cluster and wait for an ack
# File lib/stan/client.rb, line 186 def publish(subject, payload, opts={}, &blk) raise BadConnectionError.new unless @pub_prefix stan_subject = "#{@pub_prefix}.#{subject}" future = nil guid = STAN.create_guid pe = STAN::Protocol::PubMsg.new({ clientID: @client_id, guid: guid, subject: subject, data: payload }) # Use buffered queue to control number of outstanding acks @pending_pub_acks << :ack if blk # Asynchronously handled if block given synchronize do # Map ack to guid @pub_ack_map[guid] = proc do |ack| # If block is given, handle the result asynchronously error = ack.error.empty? ? nil : Error.new(ack.error) case blk.arity when 0 then blk.call when 1 then blk.call(ack.guid) when 2 then blk.call(ack.guid, error) end @pub_ack_map.delete(ack.guid) end nats.publish(stan_subject, pe.to_proto, @ack_subject) end else # No block means waiting for response before giving back control future = new_cond opts[:timeout] ||= DEFAULT_ACK_WAIT synchronize do # Map ack to guid ack_response = nil # FIXME: Maybe use fiber instead? @pub_ack_map[guid] = proc do |ack| # Capture the ack response ack_response = ack future.signal end # Send publish request and wait for the ack response nats.publish(stan_subject, pe.to_proto, @ack_subject) start_time = NATS::MonotonicTime.now future.wait(opts[:timeout]) end_time = NATS::MonotonicTime.now if (end_time - start_time) > opts[:timeout] # Remove ack @pub_ack_map.delete(guid) raise TimeoutError.new("stan: timeout") end # Remove ack @pub_ack_map.delete(guid) return guid end end # TODO: Loop for processing of expired acks end
subscribe(subject, opts={}, &cb)
click to toggle source
Create subscription which dispatches messages to callback asynchronously
# File lib/stan/client.rb, line 257 def subscribe(subject, opts={}, &cb) raise BadConnectionError.new unless @sub_req_subject sub_options = {} sub_options.merge!(opts) sub_options[:ack_wait] ||= DEFAULT_ACK_WAIT sub_options[:max_inflight] ||= DEFAULT_MAX_INFLIGHT sub_options[:stan] = self sub = Subscription.new(subject, sub_options, cb) sub.extend(MonitorMixin) synchronize { @sub_map[sub.inbox] = sub } # Hold lock throughout sub.synchronize do # Listen for actual messages sid = nats.subscribe(sub.inbox) { |raw, reply, subject| process_msg(raw, reply, subject) } sub.sid = sid nats.flush(options[:connect_timeout]) # Create the subscription request announcing the inbox on which # we have made the NATS subscription for processing messages. # First, we normalize customized subscription options before # encoding to protobuf. sub_opts = normalize_sub_req_params(sub_options) # Set STAN subject and NATS inbox where we will be awaiting # for the messages to be delivered. sub_opts[:subject] = subject sub_opts[:inbox] = sub.inbox sr = STAN::Protocol::SubscriptionRequest.new(sub_opts) reply = nil response = nil begin reply = nats.request(@sub_req_subject, sr.to_proto, timeout: options[:connect_timeout]) response = STAN::Protocol::SubscriptionResponse.decode(reply.data) rescue NATS::IO::Timeout, Google::Protobuf::ParseError => e # FIXME: Error handling on unsubscribe nats.unsubscribe(sub.sid) raise e end unless response.error.empty? # FIXME: Error handling on unsubscribe should be async nats.unsubscribe(sub.sid) raise Error.new(response.error) end # Capture ack inbox for the subscription sub.ack_inbox = response.ackInbox.freeze return sub end rescue NATS::IO::Timeout raise SubReqTimeoutError.new("stan: subscribe request timeout on '#{subject}'") end
to_s()
click to toggle source
# File lib/stan/client.rb, line 368 def to_s %Q(#<STAN::Client @cluster_id="#{@cluster_id}" @client_id="#{@client_id}">) end
Private Instance Methods
normalize_sub_req_params(opts)
click to toggle source
# File lib/stan/client.rb, line 437 def normalize_sub_req_params(opts) sub_opts = {} sub_opts[:qGroup] = opts[:queue] if opts[:queue] sub_opts[:durableName] = opts[:durable_name] if opts[:durable_name] # Must announce the clientID as part of the protocol in each subscription sub_opts[:clientID] = @client_id sub_opts[:maxInFlight] = opts[:max_inflight] sub_opts[:ackWaitInSecs] = opts[:ack_wait] || opts[:ack_timeout] # TODO: Error checking when all combinations of options are not declared case opts[:start_at] when :new_only # By default, it already acts as :new_only which is # without no initial replay, similar to bare NATS, # but we allow setting it explicitly anyway. sub_opts[:startPosition] = :NewOnly when :last_received sub_opts[:startPosition] = :LastReceived when :time, :timedelta # If using timedelta, need to get current time in UnixNano format # FIXME: Implement support for :ago option which uses time in human. sub_opts[:startPosition] = :TimeDeltaStart start_at_time = opts[:time] * 1_000_000_000 sub_opts[:startTimeDelta] = (Time.now.to_f * 1_000_000_000) - start_at_time when :sequence sub_opts[:startPosition] = :SequenceStart sub_opts[:startSequence] = opts[:sequence] || 0 when :first, :beginning sub_opts[:startPosition] = :First else sub_opts[:startPosition] = :First if opts[:deliver_all_available] end sub_opts end
process_ack(data, reply, subject)
click to toggle source
Process received publishes acks
# File lib/stan/client.rb, line 375 def process_ack(data, reply, subject) # FIXME: This should handle errors asynchronously in case there are any # Process ack pub_ack = STAN::Protocol::PubAck.decode(data) unless pub_ack.error.empty? raise Error.new(pub_ack.error) end # Unblock publishing queue @pending_pub_acks.pop if @pending_pub_acks.size > 0 synchronize do # yield the ack response back to original publisher caller if cb = @pub_ack_map[pub_ack.guid] cb.call(pub_ack) end end end
process_heartbeats(data, reply, subject)
click to toggle source
Process heartbeats by replying to them
# File lib/stan/client.rb, line 396 def process_heartbeats(data, reply, subject) # No payload assumed, just reply to the heartbeat. nats.publish(reply, '') end
process_msg(data, reply, subject)
click to toggle source
Process any received messages
# File lib/stan/client.rb, line 402 def process_msg(data, reply, subject) msg = Msg.new msg.proto = STAN::Protocol::MsgProto.decode(data) msg_ack = STAN::Protocol::Ack.new({ subject: msg.proto.subject, sequence: msg.proto.sequence }) # Lookup the subscription sub = nil synchronize do sub = @sub_map[subject] end # Check if sub is no longer valid return unless sub # Store in msg for backlink msg.sub = sub cb = nil ack_subject = nil using_manual_acks = nil sub.synchronize do cb = sub.cb ack_subject = sub.ack_inbox using_manual_acks = sub.options[:manual_acks] end # Perform the callback if sub still subscribed cb.call(msg) if cb # Process auto-ack if not done manually nats.publish(ack_subject, msg_ack.to_proto) if not using_manual_acks end