class Riak::Multiget
Coordinates a parallel fetch operation for multiple values.
Attributes
@return [Riak::Client] the associated client
@return [Array<Bucket, String>] fetch_list
an {Array} of {Bucket} and {String} keys to fetch
@return [Boolean] finished if the fetch operation has completed
@return [Hash<fetch_list_entry, RObject] result_hash
a {Hash} of {Bucket} and {String} key pairs to {RObject} instances
@return [Integer] The number of threads to use
Public Class Methods
Perform a Riak
Multiget
operation. @param [Client] client the {Riak::Client} that will perform the multiget @param [Array<Bucket, String>] fetch_list
an {Array} of {Bucket} and {String} keys to fetch @return [Hash<fetch_list_entry, RObject] result_hash
a {Hash} of {Bucket} and {String} key pairs to {RObject} instances
# File lib/riak/multiget.rb, line 28 def self.get_all(client, fetch_list) multi = new client, fetch_list multi.fetch multi.results end
Create a Riak
Multiget
operation. @param [Client] client the {Riak::Client} that will perform the multiget @param [Array<Bucket, String>] fetch_list
an {Array} of {Bucket} and {String} keys to fetch
# File lib/riak/multiget.rb, line 37 def initialize(client, fetch_list) raise ArgumentError, t('client_type', :client => client.inspect) unless client.is_a? Riak::Client raise ArgumentError, t('array_type', :array => fetch_list.inspect) unless fetch_list.is_a? Array validate_fetch_list fetch_list @client, @fetch_list = client, fetch_list.uniq self.result_hash = Hash.new @finished = false self.thread_count = client.multiget_threads end
Public Instance Methods
Starts the parallelized fetch operation @raise [ArgumentError] when a non-positive-Integer count is given
# File lib/riak/multiget.rb, line 50 def fetch queue = fetch_list.dup queue_mutex = Mutex.new result_mutex = Mutex.new unless thread_count.is_a?(Integer) && thread_count > 0 raise ArgumentError, t("invalid_multiget_thread_count") end @threads = 1.upto(thread_count).map do |_node| Thread.new do loop do pair = queue_mutex.synchronize do queue.shift end break if pair.nil? found = attempt_fetch(*pair) result_mutex.synchronize do result_hash[pair] = found end end end end end
# File lib/riak/multiget.rb, line 82 def finished? set_finished_for_thread_liveness finished end
# File lib/riak/multiget.rb, line 77 def results wait_for_finish result_hash end
# File lib/riak/multiget.rb, line 87 def wait_for_finish return if finished? @threads.each {|t| t.join } @finished = true end
Private Instance Methods
# File lib/riak/multiget.rb, line 95 def attempt_fetch(bucket, key) bucket[key] rescue Riak::FailedRequest => e raise e unless e.not_found? nil end
# File lib/riak/multiget.rb, line 102 def set_finished_for_thread_liveness return if @finished # already done all_dead = @threads.none? {|t| t.alive? } return unless all_dead # still working @finished = true return end
# File lib/riak/multiget.rb, line 112 def validate_fetch_list(fetch_list) return unless erroneous = fetch_list.detect do |e| bucket, key = e next true unless bucket.is_a? Bucket next true unless key.is_a? String end raise ArgumentError, t('fetch_list_type', :problem => erroneous) end