module Couchbase::Async

Public Instance Methods

async() click to toggle source
# File lib/couchbase/async.rb, line 28
def async
  Thread.current[:bucket_async] ||= @async
end
async=(val) click to toggle source
# File lib/couchbase/async.rb, line 32
def async=(val)
  Thread.current[:bucket_async] = val
end
async?() click to toggle source
# File lib/couchbase/async.rb, line 24
def async?
  !!async
end
async_queue() click to toggle source
# File lib/couchbase/async.rb, line 48
def async_queue
  Thread.current[:bucket_async_queue] ||= Couchbase::Async::Queue.new(self)
end
end_async_queue() click to toggle source
# File lib/couchbase/async.rb, line 52
def end_async_queue
  Thread.current[:bucket_async_queue] = nil
end
run(options = {}) { |self| ... } click to toggle source

Run the event loop.

@since 1.0.0

@param [Hash] options The options for operation for connection @option options [Fixnum] :send_threshold (0) if the internal command

buffer will exceeds this value, then the library will start network
interaction and block the current thread until all scheduled commands
will be completed.

@yieldparam [Bucket] bucket the bucket instance

@example Use block to run the loop

c = Couchbase.new
c.run do
  c.get("foo") {|ret| puts ret.value}
end

@example Use lambda to run the loop

c = Couchbase.new
operations = lambda do |c|
  c.get("foo") {|ret| puts ret.value}
end
c.run(&operations)

@example Use threshold to send out commands automatically

c = Couchbase.connect
sent = 0
c.run(:send_threshold => 8192) do  # 8Kb
  c.set("foo1", "x" * 100) {|r| sent += 1}
  # 128 bytes buffered, sent is 0 now
  c.set("foo2", "x" * 10000) {|r| sent += 1}
  # 10028 bytes added, sent is 2 now
  c.set("foo3", "x" * 100) {|r| sent += 1}
end
# all commands were executed and sent is 3 now

@example Use {Couchbase::Bucket#run} without block for async connection

c = Couchbase.new(:async => true)
c.run      # ensure that instance connected
c.set("foo", "bar"){|r| puts r.cas}
c.run

@return [nil]

@raise [Couchbase::Error::Connect] if connection closed (see {Bucket#reconnect})

# File lib/couchbase/async.rb, line 105
def run(options = {})
  do_async_setup(block_given?)
  yield(self)
  async_queue.join

  # TODO: deal with exceptions
  nil
ensure
  do_async_ensure
end
run_async(options = {}) { |self| ... } click to toggle source
# File lib/couchbase/async.rb, line 116
def run_async(options = {})
  do_async_setup(block_given?)
  yield(self)
  nil
ensure
  do_async_ensure
end
running() click to toggle source
# File lib/couchbase/async.rb, line 40
def running
  Thread.current[:bucket_running] ||= false
end
running=(val) click to toggle source
# File lib/couchbase/async.rb, line 44
def running=(val)
  Thread.current[:bucket_running] = val
end
running?() click to toggle source
# File lib/couchbase/async.rb, line 36
def running?
  !!running
end

Private Instance Methods

do_async_ensure() click to toggle source
# File lib/couchbase/async.rb, line 136
def do_async_ensure
  self.async   = false
  self.running = false
  end_async_queue
end
do_async_setup(block_given) click to toggle source
# File lib/couchbase/async.rb, line 126
def do_async_setup(block_given)
  raise LocalJumpError.new('block required for async run') unless block_given
  # TODO: check for connection
  raise Error::Invalid.new('nested #run') if running?
  # TOOD: deal with thresholds

  self.async   = true
  self.running = true
end
register_callback(future, &block) click to toggle source
# File lib/couchbase/async.rb, line 151
def register_callback(future, &block)
  callback = Couchbase::Callback.new(:set, &block)
  future.addListener(callback)
end
register_future(future, options, &block) click to toggle source
# File lib/couchbase/async.rb, line 142
def register_future(future, options, &block)
  if async_queue
    async_queue.add_future(future, options, &block)
  else
    register_callback(future, &block)
  end
  future
end