class Stomp::Connection

Low level connection which maps commands and supports synchronous receives

Attributes

connection_frame[R]
disconnect_receipt[R]

Public Class Methods

default_port(ssl) click to toggle source

alias :obj_send :send

# File vendor/gems/stomp/connection.rb, line 17
def self.default_port(ssl)
  ssl ? 61612 : 61613
end
new(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, reconnect_delay = 5, connect_headers = {}) click to toggle source

A new Connection object accepts the following parameters:

login             (String,  default : '')
passcode          (String,  default : '')
host              (String,  default : 'localhost')
port              (Integer, default : 61613)
reliable          (Boolean, default : false)
reconnect_delay   (Integer, default : 5)

e.g. c = Connection.new("username", "password", "localhost", 61613, true)

Hash:

hash = {
  :hosts => [
    {:login => "login1", :passcode => "passcode1", :host => "localhost", :port => 61616, :ssl => false},
    {:login => "login2", :passcode => "passcode2", :host => "remotehost", :port => 61617, :ssl => false}
  ],
  :initial_reconnect_delay => 0.01,
  :max_reconnect_delay => 30.0,
  :use_exponential_back_off => true,
  :back_off_multiplier => 2,
  :max_reconnect_attempts => 0,
  :randomize => false,
  :backup => false,
  :timeout => -1,
  :connect_headers => {},
  :parse_timeout => 5,
  :logger => nil,
}

e.g. c = Connection.new(hash)

TODO Stomp URL :

A Stomp URL must begin with 'stomp://' and can be in one of the following forms:

stomp://host:port
stomp://host.domain.tld:port
stomp://user:pass@host:port
stomp://user:pass@host.domain.tld:port
# File vendor/gems/stomp/connection.rb, line 63
def initialize(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, reconnect_delay = 5, connect_headers = {})
  @received_messages = []

  if login.is_a?(Hash)
    hashed_initialize(login)
  else
    @host = host
    @port = port
    @login = login
    @passcode = passcode
    @reliable = reliable
    @reconnect_delay = reconnect_delay
    @connect_headers = connect_headers
    @ssl = false
    @parse_timeout = 5    # To override, use hashed parameters
    @logger = nil         # To override, use hashed parameters
  end

  # Use Mutexes:  only one lock per each thread
  # Revert to original implementation attempt
  @transmit_semaphore = Mutex.new
  @read_semaphore = Mutex.new
  @socket_semaphore = Mutex.new
  @subscriptions = {}
  @failure = nil
  @connection_attempts = 0

  socket
end

Public Instance Methods

__old_receive() click to toggle source

Receive a frame, repeatedly retries until the frame is received

# File vendor/gems/stomp/connection.rb, line 359
def __old_receive
  # The receive may fail so we may need to retry.
  while TRUE
    begin
      
      used_socket = socket
      return _receive(used_socket)                  
    rescue => e
      @failure = $!
      raise unless @reliable
      @logger.error "receive failed: #{e}\n #{e.backtrace.join("\n")}"
      nil
    end
  end
end
abort(name, headers = {}) click to toggle source

Abort a transaction by name

# File vendor/gems/stomp/connection.rb, line 260
def abort(name, headers = {})
  headers[:transaction] = name
  transmit("ABORT", headers)
end
ack(message_id, headers = {}) click to toggle source

Acknowledge a message, used when a subscription has specified client acknowledgement ( connection.subscribe “/queue/a”, :ack => 'client'g

Accepts a transaction header ( :transaction => 'some_transaction_id' )

# File vendor/gems/stomp/connection.rb, line 226
def ack(message_id, headers = {})
  headers['message-id'] = message_id
  transmit("ACK", headers)
end
begin(name, headers = {}) click to toggle source

Begin a transaction, requires a name for the transaction

# File vendor/gems/stomp/connection.rb, line 217
def begin(name, headers = {})
  headers[:transaction] = name
  transmit("BEGIN", headers)
end
change_host() click to toggle source
# File vendor/gems/stomp/connection.rb, line 180
def change_host
  @parameters[:hosts] = @parameters[:hosts].sort_by { rand } if @parameters[:randomize]

  # Set first as master and send it to the end of array
  current_host = @parameters[:hosts].shift
  @parameters[:hosts] << current_host

  @ssl = current_host[:ssl]
  @host = current_host[:host]
  @port = current_host[:port] || Connection::default_port(@ssl)
  @login = current_host[:login] || ""
  @passcode = current_host[:passcode] || ""

end
client_ack?(message) click to toggle source
# File vendor/gems/stomp/connection.rb, line 335
def client_ack?(message)
  headers = @subscriptions[message.headers[:destination]]
  !headers.nil? && headers[:ack] == "client"
end
closed?() click to toggle source

Is this connection closed?

# File vendor/gems/stomp/connection.rb, line 212
def closed?
  @closed
end
commit(name, headers = {}) click to toggle source

Commit a transaction by name

# File vendor/gems/stomp/connection.rb, line 254
def commit(name, headers = {})
  headers[:transaction] = name
  transmit("COMMIT", headers)
end
credit(message_id, headers = {}) click to toggle source

Credit a message, used when a subscription has specified

# File vendor/gems/stomp/connection.rb, line 248
def credit(message_id, headers = {})
  headers['message-id'] = message_id
  transmit("CREDIT", headers)
end
disconnect(headers = {}) click to toggle source

Close this connection

# File vendor/gems/stomp/connection.rb, line 341
def disconnect(headers = {})
  transmit("DISCONNECT", headers)
  headers = headers.symbolize_keys
  @disconnect_receipt = receive if headers[:receipt]
  @logger.info("Connection disconnected for : #{log_params}")
  close_socket
end
hashed_initialize(params) click to toggle source
# File vendor/gems/stomp/connection.rb, line 93
def hashed_initialize(params)
  @parameters = refine_params(params)
  @reliable = true
  @reconnect_delay = @parameters[:initial_reconnect_delay]
  @connect_headers = @parameters[:connect_headers]
  @parse_timeout =  @parameters[:parse_timeout]
  @logger =  Logger.new( @parameters[:logger].instance_variable_get( "@logdev" ).filename )
  @logger.formatter = @parameters[:logger].formatter
  #sets the first host to connect
  change_host
  if @logger && @logger.respond_to?(:on_connecting)
    @logger.on_connecting(log_params)
  end
end
increase_reconnect_delay() click to toggle source
# File vendor/gems/stomp/connection.rb, line 199
def increase_reconnect_delay

  @reconnect_delay *= @parameters[:back_off_multiplier] if @parameters[:use_exponential_back_off]
  @reconnect_delay = @parameters[:max_reconnect_delay] if @reconnect_delay > @parameters[:max_reconnect_delay]
  @reconnect_delay
end
keepalive(headers = {}) click to toggle source

Sends a keepalive frame to the server so that stale connection won't be reclaimed.

Accepts a transaction header ( :transaction => 'some_transaction_id' )

# File vendor/gems/stomp/connection.rb, line 243
def keepalive(headers = {})      
  transmit("KEEPALIVE", headers)
end
max_reconnect_attempts?() click to toggle source
# File vendor/gems/stomp/connection.rb, line 195
def max_reconnect_attempts?
  !(@parameters.nil? || @parameters[:max_reconnect_attempts].nil?) && @parameters[:max_reconnect_attempts] != 0 && @connection_attempts >= @parameters[:max_reconnect_attempts]
end
nack(message_id, headers = {}) click to toggle source

Nacknowledge a message, used when a subscription has specified client acknowledgement ( connection.subscribe “/queue/a”, :ack => 'client'g

Accepts a transaction header ( :transaction => 'some_transaction_id' )

# File vendor/gems/stomp/connection.rb, line 235
def nack(message_id, headers = {})
  headers['message-id'] = message_id
  transmit("NACK", headers)
end
obj_send(*args) click to toggle source
# File vendor/gems/stomp/connection.rb, line 296
def obj_send(*args)
  __send__(*args)
end
open?() click to toggle source

Is this connection open?

# File vendor/gems/stomp/connection.rb, line 207
def open?
  !@closed
end
poll() click to toggle source

Return a pending message if one is available, otherwise return nil

# File vendor/gems/stomp/connection.rb, line 351
def poll
  # No need for a read lock here.  The receive method eventually fulfills
  # that requirement.
  return nil if @socket.nil? || !@socket.ready?
  receive
end
publish(destination, message, headers = {}) click to toggle source

Publish message to destination

To disable content length header ( :suppress_content_length => true ) Accepts a transaction header ( :transaction => 'some_transaction_id' )

# File vendor/gems/stomp/connection.rb, line 291
def publish(destination, message, headers = {})
  headers[:destination] = destination
  transmit("SEND", headers, message)      
end
receive() click to toggle source
# File vendor/gems/stomp/connection.rb, line 375
def receive
  super_result = __old_receive
  if super_result.nil? && @reliable && !closed?
    errstr = "connection.receive returning EOF as nil - clearing socket, resetting connection.\n" 
    @logger.info(errstr)
    @socket = nil
    super_result = __old_receive
  end
  return super_result
end
refine_params(params) click to toggle source
# File vendor/gems/stomp/connection.rb, line 158
def refine_params(params)
  params = params.uncamelize_and_symbolize_keys

  default_params = {
    :connect_headers => {},
    # Failover parameters
    :initial_reconnect_delay => 0.01,
    :max_reconnect_delay => 30.0,
    :use_exponential_back_off => true,
    :back_off_multiplier => 2,
    :max_reconnect_attempts => 2,
    :randomize => false,
    :backup => false,
    :timeout => -1,
    # Parse Timeout
    :parse_timeout => 5
  }

  default_params.merge(params)

end
socket() click to toggle source
# File vendor/gems/stomp/connection.rb, line 108
def socket
  
  @socket_semaphore.synchronize do
    used_socket = @socket
    used_socket = nil if closed?
    while used_socket.nil? || !@failure.nil?
      @failure = nil
      begin
        
        @logger.info("Opening socket : #{log_params}")
        used_socket = open_socket
        # Open complete
        
        @logger.info("Opened socket : #{log_params}")
        connect(used_socket)

        @connection_attempts = 0
      rescue
        @failure = $!
        used_socket = nil
        raise unless @reliable

        @logger.error "connect to #{@host} failed: #{$!} will retry(##{@connection_attempts}) in #{@reconnect_delay}\n"

        if max_reconnect_attempts?
          @reconnect_delay = 0.01
          @connection_attempts = 0
          raise Stomp::Error::MaxReconnectAttempts
        end

        sleep(@reconnect_delay)

        @connection_attempts += 1

        if @parameters
          change_host
          increase_reconnect_delay
        end

        unless @failure
          @reconnect_delay = 0
        end

      end
    end

    @socket = used_socket
  end
end
subscribe(name, headers = {}, subId = nil) click to toggle source

Subscribe to a destination, must specify a name

# File vendor/gems/stomp/connection.rb, line 266
def subscribe(name, headers = {}, subId = nil)
  headers[:destination] = name
  transmit("SUBSCRIBE", headers)

  # Store the sub so that we can replay if we reconnect.
  if @reliable
    subId = name if subId.nil?
    @subscriptions[subId] = headers
  end
end
unreceive(message, options = {}) click to toggle source

Send a message back to the source or to the dead letter queue

Accepts a dead letter queue option ( :dead_letter_queue => “/queue/DLQ” ) Accepts a limit number of redeliveries option ( :max_redeliveries => 6 ) Accepts a force client acknowledgement option (:force_client_ack => true)

# File vendor/gems/stomp/connection.rb, line 305
def unreceive(message, options = {})
  options = { :dead_letter_queue => "/queue/DLQ", :max_redeliveries => 6 }.merge options
  # Lets make sure all keys are symbols
  message.headers = message.headers.symbolize_keys

  retry_count = message.headers[:retry_count].to_i || 0
  message.headers[:retry_count] = retry_count + 1
  transaction_id = "transaction-#{message.headers[:'message-id']}-#{retry_count}"
  message_id = message.headers.delete(:'message-id')

  begin
    self.begin transaction_id

    if client_ack?(message) || options[:force_client_ack]
      self.ack(message_id, :transaction => transaction_id)
    end

    if retry_count <= options[:max_redeliveries]
      self.publish(message.headers[:destination], message.body, message.headers.merge(:transaction => transaction_id))
    else
      # Poison ack, sending the message to the DLQ
      self.publish(options[:dead_letter_queue], message.body, message.headers.merge(:transaction => transaction_id, :original_destination => message.headers[:destination], :persistent => true))
    end
    self.commit transaction_id
  rescue Exception => exception
    self.abort transaction_id
    raise exception
  end
end
unsubscribe(name, headers = {}, subId = nil) click to toggle source

Unsubscribe from a destination, must specify a name

# File vendor/gems/stomp/connection.rb, line 278
def unsubscribe(name, headers = {}, subId = nil)
  headers[:destination] = name
  transmit("UNSUBSCRIBE", headers)
  if @reliable
    subId = name if subId.nil?
    @subscriptions.delete(subId)
  end
end

Private Instance Methods

_receive( read_socket ) click to toggle source
# File vendor/gems/stomp/connection.rb, line 388
def _receive( read_socket )

  while TRUE
    begin
      @read_semaphore.synchronize do    
      # Throw away leading newlines, which are actually trailing
      # newlines from the preceding message.
        Timeout::timeout(@parse_timeout, Stomp::Error::PacketReceiptTimeout) do
          begin
            last_char = read_socket.getc
           return nil if last_char.nil?
          end until parse_char(last_char) != "\n"
          read_socket.ungetc(last_char)
        end 
    
        # If the reading hangs for more than X seconds, abort the parsing process.
        # X defaults to 5.  Override allowed in connection hash parameters.
        Timeout::timeout(@parse_timeout, Stomp::Error::PacketParsingTimeout) do
      
          # Reads the beginning of the message until it runs into a empty line
          message_header = ''
          line = ''
          begin
            message_header << line
            line = read_socket.gets
            return nil if line.nil?
          end until line =~ /^\s?\n$/

          # Checks if it includes content_length header
          content_length = message_header.match /content-length\s?:\s?(\d+)\s?\n/
          message_body = ''

          # If it does, reads the specified amount of bytes
          char = ''
          if content_length
            message_body = read_socket.read content_length[1].to_i
            raise Stomp::Error::InvalidMessageLength unless parse_char(read_socket.getc) == "\0"
          # Else reads, the rest of the message until the first \0
          else
            message_body << char while (char = parse_char(read_socket.getc)) != "\0"
          end

          # Adds the excluded \n and \0 and tries to create a new message with it
          return Message.new(message_header + "\n" + message_body + "\0")
        end
      end
    rescue Stomp::Error::PacketReceiptTimeout =>pe
      @logger.debug("Packet not received. Continuing")
      sleep 0.01
      next
    rescue Exception => e
      @logger.info(e.inspect)
    end
    break
  end          
end
_transmit(used_socket, command, headers = {}, body = '') click to toggle source
# File vendor/gems/stomp/connection.rb, line 478
def _transmit(used_socket, command, headers = {}, body = '')
  @transmit_semaphore.synchronize do
    # Handle nil body
    body = '' if body.nil?
    # The content-length should be expressed in bytes.
    # Ruby 1.8: String#length => # of bytes; Ruby 1.9: String#length => # of characters
    # With Unicode strings, # of bytes != # of characters.  So, use String#bytesize when available.
    body_length_bytes = body.respond_to?(:bytesize) ? body.bytesize : body.length

    # ActiveMQ interprets every message as a BinaryMessage
    # if content_length header is included.
    # Using :suppress_content_length => true will suppress this behaviour
    # and ActiveMQ will interpret the message as a TextMessage.
    # For more information refer to http://juretta.com/log/2009/05/24/activemq-jms-stomp/
    # Lets send this header in the message, so it can maintain state when using unreceive
    headers['content-length'] = "#{body_length_bytes}" unless headers[:suppress_content_length]
    used_socket.puts command
    headers.each {|k,v| used_socket.puts "#{k}:#{v}" }
    used_socket.puts "content-type: text/plain; charset=UTF-8"
    used_socket.puts
    used_socket.write body
    used_socket.write "\0"
  end
end
close_socket() click to toggle source
# File vendor/gems/stomp/connection.rb, line 534
def close_socket
  begin
    # Need to set @closed = true before closing the socket
    # within the @read_semaphore thread
    @read_semaphore.synchronize do
      @closed = true
      @socket.close
    end
  rescue
    #Ignoring if already closed
  end
  @closed
end
connect(used_socket) click to toggle source
# File vendor/gems/stomp/connection.rb, line 560
def connect(used_socket)
  headers = @connect_headers.clone
  headers[:login] = @login
  headers[:passcode] = @passcode
  _transmit(used_socket, "CONNECT", headers)
  
  @connection_frame = _receive(used_socket)
  @disconnect_receipt = nil
  # replay any subscriptions.
  @subscriptions.each { |k,v| _transmit(used_socket, "SUBSCRIBE", v) }
end
log_params() click to toggle source
# File vendor/gems/stomp/connection.rb, line 572
def log_params
  #lparms = @parameters.clone
  lparms = Hash.new
  lparms[:cur_host] = @host
  lparms[:cur_port] = @port
  lparms[:cur_recondelay] = @reconnect_delay
  lparms[:cur_conattempts] = @connection_attempts
  lparms
end
open_socket() click to toggle source
# File vendor/gems/stomp/connection.rb, line 548
def open_socket
  used_socket = @ssl ? open_ssl_socket : open_tcp_socket
  # try to close the old connection if any
  close_socket

  @closed = false
  # Use keepalive
  used_socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, true)
  used_socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
  used_socket
end
open_ssl_socket() click to toggle source
# File vendor/gems/stomp/connection.rb, line 509
def open_ssl_socket
  require 'openssl' unless defined?(OpenSSL)
  ctx = OpenSSL::SSL::SSLContext.new

  # For client certificate authentication:
  # key_path = ENV["STOMP_KEY_PATH"] || "~/stomp_keys"
  # ctx.cert = OpenSSL::X509::Certificate.new("#{key_path}/client.cer")
  # ctx.key = OpenSSL::PKey::RSA.new("#{key_path}/client.keystore")

  # For server certificate authentication:
  # truststores = OpenSSL::X509::Store.new
  # truststores.add_file("#{key_path}/client.ts")
  # ctx.verify_mode = OpenSSL::SSL::VERIFY_PEER
  # ctx.cert_store = truststores

  ctx.verify_mode = OpenSSL::SSL::VERIFY_NONE

  ssl = OpenSSL::SSL::SSLSocket.new(open_tcp_socket, ctx)
  def ssl.ready?
    ! @rbuffer.empty? || @io.ready?
  end
  ssl.connect
  ssl
end
open_tcp_socket() click to toggle source
# File vendor/gems/stomp/connection.rb, line 503
def open_tcp_socket
  tcp_socket = TCPSocket.open @host, @port

  tcp_socket
end
parse_char(char) click to toggle source
# File vendor/gems/stomp/connection.rb, line 445
def parse_char(char)
  RUBY_VERSION > '1.9' ? char : char.chr
end
transmit(command, headers = {}, body = '') click to toggle source
# File vendor/gems/stomp/connection.rb, line 449
def transmit(command, headers = {}, body = '')

  # The transmit may fail so we may need to retry.
  # But we should have some decent limit on retries
  max_transmit_attempts = 5
  transmit_attempts = 0

  while TRUE
    begin           
      used_socket = socket
      _transmit(used_socket, command, headers, body)
      return
    rescue Stomp::Error::MaxReconnectAttempts => e
      errstr = "transmit to #{@host} failed after max connection retry attempts: #{$!}\n Stack Trace: #{caller}"
      @logger.error(errstr)
      @failure = $!
      raise
    rescue => e
      @failure = $!
      if transmit_attempts >= max_transmit_attempts
        errstr = "transmit to #{@host} failed after max transmit retry attempts: #{$!}\n Stack Trace: #{caller}"
        @logger.error(errstr)
        raise Stomp::Error::MaxReconnectAttempts
      end
      transmit_attempts = transmit_attempts + 1
    end
  end
end