class STAN::Client

Attributes

client_id[R]
nats[R]
options[R]
pending_pub_acks[R]
sub_close_req_subject[R]
sub_map[R]
unsub_req_subject[R]

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/stan/client.rb, line 51
def initialize
  super

  # Connection to NATS, either owned or borrowed
  @nats = nil
  @borrowed_nats_connection = false

  # STAN subscriptions map
  @sub_map = {}

  # Publish Ack map (guid => ack)
  @pub_ack_map = {}
  @pending_pub_acks = nil

  # Cluster to which we are connecting
  @cluster_id = nil
  @client_id = nil

  # Connect options
  @options = {}

  # NATS Streaming subjects

  # Inbox subscription for periodical heartbeat messages
  @hb_inbox = nil
  @hb_inbox_sid = nil

  # Subscription for processing received acks from the server
  @ack_subject = nil
  @ack_subject_sid = nil

  # Publish prefix set by stan to which we append our subject on publish.
  @pub_prefix        = nil
  @sub_req_subject   = nil
  @unsub_req_subject = nil
  @close_req_subject = nil
  @sub_close_req_subject = nil

  # For initial connect request to discover subjects used by
  # the streaming server.
  @discover_subject = nil
end

Public Instance Methods

ack(msg) click to toggle source

Ack takes a received message and publishes an ack manually

# File lib/stan/client.rb, line 357
def ack(msg)
  return unless msg.sub
  msg.sub.synchronize do
    ack_proto = STAN::Protocol::Ack.new({
      subject: msg.proto.subject,
      sequence: msg.proto.sequence
    }).to_proto
    nats.publish(msg.sub.ack_inbox, ack_proto)
  end
end
close() click to toggle source

Close wraps us the session with the NATS Streaming server

# File lib/stan/client.rb, line 317
def close
  # Send close when going away only if we have been able to successfully connect
  if @close_req_subject
    req = STAN::Protocol::CloseRequest.new(clientID: @client_id)
    raw = nats.request(@close_req_subject, req.to_proto, timeout: options[:connect_timeout])

    resp = STAN::Protocol::CloseResponse.decode(raw.data)
    unless resp.error.empty?
      raise Error.new(resp.error)
    end
  end

  # If we do not even have a connection then return already...
  return unless @nats

  # TODO: If connection to nats was borrowed then we should
  # unsubscribe from all topics from STAN.  If not borrowed
  # and we own the connection, then we just close.
  begin
    # Remove all related subscriptions for STAN
    @sub_map.each_pair do |_, sub|
      nats.unsubscribe(sub.sid)
    end

    # Finally, remove the core subscriptions for STAN
    nats.unsubscribe(@hb_inbox_sid)
    nats.unsubscribe(@ack_subject_sid)
    nats.flush(options[:connect_timeout])
  rescue NATS::IO::Timeout
    raise CloseReqTimeoutError.new("stan: close request timeout")
  ensure
    if @borrowed_nats_connection
      @nats = nil
    else
      @nats.close
    end
  end
end
connect(cluster_id, client_id, opts={}, &blk) click to toggle source

Plugs into a NATS Streaming cluster, establishing a connection to NATS in case there is not one available to be borrowed.

# File lib/stan/client.rb, line 96
def connect(cluster_id, client_id, opts={}, &blk)
  @cluster_id = cluster_id
  @client_id  = client_id
  @options    = opts

  # Defaults
  @options[:connect_timeout] ||= DEFAULT_CONNECT_TIMEOUT
  @options[:max_pub_acks_inflight] ||= DEFAULT_MAX_PUB_ACKS_INFLIGHT

  # Buffered queue for controlling inflight published acks
  @pending_pub_acks = SizedQueue.new(options[:max_pub_acks_inflight])

  # Prepare connect discovery request
  @discover_subject = "#{DEFAULT_DISCOVER_SUBJECT}.#{@cluster_id}".freeze

  # Prepare delivered msgs acks processing subscription
  @ack_subject = "#{DEFAULT_ACKS_SUBJECT}.#{STAN.create_guid}".freeze

  if @nats.nil?
    case options[:nats]
    when Hash
      # Custom NATS options in case borrowed connection not present
      # can be passed to establish a connection and have stan client
      # owning it.
      @nats = NATS::IO::Client.new
      nats.connect(options[:nats])
    when NATS::IO::Client
      @nats = options[:nats]
      @borrowed_nats_connection = true
    else
      # Try to connect with NATS defaults
      @nats = NATS::IO::Client.new
      nats.connect(servers: ["nats://127.0.0.1:4222"])
    end
  end

  # If no connection to NATS present at this point then bail already
  raise BadConnectionError.new("stan: invalid connection to nats") unless @nats

  # Heartbeat subscription
  @hb_inbox = (STAN.create_inbox).freeze

  # Setup acks and heartbeats processing callbacks
  @hb_inbox_sid    = nats.subscribe(@hb_inbox)    { |raw, reply, subject| process_heartbeats(raw, reply, subject) }
  @ack_subject_sid = nats.subscribe(@ack_subject) { |raw, reply, subject| process_ack(raw, reply, subject) }
  nats.flush(options[:connect_timeout])

  # Initial connect request to discover subjects to be used
  # for communicating with STAN.
  req = STAN::Protocol::ConnectRequest.new({
    clientID: @client_id,
    heartbeatInbox: @hb_inbox
  })

  # TODO: Check for error and bail if required
  begin
    raw = nats.request(@discover_subject, req.to_proto, timeout: options[:connect_timeout])
  rescue NATS::IO::Timeout
    raise ConnectReqTimeoutError.new("stan: failed connecting to '#{@cluster_id}'")
  end

  resp = STAN::Protocol::ConnectResponse.decode(raw.data)
  unless resp.error.empty?
    # We didn't really connect but we call closing in order to
    # cleanup any other present state.
    # FIXME: Errors happening here should be reported async
    close rescue nil

    raise ConnectError.new(resp.error)
  end

  # Capture communication channels to STAN only when there
  # have not been any errors when connecting.
  @pub_prefix = resp.pubPrefix.freeze
  @sub_req_subject = resp.subRequests.freeze
  @unsub_req_subject = resp.unsubRequests.freeze
  @close_req_subject = resp.closeRequests.freeze
  @sub_close_req_subject = resp.subCloseRequests.freeze

  # If callback given then we send a close request on exit
  # and wrap up session to STAN.
  if blk
    blk.call(self)

    # Close session to the STAN cluster
    close
  end
end
publish(subject, payload, opts={}, &blk) click to toggle source

Publish will publish to the cluster and wait for an ack

# File lib/stan/client.rb, line 186
def publish(subject, payload, opts={}, &blk)
  raise BadConnectionError.new unless @pub_prefix

  stan_subject = "#{@pub_prefix}.#{subject}"
  future = nil
  guid = STAN.create_guid

  pe = STAN::Protocol::PubMsg.new({
    clientID: @client_id,
    guid: guid,
    subject: subject,
    data: payload
  })

  # Use buffered queue to control number of outstanding acks
  @pending_pub_acks << :ack

  if blk
    # Asynchronously handled if block given
    synchronize do
      # Map ack to guid
      @pub_ack_map[guid] = proc do |ack|
        # If block is given, handle the result asynchronously
        error = ack.error.empty? ? nil : Error.new(ack.error)
        case blk.arity
        when 0 then blk.call
        when 1 then blk.call(ack.guid)
        when 2 then blk.call(ack.guid, error)
        end

        @pub_ack_map.delete(ack.guid)
      end

      nats.publish(stan_subject, pe.to_proto, @ack_subject)
    end
  else
    # No block means waiting for response before giving back control
    future = new_cond
    opts[:timeout] ||= DEFAULT_ACK_WAIT

    synchronize do
      # Map ack to guid
      ack_response = nil

      # FIXME: Maybe use fiber instead?
      @pub_ack_map[guid] = proc do |ack|
        # Capture the ack response
        ack_response = ack
        future.signal
      end

      # Send publish request and wait for the ack response
      nats.publish(stan_subject, pe.to_proto, @ack_subject)
      start_time = NATS::MonotonicTime.now
      future.wait(opts[:timeout])
      end_time = NATS::MonotonicTime.now
      if (end_time - start_time) > opts[:timeout]
        # Remove ack
        @pub_ack_map.delete(guid)
        raise TimeoutError.new("stan: timeout")
      end

      # Remove ack
      @pub_ack_map.delete(guid)
      return guid
    end
  end
  # TODO: Loop for processing of expired acks
end
subscribe(subject, opts={}, &cb) click to toggle source

Create subscription which dispatches messages to callback asynchronously

# File lib/stan/client.rb, line 257
def subscribe(subject, opts={}, &cb)
  raise BadConnectionError.new unless @sub_req_subject

  sub_options = {}
  sub_options.merge!(opts)
  sub_options[:ack_wait] ||= DEFAULT_ACK_WAIT
  sub_options[:max_inflight] ||= DEFAULT_MAX_INFLIGHT
  sub_options[:stan] = self

  sub = Subscription.new(subject, sub_options, cb)
  sub.extend(MonitorMixin)
  synchronize { @sub_map[sub.inbox] = sub }

  # Hold lock throughout
  sub.synchronize do
    # Listen for actual messages
    sid = nats.subscribe(sub.inbox) { |raw, reply, subject| process_msg(raw, reply, subject) }
    sub.sid = sid
    nats.flush(options[:connect_timeout])

    # Create the subscription request announcing the inbox on which

    # we have made the NATS subscription for processing messages.
    # First, we normalize customized subscription options before
    # encoding to protobuf.
    sub_opts = normalize_sub_req_params(sub_options)

    # Set STAN subject and NATS inbox where we will be awaiting
    # for the messages to be delivered.
    sub_opts[:subject] = subject
    sub_opts[:inbox] = sub.inbox

    sr = STAN::Protocol::SubscriptionRequest.new(sub_opts)
    reply = nil
    response = nil
    begin
      reply = nats.request(@sub_req_subject, sr.to_proto, timeout: options[:connect_timeout])
      response = STAN::Protocol::SubscriptionResponse.decode(reply.data)
    rescue NATS::IO::Timeout, Google::Protobuf::ParseError => e
      # FIXME: Error handling on unsubscribe
      nats.unsubscribe(sub.sid)
      raise e
    end

    unless response.error.empty?
      # FIXME: Error handling on unsubscribe should be async
      nats.unsubscribe(sub.sid)
      raise Error.new(response.error)
    end

    # Capture ack inbox for the subscription
    sub.ack_inbox = response.ackInbox.freeze

    return sub
  end
rescue NATS::IO::Timeout
  raise SubReqTimeoutError.new("stan: subscribe request timeout on '#{subject}'")
end
to_s() click to toggle source
# File lib/stan/client.rb, line 368
def to_s
  %Q(#<STAN::Client @cluster_id="#{@cluster_id}" @client_id="#{@client_id}">)
end

Private Instance Methods

normalize_sub_req_params(opts) click to toggle source
# File lib/stan/client.rb, line 437
def normalize_sub_req_params(opts)
  sub_opts = {}
  sub_opts[:qGroup] = opts[:queue] if opts[:queue]
  sub_opts[:durableName] = opts[:durable_name] if opts[:durable_name]

  # Must announce the clientID as part of the protocol in each subscription
  sub_opts[:clientID] = @client_id
  sub_opts[:maxInFlight] = opts[:max_inflight]
  sub_opts[:ackWaitInSecs] = opts[:ack_wait] || opts[:ack_timeout]

  # TODO: Error checking when all combinations of options are not declared
  case opts[:start_at]
  when :new_only
    # By default, it already acts as :new_only which is
    # without no initial replay, similar to bare NATS,
    # but we allow setting it explicitly anyway.
    sub_opts[:startPosition] = :NewOnly
  when :last_received
    sub_opts[:startPosition] = :LastReceived
  when :time, :timedelta
    # If using timedelta, need to get current time in UnixNano format
    # FIXME: Implement support for :ago option which uses time in human.
    sub_opts[:startPosition] = :TimeDeltaStart
    start_at_time = opts[:time] * 1_000_000_000
    sub_opts[:startTimeDelta] = (Time.now.to_f * 1_000_000_000) - start_at_time
  when :sequence
    sub_opts[:startPosition] = :SequenceStart
    sub_opts[:startSequence] = opts[:sequence] || 0
  when :first, :beginning
    sub_opts[:startPosition] = :First
  else
    sub_opts[:startPosition] = :First if opts[:deliver_all_available]
  end

  sub_opts
end
process_ack(data, reply, subject) click to toggle source

Process received publishes acks

# File lib/stan/client.rb, line 375
def process_ack(data, reply, subject)
  # FIXME: This should handle errors asynchronously in case there are any

  # Process ack
  pub_ack = STAN::Protocol::PubAck.decode(data)
  unless pub_ack.error.empty?
    raise Error.new(pub_ack.error)
  end

  # Unblock publishing queue
  @pending_pub_acks.pop if @pending_pub_acks.size > 0

  synchronize do
    # yield the ack response back to original publisher caller
    if cb = @pub_ack_map[pub_ack.guid]
      cb.call(pub_ack)
    end
  end
end
process_heartbeats(data, reply, subject) click to toggle source

Process heartbeats by replying to them

# File lib/stan/client.rb, line 396
def process_heartbeats(data, reply, subject)
  # No payload assumed, just reply to the heartbeat.
  nats.publish(reply, '')
end
process_msg(data, reply, subject) click to toggle source

Process any received messages

# File lib/stan/client.rb, line 402
def process_msg(data, reply, subject)
  msg = Msg.new
  msg.proto = STAN::Protocol::MsgProto.decode(data)
  msg_ack = STAN::Protocol::Ack.new({
    subject: msg.proto.subject,
    sequence: msg.proto.sequence
  })

  # Lookup the subscription
  sub = nil
  synchronize do
    sub = @sub_map[subject]
  end
  # Check if sub is no longer valid
  return unless sub

  # Store in msg for backlink
  msg.sub = sub

  cb = nil
  ack_subject = nil
  using_manual_acks = nil
  sub.synchronize do
    cb = sub.cb
    ack_subject = sub.ack_inbox
    using_manual_acks = sub.options[:manual_acks]
  end

  # Perform the callback if sub still subscribed
  cb.call(msg) if cb

  # Process auto-ack if not done manually
  nats.publish(ack_subject, msg_ack.to_proto) if not using_manual_acks
end