class Vines::Stream
The base class for various XMPP streams (c2s, s2s, component, http), containing behavior common to all streams like rate limiting, stanza parsing, and stream error handling.
Constants
- ERROR
- PAD
- STREAM
Attributes
Public Class Methods
# File lib/vines/stream.rb, line 17 def initialize(config) @config = config end
Public Instance Methods
Advance the stream's state machine to the new state. XML nodes received by the stream will be passed to this state's `node` method.
state - The Stream::State
to process the stanzas next.
Returns the new Stream::State
.
# File lib/vines/stream.rb, line 176 def advance(state) @state = state end
# File lib/vines/stream.rb, line 116 def available_resources(*jid) router.available_resources(*jid, user.jid) end
# File lib/vines/stream.rb, line 135 def cert_domain_matches?(domain) @store.domain?(get_peer_cert, domain) end
Advance the state machine into the `Closed` state so any remaining queued nodes are not processed while we're waiting for EM to actually close the connection.
Returns nothing.
# File lib/vines/stream.rb, line 57 def close_connection(after_writing=false) super @closed = true advance(Client::Closed.new(self)) end
# File lib/vines/stream.rb, line 112 def connected_resources(jid) router.connected_resources(jid, user.jid) end
Initialize a new XML parser for this connection. This is called when the stream is first connected as well as for stream restarts during negotiation. Subclasses can override this method to provide a different type of parser (e.g. HTTP).
Returns nothing.
# File lib/vines/stream.rb, line 44 def create_parser @parser = Parser.new.tap do |parser| parser.stream_open {|node| @nodes.push(node) } parser.stream_close { close_connection } parser.stanza {|node| @nodes.push(node) } end end
# File lib/vines/stream.rb, line 152 def encrypt cert, key = @store.files_for_domain(domain) start_tls(cert_chain_file: cert, private_key_file: key, verify_peer: true) end
Returns true if the TLS certificate and private key files for this domain exist and can be used to encrypt this stream.
# File lib/vines/stream.rb, line 159 def encrypt? !@store.files_for_domain(domain).nil? end
Stream
level errors close the stream while stanza and SASL
errors are written to the client and leave the stream open. All exceptions should pass through this method for consistent handling.
e - The StandardError, usually XmppError
, that occurred.
Returns nothing.
# File lib/vines/stream.rb, line 187 def error(e) case e when SaslError, StanzaError write(e.to_xml) when StreamError send_stream_error(e) close_stream else log.error(e) send_stream_error(StreamErrors::InternalServerError.new) close_stream end end
# File lib/vines/stream.rb, line 120 def interested_resources(*jid) router.interested_resources(*jid, user.jid) end
Initialize the stream after its connection to the server has completed. EventMachine calls this method when an incoming connection is accepted into the event loop.
Returns nothing.
# File lib/vines/stream.rb, line 26 def post_init @remote_addr, @local_addr = addresses @user, @closed, @stanza_size = nil, false, 0 @bucket = TokenBucket.new(100, 10) @store = Store.new(@config.certs) @nodes = EM::Queue.new process_node_queue create_parser log.info { "%s %21s -> %s" % ['Stream connected:'.ljust(PAD), @remote_addr, @local_addr] } end
Read bytes off the stream and feed them into the XML parser. EventMachine is responsible for calling this method on its event loop as connections become readable.
data - The byte String sent to the server from the client, hopefully XML.
Returns nothing.
# File lib/vines/stream.rb, line 70 def receive_data(data) return if @closed @stanza_size += data.bytesize if @stanza_size < max_stanza_size @parser << data rescue error(StreamErrors::NotWellFormed.new) else error(StreamErrors::PolicyViolation.new('max stanza size reached')) end end
Reset the connection's XML parser when a new <stream:stream> header is received.
Returns nothing.
# File lib/vines/stream.rb, line 84 def reset create_parser end
# File lib/vines/stream.rb, line 201 def router @config.router end
# File lib/vines/stream.rb, line 124 def ssl_verify_peer(pem) # Skip verifying if user accept self-signed certificates return true if self.vhost.accept_self_signed? # EM is supposed to close the connection when this returns false, # but it only does that for inbound connections, not when we # make a connection to another server. @store.trusted?(pem).tap do |trusted| close_connection unless trusted end end
Returns the storage system for the domain. If no domain is given, the stream's storage mechanism is returned.
# File lib/vines/stream.rb, line 90 def storage(domain=nil) @config.storage(domain || self.domain) end
# File lib/vines/stream.rb, line 163 def unbind router.delete(self) log.info { "%s %21s -> %s" % ['Stream disconnected:'.ljust(PAD), @remote_addr, @local_addr] } log.info { "Streams connected: #{router.size}" } end
Reload the user's information into their active connections. Call this after storage.save_user() to sync the new user state with their other connections.
user - The User
whose connection info needs refreshing.
Returns nothing.
# File lib/vines/stream.rb, line 106 def update_user_streams(user) connected_resources(user.jid.bare).each do |stream| stream.user.update_from(user) end end
Returns the Config::Host
virtual host for the stream's domain.
# File lib/vines/stream.rb, line 95 def vhost @config.vhost(domain) end
Send the data over the wire to this client.
data - The XML String or XML::Node to write to the socket.
Returns nothing.
# File lib/vines/stream.rb, line 144 def write(data) log_node(data, :out) if data.respond_to?(:to_xml) data = data.to_xml(:indent => 0) end send_data(data) end
Private Instance Methods
Determine the remote and local socket addresses used by this connection.
Returns a two-element Array of String addresses.
# File lib/vines/stream.rb, line 210 def addresses [get_peername, get_sockname].map do |addr| addr ? Socket.unpack_sockaddr_in(addr)[0, 2].reverse.join(':') : 'unknown' end end
Write a closing stream tag and close the connection. Subclasses can override this method for custom close behavior.
Returns nothing.
# File lib/vines/stream.rb, line 233 def close_stream write('</stream:stream>') close_connection_after_writing end
# File lib/vines/stream.rb, line 276 def enforce_rate_limit unless @bucket.take(1) raise StreamErrors::PolicyViolation.new('rate limit exceeded') end end
# File lib/vines/stream.rb, line 238 def error?(node) ns = node.namespace ? node.namespace.href : nil node.name == ERROR && ns == NAMESPACES[:stream] end
# File lib/vines/stream.rb, line 282 def log_node(node, direction) return unless log.debug? from, to = @remote_addr, @local_addr from, to = to, from if direction == :out label = (direction == :out) ? 'Sent' : 'Received' log.debug("%s %21s -> %s\n%s\n" % ["#{label} stanza:".ljust(PAD), from, to, node]) end
# File lib/vines/stream.rb, line 262 def process_node(node) log_node(node, :in) @stanza_size = 0 enforce_rate_limit if error?(node) close_stream else update_stream_id(node) state.node(node) end rescue => e error(e) end
Schedule a queue pop on the EM thread to handle the next element. This guarantees all stanzas received on this stream are processed in order.
http://tools.ietf.org/html/rfc6120#section-10.1
Once a node is processed, this method recursively schedules itself to pop the next node and so on. A single call to this method effectively begins an asynchronous node processing loop.
Returns nothing.
# File lib/vines/stream.rb, line 253 def process_node_queue @nodes.pop do |node| Fiber.new do process_node(node) process_node_queue end.resume unless @closed end end
Write the StreamError's xml to the stream. Subclasses can override this method with custom error writing behavior.
A call to `close_stream` should follow this method. Stream
level errors are fatal to the connection.
e - The StreamError
that caused the connection to close.
Returns nothing.
# File lib/vines/stream.rb, line 225 def send_stream_error(e) write(e.to_xml) end
# File lib/vines/stream.rb, line 301 def update_stream_id(id_or_node) if id_or_node.is_a? String @id = id_or_node.freeze elsif Node.stream?(id_or_node) # move stream? method somewhere else? @id = id_or_node['id'].freeze end end