class Webtube
Constants
- Frame
Note that [[body]] holds the /raw/ data; that is, if
- [masked?]
-
is true, it will need to be unmasked to get
the payload. Call [[payload]] in order to abstract this away.
- OPCODE_BINARY
- OPCODE_CLOSE
- OPCODE_CONTINUATION
Not all the possible 16 values are defined by the standard.
- OPCODE_PING
- OPCODE_PONG
- OPCODE_TEXT
- VERSION
Attributes
The following three slots are not used by the [[Webtube]] infrastructrue. They have been defined purely so that application code could easily associate data it finds significant to [[Webtube]] instances.
- [accept_webtube]
-
saves the request object here
Public Class Methods
Attempts to set up a [[WebSocket]] connection to the given
- [url]]. Returns the [[Webtube]
-
instance if successful or
raise an appropriate [[Webtube::WebSocketUpgradeFailed]].
# File lib/webtube.rb, line 358 def self::connect url, allow_rsv_bits: 0, allow_opcodes: [Webtube::OPCODE_TEXT], http_header: {}, ssl_verify_mode: OpenSSL::SSL::VERIFY_PEER, ssl_cert_store: nil, # or an [[OpenSSL::X509::Store]] instance tcp_connect_timeout: nil, # or number of seconds tcp_nodelay: true, close_socket: true, on_http_request: nil, on_http_response: nil, on_ssl_handshake: nil, on_tcp_connect: nil loc = Webtube::Location.new url socket = if tcp_connect_timeout.nil? then TCPSocket.new loc.host, loc.port else Timeout.timeout tcp_connect_timeout, Net::OpenTimeout do TCPSocket.new loc.host, loc.port end end if tcp_nodelay then socket.setsockopt Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1 end on_tcp_connect.call socket if on_tcp_connect if loc.ssl? then # construct an SSL context if ssl_cert_store.nil? then ssl_cert_store = OpenSSL::X509::Store.new ssl_cert_store.set_default_paths end ssl_context = OpenSSL::SSL::SSLContext.new ssl_context.cert_store = ssl_cert_store ssl_context.verify_mode = ssl_verify_mode # wrap the socket socket = OpenSSL::SSL::SSLSocket.new socket, ssl_context socket.sync_close = true socket.hostname = loc.host # Server Name Indication socket.connect # perform SSL handshake socket.post_connection_check loc.host on_ssl_handshake.call socket if on_ssl_handshake end socket = Net::BufferedIO.new socket # transmit the request req = Webtube::Request.new loc, http_header composed_request = req.to_s socket.write composed_request on_http_request.call composed_request if on_http_request # wait for response response = Net::HTTPResponse.read_new socket if on_http_response then # reconstitute the response as a string # # (XXX: this loses some diagnostically useful bits, but # [[Net::HTTPResponse::read_new]] just doesn't preserve # the pristine original) s = "#{response.code} #{response.message}\r\n" response.each_header do |k, v| s << "#{k}: #{v}\r\n" end s << "\r\n" on_http_response.call s end # Check that the server is seeing us now # FIXME: ensure that the socket will be closed in case of # exception d = rejection response, req.expected_accept raise Webtube::WebSocketDeclined.new(d) \ if d # Can the server speak our protocol version? unless (response['Sec-WebSocket-Version'] || '13'). strip.split(/\s*,\s*/).include? '13' then raise Webtube::WebSocketVersionMismatch.new( "Sec-WebSocket-Version negotiation failed") end # The connection has been set up. Now we can instantiate # [[Webtube]]. wt = Webtube.new socket, false, allow_rsv_bits: allow_rsv_bits, allow_opcodes: allow_opcodes, close_socket: close_socket wt.instance_variable_set :@url, loc.to_s return wt end
# File lib/webtube.rb, line 37 def initialize socket, serverp, # If true, we will expect incoming data masked and # will not mask outgoing data. If false, we will # expect incoming data unmasked and will mask outgoing # data. allow_rsv_bits: 0, allow_opcodes: [Webtube::OPCODE_TEXT], close_socket: true super() @socket = socket @serverp = serverp @allow_rsv_bits = allow_rsv_bits @allow_opcodes = allow_opcodes @close_socket = close_socket @defrag_buffer = [] @alive = true @send_mutex = Mutex.new # Guards message sending, so that fragmented messages # won't get interleaved, and the [[@alive]] flag. @run_mutex = Mutex.new # Guards the main read loop. @receiving_frame = false # Are we currently receiving a frame for the # [[Webtube#run]] main loop? @reception_interrupt_mutex = Mutex.new # guards [[@receiving_frame]] return end
Checks whether the given [[Net::HTTPResponse]] represents a valid WebSocket upgrade acceptance. Returns [[nil]] if so, or a human-readable string explaining the issue if not.
- [expected_accept]
-
is the value [[Sec-WebSocket-Accept]] is
expected to hold, generated from the [[Sec-WebSocket-Key]] header field.
# File lib/webtube.rb, line 460 def self::rejection response, expected_accept unless response.code == '101' then return "the HTTP response code was not 101" end unless (response['Connection'] || '').downcase == 'upgrade' then return "the HTTP response did not say " + "'Connection: upgrade'" end unless (response['Upgrade'] || '').downcase == 'websocket' then return "the HTTP response did not say " + "'Upgrade: websocket'" end unless (response['Sec-WebSocket-Accept'] || '') == expected_accept then return "the HTTP response did not say " + "'Sec-WebSocket-Accept: #{expected_accept}'" end return nil end
Public Instance Methods
Closes the connection, thus preventing further transmission.
If [[status_code]] is supplied, it will be passed to the other side in the [[OPCODE_CLOSE]] frame. The default is 1000 which indicates normal closure. Sending a status code can be explicitly suppressed by passing [[nil]] instead of an integer; then, an empty close frame will be sent. Due to the way a close frame's payload is structured, this will also suppress delivery of [[close_explanation]], even if non-empty.
Note that RFC 6455 requires the explanation to be encoded in UTF-8. Accordingly, this method will re-encode it unless it is already in UTF-8.
# File lib/webtube.rb, line 312 def close status_code = 1000, explanation = "" # prepare the payload for the close frame payload = "" if status_code then payload = [status_code].pack('n') if explanation then payload << explanation.encode(Encoding::UTF_8) end end # let the other side know we're closing send_message payload, OPCODE_CLOSE # break the main reception loop @send_mutex.synchronize do @alive = false end # if waiting for a frame (or parsing one), interrupt it @reception_interrupt_mutex.synchronize do @thread.raise AbortReceiveLoop.new if @receiving_frame end @socket.close if @close_socket return end
The application may want to store many Webtube
instances in a hash or a set. In order to facilitate this, we'll need
- [hash]
-
and [[eql?]]. The latter is already adequately –
comparing by identity – implemented by [[Object]]; in order to ensure the former hashes by identity, we'll override it.
# File lib/webtube.rb, line 628 def hash return object_id end
# File lib/webtube.rb, line 335 def inspect s = "#<Webtube@0x%0x" % object_id s << " " << (@server ? 'from' : 'to') unless @url.nil? then s << " " << @url else # [[@socket]] is a [[Net::BufferedIO]] instance, so # [[@socket.io]] is either a plain socket or an SSL # wrapper af, port, hostname = @socket.io.peeraddr s << " %s:%i" % [hostname, port] end s << " @allow_rsv_bits=%s" % @allow_rsv_bits.inspect \ unless @allow_rsv_bits.nil? s << " @allow_opcodes=%s" % @allow_opcodes.inspect \ unless @allow_opcodes.nil? s << ">" return s end
Run a loop to read all the messages and control frames coming in via this WebSocket, and hand events to the given [[listener]]. The listener can implement the following methods:
-
onopen(webtube) will be called as soon as the channel is set up.
-
onmessage(webtube, message_body, opcode) will be called with each arriving data message once it has been defragmented. The data will be passed to it as a
- [String]], encoded in [[UTF-8]
-
for [[OPCODE_TEXT]]
messages and in [[ASCII-8BIT]] for all the other message opcodes.
-
oncontrolframe(webtube, frame) will be called upon receipt of a control frame whose opcode is listed in the
- [allow_opcodes]
-
parameter of this [[Webtube]] instance.
The frame is represented by an instance of
- [Webtube::Frame]]. Note that [[Webtube]
-
handles
connection closures ([[OPCODE_CLOSE]]) and ponging all the pings ([[OPCODE_PING]]) automatically.
-
onping(webtube, frame) will be called upon receipt of an
- [OPCODE_PING]
-
frame. [[Webtube]] will take care of
ponging all the pings, but the listener may want to process such an event for statistical information.
-
onpong(webtube, frame) will be called upon receipt of an
- [OPCODE_PONG]
-
frame.
-
onclose(webtube) will be called upon closure of the connection, for any reason.
-
onannoyedclose(webtube, frame) will be called upon receipt of an [[OPCODE_CLOSE]] frame with an explicit status code other than 1000. This typically indicates that the other side is annoyed, so the listener may want to log the condition for debugging or further analysis. Normally, once the handler returns, [[Webtube]] will respond with a close frame of the same status code and close the connection, but the handler may call [[Webtube#close]] to request a closure with a different status code or without one.
-
onexception(webtube, exception) will be called if an unhandled exception is raised during the [[Webtube]]'s lifecycle, including all of the listener event handlers. It may log the exception but should return normally so that the [[Webtube]] can issue a proper close frame for the other end and invoke the [[onclose]] handler, after which the exception will be raised again so the caller of
- [Webtube#run]
-
will have a chance to handle it.
Before calling any of the handlers, [[respond_to?]] will be used to check implementedness.
If an exception occurs during processing, it (that is, the
- [Exception]
-
instance) may implement a specific status code
to be passed to the other end via the [[OPCODE_CLOSE]] frame by implementing the [[websocket_close_status_code]] method returning the code as an integer. The default code, used if the exception does not specify one, is 1011 'unexpected condition'. An exception may explicitly suppress sending any code by having [[websocket_close_status_code]] return
- [nil]
-
instead of an integer.
# File lib/webtube.rb, line 134 def run listener @run_mutex.synchronize do @thread = Thread.current begin listener.onopen self if listener.respond_to? :onopen while @send_mutex.synchronize{@alive} do begin @reception_interrupt_mutex.synchronize do @receiving_frame = true end frame = Webtube::Frame.read_from_socket @socket ensure @reception_interrupt_mutex.synchronize do @receiving_frame = false end end unless (frame.rsv & ~@allow_rsv_bits) == 0 then raise Webtube::UnknownReservedBit.new(frame: frame) end if @serverp then unless frame.masked? raise Webtube::UnmaskedFrameToServer.new( frame: frame) end else unless !frame.masked? then raise Webtube::MaskedFrameToClient.new( frame: frame) end end if !frame.control_frame? then # data frame if frame.opcode != Webtube::OPCODE_CONTINUATION then # initial frame unless @allow_opcodes.include? frame.opcode then raise Webtube::UnknownOpcode.new(frame: frame) end unless @defrag_buffer.empty? then raise Webtube::MissingContinuationFrame.new end else # continuation frame if @defrag_buffer.empty? then raise Webtube::UnexpectedContinuationFrame.new( frame: frame) end end @defrag_buffer.push frame if frame.fin? then opcode = @defrag_buffer.first.opcode data = @defrag_buffer.map(&:payload).join '' @defrag_buffer = [] if opcode == Webtube::OPCODE_TEXT then # text messages must be encoded in UTF-8, per # RFC 6455 data.force_encoding Encoding::UTF_8 unless data.valid_encoding? then data.force_encoding Encoding::ASCII_8BIT raise Webtube::BadlyEncodedText.new( data: data) end end listener.onmessage self, data, opcode \ if listener.respond_to? :onmessage end elsif (0x08 .. 0x0F).include? frame.opcode then # control frame unless frame.fin? then raise Webtube::FragmentedControlFrame.new( frame: frame) end case frame.opcode when Webtube::OPCODE_CLOSE then message = frame.payload if message.length >= 2 then status_code, = message.unpack 'n' unless status_code == 1000 then listener.onannoyedclose self, frame \ if listener.respond_to? :onannoyedclose end else status_code = 1000 end close status_code when Webtube::OPCODE_PING then listener.onping self, frame \ if listener.respond_to? :onping send_message frame.payload, Webtube::OPCODE_PONG when Webtube::OPCODE_PONG then listener.onpong self, frame \ if listener.respond_to? :onpong else unless @allow_opcodes.include? frame.opcode then raise Webtube::UnknownOpcode.new(frame: frame) end end listener.oncontrolframe self, frame \ if @allow_opcodes.include?(frame.opcode) and listener.respond_to?(:oncontrolframe) else raise 'assertion failed' end end rescue AbortReceiveLoop # we're out of the loop now, so nothing further to do rescue Exception => e status_code = if e.respond_to? :websocket_close_status_code then e.websocket_close_status_code else 1011 # 'unexpected condition' end listener.onexception self, e \ if listener.respond_to? :onexception begin close status_code rescue Errno::EPIPE, Errno::ECONNRESET, Errno::ENOTCONN # ignore, we have a bigger exception to handle end raise e ensure @thread = nil listener.onclose self \ if listener.respond_to? :onclose end end return end
Send a given message payload to the other party, using the given opcode. By default, the [[opcode]] is [[Webtube::OPCODE_TEXT]]. Re-encodes the payload if given in a non-UTF-8 encoding and [[opcode == Webtube::OPCODE_TEXT]].
# File lib/webtube.rb, line 268 def send_message message, opcode = Webtube::OPCODE_TEXT if opcode == Webtube::OPCODE_TEXT and message.encoding != Encoding::UTF_8 then message = message.encode Encoding::UTF_8 end @send_mutex.synchronize do raise 'WebSocket connection no longer live' unless @alive # In order to ensure that the local kernel will treat our # (data) frames atomically during the [[write]] syscall, # we'll want to ensure that the frame size does not exceed # 512 bytes -- the minimum permitted size for # [[PIPE_BUF]]. At this frame size, the header size is up # to four bytes for unmasked or eight bytes for masked # frames. # # (FIXME: in retrospect, that seems like an unpractical # consideration. We should probably use path MTU # instead.) Webtube::Frame.each_frame_for_message( message: message, opcode: opcode, masked: !@serverp, max_frame_body_size: 512 - (!@serverp ? 8 : 4)) do |frame| @socket.write frame.header + frame.body end end return end