class EventMachine::Protocols::Couchbase::Client

Attributes

config[R]
nodes[R]

Public Class Methods

new() click to toggle source
# File lib/em-couchbase/client.rb, line 29
def initialize
  @opaque = 0
  @nodes = []
  @admin_ports = []
  @packets = {}
  @upgrade_queue = EM::Queue.new
end

Public Instance Methods

connect(options = {}) click to toggle source

@param options [Hash] @option options [String] :hostname (“localhost”) @option options [Fixnum] :port (8091) @option options [String] :pool (“default”) @option options [String] :bucket (“default”) @option options [String] :username (nil) @option options [String] :password (nil)

# File lib/em-couchbase/client.rb, line 44
def connect(options = {})
  @config_listener = ConfigurationListener.new
  @config_listener.on_error do |listener, error|
    if @admin_ports.empty?
      @on_error.call(self, error) if @on_error
    else
      options = options.merge(@admin_ports.shuffle!.pop)
      @config_listener.listen(options)
    end
  end
  @config_listener.on_upgrade do |config|
    @config = config
    config.nodes.each_with_index do |nn, ii|
      if nodes[ii] != nn
        nodes[ii].close_connection if nodes[ii]
        nodes[ii] = Node.connect(nn.merge(:client => self))
      end
    end
    @admin_ports = nodes.map do |node|
      host, port = node.admin.split(':')
      {:hostname => host, :port => port}
    end
    do_retry = lambda do |payload|
      opaque, packet = payload
      key, handler, raw = packet.values_at(:key, :handler, :raw)
      register_handler(opaque, key, handler)

      vbucket = raw[6..7].unpack("n").first
      if @config.vbucket_map_forward
        @config.vbucket_map[vbucket] = @config.vbucket_map_forward[vbucket].dup
      end
      node = @nodes[@config.vbucket_map[vbucket][0]]

      register_packet(opaque, raw)
      node.callback do
        node.send_data(raw)
      end
      @upgrade_queue.pop(&do_retry)
    end
    @upgrade_queue.pop(&do_retry)
    succeed
  end
  @config_listener.listen(options)
  self
end
decr(key, options = {}, &block) click to toggle source
# File lib/em-couchbase/client.rb, line 178
def decr(key, options = {}, &block)
  callback do
    opaque = opaque_inc
    register_handler(opaque, key, block)
    vbucket, node = locate(key)
    node.callback do
      node.arithm(:decr, opaque, vbucket, key, options)
    end
  end
end
get(*keys, &block) click to toggle source
# File lib/em-couchbase/client.rb, line 146
def get(*keys, &block)
  callback do
    if keys.last.is_a?(Hash)
      options = keys.last.pop
    end
    groups = keys.inject({}) do |acc, key|
      opaque = opaque_inc
      register_handler(opaque, key, block)
      vbucket, node = locate(key)
      acc[node] ||= []
      acc[node] << [opaque, vbucket, key]
      acc
    end
    groups.each do |node, tuple|
      node.callback do
        node.get(tuple, options)
      end
    end
  end
end
incr(key, options = {}, &block) click to toggle source
# File lib/em-couchbase/client.rb, line 167
def incr(key, options = {}, &block)
  callback do
    opaque = opaque_inc
    register_handler(opaque, key, block)
    vbucket, node = locate(key)
    node.callback do
      node.arithm(:incr, opaque, vbucket, key, options)
    end
  end
end
locate(key) click to toggle source

Locate node using vbucket distribution @return [Fixnum] server index

# File lib/em-couchbase/client.rb, line 128
def locate(key)
  digest = Couchbase::Util.crc32_hash(key.to_s)
  mask = @config.vbucket_map.size - 1
  vbucket = digest & mask
  [vbucket, @nodes[@config.vbucket_map[vbucket][0]]]
end
on_error(&callback) click to toggle source
# File lib/em-couchbase/client.rb, line 90
def on_error(&callback)
  @on_error = callback
end
register_handler(opaque, key, handler) click to toggle source
# File lib/em-couchbase/client.rb, line 101
def register_handler(opaque, key, handler)
  packet = (@packets[opaque] ||= {})
  packet[:key] = key
  packet[:handler] = handler
end
register_packet(opaque, packet) click to toggle source
# File lib/em-couchbase/client.rb, line 94
def register_packet(opaque, packet)
  if packet.respond_to?(:force_encoding)
    packet.force_encoding(Encoding::BINARY)
  end
  (@packets[opaque] ||= {})[:raw] = packet
end
retry(reason, opaque) click to toggle source
# File lib/em-couchbase/client.rb, line 107
def retry(reason, opaque)
  packet = @packets.delete(opaque)
  if packet
    case reason
    when :not_my_vbucket
      @upgrade_queue.push([opaque, packet])
    end
  end
end
run_callback(opaque, result) click to toggle source
# File lib/em-couchbase/client.rb, line 117
def run_callback(opaque, result)
  packet = @packets.delete(opaque)
  key, handler = packet.values_at(:key, :handler)
  if handler.respond_to?(:call)
    result.key = key
    handler.call(result)
  end
end
set(key, val, options = {}, &block) click to toggle source
# File lib/em-couchbase/client.rb, line 135
def set(key, val, options = {}, &block)
  callback do
    opaque = opaque_inc
    register_handler(opaque, key, block)
    vbucket, node = locate(key)
    node.callback do
      node.set(opaque, vbucket, key, val, options)
    end
  end
end

Protected Instance Methods

opaque_inc() click to toggle source
# File lib/em-couchbase/client.rb, line 197
def opaque_inc
  @opaque += 1
  @opaque &= 0xffffffff # 32 bits
end
unbind() click to toggle source
# File lib/em-couchbase/client.rb, line 191
def unbind
  nodes.each do |node|
    nodes.disconnect
  end
end