class Servent::EventSource
Constants
- DEFAULT_HEADERS
Attributes
ready_state[R]
uri[R]
Public Class Methods
new(url, net_http_options: { read_timeout: 600 }) { |proxy_config| ... }
click to toggle source
# File lib/servent/event_source.rb, line 11 def initialize(url, net_http_options: { read_timeout: 600 }) @uri = URI(url) @net_http_options = net_http_options @ready_state = Servent::CONNECTING @open_blocks = [] @message_blocks = [] @error_blocks = [] @proxy_config = ProxyConfig.new yield @proxy_config if block_given? end
Public Instance Methods
close()
click to toggle source
# File lib/servent/event_source.rb, line 43 def close @ready_state = Servent::CLOSED @thread.kill unless @thread.nil? end
listen(http_starter = Net::HTTP)
click to toggle source
# File lib/servent/event_source.rb, line 39 def listen(http_starter = Net::HTTP) start(http_starter).join end
on_error(&error_block)
click to toggle source
# File lib/servent/event_source.rb, line 56 def on_error(&error_block) @error_blocks << error_block end
on_message(&message_block)
click to toggle source
# File lib/servent/event_source.rb, line 52 def on_message(&message_block) @message_blocks << message_block end
on_open(&open_block)
click to toggle source
# File lib/servent/event_source.rb, line 48 def on_open(&open_block) @open_blocks << open_block end
start(http_starter = Net::HTTP) { |http, get| ... }
click to toggle source
# File lib/servent/event_source.rb, line 24 def start(http_starter = Net::HTTP) @http_starter ||= http_starter params = HTTPStartParams.new(@uri, @proxy_config, @net_http_options) @thread = Thread.new { @http_starter.start(*params.parameterize) do |http| get = Net::HTTP::Get.new @uri DEFAULT_HEADERS.each { |header, value| get[header] = value } yield http, get if block_given? perform_request http, get end } end
Private Instance Methods
fail_connection(response)
click to toggle source
# File lib/servent/event_source.rb, line 92 def fail_connection(response) @ready_state = Servent::CLOSED @error_blocks.each { |block| block.call response, :wrong_mime_type } end
open_connection(response)
click to toggle source
# File lib/servent/event_source.rb, line 72 def open_connection(response) @ready_state = Servent::OPEN @open_blocks.each { |block| block.call response } response.read_body do |chunk| # FIXME: use the same stream object to parse # different chunks. stream = Stream.new chunk events = stream.parse events.each do |event| @message_blocks.each { |block| block.call event } end end end
perform_request(http, type)
click to toggle source
# File lib/servent/event_source.rb, line 62 def perform_request(http, type) http.request type do |response| return fail_connection response if should_fail? response return schedule_reconnection if should_reconnect? response store_new_parmanent_url response open_connection response end end
schedule_reconnection()
click to toggle source
# File lib/servent/event_source.rb, line 101 def schedule_reconnection start end
should_fail?(response)
click to toggle source
# File lib/servent/event_source.rb, line 86 def should_fail?(response) return false if Servent::REDIRECT_STATUSES.include?(response.code.to_i) (response["Content-Type"] != "text/event-stream") || !Servent::KNOWN_STATUSES.include?(response.code.to_i) end
should_reconnect?(response)
click to toggle source
# File lib/servent/event_source.rb, line 97 def should_reconnect?(response) Servent::RECONNECTION_STATUSES.include? response.code.to_i end
store_new_parmanent_url(response)
click to toggle source
# File lib/servent/event_source.rb, line 105 def store_new_parmanent_url(response) return unless response.code.to_i == 301 @original_uri = @uri @uri = URI(response["Location"]) end