# File lib/monga/connections/em_connection.rb, line 101 def primary? @primary || false end
class Monga::Connections::EMConnection
Attributes
host[R]
port[R]
responses[R]
Public Class Methods
connect(host, port, timeout)
click to toggle source
# File lib/monga/connections/em_connection.rb, line 17 def self.connect(host, port, timeout) EM.connect(host, port, self, host, port, timeout) end
new(host, port, timeout)
click to toggle source
# File lib/monga/connections/em_connection.rb, line 9 def initialize(host, port, timeout) @host = host @port = port @timeout = timeout @reactor_running = true @responses = {} end
Public Instance Methods
close()
click to toggle source
# File lib/monga/connections/em_connection.rb, line 96 def close Monga.logger.debug("EventMachine is stopped, closing connection") @reactor_running = false end
connected?()
click to toggle source
# File lib/monga/connections/em_connection.rb, line 72 def connected? reconnect unless @connected @connected || false end
connection_completed()
click to toggle source
# File lib/monga/connections/em_connection.rb, line 42 def connection_completed Monga.logger.debug("Connection is established #{@host}:#{@port}") EM.add_shutdown_hook do close end unless @reactor_running EM.add_periodic_timer(Monga::Cursor::CLOSE_TIMEOUT){ Monga::Cursor.batch_kill(self) } end @connected = true @pending_for_reconnect = false @buffer = Buffer.new @reactor_running = true succeed end
is_master?() { |nil| ... }
click to toggle source
# File lib/monga/connections/em_connection.rb, line 105 def is_master? reconnect unless @connected req = Monga::Protocol::Query.new(self, "admin", "$cmd", query: {"isMaster" => 1}, limit: 1) command = req.command request_id = req.request_id blk = proc do |data| err, resp = req.parse_response(data) if Exception === err @primary = false yield nil else @primary = resp.last.first["ismaster"] yield @primary ? :primary : :secondary end end @responses[request_id] = blk send_data command end
primary?()
click to toggle source
receive_data(data)
click to toggle source
# File lib/monga/connections/em_connection.rb, line 33 def receive_data(data) @buffer.append(data) @buffer.each do |message| request_id = message[2] cb = @responses.delete request_id cb.call(message) if cb end end
reconnect()
click to toggle source
Calls superclass method
# File lib/monga/connections/em_connection.rb, line 61 def reconnect unless @connected && @pending_for_reconnect if @reactor_running super(@host, @port) else EM.schedule{ super(@host, @port) } end @pending_for_reconnect = true end end
send_command(msg, request_id=nil, &cb)
click to toggle source
# File lib/monga/connections/em_connection.rb, line 21 def send_command(msg, request_id=nil, &cb) # Reconnect is a hack for testing. # We are stopping EvenMachine for each test. # This hack reconnects to Mongo on first query reconnect unless @connected callback do send_data msg end @responses[request_id] = cb if cb end
unbind()
click to toggle source
# File lib/monga/connections/em_connection.rb, line 77 def unbind @connected = false Monga.logger.debug("Lost connection #{@host}:#{@port}") @responses.keys.each do |k| cb = @responses.delete k err = Monga::Exceptions::Disconnected.new("Disconnected from #{@host}:#{@port}") cb.call(err) end @primary = false @pending_for_reconnect = false set_deferred_status(nil) if @reactor_running EM.add_timer(0.1){ reconnect } end end