class EventMachine::Twitter::Connection

Constants

STALL_TIMEOUT
STALL_TIMER

Attributes

client[R]
headers[R]
host[R]
options[R]
port[R]
reconnector[RW]

Public Class Methods

new(client, host, port) click to toggle source
# File lib/em-twitter/connection.rb, line 26
def initialize(client, host, port)
  @client             = client
  @host               = host
  @port               = port
  @options            = @client.options
  @on_inited_callback = @options.delete(:on_inited)

  if verify_peer?
    @certificate_store  = OpenSSL::X509::Store.new
    @certificate_store.add_file(@options[:ssl][:cert_chain_file])
  end

  @network_reconnector     = EM::Twitter::Reconnectors::NetworkFailure.new
  @application_reconnector = EM::Twitter::Reconnectors::ApplicationFailure.new
end

Public Instance Methods

auto_reconnect?() click to toggle source

Returns the current state of the auto_reconnect flag.

# File lib/em-twitter/connection.rb, line 122
def auto_reconnect?
  @auto_reconnect
end
auto_reconnect_on_close?() click to toggle source

Determines if the connection should reconnect if the connection closes

# File lib/em-twitter/connection.rb, line 117
def auto_reconnect_on_close?
  auto_reconnect? && !gracefully_closed?
end
connection_completed() click to toggle source

Called after the connection to the server is completed. Initiates a

# File lib/em-twitter/connection.rb, line 44
def connection_completed
  start_tls(@options[:ssl]) if ssl?

  reset_connection

  @request = Request.new(@options)
  send_data(@request)
end
gracefully_closed?() click to toggle source

Returns the current state of the gracefully_closed flag gracefully_closed is set to true when the connection is explicitly stopped using the stop method

# File lib/em-twitter/connection.rb, line 105
def gracefully_closed?
  @gracefully_closed
end
immediate_reconnect() click to toggle source

Immediately reconnects the connection

# File lib/em-twitter/connection.rb, line 84
def immediate_reconnect
  @immediate_reconnect  = true
  @gracefully_closed    = false
  close_connection
end
immediate_reconnect?() click to toggle source

Returns the current state of the immediate_reconnect flag immediate_reconnect is true when the immediate_reconnect method is invoked on the connection

# File lib/em-twitter/connection.rb, line 112
def immediate_reconnect?
  @immediate_reconnect
end
network_failure?() click to toggle source

Returns a status of the connection, if no response was ever received from the server, then we assume a network failure.

# File lib/em-twitter/connection.rb, line 98
def network_failure?
  @response_code == 0
end
post_init() click to toggle source
# File lib/em-twitter/connection.rb, line 53
def post_init
  @headers     = {}
  @reconnector = @network_reconnector

  @stall_timer = EM::PeriodicTimer.new(STALL_TIMER) do
    if gracefully_closed?
      @stall_timer.cancel
    elsif stalled?
      close_connection
      invoke_callback(@client.no_data_callback)
    end
  end

  invoke_callback(@on_inited_callback)
  set_comm_inactivity_timeout(@options[:timeout]) if @options[:timeout] > 0
end
receive_data(data) click to toggle source

Receives responses from the server and passes them on to the HttpParser

# File lib/em-twitter/connection.rb, line 72
def receive_data(data)
  @parser << data
end
stalled?() click to toggle source
# File lib/em-twitter/connection.rb, line 126
def stalled?
  @last_response ||= Response.new
  @last_response.older_than?(STALL_TIMEOUT)
end
stop() click to toggle source

Close the connection gracefully, without reconnecting

# File lib/em-twitter/connection.rb, line 77
def stop
  @auto_reconnect     = false
  @gracefully_closed  = true
  close_connection
end
unbind() click to toggle source

Called when a connection is disconnected

# File lib/em-twitter/connection.rb, line 91
def unbind
  schedule_reconnect if auto_reconnect_on_close?
  invoke_callback(@client.close_callback)
end
update(options={}) click to toggle source
# File lib/em-twitter/connection.rb, line 131
def update(options={})
  @options.merge!(options)
  immediate_reconnect
end

Protected Instance Methods

gzip?() click to toggle source
# File lib/em-twitter/connection.rb, line 246
def gzip?
  @headers['Content-Encoding'] && @headers['Content-Encoding'] == 'gzip'
end
handle_stream(data) click to toggle source
# File lib/em-twitter/connection.rb, line 138
def handle_stream(data)
  @last_response << (@decoder ||= BaseDecoder.new).decode(data)

  # handle empty lines, Site stream sometimes returns \r\n\r\n
  return if data.strip.empty?

  if @last_response.complete?
    invoke_callback(@client.each_item_callback, @last_response.body)
  end
end
invoke_callback(callback, *args) click to toggle source

A utility method used to invoke callback methods against the Client

# File lib/em-twitter/connection.rb, line 296
def invoke_callback(callback, *args)
  callback.call(*args) if callback
end
on_body(data) click to toggle source

HttpParser implementation, invoked when a body is received

# File lib/em-twitter/connection.rb, line 191
def on_body(data)
  begin
    @buffer.extract(data).each do |line|
      handle_stream(line)
    end
    @last_response.reset if @last_response.complete?
  rescue => e
    msg = "#{e.class}: " + [e.message, e.backtrace].flatten.join("\n\t")
    invoke_callback(@client.error_callback, msg)

    close_connection
    return
  end
end
on_headers_complete(headers) click to toggle source

HttpParser implementation, invoked after response headers are received

# File lib/em-twitter/connection.rb, line 150
def on_headers_complete(headers)
  @response_code  = @parser.status_code
  @headers        = headers

  # @decoder = BaseDecoder.new
  # TODO: Complete gzip support
  # detect gzip encoding and use a decoder for response bodies
  # gzip needs to be detected with the Content-Encoding header
  # @decoder = if gzip?
  #   GzipDecoder.new
  # else
  #   BaseDecoder.new
  # end

  # everything below here is error handling so return if we got a 200
  if @response_code.to_i == 200
    @network_reconnector.reset
    @application_reconnector.reset
    return
  end

  # since we got a response use the application failure reconnector
  # to handle reconnects
  @reconnector = @application_reconnector

  case @response_code
  when 401 then invoke_callback(@client.unauthorized_callback)
  when 403 then invoke_callback(@client.forbidden_callback)
  when 404 then invoke_callback(@client.not_found_callback)
  when 406 then invoke_callback(@client.not_acceptable_callback)
  when 413 then invoke_callback(@client.too_long_callback)
  when 416 then invoke_callback(@client.range_unacceptable_callback)
  when 420 then invoke_callback(@client.enhance_your_calm_callback)
  when 503 then invoke_callback(@client.service_unavailable_callback)
  else
    msg = "Unhandled status code: #{@response_code}."
    invoke_callback(@client.error_callback, msg)
  end
end
reconnect_after(reconnect_timeout) click to toggle source

Performs the reconnection after x seconds have passed. Reconnection is performed immediately if the argument passed is zero.

Otherwise it will create an EM::Timer that will reconnect

# File lib/em-twitter/connection.rb, line 280
def reconnect_after(reconnect_timeout)
  invoke_callback(@client.reconnect_callback,
                  @reconnector.reconnect_timeout,
                  @reconnector.reconnect_count)

  # don't assume a successful reconnect so use the network reconnector
  @reconnector = @network_reconnector

  if reconnect_timeout.zero?
    reconnect(@host, @port)
  else
    EM::Timer.new(reconnect_timeout) { reconnect(@host, @port) }
  end
end
reset_connection() click to toggle source

Resets the internals of the connection on initial connection and on reconnections. Clears the response buffer and resets internal state

# File lib/em-twitter/connection.rb, line 302
def reset_connection
  @buffer                     = BufferedTokenizer.new("\r")
  @parser                     = Http::Parser.new(self)
  @parser.on_body             = method(:on_body)
  @parser.on_headers_complete = method(:on_headers_complete)
  @last_response              = Response.new
  @response_code              = 0

  @gracefully_closed          = false
  @immediate_reconnect        = false
  @auto_reconnect             = @options[:auto_reconnect]
end
schedule_reconnect() click to toggle source

Handles reconnection to the server when a disconnect occurs. By using a reconnector, it will gradually increase the time between reconnects per Twitter’s reconnection guidelines.

# File lib/em-twitter/connection.rb, line 257
def schedule_reconnect
  if gracefully_closed?
    reconnect_after(0)
    @gracefully_closed = false
    return
  end

  begin
    @reconnector.increment do |timeout|
      reconnect_after(timeout)
    end
  rescue ReconnectLimitError => e
    invoke_callback(@client.max_reconnects_callback,
                    @reconnector.reconnect_timeout,
                    @reconnector.reconnect_count)
  end
end
ssl?() click to toggle source
# File lib/em-twitter/connection.rb, line 242
def ssl?
  @options[:ssl]
end
ssl_handshake_completed() click to toggle source
# File lib/em-twitter/connection.rb, line 231
def ssl_handshake_completed
  return true unless verify_peer?

  unless OpenSSL::SSL.verify_certificate_identity(@last_seen_cert, @client.host)
    fail OpenSSL::OpenSSLError.new("the hostname '#{@client.host}' does not match the server certificate")
    false
  else
    true
  end
end
ssl_verify_peer(cert_string) click to toggle source

It’s important that we try to not add a certificate to the store that’s already in the store, because OpenSSL::X509::Store will raise an exception.

# File lib/em-twitter/connection.rb, line 208
def ssl_verify_peer(cert_string)
  cert = nil
  begin
    cert = OpenSSL::X509::Certificate.new(cert_string)
  rescue OpenSSL::X509::CertificateError
    return false
  end

  @last_seen_cert = cert

  if @certificate_store.verify(@last_seen_cert)
    begin
      @certificate_store.add_cert(@last_seen_cert)
    rescue OpenSSL::X509::StoreError => e
      raise e unless e.message == 'cert already in hash table'
    end
    true
  else
    raise OpenSSL::OpenSSLError.new("unable to verify the server certificate of #{@client.host}")
    false
  end
end
verify_peer?() click to toggle source
# File lib/em-twitter/connection.rb, line 250
def verify_peer?
  ssl? && @options[:ssl][:verify_peer]
end