class STAN::Subscription

Attributes

ack_inbox[RW]
cb[R]
durable_name[R]
inbox[R]
options[R]
queue[R]
sid[RW]
stan[R]
subject[R]

Public Class Methods

new(subject, opts={}, cb) click to toggle source
# File lib/stan/client.rb, line 479
def initialize(subject, opts={}, cb)
  @subject = subject
  @queue = opts[:queue]
  @inbox = STAN.create_inbox
  @sid = nil # inbox subscription sid
  @options = opts
  @cb = cb
  @ack_inbox = nil
  @stan = opts[:stan]
  @durable_name = opts[:durable_name]
end

Public Instance Methods

close() click to toggle source
# File lib/stan/client.rb, line 532
def close
  synchronize do
    stan.nats.unsubscribe(self.sid)
  end

  # Make client stop tracking the subscription inbox
  # and grab close request subject under the lock.
  sub_close_subject = nil
  stan.synchronize do
    stan.sub_map.delete(self.ack_inbox)
    sub_close_subject = stan.sub_close_req_subject
  end

  sub_close_req = STAN::Protocol::UnsubscribeRequest.new({
    clientID: stan.client_id,
    subject: self.subject,
    inbox: self.ack_inbox
  })

  raw = stan.nats.request(sub_close_subject, sub_close_req.to_proto, {
    timeout: stan.options[:connect_timeout]
  })
  response = STAN::Protocol::SubscriptionResponse.decode(raw.data)
  unless response.error.empty?
    # FIXME: Error handling on unsubscribe/close
    raise Error.new(response.error)
  end
end
to_s() click to toggle source
# File lib/stan/client.rb, line 491
def to_s
  %Q(#<STAN::Subscription @subject="#{@subject}" @queue="#{@queue}" @durable_name="#{@durable_name}" @inbox="#{@inbox}" @ack_inbox="#{@ack_inbox}" @sid=#{@sid}>)
end
unsubscribe() click to toggle source

Unsubscribe removes interest in the subscription. For durables, it means that the durable interest is also removed from the server. Restarting a durable with the same name will not resume the subscription, it will be considered a new one.

# File lib/stan/client.rb, line 499
def unsubscribe
  synchronize do
    stan.nats.unsubscribe(self.sid)
  end

  # Make client stop tracking the subscription inbox
  # and grab unsub request subject under the lock.
  unsub_subject = nil
  stan.synchronize do
    stan.sub_map.delete(self.ack_inbox)
    unsub_subject = stan.unsub_req_subject
  end

  unsub_req = STAN::Protocol::UnsubscribeRequest.new({
    clientID: stan.client_id,
    subject: self.subject,
    inbox: self.ack_inbox
  })

  if self.durable_name
    unsub_req.durableName = self.durable_name
  end

  raw = stan.nats.request(unsub_subject, unsub_req.to_proto, {
    timeout: stan.options[:connect_timeout]
  })
  response = STAN::Protocol::SubscriptionResponse.decode(raw.data)
  unless response.error.empty?
    # FIXME: Error handling on unsubscribe
    raise Error.new(response.error)
  end
end