class EventMachine::Protocols::Couchbase::Client
Attributes
config[R]
nodes[R]
Public Class Methods
new()
click to toggle source
# File lib/em-couchbase/client.rb, line 29 def initialize @opaque = 0 @nodes = [] @admin_ports = [] @packets = {} @upgrade_queue = EM::Queue.new end
Public Instance Methods
connect(options = {})
click to toggle source
@param options [Hash] @option options [String] :hostname (“localhost”) @option options [Fixnum] :port (8091) @option options [String] :pool (“default”) @option options [String] :bucket (“default”) @option options [String] :username (nil) @option options [String] :password (nil)
# File lib/em-couchbase/client.rb, line 44 def connect(options = {}) @config_listener = ConfigurationListener.new @config_listener.on_error do |listener, error| if @admin_ports.empty? @on_error.call(self, error) if @on_error else options = options.merge(@admin_ports.shuffle!.pop) @config_listener.listen(options) end end @config_listener.on_upgrade do |config| @config = config config.nodes.each_with_index do |nn, ii| if nodes[ii] != nn nodes[ii].close_connection if nodes[ii] nodes[ii] = Node.connect(nn.merge(:client => self)) end end @admin_ports = nodes.map do |node| host, port = node.admin.split(':') {:hostname => host, :port => port} end do_retry = lambda do |payload| opaque, packet = payload key, handler, raw = packet.values_at(:key, :handler, :raw) register_handler(opaque, key, handler) vbucket = raw[6..7].unpack("n").first if @config.vbucket_map_forward @config.vbucket_map[vbucket] = @config.vbucket_map_forward[vbucket].dup end node = @nodes[@config.vbucket_map[vbucket][0]] register_packet(opaque, raw) node.callback do node.send_data(raw) end @upgrade_queue.pop(&do_retry) end @upgrade_queue.pop(&do_retry) succeed end @config_listener.listen(options) self end
decr(key, options = {}, &block)
click to toggle source
# File lib/em-couchbase/client.rb, line 178 def decr(key, options = {}, &block) callback do opaque = opaque_inc register_handler(opaque, key, block) vbucket, node = locate(key) node.callback do node.arithm(:decr, opaque, vbucket, key, options) end end end
get(*keys, &block)
click to toggle source
# File lib/em-couchbase/client.rb, line 146 def get(*keys, &block) callback do if keys.last.is_a?(Hash) options = keys.last.pop end groups = keys.inject({}) do |acc, key| opaque = opaque_inc register_handler(opaque, key, block) vbucket, node = locate(key) acc[node] ||= [] acc[node] << [opaque, vbucket, key] acc end groups.each do |node, tuple| node.callback do node.get(tuple, options) end end end end
incr(key, options = {}, &block)
click to toggle source
# File lib/em-couchbase/client.rb, line 167 def incr(key, options = {}, &block) callback do opaque = opaque_inc register_handler(opaque, key, block) vbucket, node = locate(key) node.callback do node.arithm(:incr, opaque, vbucket, key, options) end end end
locate(key)
click to toggle source
Locate node using vbucket distribution @return [Fixnum] server index
# File lib/em-couchbase/client.rb, line 128 def locate(key) digest = Couchbase::Util.crc32_hash(key.to_s) mask = @config.vbucket_map.size - 1 vbucket = digest & mask [vbucket, @nodes[@config.vbucket_map[vbucket][0]]] end
on_error(&callback)
click to toggle source
# File lib/em-couchbase/client.rb, line 90 def on_error(&callback) @on_error = callback end
register_handler(opaque, key, handler)
click to toggle source
# File lib/em-couchbase/client.rb, line 101 def register_handler(opaque, key, handler) packet = (@packets[opaque] ||= {}) packet[:key] = key packet[:handler] = handler end
register_packet(opaque, packet)
click to toggle source
# File lib/em-couchbase/client.rb, line 94 def register_packet(opaque, packet) if packet.respond_to?(:force_encoding) packet.force_encoding(Encoding::BINARY) end (@packets[opaque] ||= {})[:raw] = packet end
retry(reason, opaque)
click to toggle source
# File lib/em-couchbase/client.rb, line 107 def retry(reason, opaque) packet = @packets.delete(opaque) if packet case reason when :not_my_vbucket @upgrade_queue.push([opaque, packet]) end end end
run_callback(opaque, result)
click to toggle source
# File lib/em-couchbase/client.rb, line 117 def run_callback(opaque, result) packet = @packets.delete(opaque) key, handler = packet.values_at(:key, :handler) if handler.respond_to?(:call) result.key = key handler.call(result) end end
set(key, val, options = {}, &block)
click to toggle source
# File lib/em-couchbase/client.rb, line 135 def set(key, val, options = {}, &block) callback do opaque = opaque_inc register_handler(opaque, key, block) vbucket, node = locate(key) node.callback do node.set(opaque, vbucket, key, val, options) end end end
Protected Instance Methods
opaque_inc()
click to toggle source
# File lib/em-couchbase/client.rb, line 197 def opaque_inc @opaque += 1 @opaque &= 0xffffffff # 32 bits end
unbind()
click to toggle source
# File lib/em-couchbase/client.rb, line 191 def unbind nodes.each do |node| nodes.disconnect end end