class Monga::Connections::EMConnection

Attributes

host[R]
port[R]
responses[R]

Public Class Methods

connect(host, port, timeout) click to toggle source
# File lib/monga/connections/em_connection.rb, line 17
def self.connect(host, port, timeout)
  EM.connect(host, port, self, host, port, timeout)
end
new(host, port, timeout) click to toggle source
# File lib/monga/connections/em_connection.rb, line 9
def initialize(host, port, timeout)
  @host = host
  @port = port
  @timeout = timeout
  @reactor_running = true
  @responses = {}
end

Public Instance Methods

close() click to toggle source
# File lib/monga/connections/em_connection.rb, line 96
def close
  Monga.logger.debug("EventMachine is stopped, closing connection")
  @reactor_running = false
end
connected?() click to toggle source
# File lib/monga/connections/em_connection.rb, line 72
def connected?
  reconnect unless @connected
  @connected || false
end
connection_completed() click to toggle source
# File lib/monga/connections/em_connection.rb, line 42
def connection_completed
  Monga.logger.debug("Connection is established #{@host}:#{@port}")

  EM.add_shutdown_hook do
    close
  end

  unless @reactor_running
    EM.add_periodic_timer(Monga::Cursor::CLOSE_TIMEOUT){ Monga::Cursor.batch_kill(self) }
  end

  @connected = true
  @pending_for_reconnect = false
  @buffer = Buffer.new
  @reactor_running = true

  succeed
end
is_master?() { |nil| ... } click to toggle source
# File lib/monga/connections/em_connection.rb, line 105
def is_master?
  reconnect unless @connected
  req = Monga::Protocol::Query.new(self, "admin", "$cmd", query: {"isMaster" => 1}, limit: 1)
  command = req.command
  request_id = req.request_id
  blk = proc do |data|
    err, resp = req.parse_response(data)
    if Exception === err
      @primary = false
      yield nil
    else
      @primary = resp.last.first["ismaster"]
      yield @primary ? :primary : :secondary
    end
  end
  @responses[request_id] = blk
  send_data command
end
primary?() click to toggle source
# File lib/monga/connections/em_connection.rb, line 101
def primary?
  @primary || false
end
receive_data(data) click to toggle source
# File lib/monga/connections/em_connection.rb, line 33
def receive_data(data)
  @buffer.append(data)
  @buffer.each do |message|
    request_id = message[2]
    cb = @responses.delete request_id
    cb.call(message)  if cb
  end
end
reconnect() click to toggle source
Calls superclass method
# File lib/monga/connections/em_connection.rb, line 61
def reconnect
  unless @connected && @pending_for_reconnect
    if @reactor_running
      super(@host, @port)
    else
      EM.schedule{ super(@host, @port) }
    end
    @pending_for_reconnect = true
  end
end
send_command(msg, request_id=nil, &cb) click to toggle source
# File lib/monga/connections/em_connection.rb, line 21
def send_command(msg, request_id=nil, &cb)
  # Reconnect is a hack for testing.
  # We are stopping EvenMachine for each test.
  # This hack reconnects to Mongo on first query
  reconnect unless @connected

  callback do
    send_data msg
  end
  @responses[request_id] = cb  if cb
end
unbind() click to toggle source
# File lib/monga/connections/em_connection.rb, line 77
def unbind
  @connected = false
  Monga.logger.debug("Lost connection #{@host}:#{@port}")

  @responses.keys.each do |k|
    cb = @responses.delete k
    err = Monga::Exceptions::Disconnected.new("Disconnected from #{@host}:#{@port}")
    cb.call(err)
  end

  @primary = false
  @pending_for_reconnect = false
  set_deferred_status(nil)

  if @reactor_running
    EM.add_timer(0.1){ reconnect }
  end
end