class EventMachine::Protocols::Couchbase::ConfigurationListener
Attributes
config_stream[R]
options[R]
Public Class Methods
new()
click to toggle source
# File lib/em-couchbase/configuration.rb, line 47 def initialize @parser = Yajl::Parser.new @parser.on_parse_complete = lambda do |object| if @on_upgrade config = build_config(object) if config @on_upgrade.call(config) end end end end
Public Instance Methods
build_config(json)
click to toggle source
# File lib/em-couchbase/configuration.rb, line 95 def build_config(json) return unless json.is_a?(Hash) bucket_name = json.fetch("name") bucket_type = json.fetch("bucketType") sasl_password = json.fetch("saslPassword") node_locator = json.fetch("nodeLocator") if node_locator == "vbucket" server_map = json.fetch("vBucketServerMap") num_replicas = server_map.fetch("numReplicas") hash_algorithm = server_map.fetch("hashAlgorithm") vbucket_map = server_map.fetch("vBucketMap") vbucket_map_forward = server_map["vBucketMapForward"] end nodes = json.fetch("nodes") if nodes.empty? raise ArgumentError, "empty list of nodes" end nodes = nodes.map do |node| admin = node.fetch("hostname") ports = node.fetch("ports") { :admin => admin, :proxy => admin.sub(/:\d+$/, ":#{ports.fetch("proxy")}"), :direct => admin.sub(/:\d+$/, ":#{ports.fetch("direct")}"), :couch => node["couchApiBase"], # nil for 1.8.x series :status => node.fetch("status"), :version => node.fetch("version") } end.sort_by{|node| node[:direct]} Configuration.new(:bucket_name => bucket_name, :bucket_type => bucket_type, :sasl_password => sasl_password, :node_locator => node_locator, :num_replicas => num_replicas, :hash_algorithm => hash_algorithm, :vbucket_map => vbucket_map, :vbucket_map_forward => vbucket_map_forward, :nodes => nodes) end
listen(options = {})
click to toggle source
# File lib/em-couchbase/configuration.rb, line 70 def listen(options = {}) @options = { :hostname => "localhost", :port => 8091, :pool => "default", :bucket => "default" }.merge(options) params = {:head => {}} (username, password) = auth = @options.values_at(:username, :password) if username && password params[:head][:authorization] = auth end uri = sprintf("http://%s:%d/pools/%s/bucketsStreaming/%s/", *@options.values_at(:hostname, :port, :pool, :bucket)) @config_stream = EM::HttpRequest.new(URI.parse(uri), :inactivity_timeout => 0).get params @config_stream.errback do |http| @on_error.call(self, http.error) if @on_error && http.error end @config_stream.stream do |chunk| @parser << chunk end @config_stream end
on_error(&callback)
click to toggle source
# File lib/em-couchbase/configuration.rb, line 63 def on_error(&callback) @on_error = callback if @config_stream @config_stream.errback(&callback) end end
on_upgrade(&callback)
click to toggle source
# File lib/em-couchbase/configuration.rb, line 59 def on_upgrade(&callback) @on_upgrade = callback end