class Webtube

Constants

Frame

Note that [[body]] holds the /raw/ data; that is, if

[masked?]

is true, it will need to be unmasked to get

the payload. Call [[payload]] in order to abstract this away.

OPCODE_BINARY
OPCODE_CLOSE
OPCODE_CONTINUATION

Not all the possible 16 values are defined by the standard.

OPCODE_PING
OPCODE_PONG
OPCODE_TEXT
VERSION

Attributes

allow_opcodes[RW]
allow_rsv_bits[RW]
context[RW]
header[RW]

The following three slots are not used by the [[Webtube]] infrastructrue. They have been defined purely so that application code could easily associate data it finds significant to [[Webtube]] instances.

session[RW]
[accept_webtube]

saves the request object here

url[R]

Public Class Methods

connect(url, allow_rsv_bits: 0, allow_opcodes: [Webtube::OPCODE_TEXT], http_header: {}) click to toggle source

Attempts to set up a [[WebSocket]] connection to the given

[url]]. Returns the [[Webtube]

instance if successful or

raise an appropriate [[Webtube::WebSocketUpgradeFailed]].

# File lib/webtube.rb, line 358
def self::connect url,
    allow_rsv_bits: 0,
    allow_opcodes: [Webtube::OPCODE_TEXT],
    http_header: {},
    ssl_verify_mode: OpenSSL::SSL::VERIFY_PEER,
    ssl_cert_store: nil,
        # or an [[OpenSSL::X509::Store]] instance
    tcp_connect_timeout: nil, # or number of seconds
    tcp_nodelay: true,
    close_socket: true,
    on_http_request: nil,
    on_http_response: nil,
    on_ssl_handshake: nil,
    on_tcp_connect: nil
  loc = Webtube::Location.new url

  socket = if tcp_connect_timeout.nil? then
    TCPSocket.new loc.host, loc.port
  else
    Timeout.timeout tcp_connect_timeout, Net::OpenTimeout do
      TCPSocket.new loc.host, loc.port
    end
  end
  if tcp_nodelay then
    socket.setsockopt Socket::IPPROTO_TCP,
        Socket::TCP_NODELAY, 1
  end
  on_tcp_connect.call socket if on_tcp_connect

  if loc.ssl? then
    # construct an SSL context
    if ssl_cert_store.nil? then
      ssl_cert_store = OpenSSL::X509::Store.new
      ssl_cert_store.set_default_paths
    end
    ssl_context = OpenSSL::SSL::SSLContext.new
    ssl_context.cert_store = ssl_cert_store
    ssl_context.verify_mode = ssl_verify_mode
    # wrap the socket
    socket = OpenSSL::SSL::SSLSocket.new socket, ssl_context
    socket.sync_close = true
    socket.hostname = loc.host # Server Name Indication
    socket.connect # perform SSL handshake
    socket.post_connection_check loc.host
    on_ssl_handshake.call socket if on_ssl_handshake
  end

  socket = Net::BufferedIO.new socket

  # transmit the request
  req = Webtube::Request.new loc, http_header
  composed_request = req.to_s
  socket.write composed_request
  on_http_request.call composed_request if on_http_request

  # wait for response
  response = Net::HTTPResponse.read_new socket

  if on_http_response then
    # reconstitute the response as a string
    #
    # (XXX: this loses some diagnostically useful bits, but
    # [[Net::HTTPResponse::read_new]] just doesn't preserve
    # the pristine original)
    s = "#{response.code} #{response.message}\r\n"
    response.each_header do |k, v|
      s << "#{k}: #{v}\r\n"
    end
    s << "\r\n"
    on_http_response.call s
  end

  # Check that the server is seeing us now
  # FIXME: ensure that the socket will be closed in case of
  # exception
  d = rejection response, req.expected_accept
  raise Webtube::WebSocketDeclined.new(d) \
      if d

  # Can the server speak our protocol version?
  unless (response['Sec-WebSocket-Version'] || '13').
      strip.split(/\s*,\s*/).include? '13' then
    raise Webtube::WebSocketVersionMismatch.new(
        "Sec-WebSocket-Version negotiation failed")
  end

  # The connection has been set up.  Now we can instantiate
  # [[Webtube]].
  wt = Webtube.new socket, false,
      allow_rsv_bits: allow_rsv_bits,
      allow_opcodes: allow_opcodes,
      close_socket: close_socket
  wt.instance_variable_set :@url, loc.to_s
  return wt
end
new(socket, serverp, allow_rsv_bits: 0, allow_opcodes: [Webtube::OPCODE_TEXT], close_socket: true) click to toggle source
Calls superclass method
# File lib/webtube.rb, line 37
def initialize socket,
    serverp,
        # If true, we will expect incoming data masked and
        # will not mask outgoing data.  If false, we will
        # expect incoming data unmasked and will mask outgoing
        # data.
    allow_rsv_bits: 0,
    allow_opcodes: [Webtube::OPCODE_TEXT],
    close_socket: true
  super()
  @socket = socket
  @serverp = serverp
  @allow_rsv_bits = allow_rsv_bits
  @allow_opcodes = allow_opcodes
  @close_socket = close_socket
  @defrag_buffer = []
  @alive = true
  @send_mutex = Mutex.new
      # Guards message sending, so that fragmented messages
      # won't get interleaved, and the [[@alive]] flag.
  @run_mutex = Mutex.new
      # Guards the main read loop.
  @receiving_frame = false
      # Are we currently receiving a frame for the
      # [[Webtube#run]] main loop?
  @reception_interrupt_mutex = Mutex.new
      # guards [[@receiving_frame]]
  return
end
rejection(response, expected_accept) click to toggle source

Checks whether the given [[Net::HTTPResponse]] represents a valid WebSocket upgrade acceptance. Returns [[nil]] if so, or a human-readable string explaining the issue if not.

[expected_accept]

is the value [[Sec-WebSocket-Accept]] is

expected to hold, generated from the [[Sec-WebSocket-Key]] header field.

# File lib/webtube.rb, line 460
def self::rejection response, expected_accept
  unless response.code == '101' then
    return "the HTTP response code was not 101"
  end
  unless (response['Connection'] || '').downcase ==
      'upgrade' then
    return "the HTTP response did not say " +
        "'Connection: upgrade'"
  end
  unless (response['Upgrade'] || '').downcase ==
      'websocket' then
    return "the HTTP response did not say " +
        "'Upgrade: websocket'"
  end
  unless (response['Sec-WebSocket-Accept'] || '') ==
      expected_accept then
    return "the HTTP response did not say " +
        "'Sec-WebSocket-Accept: #{expected_accept}'"
  end
  return nil
end

Public Instance Methods

close(status_code = 1000, explanation = "") click to toggle source

Closes the connection, thus preventing further transmission.

If [[status_code]] is supplied, it will be passed to the other side in the [[OPCODE_CLOSE]] frame. The default is 1000 which indicates normal closure. Sending a status code can be explicitly suppressed by passing [[nil]] instead of an integer; then, an empty close frame will be sent. Due to the way a close frame's payload is structured, this will also suppress delivery of [[close_explanation]], even if non-empty.

Note that RFC 6455 requires the explanation to be encoded in UTF-8. Accordingly, this method will re-encode it unless it is already in UTF-8.

# File lib/webtube.rb, line 312
def close status_code = 1000, explanation = ""
  # prepare the payload for the close frame
  payload = ""
  if status_code then
    payload = [status_code].pack('n')
    if explanation then
      payload << explanation.encode(Encoding::UTF_8)
    end
  end
  # let the other side know we're closing
  send_message payload, OPCODE_CLOSE
  # break the main reception loop
  @send_mutex.synchronize do
    @alive = false
  end
  # if waiting for a frame (or parsing one), interrupt it
  @reception_interrupt_mutex.synchronize do
    @thread.raise AbortReceiveLoop.new if @receiving_frame
  end
  @socket.close if @close_socket
  return
end
hash() click to toggle source

The application may want to store many Webtube instances in a hash or a set. In order to facilitate this, we'll need

[hash]

and [[eql?]]. The latter is already adequately –

comparing by identity – implemented by [[Object]]; in order to ensure the former hashes by identity, we'll override it.

# File lib/webtube.rb, line 628
def hash
  return object_id
end
inspect() click to toggle source
# File lib/webtube.rb, line 335
def inspect
  s = "#<Webtube@0x%0x" % object_id
  s << " " << (@server ? 'from' : 'to')
  unless @url.nil? then
    s << " " << @url
  else
    # [[@socket]] is a [[Net::BufferedIO]] instance, so
    # [[@socket.io]] is either a plain socket or an SSL
    # wrapper
    af, port, hostname = @socket.io.peeraddr
    s << " %s:%i" % [hostname, port]
  end
  s << " @allow_rsv_bits=%s" % @allow_rsv_bits.inspect \
      unless @allow_rsv_bits.nil?
  s << " @allow_opcodes=%s" % @allow_opcodes.inspect \
      unless @allow_opcodes.nil?
  s << ">"
  return s
end
run(listener) click to toggle source

Run a loop to read all the messages and control frames coming in via this WebSocket, and hand events to the given [[listener]]. The listener can implement the following methods:

  • onopen(webtube) will be called as soon as the channel is set up.

  • onmessage(webtube, message_body, opcode) will be called with each arriving data message once it has been defragmented. The data will be passed to it as a

    [String]], encoded in [[UTF-8]

    for [[OPCODE_TEXT]]

    messages and in [[ASCII-8BIT]] for all the other message opcodes.

  • oncontrolframe(webtube, frame) will be called upon receipt of a control frame whose opcode is listed in the

    [allow_opcodes]

    parameter of this [[Webtube]] instance.

    The frame is represented by an instance of

    [Webtube::Frame]]. Note that [[Webtube]

    handles

    connection closures ([[OPCODE_CLOSE]]) and ponging all the pings ([[OPCODE_PING]]) automatically.

  • onping(webtube, frame) will be called upon receipt of an

    [OPCODE_PING]

    frame. [[Webtube]] will take care of

    ponging all the pings, but the listener may want to process such an event for statistical information.

  • onpong(webtube, frame) will be called upon receipt of an

    [OPCODE_PONG]

    frame.

  • onclose(webtube) will be called upon closure of the connection, for any reason.

  • onannoyedclose(webtube, frame) will be called upon receipt of an [[OPCODE_CLOSE]] frame with an explicit status code other than 1000. This typically indicates that the other side is annoyed, so the listener may want to log the condition for debugging or further analysis. Normally, once the handler returns, [[Webtube]] will respond with a close frame of the same status code and close the connection, but the handler may call [[Webtube#close]] to request a closure with a different status code or without one.

  • onexception(webtube, exception) will be called if an unhandled exception is raised during the [[Webtube]]'s lifecycle, including all of the listener event handlers. It may log the exception but should return normally so that the [[Webtube]] can issue a proper close frame for the other end and invoke the [[onclose]] handler, after which the exception will be raised again so the caller of

    [Webtube#run]

    will have a chance to handle it.

Before calling any of the handlers, [[respond_to?]] will be used to check implementedness.

If an exception occurs during processing, it (that is, the

[Exception]

instance) may implement a specific status code

to be passed to the other end via the [[OPCODE_CLOSE]] frame by implementing the [[websocket_close_status_code]] method returning the code as an integer. The default code, used if the exception does not specify one, is 1011 'unexpected condition'. An exception may explicitly suppress sending any code by having [[websocket_close_status_code]] return

[nil]

instead of an integer.

# File lib/webtube.rb, line 134
def run listener
  @run_mutex.synchronize do
    @thread = Thread.current
    begin
      listener.onopen self if listener.respond_to? :onopen
      while @send_mutex.synchronize{@alive} do
        begin
          @reception_interrupt_mutex.synchronize do
            @receiving_frame = true
          end
          frame = Webtube::Frame.read_from_socket @socket
        ensure
          @reception_interrupt_mutex.synchronize do
            @receiving_frame = false
          end
        end
        unless (frame.rsv & ~@allow_rsv_bits) == 0 then
          raise Webtube::UnknownReservedBit.new(frame: frame)
        end
        if @serverp then
          unless frame.masked?
            raise Webtube::UnmaskedFrameToServer.new(
                frame: frame)
          end
        else
          unless !frame.masked? then
            raise Webtube::MaskedFrameToClient.new(
                frame: frame)
          end
        end
        if !frame.control_frame? then
          # data frame
          if frame.opcode != Webtube::OPCODE_CONTINUATION then
            # initial frame
            unless @allow_opcodes.include? frame.opcode then
              raise Webtube::UnknownOpcode.new(frame: frame)
            end
            unless @defrag_buffer.empty? then
              raise Webtube::MissingContinuationFrame.new
            end
          else
            # continuation frame
            if @defrag_buffer.empty? then
              raise Webtube::UnexpectedContinuationFrame.new(
                  frame: frame)
            end
          end
          @defrag_buffer.push frame
          if frame.fin? then
            opcode = @defrag_buffer.first.opcode
            data = @defrag_buffer.map(&:payload).join ''
            @defrag_buffer = []
            if opcode == Webtube::OPCODE_TEXT then
              # text messages must be encoded in UTF-8, per
              # RFC 6455
              data.force_encoding Encoding::UTF_8
              unless data.valid_encoding? then
                data.force_encoding Encoding::ASCII_8BIT
                raise Webtube::BadlyEncodedText.new(
                    data: data)
              end
            end
            listener.onmessage self, data, opcode \
                if listener.respond_to? :onmessage
          end
        elsif (0x08 .. 0x0F).include? frame.opcode then
          # control frame
          unless frame.fin? then
            raise Webtube::FragmentedControlFrame.new(
                frame: frame)
          end
          case frame.opcode
          when Webtube::OPCODE_CLOSE then
            message = frame.payload
            if message.length >= 2 then
              status_code, = message.unpack 'n'
              unless status_code == 1000 then
                listener.onannoyedclose self, frame \
                    if listener.respond_to? :onannoyedclose
              end
            else
              status_code = 1000
            end
            close status_code
          when Webtube::OPCODE_PING then
            listener.onping self, frame \
                if listener.respond_to? :onping
            send_message frame.payload, Webtube::OPCODE_PONG
          when Webtube::OPCODE_PONG then
            listener.onpong self, frame \
                if listener.respond_to? :onpong
          else
            unless @allow_opcodes.include? frame.opcode then
              raise Webtube::UnknownOpcode.new(frame: frame)
            end
          end
          listener.oncontrolframe self, frame \
              if @allow_opcodes.include?(frame.opcode) and
                  listener.respond_to?(:oncontrolframe)
        else
          raise 'assertion failed'
        end
      end
    rescue AbortReceiveLoop
      # we're out of the loop now, so nothing further to do
    rescue Exception => e
      status_code =
          if e.respond_to? :websocket_close_status_code then
            e.websocket_close_status_code
          else
            1011 # 'unexpected condition'
          end
      listener.onexception self, e \
          if listener.respond_to? :onexception
      begin
        close status_code
      rescue Errno::EPIPE, Errno::ECONNRESET, Errno::ENOTCONN
        # ignore, we have a bigger exception to handle
      end
      raise e
    ensure
      @thread = nil
      listener.onclose self \
          if listener.respond_to? :onclose
    end
  end
  return
end
send_message(message, opcode = Webtube::OPCODE_TEXT) click to toggle source

Send a given message payload to the other party, using the given opcode. By default, the [[opcode]] is [[Webtube::OPCODE_TEXT]]. Re-encodes the payload if given in a non-UTF-8 encoding and [[opcode == Webtube::OPCODE_TEXT]].

# File lib/webtube.rb, line 268
def send_message message, opcode = Webtube::OPCODE_TEXT
  if opcode == Webtube::OPCODE_TEXT and
      message.encoding != Encoding::UTF_8 then
    message = message.encode Encoding::UTF_8
  end
  @send_mutex.synchronize do
    raise 'WebSocket connection no longer live' unless @alive
    # In order to ensure that the local kernel will treat our
    # (data) frames atomically during the [[write]] syscall,
    # we'll want to ensure that the frame size does not exceed
    # 512 bytes -- the minimum permitted size for
    # [[PIPE_BUF]].  At this frame size, the header size is up
    # to four bytes for unmasked or eight bytes for masked
    # frames.
    #
    # (FIXME: in retrospect, that seems like an unpractical
    # consideration.  We should probably use path MTU
    # instead.)
    Webtube::Frame.each_frame_for_message(
        message: message,
        opcode: opcode,
        masked: !@serverp,
        max_frame_body_size:
            512 - (!@serverp ? 8 : 4)) do |frame|
      @socket.write frame.header + frame.body
    end
  end
  return
end