class Riak::Multiget

Coordinates a parallel fetch operation for multiple values.

Attributes

client[R]

@return [Riak::Client] the associated client

fetch_list[R]

@return [Array<Bucket, String>] fetch_list an {Array} of {Bucket} and {String} keys to fetch

finished[R]

@return [Boolean] finished if the fetch operation has completed

result_hash[RW]

@return [Hash<fetch_list_entry, RObject] result_hash a {Hash} of {Bucket} and {String} key pairs to {RObject} instances

thread_count[RW]

@return [Integer] The number of threads to use

Public Class Methods

get_all(client, fetch_list) click to toggle source

Perform a Riak Multiget operation. @param [Client] client the {Riak::Client} that will perform the multiget @param [Array<Bucket, String>] fetch_list an {Array} of {Bucket} and {String} keys to fetch @return [Hash<fetch_list_entry, RObject] result_hash a {Hash} of {Bucket} and {String} key pairs to {RObject} instances

# File lib/riak/multiget.rb, line 28
def self.get_all(client, fetch_list)
  multi = new client, fetch_list
  multi.fetch
  multi.results
end
new(client, fetch_list) click to toggle source

Create a Riak Multiget operation. @param [Client] client the {Riak::Client} that will perform the multiget @param [Array<Bucket, String>] fetch_list an {Array} of {Bucket} and {String} keys to fetch

# File lib/riak/multiget.rb, line 37
def initialize(client, fetch_list)
  raise ArgumentError, t('client_type', :client => client.inspect) unless client.is_a? Riak::Client
  raise ArgumentError, t('array_type', :array => fetch_list.inspect) unless fetch_list.is_a? Array

  validate_fetch_list fetch_list
  @client, @fetch_list = client, fetch_list.uniq
  self.result_hash = Hash.new
  @finished = false
  self.thread_count = client.multiget_threads
end

Public Instance Methods

fetch() click to toggle source

Starts the parallelized fetch operation @raise [ArgumentError] when a non-positive-Integer count is given

# File lib/riak/multiget.rb, line 50
def fetch
  queue = fetch_list.dup
  queue_mutex = Mutex.new
  result_mutex = Mutex.new

  unless thread_count.is_a?(Integer) && thread_count > 0
    raise ArgumentError, t("invalid_multiget_thread_count")
  end

  @threads = 1.upto(thread_count).map do |_node|
    Thread.new do
      loop do
        pair = queue_mutex.synchronize do
          queue.shift
        end

        break if pair.nil?

        found = attempt_fetch(*pair)
        result_mutex.synchronize do
          result_hash[pair] = found
        end
      end
    end
  end
end
finished?() click to toggle source
# File lib/riak/multiget.rb, line 82
def finished?
  set_finished_for_thread_liveness
  finished
end
results() click to toggle source
# File lib/riak/multiget.rb, line 77
def results
  wait_for_finish
  result_hash
end
wait_for_finish() click to toggle source
# File lib/riak/multiget.rb, line 87
def wait_for_finish
  return if finished?
  @threads.each {|t| t.join }
  @finished = true
end

Private Instance Methods

attempt_fetch(bucket, key) click to toggle source
# File lib/riak/multiget.rb, line 95
def attempt_fetch(bucket, key)
  bucket[key]
rescue Riak::FailedRequest => e
  raise e unless e.not_found?
  nil
end
set_finished_for_thread_liveness() click to toggle source
# File lib/riak/multiget.rb, line 102
def set_finished_for_thread_liveness
  return if @finished # already done

  all_dead = @threads.none? {|t| t.alive? }
  return unless all_dead # still working

  @finished = true
  return
end
validate_fetch_list(fetch_list) click to toggle source
# File lib/riak/multiget.rb, line 112
def validate_fetch_list(fetch_list)
  return unless erroneous = fetch_list.detect do |e|
    bucket, key = e
    next true unless bucket.is_a? Bucket
    next true unless key.is_a? String
  end

  raise ArgumentError, t('fetch_list_type', :problem => erroneous)
end