class EventMachine::Protocols::Couchbase::ConfigurationListener

Attributes

config_stream[R]
options[R]

Public Class Methods

new() click to toggle source
# File lib/em-couchbase/configuration.rb, line 47
def initialize
  @parser = Yajl::Parser.new
  @parser.on_parse_complete = lambda do |object|
    if @on_upgrade
      config = build_config(object)
      if config
        @on_upgrade.call(config)
      end
    end
  end
end

Public Instance Methods

build_config(json) click to toggle source
# File lib/em-couchbase/configuration.rb, line 95
def build_config(json)
  return unless json.is_a?(Hash)

  bucket_name = json.fetch("name")
  bucket_type = json.fetch("bucketType")
  sasl_password = json.fetch("saslPassword")
  node_locator = json.fetch("nodeLocator")
  if node_locator == "vbucket"
    server_map = json.fetch("vBucketServerMap")
    num_replicas = server_map.fetch("numReplicas")
    hash_algorithm = server_map.fetch("hashAlgorithm")
    vbucket_map = server_map.fetch("vBucketMap")
    vbucket_map_forward = server_map["vBucketMapForward"]
  end
  nodes = json.fetch("nodes")
  if nodes.empty?
    raise ArgumentError, "empty list of nodes"
  end
  nodes = nodes.map do |node|
    admin = node.fetch("hostname")
    ports = node.fetch("ports")
    {
      :admin => admin,
      :proxy => admin.sub(/:\d+$/, ":#{ports.fetch("proxy")}"),
      :direct => admin.sub(/:\d+$/, ":#{ports.fetch("direct")}"),
      :couch => node["couchApiBase"], # nil for 1.8.x series
      :status => node.fetch("status"),
      :version => node.fetch("version")
    }
  end.sort_by{|node| node[:direct]}
  Configuration.new(:bucket_name => bucket_name,
                    :bucket_type => bucket_type,
                    :sasl_password => sasl_password,
                    :node_locator => node_locator,
                    :num_replicas => num_replicas,
                    :hash_algorithm => hash_algorithm,
                    :vbucket_map => vbucket_map,
                    :vbucket_map_forward => vbucket_map_forward,
                    :nodes => nodes)
end
listen(options = {}) click to toggle source
# File lib/em-couchbase/configuration.rb, line 70
def listen(options = {})
  @options = {
    :hostname => "localhost",
    :port => 8091,
    :pool => "default",
    :bucket => "default"
  }.merge(options)
  params = {:head => {}}
  (username, password) = auth = @options.values_at(:username, :password)
  if username && password
    params[:head][:authorization] = auth
  end
  uri = sprintf("http://%s:%d/pools/%s/bucketsStreaming/%s/",
                *@options.values_at(:hostname, :port, :pool, :bucket))
  @config_stream = EM::HttpRequest.new(URI.parse(uri),
                                       :inactivity_timeout => 0).get params
  @config_stream.errback do |http|
    @on_error.call(self, http.error) if @on_error && http.error
  end
  @config_stream.stream do |chunk|
    @parser << chunk
  end
  @config_stream
end
on_error(&callback) click to toggle source
# File lib/em-couchbase/configuration.rb, line 63
def on_error(&callback)
  @on_error = callback
  if @config_stream
    @config_stream.errback(&callback)
  end
end
on_upgrade(&callback) click to toggle source
# File lib/em-couchbase/configuration.rb, line 59
def on_upgrade(&callback)
  @on_upgrade = callback
end