class Vines::Stream

The base class for various XMPP streams (c2s, s2s, component, http), containing behavior common to all streams like rate limiting, stanza parsing, and stream error handling.

Constants

ERROR
PAD
STREAM

Attributes

config[R]
domain[R]
id[R]
state[R]
user[RW]

Public Class Methods

new(config) click to toggle source
# File lib/vines/stream.rb, line 17
def initialize(config)
  @config = config
end

Public Instance Methods

advance(state) click to toggle source

Advance the stream's state machine to the new state. XML nodes received by the stream will be passed to this state's `node` method.

state - The Stream::State to process the stanzas next.

Returns the new Stream::State.

# File lib/vines/stream.rb, line 176
def advance(state)
  @state = state
end
available_resources(*jid) click to toggle source
# File lib/vines/stream.rb, line 116
def available_resources(*jid)
  router.available_resources(*jid, user.jid)
end
cert_domain_matches?(domain) click to toggle source
# File lib/vines/stream.rb, line 135
def cert_domain_matches?(domain)
  @store.domain?(get_peer_cert, domain)
end
close_connection(after_writing=false) click to toggle source

Advance the state machine into the `Closed` state so any remaining queued nodes are not processed while we're waiting for EM to actually close the connection.

Returns nothing.

Calls superclass method
# File lib/vines/stream.rb, line 57
def close_connection(after_writing=false)
  super
  @closed = true
  advance(Client::Closed.new(self))
end
connected_resources(jid) click to toggle source
# File lib/vines/stream.rb, line 112
def connected_resources(jid)
  router.connected_resources(jid, user.jid)
end
create_parser() click to toggle source

Initialize a new XML parser for this connection. This is called when the stream is first connected as well as for stream restarts during negotiation. Subclasses can override this method to provide a different type of parser (e.g. HTTP).

Returns nothing.

# File lib/vines/stream.rb, line 44
def create_parser
  @parser = Parser.new.tap do |parser|
    parser.stream_open {|node| @nodes.push(node) }
    parser.stream_close { close_connection }
    parser.stanza {|node| @nodes.push(node) }
  end
end
encrypt() click to toggle source
# File lib/vines/stream.rb, line 152
def encrypt
  cert, key = @store.files_for_domain(domain)
  start_tls(cert_chain_file: cert, private_key_file: key, verify_peer: true)
end
encrypt?() click to toggle source

Returns true if the TLS certificate and private key files for this domain exist and can be used to encrypt this stream.

# File lib/vines/stream.rb, line 159
def encrypt?
  !@store.files_for_domain(domain).nil?
end
error(e) click to toggle source

Stream level errors close the stream while stanza and SASL errors are written to the client and leave the stream open. All exceptions should pass through this method for consistent handling.

e - The StandardError, usually XmppError, that occurred.

Returns nothing.

# File lib/vines/stream.rb, line 187
def error(e)
  case e
  when SaslError, StanzaError
    write(e.to_xml)
  when StreamError
    send_stream_error(e)
    close_stream
  else
    log.error(e)
    send_stream_error(StreamErrors::InternalServerError.new)
    close_stream
  end
end
interested_resources(*jid) click to toggle source
# File lib/vines/stream.rb, line 120
def interested_resources(*jid)
  router.interested_resources(*jid, user.jid)
end
post_init() click to toggle source

Initialize the stream after its connection to the server has completed. EventMachine calls this method when an incoming connection is accepted into the event loop.

Returns nothing.

# File lib/vines/stream.rb, line 26
def post_init
  @remote_addr, @local_addr = addresses
  @user, @closed, @stanza_size = nil, false, 0
  @bucket = TokenBucket.new(100, 10)
  @store = Store.new(@config.certs)
  @nodes = EM::Queue.new
  process_node_queue
  create_parser
  log.info { "%s %21s -> %s" %
    ['Stream connected:'.ljust(PAD), @remote_addr, @local_addr] }
end
receive_data(data) click to toggle source

Read bytes off the stream and feed them into the XML parser. EventMachine is responsible for calling this method on its event loop as connections become readable.

data - The byte String sent to the server from the client, hopefully XML.

Returns nothing.

# File lib/vines/stream.rb, line 70
def receive_data(data)
  return if @closed
  @stanza_size += data.bytesize
  if @stanza_size < max_stanza_size
    @parser << data rescue error(StreamErrors::NotWellFormed.new)
  else
    error(StreamErrors::PolicyViolation.new('max stanza size reached'))
  end
end
reset() click to toggle source

Reset the connection's XML parser when a new <stream:stream> header is received.

Returns nothing.

# File lib/vines/stream.rb, line 84
def reset
  create_parser
end
router() click to toggle source
# File lib/vines/stream.rb, line 201
def router
  @config.router
end
ssl_verify_peer(pem) click to toggle source
# File lib/vines/stream.rb, line 124
def ssl_verify_peer(pem)
  # Skip verifying if user accept self-signed certificates
  return true if self.vhost.accept_self_signed?
  # EM is supposed to close the connection when this returns false,
  # but it only does that for inbound connections, not when we
  # make a connection to another server.
  @store.trusted?(pem).tap do |trusted|
    close_connection unless trusted
  end
end
storage(domain=nil) click to toggle source

Returns the storage system for the domain. If no domain is given, the stream's storage mechanism is returned.

# File lib/vines/stream.rb, line 90
def storage(domain=nil)
  @config.storage(domain || self.domain)
end
unbind() click to toggle source
# File lib/vines/stream.rb, line 163
def unbind
  router.delete(self)
  log.info { "%s %21s -> %s" %
    ['Stream disconnected:'.ljust(PAD), @remote_addr, @local_addr] }
  log.info { "Streams connected: #{router.size}" }
end
update_user_streams(user) click to toggle source

Reload the user's information into their active connections. Call this after storage.save_user() to sync the new user state with their other connections.

user - The User whose connection info needs refreshing.

Returns nothing.

# File lib/vines/stream.rb, line 106
def update_user_streams(user)
  connected_resources(user.jid.bare).each do |stream|
    stream.user.update_from(user)
  end
end
vhost() click to toggle source

Returns the Config::Host virtual host for the stream's domain.

# File lib/vines/stream.rb, line 95
def vhost
  @config.vhost(domain)
end
write(data) click to toggle source

Send the data over the wire to this client.

data - The XML String or XML::Node to write to the socket.

Returns nothing.

# File lib/vines/stream.rb, line 144
def write(data)
  log_node(data, :out)
  if data.respond_to?(:to_xml)
    data = data.to_xml(:indent => 0)
  end
  send_data(data)
end

Private Instance Methods

addresses() click to toggle source

Determine the remote and local socket addresses used by this connection.

Returns a two-element Array of String addresses.

# File lib/vines/stream.rb, line 210
def addresses
  [get_peername, get_sockname].map do |addr|
    addr ? Socket.unpack_sockaddr_in(addr)[0, 2].reverse.join(':') : 'unknown'
  end
end
close_stream() click to toggle source

Write a closing stream tag and close the connection. Subclasses can override this method for custom close behavior.

Returns nothing.

# File lib/vines/stream.rb, line 233
def close_stream
  write('</stream:stream>')
  close_connection_after_writing
end
enforce_rate_limit() click to toggle source
# File lib/vines/stream.rb, line 276
def enforce_rate_limit
  unless @bucket.take(1)
    raise StreamErrors::PolicyViolation.new('rate limit exceeded')
  end
end
error?(node) click to toggle source
# File lib/vines/stream.rb, line 238
def error?(node)
  ns = node.namespace ? node.namespace.href : nil
  node.name == ERROR && ns == NAMESPACES[:stream]
end
log_node(node, direction) click to toggle source
# File lib/vines/stream.rb, line 282
def log_node(node, direction)
  return unless log.debug?
  from, to = @remote_addr, @local_addr
  from, to = to, from if direction == :out
  label = (direction == :out) ? 'Sent' : 'Received'
  log.debug("%s %21s -> %s\n%s\n" %
    ["#{label} stanza:".ljust(PAD), from, to, node])
end
process_node(node) click to toggle source
# File lib/vines/stream.rb, line 262
def process_node(node)
  log_node(node, :in)
  @stanza_size = 0
  enforce_rate_limit
  if error?(node)
    close_stream
  else
    update_stream_id(node)
    state.node(node)
  end
rescue => e
  error(e)
end
process_node_queue() click to toggle source

Schedule a queue pop on the EM thread to handle the next element. This guarantees all stanzas received on this stream are processed in order.

http://tools.ietf.org/html/rfc6120#section-10.1

Once a node is processed, this method recursively schedules itself to pop the next node and so on. A single call to this method effectively begins an asynchronous node processing loop.

Returns nothing.

# File lib/vines/stream.rb, line 253
def process_node_queue
  @nodes.pop do |node|
    Fiber.new do
      process_node(node)
      process_node_queue
    end.resume unless @closed
  end
end
send_stream_error(e) click to toggle source

Write the StreamError's xml to the stream. Subclasses can override this method with custom error writing behavior.

A call to `close_stream` should follow this method. Stream level errors are fatal to the connection.

e - The StreamError that caused the connection to close.

Returns nothing.

# File lib/vines/stream.rb, line 225
def send_stream_error(e)
  write(e.to_xml)
end
update_stream_id(id_or_node) click to toggle source
# File lib/vines/stream.rb, line 301
def update_stream_id(id_or_node)
  if id_or_node.is_a? String
    @id = id_or_node.freeze
  elsif Node.stream?(id_or_node) # move stream? method somewhere else?
    @id = id_or_node['id'].freeze
  end
end
valid_address?(jid) click to toggle source

Determine if this is a valid domain-only JID that can be used in stream initiation stanza headers.

jid - The String or JID to verify (e.g. 'wonderland.lit').

Return true if the jid is domain-only.

# File lib/vines/stream.rb, line 297
def valid_address?(jid)
  JID.new(jid).domain? rescue false
end