module MatrixSdk::Protocols::MSC

Preliminary support for unmerged MSCs (Matrix Spec Changes)

Public Instance Methods

msc2108?() click to toggle source

Check if there's support for MSC2108 - Sync over Server Sent Events

# File lib/matrix_sdk/protocols/msc.rb, line 10
def msc2108?
  @msc ||= {}
  @msc[2108] ||= \
    begin
      request(:get, :client_r0, '/sync/sse', skip_auth: true, headers: { accept: 'text/event-stream' })
    rescue MatrixSdk::MatrixNotAuthorizedError # Returns 401 if implemented
      true
    rescue MatrixSdk::MatrixRequestError
      false
    end
rescue StandardError => e
  logger.debug "Failed to check MSC2108 status;\n#{e.inspect}"
  false
end
msc2108_sync_sse(since: nil, **params) { |response self, data, event: event, id: id| ... } click to toggle source

Sync over Server Sent Events - MSC2108

@example Syncing over SSE

@since = 'some token'
api.msc2108_sync_sse(since: @since) do |data, event:, id:|
  if event == 'sync'
    handle(data) # data is the same as a normal sync response
    @since = id
  end
end

@see Protocols::CS#sync @see github.com/matrix-org/matrix-doc/pull/2108/ rubocop:disable Metrics/MethodLength

# File lib/matrix_sdk/protocols/msc.rb, line 39
def msc2108_sync_sse(since: nil, **params, &on_data)
  raise ArgumentError, 'Must be given a block accepting two args - data and { event:, id: }' \
    unless on_data.is_a?(Proc) && on_data.arity == 2
  raise 'Needs to be logged in' unless access_token # TODO: Better error

  query = params.select do |k, _v|
    %i[filter full_state set_presence].include? k
  end
  query[:user_id] = params.delete(:user_id) if protocol?(:AS) && params.key?(:user_id)

  req = Net::HTTP::Get.new(homeserver.dup.tap do |u|
    u.path = "#{api_to_path :client_r0}/sync/sse"
    u.query = URI.encode_www_form(query)
  end)
  req['accept'] = 'text/event-stream'
  req['accept-encoding'] = 'identity' # Disable compression on the SSE stream
  req['authorization'] = "Bearer #{access_token}"
  req['last-event-id'] = since if since

  cancellation_token = { run: true }

  # rubocop:disable Metrics/BlockLength
  thread = Thread.new(cancellation_token) do |ctx|
    print_http(req)
    http.request req do |response|
      break unless ctx[:run]

      print_http(response, body: false)
      raise MatrixRequestError.new_by_code(JSON.parse(response.body, symbolize_names: true), response.code) unless response.is_a? Net::HTTPSuccess

      # Override buffer size for BufferedIO
      socket = response.instance_variable_get :@socket
      if socket.is_a? Net::BufferedIO
        socket.instance_eval do
          def rbuf_fill
            bufsize_override = 1024
            loop do
              case rv = @io.read_nonblock(bufsize_override, exception: false)
              when String
                @rbuf << rv
                rv.clear
                return
              when :wait_readable
                @io.to_io.wait_readable(@read_timeout) || raise(Net::ReadTimeout)
              when :wait_writable
                @io.to_io.wait_writable(@read_timeout) || raise(Net::ReadTimeout)
              when nil
                raise EOFError, 'end of file reached'
              end
            end
          end
        end
      end

      stream_id = ('A'..'Z').to_a.sample(4).join

      logger.debug "MSC2108 : #{stream_id} : Starting SSE stream."

      buffer = ''
      response.read_body do |chunk|
        buffer += chunk

        while (index = buffer.index(/\r\n\r\n|\n\n/))
          stream = buffer.slice!(0..index)

          data = ''
          event = nil
          id = nil

          stream.split(/\r?\n/).each do |part|
            /^data:(.+)$/.match(part) do |m_data|
              data += "\n" unless data.empty?
              data += m_data[1].strip
            end
            /^event:(.+)$/.match(part) do |m_event|
              event = m_event[1].strip
            end
            /^id:(.+)$/.match(part) do |m_id|
              id = m_id[1].strip
            end
            /^:(.+)$/.match(part) do |m_comment|
              logger.debug "MSC2108 : #{stream_id} : Received comment '#{m_comment[1].strip}'"
            end
          end

          if %w[sync sync_error].include? event
            data = JSON.parse(data, symbolize_names: true)
            yield((MatrixSdk::Response.new self, data), event: event, id: id)
          elsif event
            logger.info "MSC2108 : #{stream_id} : Received unknown event '#{event}'; #{data}"
          end
        end

        unless ctx[:run]
          socket.close
          break
        end
      end
      break unless ctx[:run]
    end
  end
  # rubocop:enable Metrics/BlockLength

  thread.run

  [thread, cancellation_token]
end
rbuf_fill() click to toggle source
# File lib/matrix_sdk/protocols/msc.rb, line 73
def rbuf_fill
  bufsize_override = 1024
  loop do
    case rv = @io.read_nonblock(bufsize_override, exception: false)
    when String
      @rbuf << rv
      rv.clear
      return
    when :wait_readable
      @io.to_io.wait_readable(@read_timeout) || raise(Net::ReadTimeout)
    when :wait_writable
      @io.to_io.wait_writable(@read_timeout) || raise(Net::ReadTimeout)
    when nil
      raise EOFError, 'end of file reached'
    end
  end
end
refresh_mscs() click to toggle source
# File lib/matrix_sdk/protocols/msc.rb, line 5
def refresh_mscs
  @msc = {}
end