class Moneta::Adapters::Couch

CouchDB backend

You can store hashes directly using this adapter.

@example Store hashes

db = Moneta::Adapters::Couch.new
db['key'] = {a: 1, b: 2}

@api public

Public Class Methods

new(options = {}) click to toggle source

@param [Hash] options @option options [String] :host (‘127.0.0.1’) Couch host @option options [String] :port (5984) Couch port @option options [String] :db (‘moneta’) Couch database @option options [String] :scheme (‘http’) HTTP scheme to use @option options [String] :value_field (‘value’) Document field to store value @option options [String] :type_field (‘type’) Document field to store value type @option options [String] :login Login name to use for HTTP basic authentication @option options [String] :password Password to use for HTTP basic authentication @option options [Symbol] :adapter Adapter to use with Faraday @option options [Faraday::Connecton] :backend Use existing backend instance @option options Other options passed to {Faraday::new} (unless

:backend option is provided).
Calls superclass method Moneta::Adapter::new
# File lib/moneta/adapters/couch.rb, line 57
def initialize(options = {})
  super

  if config.login && config.password
    # Faraday 1.x had a `basic_auth` function
    if backend.respond_to? :basic_auth
      backend.basic_auth(config.login, config.password)
    else
      backend.request :authorization, :basic, config.login, config.password
    end
  end

  @rev_cache = Moneta.build do
    use :Lock
    adapter :LRUHash
  end
  create_db unless config.skip_create_db
end

Public Instance Methods

clear(options = {}) click to toggle source

(see Proxy#clear) @option options [Boolean] :compact (false) Whether to compact the database after clearing @option options [Boolean] :await_compact (false) Whether to wait for compaction to complete

before returning.

@option options [Boolean] :full_commit (nil) Set to ‘true` or `false`

to override the server's
{https://docs.couchdb.org/en/stable/config/couchdb.html#couchdb/delayed_commits
 commit policy}
# File lib/moneta/adapters/couch.rb, line 148
def clear(options = {})
  loop do
    docs = all_docs(limit: 10_000)
    break if docs['rows'].empty?
    deletions = docs['rows'].map do |row|
      { _id: row['id'], _rev: row['value']['rev'], _deleted: true }
    end
    bulk_docs(deletions, full_commit: options[:full_commit])
  end

  # Compact the database unless told not to
  if options[:compact]
    post('_compact', expect: 202)

    # Performance won't be great while compaction is happening, so by default we wait for it
    if options[:await_compact]
      loop do
        db_info = get('', expect: 200)
        break unless db_info['compact_running']

        # wait before checking again
        sleep 1
      end
    end
  end

  self
end
create(key, value, options = {}) click to toggle source

(see Proxy#create) @option (see key?)

# File lib/moneta/adapters/couch.rb, line 179
def create(key, value, options = {})
  cache_rev = options[:cache_rev] != false
  doc = value_to_doc(value, nil)
  response = put(key, doc, cache_rev: cache_rev, returns: :response)
  case response.status
  when 201
    true
  when 409
    false
  else
    raise HTTPError.new(response.status, :put, @backend.create_url(key))
  end
rescue HTTPError
  tries ||= 0
  (tries += 1) < 10 ? retry : raise
end
delete(key, options = {}) click to toggle source

(see Proxy#delete) @option options [Boolean] :batch (false) Whether to do a

{https://docs.couchdb.org/en/stable/api/database/common.html#api-doc-batch-writes
 batch mode write}

@option options [Boolean] :full_commit (nil) Set to ‘true` or `false`

to override the server's
{https://docs.couchdb.org/en/stable/config/couchdb.html#couchdb/delayed_commits
 commit policy}
# File lib/moneta/adapters/couch.rb, line 121
def delete(key, options = {})
  get_response = get(key, returns: :response)
  if get_response.success?
    value = body_to_value(get_response.body)
    existing_rev = parse_rev(get_response)
    query = { rev: existing_rev }
    query[:batch] = 'ok' if options[:batch]
    request(:delete, key,
            headers: full_commit_header(options[:full_commit]),
            query: query,
            expect: options[:batch] ? 202 : 200)
    delete_cached_rev(key)
    value
  end
rescue HTTPError
  tries ||= 0
  (tries += 1) < 10 ? retry : raise
end
each_key() { |key| ... } click to toggle source

(see Proxy#each_key)

# File lib/moneta/adapters/couch.rb, line 197
def each_key
  return enum_for(:each_key) unless block_given?

  skip = 0
  limit = 1000
  loop do
    docs = all_docs(limit: limit, skip: skip)
    break if docs['rows'].empty?
    skip += docs['rows'].length
    docs['rows'].each do |row|
      key = row['id']
      @rev_cache[key] = row['value']['rev']
      yield key
    end
  end
  self
end
key?(key, options = {}) click to toggle source

(see Proxy#key?) @option options [Boolean] :cache_rev (true) Whether to cache the rev of

the document for faster updating
# File lib/moneta/adapters/couch.rb, line 79
def key?(key, options = {})
  cache_rev = options[:cache_rev] != false
  head(key, cache_rev: cache_rev)
end
load(key, options = {}) click to toggle source

(see Proxy#load) @option (see key?)

# File lib/moneta/adapters/couch.rb, line 86
def load(key, options = {})
  cache_rev = options[:cache_rev] != false
  doc = get(key, cache_rev: cache_rev)
  doc ? doc_to_value(doc) : nil
end
merge!(pairs, options = {}) { |key, existing, new_value| ... } click to toggle source

(see Proxy#merge!) @option options [Boolean] :full_commit (nil) Set to ‘true` or `false`

to override the server's
{https://docs.couchdb.org/en/stable/config/couchdb.html#couchdb/delayed_commits
 commit policy}
# File lib/moneta/adapters/couch.rb, line 235
def merge!(pairs, options = {})
  keys = pairs.map { |key, _| key }.to_a
  cache_revs(*keys.reject { |key| @rev_cache[key] })

  if block_given?
    existing = Hash[slice(*keys, **options)]
    pairs = pairs.map do |key, new_value|
      [
        key,
        if existing.key?(key)
          yield(key, existing[key], new_value)
        else
          new_value
        end
      ]
    end
  end

  docs = pairs.map { |key, value| value_to_doc(value, @rev_cache[key], key) }.to_a
  results = bulk_docs(docs, full_commit: options[:full_commit], returns: :doc)
  retries = results.each_with_object([]) do |row, retries|
    ok, id = row.values_at('ok', 'id')
    if ok
      @rev_cache[id] = row['rev']
    elsif row['error'] == 'conflict'
      delete_cached_rev(id)
      retries << pairs.find { |key,| key == id }
    else
      raise "Unrecognised response: #{row}"
    end
  end

  # Recursive call with all conflicts
  if retries.empty?
    self
  else
    merge!(retries, options)
  end
end
slice(*keys, **options) click to toggle source

(see Proxy#slice)

# File lib/moneta/adapters/couch.rb, line 222
def slice(*keys, **options)
  docs = all_docs(keys: keys, include_docs: true)
  docs["rows"].map do |row|
    next unless doc = row['doc']
    [row['id'], doc_to_value(doc)]
  end.compact
end
store(key, value, options = {}) click to toggle source

(see Proxy#store) @option (see key?) @option options [Boolean] :batch (false) Whether to do a

{https://docs.couchdb.org/en/stable/api/database/common.html#api-doc-batch-writes
 batch mode write}

@option options [Boolean] :full_commit (nil) Set to ‘true` or `false`

to override the server's
{https://docs.couchdb.org/en/stable/config/couchdb.html#couchdb/delayed_commits
 commit policy}
# File lib/moneta/adapters/couch.rb, line 101
def store(key, value, options = {})
  put(key, value_to_doc(value, rev(key)),
      headers: full_commit_header(options[:full_commit]),
      query: options[:batch] ? { batch: 'ok' } : {},
      cache_rev: options[:cache_rev] != false,
      expect: options[:batch] ? 202 : 201)
  value
rescue HTTPError
  tries ||= 0
  (tries += 1) < 10 ? retry : raise
end
values_at(*keys, **options) click to toggle source

(see Proxy#values_at)

# File lib/moneta/adapters/couch.rb, line 216
def values_at(*keys, **options)
  hash = Hash[slice(*keys, **options)]
  keys.map { |key| hash[key] }
end

Private Instance Methods

all_docs(sorted: false, **params) click to toggle source
# File lib/moneta/adapters/couch.rb, line 418
def all_docs(sorted: false, **params)
  keys = params.delete(:keys)
  query = encode_query(params.merge(sorted: sorted))
  if keys
    post('_all_docs', { keys: keys },
         query: query,
         expect: 200,
         returns: :doc)
  else
    get('_all_docs', query: query, expect: 200)
  end
end
body_to_value(body) click to toggle source
# File lib/moneta/adapters/couch.rb, line 281
def body_to_value(body)
  doc_to_value(MultiJson.load(body))
end
bulk_docs(docs, returns: :success, full_commit: nil) click to toggle source
# File lib/moneta/adapters/couch.rb, line 431
def bulk_docs(docs, returns: :success, full_commit: nil)
  post('_bulk_docs', { docs: docs },
       headers: full_commit_header(full_commit),
       returns: returns,
       expect: 201)
end
cache_response_rev(key, response) click to toggle source
# File lib/moneta/adapters/couch.rb, line 348
def cache_response_rev(key, response)
  case response.status
  when 200, 201
    @rev_cache[key] = parse_rev(response)
  else
    delete_cached_rev(key)
    nil
  end
end
cache_revs(*keys) click to toggle source
# File lib/moneta/adapters/couch.rb, line 336
def cache_revs(*keys)
  docs = all_docs(keys: keys)
  docs['rows'].each do |row|
    next if !row['value'] || row['value']['deleted']
    @rev_cache[row['id']] = row['value']['rev']
  end
end
create_db() click to toggle source
# File lib/moneta/adapters/couch.rb, line 315
def create_db
  loop do
    response = put('', returns: :response)
    case response.status
    when 201
      break
    when 412
      # Make sure the database really does exist
      # See https://github.com/apache/couchdb/issues/2073
      break if head('')
    else
      raise HTTPError.new(response.status, :put, '')
    end

    # Wait before trying again
    sleep 1
  end

  self
end
delete_cached_rev(key) click to toggle source
# File lib/moneta/adapters/couch.rb, line 358
def delete_cached_rev(key)
  @rev_cache.delete(key)
end
doc_to_value(doc) click to toggle source
# File lib/moneta/adapters/couch.rb, line 285
def doc_to_value(doc)
  case doc[config.type_field]
  when 'Hash'
    doc = doc.dup
    doc.delete('_id')
    doc.delete('_rev')
    doc.delete(config.type_field)
    doc
  else
    doc[config.value_field]
  end
end
encode_query(query) click to toggle source
# File lib/moneta/adapters/couch.rb, line 369
def encode_query(query)
  query.map { |key, value| [key, MultiJson.dump(value)] }
end
full_commit_header(full_commit) click to toggle source
# File lib/moneta/adapters/couch.rb, line 277
def full_commit_header(full_commit)
  full_commit == nil ? {} : { 'X-Couch-Full-Commit' => (!!full_commit).to_s }
end
get(key, returns: :doc, **options) click to toggle source
# File lib/moneta/adapters/couch.rb, line 400
def get(key, returns: :doc, **options)
  request(:get, key, returns: returns, **options)
end
head(key, returns: :success, **options) click to toggle source
# File lib/moneta/adapters/couch.rb, line 404
def head(key, returns: :success, **options)
  request(:head, key, returns: returns, **options)
end
parse_rev(response) click to toggle source
# File lib/moneta/adapters/couch.rb, line 344
def parse_rev(response)
  response['etag'][1..-2]
end
post(key, doc = nil, returns: :success, **options) click to toggle source
# File lib/moneta/adapters/couch.rb, line 413
def post(key, doc = nil, returns: :success, **options)
  body = doc == nil ? '' : MultiJson.dump(doc)
  request(:post, key, body, returns: returns, **options)
end
put(key, doc = nil, returns: :success, **options) click to toggle source
# File lib/moneta/adapters/couch.rb, line 408
def put(key, doc = nil, returns: :success, **options)
  body = doc == nil ? '' : MultiJson.dump(doc)
  request(:put, key, body, returns: returns, **options)
end
request(method, key, body = nil, returns: :nil, cache_rev: false, expect: nil, query: nil, headers: {}) click to toggle source
# File lib/moneta/adapters/couch.rb, line 373
def request(method, key, body = nil, returns: :nil, cache_rev: false, expect: nil, query: nil, headers: {})
  url = @backend.build_url(key, query)
  headers['Content-Type'] = 'application/json' if %i{put post}.include?(method)
  response = @backend.run_request(method, url, body || '', headers)

  if cache_rev
    cache_response_rev(key, response)
  end

  if expect
    raise HTTPError.new(response.status, method, url) unless response.status == expect
  end

  case returns
  when :response
    response
  when :success
    response.success?
  when :doc
    response.success? ? MultiJson.load(response.body) : nil
  when :nil
    nil
  else
    raise "Unknown returns param: #{returns.inspect}"
  end
end
rev(key) click to toggle source
# File lib/moneta/adapters/couch.rb, line 362
def rev(key)
  @rev_cache[key] || begin
    response = @backend.head(key)
    cache_response_rev(key, response)
  end
end
value_to_doc(value, rev, id = nil) click to toggle source
# File lib/moneta/adapters/couch.rb, line 298
def value_to_doc(value, rev, id = nil)
  doc =
    case value
    when Hash
      value.merge(config.type_field => 'Hash')
    when String
      { config.value_field => value, config.type_field => 'String' }
    when Float, Integer
      { config.value_field => value, config.type_field => 'Number' }
    else
      raise ArgumentError, "Invalid value type: #{value.class}"
    end
  doc['_rev'] = rev if rev
  doc['_id'] = id if id
  doc
end