class STAN::Subscription
Attributes
ack_inbox[RW]
cb[R]
durable_name[R]
inbox[R]
options[R]
queue[R]
sid[RW]
stan[R]
subject[R]
Public Class Methods
new(subject, opts={}, cb)
click to toggle source
# File lib/stan/client.rb, line 479 def initialize(subject, opts={}, cb) @subject = subject @queue = opts[:queue] @inbox = STAN.create_inbox @sid = nil # inbox subscription sid @options = opts @cb = cb @ack_inbox = nil @stan = opts[:stan] @durable_name = opts[:durable_name] end
Public Instance Methods
close()
click to toggle source
# File lib/stan/client.rb, line 532 def close synchronize do stan.nats.unsubscribe(self.sid) end # Make client stop tracking the subscription inbox # and grab close request subject under the lock. sub_close_subject = nil stan.synchronize do stan.sub_map.delete(self.ack_inbox) sub_close_subject = stan.sub_close_req_subject end sub_close_req = STAN::Protocol::UnsubscribeRequest.new({ clientID: stan.client_id, subject: self.subject, inbox: self.ack_inbox }) raw = stan.nats.request(sub_close_subject, sub_close_req.to_proto, { timeout: stan.options[:connect_timeout] }) response = STAN::Protocol::SubscriptionResponse.decode(raw.data) unless response.error.empty? # FIXME: Error handling on unsubscribe/close raise Error.new(response.error) end end
to_s()
click to toggle source
# File lib/stan/client.rb, line 491 def to_s %Q(#<STAN::Subscription @subject="#{@subject}" @queue="#{@queue}" @durable_name="#{@durable_name}" @inbox="#{@inbox}" @ack_inbox="#{@ack_inbox}" @sid=#{@sid}>) end
unsubscribe()
click to toggle source
Unsubscribe removes interest in the subscription. For durables, it means that the durable interest is also removed from the server. Restarting a durable with the same name will not resume the subscription, it will be considered a new one.
# File lib/stan/client.rb, line 499 def unsubscribe synchronize do stan.nats.unsubscribe(self.sid) end # Make client stop tracking the subscription inbox # and grab unsub request subject under the lock. unsub_subject = nil stan.synchronize do stan.sub_map.delete(self.ack_inbox) unsub_subject = stan.unsub_req_subject end unsub_req = STAN::Protocol::UnsubscribeRequest.new({ clientID: stan.client_id, subject: self.subject, inbox: self.ack_inbox }) if self.durable_name unsub_req.durableName = self.durable_name end raw = stan.nats.request(unsub_subject, unsub_req.to_proto, { timeout: stan.options[:connect_timeout] }) response = STAN::Protocol::SubscriptionResponse.decode(raw.data) unless response.error.empty? # FIXME: Error handling on unsubscribe raise Error.new(response.error) end end