module EventMachine::Bucketer::Ordered::Database::Redis

Private Instance Methods

add_item_to_db(bucket_id, item, &blk) click to toggle source
# File lib/em-bucketer/ordered/database/redis.rb, line 33
def add_item_to_db(bucket_id, item, &blk)
  EM::Completion.new.tap do |c|
    c.callback(&blk) if block_given?
    redis.rpush(redis_key(bucket_id), Marshal.dump(item)).callback do
      add_to_known_buckets(bucket_id).callback do
        c.succeed
      end.errback do |e|
        c.fail e
      end
    end.errback do |e|
      c.fail e
    end
  end
end
add_to_known_buckets(bucket_id, &blk) click to toggle source
# File lib/em-bucketer/ordered/database/redis.rb, line 103
def add_to_known_buckets(bucket_id, &blk)
  redis.sadd(redis_known_buckets_key, bucket_id, &blk)
end
bucket_size_from_db(bucket_id, &blk) click to toggle source
# File lib/em-bucketer/ordered/database/redis.rb, line 22
def bucket_size_from_db(bucket_id, &blk)
  EM::Completion.new.tap do |c|
    c.callback(&blk) if block_given?
    redis.llen(redis_key(bucket_id)).callback do |len|
      c.succeed len.to_i
    end.errback do |e|
      c.fail e
    end
  end
end
empty_bucket_in_db(bucket_id, &blk) click to toggle source
# File lib/em-bucketer/ordered/database/redis.rb, line 84
def empty_bucket_in_db(bucket_id, &blk)
  EM::Completion.new.tap do |c|
    c.callback(&blk) if block_given?
    redis.del(redis_key(bucket_id)).callback do
      remove_from_known_buckets(bucket_id).callback do
        c.succeed
      end.errback do |e|
        c.fail e
      end
    end.errback do |e|
      c.fail e
    end
  end
end
get_bucket_from_db(bucket_id, &blk) click to toggle source
# File lib/em-bucketer/ordered/database/redis.rb, line 72
def get_bucket_from_db(bucket_id, &blk)
  EM::Completion.new.tap do |c|
    c.callback(&blk) if block_given?
    redis.lrange(redis_key(bucket_id), 0, -1).callback do |data|
      bucket = data.map { |d| Marshal.load(d) }
      c.succeed bucket
    end.errback do |e|
      c.fail e
    end
  end
end
known_buckets(&blk) click to toggle source
# File lib/em-bucketer/ordered/database/redis.rb, line 99
def known_buckets(&blk)
  redis.smembers(redis_known_buckets_key, &blk)
end
pop_all_from_db(bucket_id, &blk) click to toggle source
# File lib/em-bucketer/ordered/database/redis.rb, line 48
def pop_all_from_db(bucket_id, &blk)
  EM::Completion.new.tap do |c|
    c.callback(&blk) if block_given?
    redis.lpopa([redis_key(bucket_id)]).callback do |data|
      bucket = data.map { |d| Marshal.load(d) }
      c.succeed bucket
    end.errback do |e|
      c.fail e
    end
  end
end
pop_count_from_db(bucket_id, count, &blk) click to toggle source
# File lib/em-bucketer/ordered/database/redis.rb, line 60
def pop_count_from_db(bucket_id, count, &blk)
  EM::Completion.new.tap do |c|
    c.callback(&blk) if block_given?
    redis.lpopn([redis_key(bucket_id)], count).callback do |data|
      bucket = data.map { |d| Marshal.load(d) }
      c.succeed bucket
    end.errback do |e|
      c.fail e
    end
  end
end
redis_key(bucket_id) click to toggle source
# File lib/em-bucketer/ordered/database/redis.rb, line 111
def redis_key(bucket_id)
  "em_bucketer_ordered:#{redis_prefix}:#{bucket_id}"
end
redis_known_buckets_key() click to toggle source
# File lib/em-bucketer/ordered/database/redis.rb, line 115
def redis_known_buckets_key
  "em_bucketer_ordered_known_buckets:#{redis_prefix}"
end
remove_from_known_buckets(bucket_id, &blk) click to toggle source
# File lib/em-bucketer/ordered/database/redis.rb, line 107
def remove_from_known_buckets(bucket_id, &blk)
  redis.srem(redis_known_buckets_key, bucket_id, &blk)
end
setup_db() click to toggle source
# File lib/em-bucketer/ordered/database/redis.rb, line 9
      def setup_db
        redis.register_script(:lpopn, <<-END)
          local r = redis.call('lrange', KEYS[1], 0, ARGV[1] - 1)
          redis.call('ltrim', KEYS[1], ARGV[1], -1)
          return r
        END
        redis.register_script(:lpopa, <<-END)
          local r = redis.call('lrange', KEYS[1], 0, - 1)
          redis.call('del', KEYS[1])
          return r
        END
      end