class Stomp::Client

Typical Stomp client class. Uses a listener thread to receive frames from the server, any thread can send.

Receives all happen in one thread, so consider not doing much processing in that thread if you have much message volume.

Attributes

parameters[R]

Parameters hash

Public Class Methods

new(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, autoflush = false) click to toggle source

A new Client object can be initialized using three forms:

Hash (this is the recommended Client initialization method):

hash = {
  :hosts => [
    {:login => "login1", :passcode => "passcode1", :host => "localhost", :port => 61616, :ssl => false},
    {:login => "login2", :passcode => "passcode2", :host => "remotehost", :port => 61617, :ssl => false}
  ],
  # These are the default parameters and do not need to be set
  :reliable => true,                  # reliable (use failover)
  :initial_reconnect_delay => 0.01,   # initial delay before reconnect (secs)
  :max_reconnect_delay => 30.0,       # max delay before reconnect
  :use_exponential_back_off => true,  # increase delay between reconnect attpempts
  :back_off_multiplier => 2,          # next delay multiplier
  :max_reconnect_attempts => 0,       # retry forever, use # for maximum attempts
  :randomize => false,                # do not radomize hosts hash before reconnect
  :connect_timeout => 0,              # Timeout for TCP/TLS connects, use # for max seconds
  :connect_headers => {},             # user supplied CONNECT headers (req'd for Stomp 1.1+)
  :parse_timeout => 5,                # IO::select wait time on socket reads
  :logger => nil,                     # user suplied callback logger instance
  :dmh => false,                      # do not support multihomed IPV4 / IPV6 hosts during failover
  :closed_check => true,              # check first if closed in each protocol method
  :hbser => false,                    # raise on heartbeat send exception
  :stompconn => false,                # Use STOMP instead of CONNECT
  :usecrlf => false,                  # Use CRLF command and header line ends (1.2+)
  :max_hbread_fails => 0,             # Max HB read fails before retry.  0 => never retry
  :max_hbrlck_fails => 0,             # Max HB read lock obtain fails before retry.  0 => never retry
  :fast_hbs_adjust => 0.0,            # Fast heartbeat senders sleep adjustment, seconds, needed ...
                                      # For fast heartbeat senders.  'fast' == YMMV.  If not
                                      # correct for your environment, expect unnecessary fail overs
  :connread_timeout => 0,             # Timeout during CONNECT for read of CONNECTED/ERROR, secs
  :tcp_nodelay => true,               # Turns on the TCP_NODELAY socket option; disables Nagle's algorithm
  :start_timeout => 0,                # Timeout around Stomp::Client initialization
  :sslctx_newparm => nil,             # Param for SSLContext.new
  :ssl_post_conn_check => true,       # Further verify broker identity
  :nto_cmd_read => true,              # No timeout on COMMAND read
}

e.g. c = Stomp::Client.new(hash)

Positional parameters:

login     (String,  default : '')
passcode  (String,  default : '')
host      (String,  default : 'localhost')
port      (Integer, default : 61613)
reliable  (Boolean, default : false)

e.g. c = Stomp::Client.new('login', 'passcode', 'localhost', 61613, true)

Stomp URL :

A Stomp URL must begin with 'stomp://' and can be in one of the following forms:

stomp://host:port
stomp://host.domain.tld:port
stomp://login:passcode@host:port
stomp://login:passcode@host.domain.tld:port

e.g. c = Stomp::Client.new(urlstring)
# File lib/stomp/client.rb, line 83
def initialize(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, autoflush = false)
  parse_hash_params(login) ||
    parse_stomp_url(login) ||
    parse_failover_url(login) ||
    parse_positional_params(login, passcode, host, port, reliable)

  @logger = @parameters[:logger] ||= Stomp::NullLogger.new
  @start_timeout = @parameters[:start_timeout] || 0
  @parameters[:client_main] = Thread::current
  ## p [ "CLINDBG", @parameters[:client_main] ]
  check_arguments!()

  # p [ "cldbg01", @parameters ]

  begin
    Timeout::timeout(@start_timeout) {
      create_error_handler
      create_connection(autoflush)
      start_listeners()
    }
  rescue Timeout::Error
    # p [ "cldbg02" ]
    ex = Stomp::Error::StartTimeoutException.new(@start_timeout)
    raise ex
  end
end
open(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false) click to toggle source

open is syntactic sugar for 'Client.new', see 'initialize' for usage.

# File lib/stomp/client.rb, line 139
def self.open(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false)
  Client.new(login, passcode, host, port, reliable)
end

Public Instance Methods

abort(name, headers = {}) click to toggle source

Abort aborts work in a transaction by name.

# File lib/stomp/client.rb, line 155
def abort(name, headers = {})
  @connection.abort(name, headers)

  # replay any ack'd messages in this transaction
  replay_list = @replay_messages_by_txn[name]
  if replay_list
    replay_list.each do |message|
      find_listener(message) # find_listener also calls the listener
    end
  end
end
ack(message, headers = {}) { |r| ... } click to toggle source

Acknowledge a message, used when a subscription has specified client acknowledgement ( connection.subscribe(“/queue/a”,{:ack => 'client'}). Accepts a transaction header ( :transaction => 'some_transaction_id' ).

# File lib/stomp/client.rb, line 204
def ack(message, headers = {})
  txn_id = headers[:transaction]
  if txn_id
    # lets keep around messages ack'd in this transaction in case we rollback
    replay_list = @replay_messages_by_txn[txn_id]
    if replay_list.nil?
      replay_list = []
      @replay_messages_by_txn[txn_id] = replay_list
    end
    replay_list << message
  end
  if block_given?
    headers = headers.merge(:receipt => register_receipt_listener(lambda {|r| yield r}))
  end
  context = ack_context_for(message, headers)
  @connection.ack context[:message_id], context[:headers]
end
Also aliased as: acknowledge
ack_context_for(message, headers) click to toggle source
# File lib/stomp/client.rb, line 232
def ack_context_for(message, headers)
  id = case protocol
    when Stomp::SPL_12
     'ack'
    when Stomp::SPL_11
     headers = headers.merge(:subscription => message.headers['subscription'])
     'message-id'
    else
     'message-id'
  end
  {:message_id => message.headers[id], :headers => headers}
end
acknowledge(message, headers = {})

For posterity, we alias:

Alias for: ack
autoflush() click to toggle source

autoflush returns the current connection's autoflush setting.

# File lib/stomp/client.rb, line 358
def autoflush()
  @connection.autoflush()
end
autoflush=(af) click to toggle source

autoflush= sets the current connection's autoflush setting.

# File lib/stomp/client.rb, line 353
def autoflush=(af)
  @connection.autoflush = af
end
begin(name, headers = {}) click to toggle source

Begin starts work in a a transaction by name.

# File lib/stomp/client.rb, line 150
def begin(name, headers = {})
  @connection.begin(name, headers)
end
close(headers={}) click to toggle source

close frees resources in use by this client. The listener thread is terminated, and disconnect on the connection is called.

# File lib/stomp/client.rb, line 290
def close(headers={})
  @listener_thread.exit
  @connection.disconnect(headers)
end
closed?() click to toggle source

close? tests if this client connection is closed.

# File lib/stomp/client.rb, line 279
def closed?()
  @connection.closed?()
end
commit(name, headers = {}) click to toggle source

Commit commits work in a transaction by name.

# File lib/stomp/client.rb, line 168
def commit(name, headers = {})
  txn_id = headers[:transaction]
  @replay_messages_by_txn.delete(txn_id)
  @connection.commit(name, headers)
end
connection_frame() click to toggle source

Return the broker's CONNECTED frame to the client. Misnamed.

# File lib/stomp/client.rb, line 264
def connection_frame()
  @connection.connection_frame
end
create_error_handler() click to toggle source
# File lib/stomp/client.rb, line 110
def create_error_handler
  client_thread = Thread.current
  if client_thread.respond_to?(:report_on_exception=)
    client_thread.report_on_exception=false
  end

  @error_listener = lambda do |error|
    exception = case error.body
                  when /ResourceAllocationException/i
                    Stomp::Error::ProducerFlowControlException.new(error)
                  when /ProtocolException/i
                    Stomp::Error::ProtocolException.new(error)
                  else
                    Stomp::Error::BrokerException.new(error)
                end

    @receipt_listeners.delete(error.headers['receipt-id']) if error.headers['receipt-id']
    client_thread.raise exception
  end
end
disconnect_receipt() click to toggle source

Return any RECEIPT frame received by DISCONNECT.

# File lib/stomp/client.rb, line 269
def disconnect_receipt()
  @connection.disconnect_receipt
end
hbrecv_count() click to toggle source

hbrecv_count returns the current connection's heartbeat receive count.

# File lib/stomp/client.rb, line 342
def hbrecv_count()
  @connection.hbrecv_count()
end
hbrecv_interval() click to toggle source

hbrecv_interval returns the connection's heartbeat receive interval.

# File lib/stomp/client.rb, line 332
def hbrecv_interval()
  @connection.hbrecv_interval()
end
hbsend_count() click to toggle source

hbsend_count returns the current connection's heartbeat send count.

# File lib/stomp/client.rb, line 337
def hbsend_count()
  @connection.hbsend_count()
end
hbsend_interval() click to toggle source

hbsend_interval returns the connection's heartbeat send interval.

# File lib/stomp/client.rb, line 327
def hbsend_interval()
  @connection.hbsend_interval()
end
join(limit = nil) click to toggle source

join the listener thread for this client, generally used to wait for a quit signal.

# File lib/stomp/client.rb, line 145
def join(limit = nil)
  @listener_thread.join(limit)
end
jruby?() click to toggle source

jruby? tests if the connection has detcted a JRuby environment

# File lib/stomp/client.rb, line 284
def jruby?()
  @connection.jruby
end
nack(message, headers = {}) click to toggle source

Stomp 1.1+ NACK.

# File lib/stomp/client.rb, line 226
def nack(message, headers = {})
  context = ack_context_for(message, headers)
  @connection.nack context[:message_id], context[:headers]
end
open?() click to toggle source

open? tests if this client connection is open.

# File lib/stomp/client.rb, line 274
def open?
  @connection.open?()
end
poll() click to toggle source

Poll for asynchronous messages issued by broker. Return nil of no message available, else the message

# File lib/stomp/client.rb, line 348
def poll()
  @connection.poll()
end
protocol() click to toggle source

protocol returns the current client's protocol level.

# File lib/stomp/client.rb, line 307
def protocol()
  @connection.protocol()
end
publish(destination, message, headers = {}) { |r| ... } click to toggle source

Publishes message to destination. If a block is given a receipt will be requested and passed to the block on receipt. Accepts a transaction header ( :transaction => 'some_transaction_id' ).

# File lib/stomp/client.rb, line 254
def publish(destination, message, headers = {})
  headers = headers.symbolize_keys
  raise Stomp::Error::DestinationRequired unless destination
  if block_given?
    headers = headers.merge(:receipt => register_receipt_listener(lambda {|r| yield r}))
  end
  @connection.publish(destination, message, headers)
end
running() click to toggle source

running checks if the thread was created and is not dead.

# File lib/stomp/client.rb, line 296
def running()
  @listener_thread && !!@listener_thread.status
end
set_logger(logger) click to toggle source

set_logger identifies a new callback logger.

# File lib/stomp/client.rb, line 301
def set_logger(logger)
  @logger = logger
  @connection.set_logger(logger)
end
sha1(data) click to toggle source

sha1 returns a SHA1 sum of a given string.

# File lib/stomp/client.rb, line 317
def sha1(data)
  @connection.sha1(data)
end
subscribe(destination, headers = {}) { |msg| ... } click to toggle source

Subscribe to a destination, must be passed a block which will be used as a callback listener. Accepts a transaction header ( :transaction => 'some_transaction_id' ).

# File lib/stomp/client.rb, line 177
def subscribe(destination, headers = {})
  raise Stomp::Error::NoListenerGiven unless block_given?
  headers = headers.symbolize_keys
  raise Stomp::Error::DestinationRequired unless destination
  # use subscription id to correlate messages to subscription. As described in
  # the SUBSCRIPTION section of the protocol: http://stomp.github.com/.
  # If no subscription id is provided, generate one.
  headers = headers.merge(:id => build_subscription_id(destination, headers))
  if @listeners[headers[:id]]
    raise Stomp::Error::DuplicateSubscription
  end
  @listeners[headers[:id]] = lambda {|msg| yield msg}
  @connection.subscribe(destination, headers)
end
unreceive(message, options = {}) click to toggle source

Unreceive a message, sending it back to its queue or to the DLQ.

# File lib/stomp/client.rb, line 246
def unreceive(message, options = {})
  @connection.unreceive(message, options)
end
unsubscribe(destination, headers = {}) click to toggle source

Unsubscribe from a subscription by name.

# File lib/stomp/client.rb, line 193
def unsubscribe(destination, headers = {})
  headers = headers.symbolize_keys
  raise Stomp::Error::DestinationRequired unless destination
  headers = headers.merge(:id => build_subscription_id(destination, headers))
  @connection.unsubscribe(destination, headers)
  @listeners[headers[:id]] = nil
end
uuid() click to toggle source

uuid returns a type 4 UUID.

# File lib/stomp/client.rb, line 322
def uuid()
  @connection.uuid()
end
valid_utf8?(s) click to toggle source

valid_utf8? validates any given string for UTF8 compliance.

# File lib/stomp/client.rb, line 312
def valid_utf8?(s)
  @connection.valid_utf8?(s)
end

Private Instance Methods

build_subscription_id(destination, headers) click to toggle source
# File lib/client/utils.rb, line 80
def build_subscription_id(destination, headers)
  return headers[:id] until headers[:id].nil?
  return headers['id'] until headers['id'].nil?
  # p [ "DBBSID1", destination, headers ]
  Digest::SHA1.hexdigest(destination)
end
check_arguments!() click to toggle source

A sanity check of required arguments.

# File lib/client/utils.rb, line 106
def check_arguments!()
  raise ArgumentError.new("missing :hosts parameter") unless @parameters[:hosts]
  raise ArgumentError.new("invalid :hosts type") unless @parameters[:hosts].is_a?(Array)
  @parameters[:hosts].each do |hv|
    # Validate port requested
    raise ArgumentError.new("empty :port value in #{hv.inspect}") if hv[:port] == ''
    unless hv[:port].nil?
      tpv = hv[:port].to_i
      raise ArgumentError.new("invalid :port value=#{tpv} from #{hv.inspect}") if tpv < 1 || tpv > 65535
    end
    # Validate host requested (no validation here.  if nil or '', localhost will
    # be used in #Connection.)
  end
  raise ArgumentError unless @parameters[:reliable].is_a?(TrueClass) || @parameters[:reliable].is_a?(FalseClass)
  #
  if @parameters[:reliable] && @start_timeout > 0
    warn "WARN detected :reliable == true and :start_timeout > 0"
    warn "WARN this may cause incorrect fail-over behavior"
    warn "WARN use :start_timeout => 0 to correct fail-over behavior"
  end
end
create_connection(autoflush) click to toggle source
# File lib/stomp/client.rb, line 131
def create_connection(autoflush)
  # p [ "ccon01", @parameters ]
  @connection = Connection.new(@parameters)
  @connection.autoflush = autoflush
end
create_listener_maps() click to toggle source
# File lib/client/utils.rb, line 171
def create_listener_maps
  @listeners = {}
  @receipt_listeners = {}
  @replay_messages_by_txn = {}

  @listener_map = Hash.new do |message|
    @failure = nil
    unless @connection.slog(:on_miscerr, @connection.log_params, "Received unknown frame type: '#{message.command}'\n")
      warn "Received unknown frame type: '#{message.command}'\n"
    end
  end

  @listener_map[Stomp::CMD_MESSAGE] = lambda {|message| find_listener(message) }
  @listener_map[Stomp::CMD_RECEIPT] = lambda {|message| find_receipt_listener(message) }
  @listener_map[Stomp::CMD_ERROR]   = @error_listener
end
filter_options(options) click to toggle source

filter_options returns a new Hash of filtered options.

# File lib/client/utils.rb, line 129
def filter_options(options)
  new_options = {}
  new_options[:initial_reconnect_delay] = (options["initialReconnectDelay"] || 10).to_f / 1000 # In ms
  new_options[:max_reconnect_delay] = (options["maxReconnectDelay"] || 30000 ).to_f / 1000 # In ms
  new_options[:use_exponential_back_off] = !(options["useExponentialBackOff"] == "false") # Default: true
  new_options[:back_off_multiplier] = (options["backOffMultiplier"] || 2 ).to_i
  new_options[:max_reconnect_attempts] = (options["maxReconnectAttempts"] || 0 ).to_i
  new_options[:randomize] = options["randomize"] == "true" # Default: false
  new_options[:connect_timeout] = 0

  new_options
end
find_listener(message) click to toggle source

find_listener returns the listener for a given subscription in a given message.

# File lib/client/utils.rb, line 143
def find_listener(message)
  subscription_id = message.headers['subscription']
  if subscription_id == nil
    # For backward compatibility, some messages may already exist with no
    # subscription id, in which case we can attempt to synthesize one.
    set_subscription_id_if_missing(message.headers['destination'], message.headers)
    subscription_id = message.headers[:id]
  end

  listener = @listeners[subscription_id]
  listener.call(message) if listener
end
find_receipt_listener(message) click to toggle source
# File lib/client/utils.rb, line 163
def find_receipt_listener(message)
  listener = @receipt_listeners[message.headers['receipt-id']]
  if listener
     listener.call(message)
     @receipt_listeners.delete(message.headers['receipt-id'])
  end
end
parse_failover_url(login) click to toggle source

e.g. failover://(stomp://login1:passcode1@localhost:61616,stomp://login2:passcode2@remotehost:61617)?option1=param

# File lib/client/utils.rb, line 40
def parse_failover_url(login)
  rval = nil
  original_verbose, $VERBOSE = $VERBOSE, nil # shut off warnings
  md = FAILOVER_REGEX.match(login)
  $VERBOSE = original_verbose
  if md
    finhosts = parse_hosts(login)

    options = {}
    if md_last = md[-1]
      parts = md_last.split(/&|=/)
      raise Stomp::Error::MalformedFailoverOptionsError unless ( parts.size % 2 ) == 0
      options = Hash[*parts]
    end

    @parameters = {:hosts => finhosts}.merge!(filter_options(options))

    @parameters[:reliable] = true
    rval = true
  end
  rval
end
parse_hash_params(params) click to toggle source
# File lib/client/utils.rb, line 11
def parse_hash_params(params)
  return false unless params.is_a?(Hash)

  @parameters = params
  # Do not override user choice of false.
  @parameters[:reliable] = true unless @parameters[:reliable] == false

  true
end
parse_hosts(url) click to toggle source

Parse a stomp URL.

# File lib/client/utils.rb, line 88
def parse_hosts(url)
  hosts = []
  original_verbose, $VERBOSE = $VERBOSE, nil # shut off warnings
  host_match = /stomp(\+ssl)?:\/\/#{URL_REPAT}/
  url.scan(host_match).each do |match|
    host = {}
    host[:ssl] = match[0] == "+ssl" ? true : false
    host[:login] =  match[3] || ""
    host[:passcode] = match[4] || ""
    host[:host] = match[5]
    host[:port] = match[6].to_i
    hosts << host
  end
  $VERBOSE = original_verbose
  hosts
end
parse_positional_params(login, passcode, host, port, reliable) click to toggle source
# File lib/client/utils.rb, line 63
def parse_positional_params(login, passcode, host, port, reliable)
  @parameters = { :reliable => reliable,
                  :hosts => [ { :login => login,
                                :passcode => passcode,
                                :host => host,
                                :port => port.to_i } ] }
  true
end
parse_stomp_url(login) click to toggle source
# File lib/client/utils.rb, line 21
def parse_stomp_url(login)
  original_verbose, $VERBOSE = $VERBOSE, nil # shut off warnings
  regexp = /^stomp:\/\/#{URL_REPAT}/
  url = regexp.match(login)
  $VERBOSE = original_verbose
  return false unless url
  @login =  url[3] || ""
  @passcode = url[4] || ""
  @host = url[5]
  @port = url[6].to_i
  @parameters = { :reliable => false,
                  :hosts => [ { :login => @login,
                                :passcode => @passcode,
                                :host => @host,
                                :port => @port} ] }
  true
end
register_receipt_listener(listener) click to toggle source

Register a receipt listener.

# File lib/client/utils.rb, line 157
def register_receipt_listener(listener)
  id = uuid
  @receipt_listeners[id] = listener
  id
end
set_subscription_id_if_missing(destination, headers) click to toggle source

Set a subscription id in the headers hash if one does not already exist. For simplicities sake, all subscriptions have a subscription ID. setting an id in the SUBSCRIPTION header is described in the stomp protocol docs: stomp.github.com/

# File lib/client/utils.rb, line 76
def set_subscription_id_if_missing(destination, headers)
  headers[:id] = build_subscription_id(destination, headers)
end
start_listeners() click to toggle source

Start a single listener thread. Misnamed I think.

# File lib/client/utils.rb, line 189
def start_listeners()
  create_listener_maps

  @listener_thread = Thread.start do
    loop do
      message = @connection.receive
      # AMQ specific behavior
      if message.nil? && (!@parameters[:reliable])
        raise Stomp::Error::NilMessageError
      end

      next unless message # message can be nil on rapid AMQ stop/start sequences

      @listener_map[message.command].call(message)
    end

  end
end