class Servent::EventSource

Constants

DEFAULT_HEADERS

Attributes

ready_state[R]
uri[R]

Public Class Methods

new(url, net_http_options: { read_timeout: 600 }) { |proxy_config| ... } click to toggle source
# File lib/servent/event_source.rb, line 11
def initialize(url, net_http_options: { read_timeout: 600 })
  @uri              = URI(url)
  @net_http_options = net_http_options
  @ready_state      = Servent::CONNECTING

  @open_blocks    = []
  @message_blocks = []
  @error_blocks   = []

  @proxy_config = ProxyConfig.new
  yield @proxy_config if block_given?
end

Public Instance Methods

close() click to toggle source
# File lib/servent/event_source.rb, line 43
def close
  @ready_state = Servent::CLOSED
  @thread.kill unless @thread.nil?
end
listen(http_starter = Net::HTTP) click to toggle source
# File lib/servent/event_source.rb, line 39
def listen(http_starter = Net::HTTP)
  start(http_starter).join
end
on_error(&error_block) click to toggle source
# File lib/servent/event_source.rb, line 56
def on_error(&error_block)
  @error_blocks << error_block
end
on_message(&message_block) click to toggle source
# File lib/servent/event_source.rb, line 52
def on_message(&message_block)
  @message_blocks << message_block
end
on_open(&open_block) click to toggle source
# File lib/servent/event_source.rb, line 48
def on_open(&open_block)
  @open_blocks << open_block
end
start(http_starter = Net::HTTP) { |http, get| ... } click to toggle source
# File lib/servent/event_source.rb, line 24
def start(http_starter = Net::HTTP)
  @http_starter ||= http_starter
  params = HTTPStartParams.new(@uri, @proxy_config, @net_http_options)

  @thread = Thread.new {
    @http_starter.start(*params.parameterize) do |http|
      get = Net::HTTP::Get.new @uri
      DEFAULT_HEADERS.each { |header, value| get[header] = value }
      yield http, get if block_given?

      perform_request http, get
    end
  }
end

Private Instance Methods

fail_connection(response) click to toggle source
# File lib/servent/event_source.rb, line 92
def fail_connection(response)
  @ready_state = Servent::CLOSED
  @error_blocks.each { |block| block.call response, :wrong_mime_type }
end
open_connection(response) click to toggle source
# File lib/servent/event_source.rb, line 72
def open_connection(response)
  @ready_state = Servent::OPEN
  @open_blocks.each { |block| block.call response }
  response.read_body do |chunk|
    # FIXME: use the same stream object to parse
    #        different chunks.
    stream = Stream.new chunk
    events = stream.parse
    events.each do |event|
      @message_blocks.each { |block| block.call event }
    end
  end
end
perform_request(http, type) click to toggle source
# File lib/servent/event_source.rb, line 62
def perform_request(http, type)
  http.request type do |response|
    return fail_connection response if should_fail? response
    return schedule_reconnection if should_reconnect? response
    store_new_parmanent_url response

    open_connection response
  end
end
schedule_reconnection() click to toggle source
# File lib/servent/event_source.rb, line 101
def schedule_reconnection
  start
end
should_fail?(response) click to toggle source
# File lib/servent/event_source.rb, line 86
def should_fail?(response)
  return false if Servent::REDIRECT_STATUSES.include?(response.code.to_i)
  (response["Content-Type"] != "text/event-stream") ||
    !Servent::KNOWN_STATUSES.include?(response.code.to_i)
end
should_reconnect?(response) click to toggle source
# File lib/servent/event_source.rb, line 97
def should_reconnect?(response)
  Servent::RECONNECTION_STATUSES.include? response.code.to_i
end
store_new_parmanent_url(response) click to toggle source
# File lib/servent/event_source.rb, line 105
def store_new_parmanent_url(response)
  return unless response.code.to_i == 301
  @original_uri = @uri
  @uri = URI(response["Location"])
end