module EventMachine::Bucketer::Base

Public Instance Methods

add_item(bucket_id, item_id, item, &blk) click to toggle source

Adds a item to the specified bucket and calls the block when it is done

@param bucket_id [String] the bucket id of the bucket to put the item in @param item_id [String] the item_id of the item (used to ensure uniqueness within a bucket) @param item [Object] the item to be placed in the bucket

# File lib/em-bucketer/base.rb, line 21
def add_item(bucket_id, item_id, item, &blk)
  add_timer_if_first(bucket_id)
  EM::Completion.new.tap do |c|
    c.callback(&blk) if block_given?
    add_bucket_to_db(bucket_id, item_id, item).callback do
      c.succeed
    end.errback do |e|
      c.fail e
    end
    check_bucket_full(bucket_id)
  end
end
empty_bucket(bucket_id, &blk) click to toggle source

Empty a bucket

@param bucket_id [String] the bucket id of the bucket you want to empty

# File lib/em-bucketer/base.rb, line 140
def empty_bucket(bucket_id, &blk)
  EM::Completion.new.tap do |c|
    c.callback(&blk) if block_given?
    empty_bucket_in_db(bucket_id).callback do
      clear_timer(bucket_id)
      c.succeed
    end.errback do |e|
      c.fail e
    end
  end
end
get_and_empty_bucket(bucket_id, &blk) click to toggle source

Get the contents of a bucket then empty it

@param bucket_id [String] the bucket id of the bucket you want to get @yield [Array] the items you put into the bucket

# File lib/em-bucketer/base.rb, line 88
def get_and_empty_bucket(bucket_id, &blk)
  EM::Completion.new.tap do |c|
    c.callback(&blk) if block_given?
    get_bucket(bucket_id).callback do |contents|
      empty_bucket(bucket_id).callback do
        c.succeed contents
      end.errback do |e|
        c.fail e
      end
    end.errback do |e|
      c.fail e
    end
  end
end
get_and_remove(bucket_id, count, &blk) click to toggle source

Get at most ‘count` number of items from the bucket and remove them.

@example get 100 items from the bucket

bucketer.get_and_remove("1", 100) do |items|
  p "yay I got #{items.count} items"
  items.each do |i|
    p "got #{i}"
  end
end

@param bucket_id [String] the bucket id of the bucket you want to get @param count [Integer] the number of items you want from the bucket @yield [Array] the items you put into the bucket

# File lib/em-bucketer/base.rb, line 120
def get_and_remove(bucket_id, count, &blk)
  EM::Completion.new.tap do |c|
    c.callback(&blk) if block_given?
    get_bucket_from_db(bucket_id).callback do |bucket|
      empty_bucket(bucket_id).callback do
        values = []
        EM::Iterator.new(bucket).each(get_and_remove_iterator(bucket_id, count, values, c), -> { c.succeed(values) })
      end.errback do |e|
        c.fail e
      end
    end.errback do |e|
      c.fail e
    end
  end
end
get_bucket(bucket_id, &blk) click to toggle source

Get the contents of a bucket.

@param bucket_id [String] the bucket id of the bucket you want to get @yield [Array] the items you put into the bucket

# File lib/em-bucketer/base.rb, line 71
def get_bucket(bucket_id, &blk)
  EM::Completion.new.tap do |c|
    c.callback(&blk) if block_given?
    get_bucket_from_db(bucket_id).callback do |bucket|
      c.succeed bucket.values
    end.errback do |e|
      c.fail e
    end
  end
end
on_bucket_full(&blk) click to toggle source

Used to set a callback hook for when a bucket reaches the threshold size. It is IMPORTANT to note that the bucket will not automatically be emptied you must call empty_bucket if you want the bucket to be emptied. Also the callback will be called every time a item is added until the bucket is emptied.

@yield [String] The bucket id of the full bucket

# File lib/em-bucketer/base.rb, line 43
def on_bucket_full(&blk)
  @on_bucket_full_callbacks << blk
end
on_bucket_timeout(&blk) click to toggle source

Used to set a callback hook for when a bucket reaches the time limit. It is IMPORTANT to note that the bucket will not automatically be emptied you must call empty_bucket if you want the bucket to be emptied.

This timer is started once the bucket gets its first item and is cleared only when the bucket is emptied. The callback will only be called once at this time and then not again unless you empty the bucket and add something again.

@yield [String] The bucket id of the full bucket

# File lib/em-bucketer/base.rb, line 61
def on_bucket_timeout(&blk)
  @on_bucket_timeout_callbacks << blk
end
setup(bucket_threshold_size, bucket_max_age) click to toggle source
# File lib/em-bucketer/base.rb, line 3
def setup(bucket_threshold_size, bucket_max_age)
  @bucket_threshold_size = bucket_threshold_size
  @bucket_max_age = bucket_max_age
  @on_bucket_full_callbacks = []
  @on_bucket_timeout_callbacks = []
  @buckets_with_timers = Set.new
end

Private Instance Methods

add_timer_if_first(bucket_id) click to toggle source
# File lib/em-bucketer/base.rb, line 186
def add_timer_if_first(bucket_id)
  return unless @bucket_max_age
  if @buckets_with_timers.add?(bucket_id)
    EM.add_timer(@bucket_max_age) do
      @on_bucket_timeout_callbacks.each do |callback|
        callback.call bucket_id
      end
    end
  end
end
bucket_full?(bucket_id, &blk) click to toggle source
# File lib/em-bucketer/base.rb, line 170
def bucket_full?(bucket_id, &blk)
  bucket_size_from_db(bucket_id).callback do |size|
    blk.call size >= @bucket_threshold_size
  end
end
check_bucket_full(bucket_id) click to toggle source
# File lib/em-bucketer/base.rb, line 176
def check_bucket_full(bucket_id)
  bucket_full?(bucket_id) do |is_full|
    if is_full
      @on_bucket_full_callbacks.each do |callback|
        callback.call bucket_id
      end
    end
  end
end
clear_timer(bucket_id) click to toggle source
# File lib/em-bucketer/base.rb, line 197
def clear_timer(bucket_id)
  @buckets_with_timers.delete(bucket_id)
end
get_and_remove_iterator(bucket_id, count, values, completion) click to toggle source
# File lib/em-bucketer/base.rb, line 154
def get_and_remove_iterator(bucket_id, count, values, completion)
  proc do |tuple, iter|
    key, val = tuple[0], tuple[1]
    if values.count < count
      values << val
      iter.next
    else
      add_item(bucket_id, key, val).callback do
        iter.next
      end.errback do |e|
        completion.fail e
      end
    end
  end
end