class Celluloid::EventSource
Constants
- CLOSED
- CONNECTING
- MAX_RECONNECT_TIME
- OPEN
- VERSION
Attributes
ready_state[R]
url[R]
with_credentials[R]
Public Class Methods
new(uri, options = {}) { |self| ... }
click to toggle source
Constructor for an EventSource
.
@param uri [String] the event stream URI @param opts [Hash] the configuration options @option opts [Hash] :headers Headers to send with the request @option opts [Float] :read_timeout Timeout (in seconds) after which to restart the connection if
the server has sent no data
@option opts [Float] :reconnect_delay Initial delay (in seconds) between connection attempts; this will
be increased exponentially if there are repeated failures
# File lib/celluloid/eventsource.rb, line 43 def initialize(uri, options = {}) self.url = uri options = options.dup @ready_state = CONNECTING @with_credentials = options.delete(:with_credentials) { false } @headers = default_request_headers.merge(options.fetch(:headers, {})) @read_timeout = options.fetch(:read_timeout, 0).to_i proxy = ENV['HTTP_PROXY'] || ENV['http_proxy'] || options[:proxy] if proxy proxyUri = URI(proxy) if proxyUri.scheme == 'http' || proxyUri.scheme == 'https' @proxy = proxyUri end end @reconnect_timeout = options.fetch(:reconnect_delay, 1) @on = { open: ->{}, message: ->(_) {}, error: ->(_) {} } @chunked = false yield self if block_given? async.listen end
Public Instance Methods
close()
click to toggle source
# File lib/celluloid/eventsource.rb, line 94 def close @socket.close if @socket @ready_state = CLOSED end
closed?()
click to toggle source
# File lib/celluloid/eventsource.rb, line 76 def closed? ready_state == CLOSED end
connected?()
click to toggle source
# File lib/celluloid/eventsource.rb, line 72 def connected? ready_state == OPEN end
listen()
click to toggle source
# File lib/celluloid/eventsource.rb, line 80 def listen while !closed? begin establish_connection process_stream rescue UnexpectedContentType raise # Let these flow to the top rescue StandardError => e info "Reconnecting after exception: #{e}" # Just reconnect on runtime errors end end end
on(event_name, &action)
click to toggle source
# File lib/celluloid/eventsource.rb, line 99 def on(event_name, &action) @on[event_name.to_sym] = action end
on_error(&action)
click to toggle source
# File lib/celluloid/eventsource.rb, line 111 def on_error(&action) @on[:error] = action end
on_message(&action)
click to toggle source
# File lib/celluloid/eventsource.rb, line 107 def on_message(&action) @on[:message] = action end
on_open(&action)
click to toggle source
# File lib/celluloid/eventsource.rb, line 103 def on_open(&action) @on[:open] = action end
url=(uri)
click to toggle source
# File lib/celluloid/eventsource.rb, line 68 def url=(uri) @url = URI(uri) end
Private Instance Methods
chunked?()
click to toggle source
# File lib/celluloid/eventsource.rb, line 200 def chunked? @chunked end
connect_string()
click to toggle source
# File lib/celluloid/eventsource.rb, line 262 def connect_string req = "CONNECT #{url.host}:#{url.port} HTTP/1.1\r\n" req << "Host: #{url.host}:#{url.port}\r\n" if @proxy.user || @proxy.password encoded_credentials = Base64.strict_encode64([@proxy.user || '', @proxy.password || ''].join(":")) req << "Proxy-Authorization: Basic #{encoded_credentials}\r\n" end req << "\r\n" end
default_request_headers()
click to toggle source
# File lib/celluloid/eventsource.rb, line 192 def default_request_headers { 'Accept' => 'text/event-stream', 'Cache-Control' => 'no-cache', 'Host' => url.host } end
establish_connection()
click to toggle source
# File lib/celluloid/eventsource.rb, line 121 def establish_connection parser = ResponseParser.new reconnect_attempts = 0 reconnect_jitter_rand = Random.new loop do begin if @proxy sock = ::TCPSocket.new(@proxy.host, @proxy.port) @socket = Celluloid::IO::TCPSocket.new(sock) @socket.write(connect_string) @socket.flush while (line = readline_with_timeout(@socket).chomp) != '' do parser << line end unless parser.status_code == 200 @on[:error].call({status_code: parser.status_code, body: parser.chunk}) return end else sock = ::TCPSocket.new(@url.host, @url.port) @socket = Celluloid::IO::TCPSocket.new(sock) end if ssl? @socket = Celluloid::IO::SSLSocket.new(@socket) @socket.connect end @socket.write(request_string) @socket.flush() until parser.headers? parser << readline_with_timeout(@socket) end if parser.status_code != 200 until @socket.eof? parser << readline_with_timeout(@socket) end # If the server returns a non-200, we don't want to close-- we just want to # report an error # close @on[:error].call({status_code: parser.status_code, body: parser.chunk}) elsif parser.headers['Content-Type'] && parser.headers['Content-Type'].include?("text/event-stream") @chunked = !parser.headers["Transfer-Encoding"].nil? && parser.headers["Transfer-Encoding"].include?("chunked") @ready_state = OPEN @on[:open].call return # Success, don't retry else close info "Invalid Content-Type #{parser.headers['Content-Type']}" @on[:error].call({status_code: parser.status_code, body: "Invalid Content-Type #{parser.headers['Content-Type']}. Expected text/event-stream"}) raise UnexpectedContentType end rescue UnexpectedContentType raise # Let these flow to the top rescue StandardError => e warn "Waiting to try again after exception while connecting: #{e}" # Just try again after a delay for any other exceptions end base_sleep_time = ([@reconnect_timeout * (2 ** reconnect_attempts), MAX_RECONNECT_TIME].min).to_f sleep_time = (base_sleep_time / 2) + reconnect_jitter_rand.rand(base_sleep_time / 2) sleep sleep_time reconnect_attempts += 1 end end
process_stream()
click to toggle source
# File lib/celluloid/eventsource.rb, line 233 def process_stream parser = EventParser.new(read_lines, @chunked,->(timeout) { @read_timeout = timeout }) parser.each do |event| @on[event.type] && @on[event.type].call(event) @last_event_id = event.id end end
read_chunked_lines(socket)
click to toggle source
# File lib/celluloid/eventsource.rb, line 204 def read_chunked_lines(socket) Enumerator.new do |lines| chunk_header = readline_with_timeout(socket) bytes_to_read = chunk_header.to_i(16) bytes_read = 0 while bytes_read < bytes_to_read do line = readline_with_timeout(@socket) bytes_read += line.size lines << line end end end
read_lines()
click to toggle source
# File lib/celluloid/eventsource.rb, line 217 def read_lines Enumerator.new do |lines| loop do break if closed? if chunked? for line in read_chunked_lines(@socket) do break if closed? lines << line end else lines << readline_with_timeout(@socket) end end end end
readline_with_timeout(socket)
click to toggle source
# File lib/celluloid/eventsource.rb, line 241 def readline_with_timeout(socket) if @read_timeout > 0 begin timeout(@read_timeout) do socket.readline end rescue Celluloid::TaskTimeout @on[:error].call({body: "Read timeout, will attempt reconnection"}) raise ReadTimeout end else return socket.readline end end
request_string()
click to toggle source
# File lib/celluloid/eventsource.rb, line 256 def request_string headers = @headers.map { |k, v| "#{k}: #{v}" } ["GET #{url.request_uri} HTTP/1.1", headers].flatten.join("\r\n").concat("\r\n\r\n") end
ssl?()
click to toggle source
# File lib/celluloid/eventsource.rb, line 117 def ssl? url.scheme == 'https' end