module Couchbase::Async
Public Instance Methods
async()
click to toggle source
# File lib/couchbase/async.rb, line 28 def async Thread.current[:bucket_async] ||= @async end
async=(val)
click to toggle source
# File lib/couchbase/async.rb, line 32 def async=(val) Thread.current[:bucket_async] = val end
async?()
click to toggle source
# File lib/couchbase/async.rb, line 24 def async? !!async end
async_queue()
click to toggle source
# File lib/couchbase/async.rb, line 48 def async_queue Thread.current[:bucket_async_queue] ||= Couchbase::Async::Queue.new(self) end
end_async_queue()
click to toggle source
# File lib/couchbase/async.rb, line 52 def end_async_queue Thread.current[:bucket_async_queue] = nil end
run(options = {}) { |self| ... }
click to toggle source
Run the event loop.
@since 1.0.0
@param [Hash] options The options for operation for connection @option options [Fixnum] :send_threshold (0) if the internal command
buffer will exceeds this value, then the library will start network interaction and block the current thread until all scheduled commands will be completed.
@yieldparam [Bucket] bucket the bucket instance
@example Use block to run the loop
c = Couchbase.new c.run do c.get("foo") {|ret| puts ret.value} end
@example Use lambda to run the loop
c = Couchbase.new operations = lambda do |c| c.get("foo") {|ret| puts ret.value} end c.run(&operations)
@example Use threshold to send out commands automatically
c = Couchbase.connect sent = 0 c.run(:send_threshold => 8192) do # 8Kb c.set("foo1", "x" * 100) {|r| sent += 1} # 128 bytes buffered, sent is 0 now c.set("foo2", "x" * 10000) {|r| sent += 1} # 10028 bytes added, sent is 2 now c.set("foo3", "x" * 100) {|r| sent += 1} end # all commands were executed and sent is 3 now
@example Use {Couchbase::Bucket#run} without block for async connection
c = Couchbase.new(:async => true) c.run # ensure that instance connected c.set("foo", "bar"){|r| puts r.cas} c.run
@return [nil]
@raise [Couchbase::Error::Connect] if connection closed (see {Bucket#reconnect})
# File lib/couchbase/async.rb, line 105 def run(options = {}) do_async_setup(block_given?) yield(self) async_queue.join # TODO: deal with exceptions nil ensure do_async_ensure end
run_async(options = {}) { |self| ... }
click to toggle source
# File lib/couchbase/async.rb, line 116 def run_async(options = {}) do_async_setup(block_given?) yield(self) nil ensure do_async_ensure end
running()
click to toggle source
# File lib/couchbase/async.rb, line 40 def running Thread.current[:bucket_running] ||= false end
running=(val)
click to toggle source
# File lib/couchbase/async.rb, line 44 def running=(val) Thread.current[:bucket_running] = val end
running?()
click to toggle source
# File lib/couchbase/async.rb, line 36 def running? !!running end
Private Instance Methods
do_async_ensure()
click to toggle source
# File lib/couchbase/async.rb, line 136 def do_async_ensure self.async = false self.running = false end_async_queue end
do_async_setup(block_given)
click to toggle source
# File lib/couchbase/async.rb, line 126 def do_async_setup(block_given) raise LocalJumpError.new('block required for async run') unless block_given # TODO: check for connection raise Error::Invalid.new('nested #run') if running? # TOOD: deal with thresholds self.async = true self.running = true end
register_callback(future, &block)
click to toggle source
# File lib/couchbase/async.rb, line 151 def register_callback(future, &block) callback = Couchbase::Callback.new(:set, &block) future.addListener(callback) end
register_future(future, options, &block)
click to toggle source
# File lib/couchbase/async.rb, line 142 def register_future(future, options, &block) if async_queue async_queue.add_future(future, options, &block) else register_callback(future, &block) end future end