module MatrixSdk::Protocols::MSC
Preliminary support for unmerged MSCs (Matrix Spec Changes)
Public Instance Methods
msc2108?()
click to toggle source
Check if there's support for MSC2108 - Sync over Server Sent Events
# File lib/matrix_sdk/protocols/msc.rb, line 10 def msc2108? @msc ||= {} @msc[2108] ||= \ begin request(:get, :client_r0, '/sync/sse', skip_auth: true, headers: { accept: 'text/event-stream' }) rescue MatrixSdk::MatrixNotAuthorizedError # Returns 401 if implemented true rescue MatrixSdk::MatrixRequestError false end rescue StandardError => e logger.debug "Failed to check MSC2108 status;\n#{e.inspect}" false end
msc2108_sync_sse(since: nil, **params) { |response self, data, event: event, id: id| ... }
click to toggle source
Sync over Server Sent Events - MSC2108
@example Syncing over SSE
@since = 'some token' api.msc2108_sync_sse(since: @since) do |data, event:, id:| if event == 'sync' handle(data) # data is the same as a normal sync response @since = id end end
@see Protocols::CS#sync
@see github.com/matrix-org/matrix-doc/pull/2108/ rubocop:disable Metrics/MethodLength
# File lib/matrix_sdk/protocols/msc.rb, line 39 def msc2108_sync_sse(since: nil, **params, &on_data) raise ArgumentError, 'Must be given a block accepting two args - data and { event:, id: }' \ unless on_data.is_a?(Proc) && on_data.arity == 2 raise 'Needs to be logged in' unless access_token # TODO: Better error query = params.select do |k, _v| %i[filter full_state set_presence].include? k end query[:user_id] = params.delete(:user_id) if protocol?(:AS) && params.key?(:user_id) req = Net::HTTP::Get.new(homeserver.dup.tap do |u| u.path = "#{api_to_path :client_r0}/sync/sse" u.query = URI.encode_www_form(query) end) req['accept'] = 'text/event-stream' req['accept-encoding'] = 'identity' # Disable compression on the SSE stream req['authorization'] = "Bearer #{access_token}" req['last-event-id'] = since if since cancellation_token = { run: true } # rubocop:disable Metrics/BlockLength thread = Thread.new(cancellation_token) do |ctx| print_http(req) http.request req do |response| break unless ctx[:run] print_http(response, body: false) raise MatrixRequestError.new_by_code(JSON.parse(response.body, symbolize_names: true), response.code) unless response.is_a? Net::HTTPSuccess # Override buffer size for BufferedIO socket = response.instance_variable_get :@socket if socket.is_a? Net::BufferedIO socket.instance_eval do def rbuf_fill bufsize_override = 1024 loop do case rv = @io.read_nonblock(bufsize_override, exception: false) when String @rbuf << rv rv.clear return when :wait_readable @io.to_io.wait_readable(@read_timeout) || raise(Net::ReadTimeout) when :wait_writable @io.to_io.wait_writable(@read_timeout) || raise(Net::ReadTimeout) when nil raise EOFError, 'end of file reached' end end end end end stream_id = ('A'..'Z').to_a.sample(4).join logger.debug "MSC2108 : #{stream_id} : Starting SSE stream." buffer = '' response.read_body do |chunk| buffer += chunk while (index = buffer.index(/\r\n\r\n|\n\n/)) stream = buffer.slice!(0..index) data = '' event = nil id = nil stream.split(/\r?\n/).each do |part| /^data:(.+)$/.match(part) do |m_data| data += "\n" unless data.empty? data += m_data[1].strip end /^event:(.+)$/.match(part) do |m_event| event = m_event[1].strip end /^id:(.+)$/.match(part) do |m_id| id = m_id[1].strip end /^:(.+)$/.match(part) do |m_comment| logger.debug "MSC2108 : #{stream_id} : Received comment '#{m_comment[1].strip}'" end end if %w[sync sync_error].include? event data = JSON.parse(data, symbolize_names: true) yield((MatrixSdk::Response.new self, data), event: event, id: id) elsif event logger.info "MSC2108 : #{stream_id} : Received unknown event '#{event}'; #{data}" end end unless ctx[:run] socket.close break end end break unless ctx[:run] end end # rubocop:enable Metrics/BlockLength thread.run [thread, cancellation_token] end
rbuf_fill()
click to toggle source
# File lib/matrix_sdk/protocols/msc.rb, line 73 def rbuf_fill bufsize_override = 1024 loop do case rv = @io.read_nonblock(bufsize_override, exception: false) when String @rbuf << rv rv.clear return when :wait_readable @io.to_io.wait_readable(@read_timeout) || raise(Net::ReadTimeout) when :wait_writable @io.to_io.wait_writable(@read_timeout) || raise(Net::ReadTimeout) when nil raise EOFError, 'end of file reached' end end end
refresh_mscs()
click to toggle source
# File lib/matrix_sdk/protocols/msc.rb, line 5 def refresh_mscs @msc = {} end