class BPS::Publisher::NATS

Constants

CLIENT_OPTS
FLUSH_TIMEOUT

Public Class Methods

coercer() click to toggle source

@return [BPS::Coercer] the options coercer.

# File lib/bps/publisher/nats.rb, line 48
def self.coercer
  @coercer ||= BPS::Coercer.new(CLIENT_OPTS).freeze
end
new(**opts) click to toggle source

@param [Hash] options.

Calls superclass method
# File lib/bps/publisher/nats.rb, line 53
def initialize(**opts)
  super()

  # handle TLS if CA file is provided:
  if !opts[:tls] && opts[:tls_ca_file]
    ctx = OpenSSL::SSL::SSLContext.new
    ctx.set_params
    ctx.ca_file = opts.delete(:tls_ca_file)
    opts[:tls] = ctx
  end

  @topics = {}
  @client = ::NATS::IO::Client.new
  @client.connect(**opts.slice(*CLIENT_OPTS.keys))
end
parse_url(url) click to toggle source
# File lib/bps/publisher/nats.rb, line 36
def self.parse_url(url)
  port = url.port&.to_s || '4222'
  servers = CGI.unescape(url.host).split(',').map do |host|
    addr = "nats://#{host}"
    addr << ':' << port unless /:\d+$/.match?(addr)
    addr
  end
  opts = CGI.parse(url.query || '').transform_values {|v| v.size == 1 ? v[0] : v }
  opts.merge(servers: servers)
end

Public Instance Methods

close() click to toggle source
# File lib/bps/publisher/nats.rb, line 73
def close
  @client.close
end
topic(name) click to toggle source
# File lib/bps/publisher/nats.rb, line 69
def topic(name)
  @topics[name] ||= self.class::Topic.new(@client, name)
end