class Etzetera::Client

Constants

API_VERSION
KEYS_PREFIX
LEADER_PREFIX
LOCK_PREFIX
STATS_PREFIX

Attributes

servers[RW]

Public Class Methods

new(servers = ['http://127.0.0.1:4001'], default_options = {}) click to toggle source
# File lib/etzetera/client.rb, line 19
def initialize(servers = ['http://127.0.0.1:4001'], default_options = {})

  self.servers = servers.dup

  def_opts = default_options.dup

  opts = {}
  opts[:headers]          = {:accept => 'application/json'}
  opts[:response]         = :object
  opts[:socket_class]     = Celluloid::IO::TCPSocket
  opts[:ssl_socket_class] = Celluloid::IO::SSLSocket

  @etcd_opts = {}
  @etcd_opts[:election_timeout]   = def_opts.delete(:election_timeout) {|key| 200}
  @etcd_opts[:heartbeat_interval] = def_opts.delete(:heartbeat_interval) {|key| 50}

  @default_options = ::HTTP::Options.new(opts.merge(def_opts))
end

Public Instance Methods

acquire_lock(name, ttl) click to toggle source
# File lib/etzetera/client.rb, line 91
def acquire_lock(name, ttl)
  request(:post, LOCK_PREFIX, name, :form => {:ttl => ttl})
end
compareAndDelete(key, prevValue) click to toggle source
# File lib/etzetera/client.rb, line 87
def compareAndDelete(key, prevValue)
  request(:delete, KEYS_PREFIX, key, :params => {:prevValue => prevValue})
end
compareAndSwap(key, prevValue, form) click to toggle source
# File lib/etzetera/client.rb, line 83
def compareAndSwap(key, prevValue, form)
  request(:put, KEYS_PREFIX, key, :form => form, :params => {:prevValue => prevValue})
end
create(key, form, params = {}) click to toggle source
# File lib/etzetera/client.rb, line 63
def create(key, form, params = {})
  request(:put, KEYS_PREFIX, key, :form => form, :params => params.merge({:prevExist => false}))
end
delete(key, params = {}) click to toggle source
# File lib/etzetera/client.rb, line 46
def delete(key, params = {})
  request(:delete, KEYS_PREFIX, key, :params => params)
end
delete_leader(clustername, name) click to toggle source
# File lib/etzetera/client.rb, line 115
def delete_leader(clustername, name)
  request(:delete, LEADER_PREFIX, clustername, :form => {:name => name})
end
dir(dir, params = {}) click to toggle source
# File lib/etzetera/client.rb, line 75
def dir(dir, params = {})
  request(:get, KEYS_PREFIX, dir, :params => params.merge({:recursive => true}))
end
get(key, params = {}) click to toggle source
# File lib/etzetera/client.rb, line 38
def get(key, params = {})
  request(:get, KEYS_PREFIX, key, :params => params)
end
get_leader(clustername, params = {}) click to toggle source
# File lib/etzetera/client.rb, line 111
def get_leader(clustername, params = {})
  request(:get, LEADER_PREFIX, clustername, :params => params)
end
mkdir(dir) click to toggle source
# File lib/etzetera/client.rb, line 71
def mkdir(dir)
  request(:put, KEYS_PREFIX, dir, :params => {:dir => true})
end
release_lock(name, form) click to toggle source
# File lib/etzetera/client.rb, line 99
def release_lock(name, form)
  request(:delete, LOCK_PREFIX, name, :form => form)
end
renew_lock(name, form) click to toggle source
# File lib/etzetera/client.rb, line 95
def renew_lock(name, form)
  request(:put, LOCK_PREFIX, name, :form => form)
end
retrieve_lock(name, params) click to toggle source
# File lib/etzetera/client.rb, line 103
def retrieve_lock(name, params)
  request(:get, LOCK_PREFIX, name, :params => params)
end
rmdir(dir, params = {}) click to toggle source
# File lib/etzetera/client.rb, line 79
def rmdir(dir, params = {})
  request(:delete, KEYS_PREFIX, dir, :params => {:recursive => true}.merge(params))
end
set(key, form, params = {}) click to toggle source
# File lib/etzetera/client.rb, line 42
def set(key, form, params = {})
  request(:put, KEYS_PREFIX, key, form: form, :params => params)
end
set_leader(clustername, name, ttl) click to toggle source
# File lib/etzetera/client.rb, line 107
def set_leader(clustername, name, ttl)
  request(:put, LEADER_PREFIX, clustername, :form => {:name => name, :ttl => ttl})
end
stats(type) click to toggle source
# File lib/etzetera/client.rb, line 119
def stats(type)
  request(:get, STATS_PREFIX, type)
end
update(key, form, params = {}) click to toggle source
# File lib/etzetera/client.rb, line 67
def update(key, form, params = {})
  request(:put, KEYS_PREFIX, key, :form => form, :params => params.merge({:prevExist => true}))
end
wait(key, callback = nil, params = {}) { |response| ... } click to toggle source
# File lib/etzetera/client.rb, line 50
def wait(key, callback = nil, params = {})
  response = request(:get, KEYS_PREFIX, key, :params => params.merge({:wait => true}))

  if block_given?
    yield response
  elsif callback
    #sleep (@etcd_opts[:heartbeat_interval] / 1000.0)
    callback.call(response)
  else
    response
  end
end

Private Instance Methods

request(verb, prefix, path, options = {}) click to toggle source
# File lib/etzetera/client.rb, line 124
def request(verb, prefix, path, options = {})
  opts = @default_options.merge(options)

  if opts[:form] && !opts[:form].is_a?(Hash)
    opts = opts.with_form({:value => opts[:form]})
  end

  client  = ::HTTP::Client.new(opts)
  server  = servers.first
  retries = servers.count - 1
  req = nil

  begin
    req = client.request(verb, "#{server}#{prefix}#{path}")
    response = MultiJson.load(req.body)
    unless response['errorCode']
      response
    else
      abort Error::CODES[response['errorCode']].new(response['message'])
    end
  rescue IOError => e
    abort e if retries < 1

    #sleep (@etcd_opts[:election_timeout] / 1000.0)

    old_server  = server
    new_servers = servers.dup
    new_servers.delete(old_server)
    # Would be nice if you could get the host:port combination of the new leader directly.
    server = new_servers.sample

    servers.swap!(servers.index(old_server), servers.index(server))

    retries -= 1

    retry
    # etcd is inconsistent in the way it handles http responses
    # instead of adopting their buggy behaviour (all 5** errors are text, 4** errors are json,
    # but both respond with text/plain content-type) i just use exceptions for flow control :<
  rescue MultiJson::LoadError => e
    if req.code.between?(200, 299)
      req.body.to_s
    elsif req.code.between?(300, 399)
      if req.headers['Location']
        debug req.headers['Location']
        request(verb, '', req.headers['Location'], opts)
      end
    elsif req.code.between?(400, 499)
      abort Error::HttpClientError.new("#{req.reason}\n\t#{req.body}")
    elsif req.code.between?(500, 599)
      abort Error::HttpServerError.new("#{req.reason}\n\t#{req.body}")
    else
      abort Error::EtzeteraError.new(e)
    end
  end
end