class Roma::Storage::BasicStorage

Constants

PACK_HEADER_TEMPLATE
0.. 3

vn

4.. 7

physical clock (unix time)

8..11

logical clock

12..15

exptime(unix time)

16..

value data

PACK_TEMPLATE

Attributes

cleanup_regexp[RW]
dbs[R]
divnum[RW]
do_each_vn_dump[RW]
each_clean_up_sleep[RW]
each_vn_dump_sleep[RW]
each_vn_dump_sleep_count[RW]
error_message[R]
ext_name[R]
hdb[R]
hdiv[R]
logic_clock_expire[RW]
option[W]
st_class[RW]
storage_path[W]
vn_list[W]

Public Class Methods

new() click to toggle source
# File lib/roma/storage/basic_storage.rb, line 32
def initialize
  # database handler
  @hdb = []
  # database cache handler
  @hdbc = []
  # status of a database
  @dbs = []
  @log_fd = nil
  # file number list of a each_vn_dump while
  @each_vn_dump_vnodes = []

  @hdiv = Hash.new(0)

  @ext_name = 'db'

  @st_class = nil
  @divnum = 10

  @each_vn_dump_sleep = 0.001
  @each_vn_dump_sleep_count = 100
  @each_clean_up_sleep = 0.01
  @cleanup_regexp = nil
  @logic_clock_expire = 300

  @each_cache_lock = Mutex::new
  @each_clean_up_lock = Mutex::new
  @stat_lock = Mutex::new
end

Public Instance Methods

add(vn, k, d, expt, v) click to toggle source
# File lib/roma/storage/basic_storage.rb, line 225
def add(vn, k, d, expt, v)
  buf = db_get(vn, k)
  clk = 0
  if buf
    vn, t, clk, expt2, v2 = unpack_data(buf)
    return nil if Time.now.to_i <= expt2
    clk = (clk + 1) & 0xffffffff
  end

  # not exist
  ret = [vn, Time.now.to_i, clk, expt, v]
  return ret if db_put(vn, k, pack_data(*ret))
  nil
end
append(vn, k, d, expt, v) click to toggle source
# File lib/roma/storage/basic_storage.rb, line 254
def append(vn, k, d, expt, v)
  buf = db_get(vn, k)
  return nil unless buf

  # buf != nil
  vn, t, clk, expt2, v2 = unpack_data(buf)
  return nil if Time.now.to_i > expt2
  clk = (clk + 1) & 0xffffffff

  ret = [vn, Time.now.to_i, clk, expt, v2 + v]
  return ret if db_put(vn, k, pack_data(*ret))
  nil
end
cache_file_name(dn) click to toggle source
# File lib/roma/storage/basic_storage.rb, line 661
def cache_file_name(dn)
  "#{@storage_path}/#{dn}.cache.#{@ext_name}"
end
cas(vn, k, d, clk, expt, v) click to toggle source
# File lib/roma/storage/basic_storage.rb, line 182
def cas(vn, k, d, clk, expt, v)
  buf = db_get(vn ,k)
  return :not_found unless buf
  t = Time.now.to_i
  data = unpack_data(buf)
  return :not_found if t > data[3]
  return :exists if clk != data[2]
  clk = (data[2] + 1) & 0xffffffff
  ret = [vn, t, clk, expt, v]
  return ret if db_put(vn, k, pack_data(*ret))
  nil
end
close_log() click to toggle source
# File lib/roma/storage/basic_storage.rb, line 757
def close_log
  @log_fd.close if @log_fd
  @log_fd = nil
end
closedb() click to toggle source
# File lib/roma/storage/basic_storage.rb, line 117
def closedb
  stop_clean_up
  buf = @hdb; @hdb = []
  buf.each{ |h| close_db(h) if h }
  buf = @hdbc; @hdbc = []
  buf.each{ |h| close_db(h) if h }
  close_log
end
db_get(vn, k) click to toggle source
# File lib/roma/storage/basic_storage.rb, line 149
def db_get(vn, k)
  n = @hdiv[vn]
  d = @hdb[n].get(k)
  return d if @dbs[n] == :normal

  c = @hdbc[n].get(k)
  return d unless c # in case of out of :normal status

  if @dbs[n] == :cachecleaning && d
    # in case of existing value is both @hdb and @hdbc
    vn, lat, clk, expt = unpack_header(d)
    cvn, clat, cclk, cexpt = unpack_header(c)
    return d if cmp_clk(clk, cclk) > 0 # if @hdb newer than @hdbc
  end
  c
end
db_put(vn, k, v) click to toggle source
# File lib/roma/storage/basic_storage.rb, line 166
def db_put(vn, k, v)
  n = @hdiv[vn]
  if @dbs[n] == :safecopy_flushing || @dbs[n] == :safecopy_flushed
    ret = @hdbc[n].put(k, v)
  else
    ret = @hdb[n].put(k, v)
  end
  ret
end
decr(vn, k, d, v) click to toggle source
# File lib/roma/storage/basic_storage.rb, line 374
def decr(vn, k, d, v)
  buf = db_get(vn, k)
  return nil unless buf

  # buf != nil
  vn, t, clk, expt2, v2 = unpack_data(buf)
  return nil if Time.now.to_i > expt2
  clk = (clk + 1) & 0xffffffff

  v = (v2.to_i - v)
  v = 0 if v < 0
  v = v & 0xffffffffffffffff

  ret = [vn, Time.now.to_i, clk, expt2, v.to_s]
  return ret if db_put(vn, k, pack_data(*ret))
  nil
end
delete(vn, k, d) click to toggle source
# File lib/roma/storage/basic_storage.rb, line 329
def delete(vn, k, d)
  buf = db_get(vn, k)
  v = ret = nil
  clk = 0
  if buf
    vn, t, clk, expt, v2 = unpack_data(buf)
    return :deletemark if expt == 0
    clk = (clk + 1) & 0xffffffff
    v = v2 if v2 && v2.length != 0 && Time.now.to_i <= expt
  end

  # [ 0.. 3] vn
  # [ 4.. 7] physical clock(unix time)
  # [ 8..11] logical clock
  # [12..15] exptime(unix time) => 0
  ret = [vn, Time.now.to_i, clk, 0, v]
  if db_put(vn, k, pack_header(*ret[0..-2]))
    return ret
  else
    return nil
  end
end
dump(vn) click to toggle source

Returns the vnode dump.

# File lib/roma/storage/basic_storage.rb, line 530
def dump(vn)
  buf = get_vnode_hash(vn)
  return nil if buf.length == 0
  Marshal.dump(buf)
end
each_cache_by_keys(dn, keys) { |vn, last, clk, expt, k, val| ... } click to toggle source

Calls the geven block, passes the cache(@hdbc) element.

dn

number of database

keys

key list

# File lib/roma/storage/basic_storage.rb, line 606
def each_cache_by_keys(dn, keys)
  keys.each do |k|
    v = @hdbc[dn].get(k)
    vn, last, clk, expt, val = unpack_data(v)
    yield [vn, last, clk, expt, k, val]
  end
end
each_cache_dump_pack(dn, keys) { |vn_dump_pack(vn, last, clk, expt, k, val)| ... } click to toggle source

Calls the geven block, passes the cache(@hdbc) element as the spushv command data format.

dn

number of database

keys

key list

# File lib/roma/storage/basic_storage.rb, line 618
def each_cache_dump_pack(dn, keys)
  keys.each do |k|
    v = @hdbc[dn].get(k)
    vn, last, clk, expt, val = unpack_data(v)
    yield vn_dump_pack(vn, last, clk, expt, k, val)
  end
end
each_clean_up(t, vnhash) { |k, vn| ... } click to toggle source
# File lib/roma/storage/basic_storage.rb, line 411
def each_clean_up(t, vnhash)
  @do_clean_up = true

  f = nil;
  if @cleanup_regexp && File.exist?(@storage_path)
    f = open(@storage_path + '/klist.txt','w')
  end

  return unless @each_clean_up_lock.try_lock
  nt = Time.now.to_i
  @divnum.times do |i|
    next if @dbs[i] != :normal
    hdb = @hdb[i]
    hdb.each do |k, v|
      return unless @do_clean_up # 1st check
      vn, last, clk, expt = unpack_header(v)
      vn_stat = vnhash[vn]
      if f && @cleanup_regexp && k =~ /#{@cleanup_regexp}/
        # write klist
        f.puts("#{k},#{last},#{clk}") if hdb.get(k) == v
      end
      if vn_stat == :primary && ( (expt != 0 && nt > expt) || (expt == 0 && t > last) )
        if yield k, vn
          hdb.out(k) if hdb.get(k) == v
        end
      elsif vn_stat == nil && t > last
        if yield k, vn
          hdb.out(k) if hdb.get(k) == v
        end
      end
      return unless @do_clean_up # 2nd ckeck
      sleep @each_clean_up_sleep
    end
  end
ensure
  @each_clean_up_lock.unlock if @each_clean_up_lock.locked?
  if f
    @cleanup_regexp = nil
    f.close
  end
end
each_hdb_dump(i,except_vnh = nil) { |[vn, last, clk, expt, length, k, length, val].pack("NNNNNa#{length}Na#{length}")| ... } click to toggle source
# File lib/roma/storage/basic_storage.rb, line 581
def each_hdb_dump(i,except_vnh = nil)
  count = 0
  @hdb[i].each{|k,v|
    vn, last, clk, expt, val = unpack_data(v)
    if except_vnh && except_vnh.key?(vn) || Time.now.to_i > expt
      count += 1
      sleep @each_vn_dump_sleep if count % @each_vn_dump_sleep_count == 0
    else
      yield [vn, last, clk, expt, k.length, k, val.length, val].pack("NNNNNa#{k.length}Na#{val.length}")
      sleep @each_vn_dump_sleep
    end
  }
end
each_vn_dump(target_vn) { |vn_dump_pack(vn, last, clk, expt, k, val)| ... } click to toggle source
# File lib/roma/storage/basic_storage.rb, line 536
def each_vn_dump(target_vn)
  n = @hdiv[target_vn]
  @stat_lock.synchronize do
    return false if @dbs[n] != :normal
    return false if @each_vn_dump_vnodes.include?(target_vn)
    @each_vn_dump_vnodes << target_vn
  end

  begin
    @do_each_vn_dump = true
    each_unpacked_db(target_vn, @hdb) do |vn, last, clk, expt, k, val|
      return unless @do_each_vn_dump
      yield vn_dump_pack(vn, last, clk, expt, k, val)
    end
  ensure
    @each_vn_dump_vnodes.delete(target_vn)
  end

  true
end
flush_db(dn) click to toggle source
# File lib/roma/storage/basic_storage.rb, line 657
def flush_db(dn)
  @hdb[dn].sync
end
get(vn, k, d) click to toggle source
# File lib/roma/storage/basic_storage.rb, line 282
def get(vn, k, d)
  buf = db_get(vn, k)
  return nil unless buf
  vn, t, clk, expt, v = unpack_data(buf)

  return nil if Time.now.to_i > expt
  v
end
get_context(vn, k, d) click to toggle source
# File lib/roma/storage/basic_storage.rb, line 176
def get_context(vn, k, d)
  buf = db_get(vn, k)
  return nil unless buf
  unpack_header(buf)
end
get_keys_in_cache(dn, kn=100) click to toggle source

Returns a key array in a cache(@hdbc).

dn

number of database

kn

number of keys which is return value

# File lib/roma/storage/basic_storage.rb, line 629
def get_keys_in_cache(dn, kn=100)
  return nil if @do_each_vn_dump
  ret = []
  return ret unless @hdbc[dn]
  count = 0
  @each_cache_lock.synchronize do
    @hdbc[dn].each do |k, v|
      ret << k
      break if (count+=1) >= kn
    end
  end
  ret
end
get_logfile_list() click to toggle source
# File lib/roma/storage/basic_storage.rb, line 718
def get_logfile_list
  l={}
  files=Dir.glob("#{@storage_path}/status.log.*")
  files.each{ |file|
    if /$.+status\.log\.(\d+)$/=~file
      l[$1.to_i]=$&
    end
  }
  # sorted by old order
  l.to_a.sort{|a,b| a[0]<=>b[0]}
end
get_raw(vn, k, d) click to toggle source
# File lib/roma/storage/basic_storage.rb, line 291
def get_raw(vn, k, d)
  buf = db_get(vn, k)
  return nil unless buf

  unpack_data(buf)
end
get_raw2(k) click to toggle source
# File lib/roma/storage/basic_storage.rb, line 298
def get_raw2(k)
  @hdb.each{|hdb|
    buf = hdb.get(k)
    return unpack_data(buf) if buf
  }
  nil
end
get_stat() click to toggle source
# File lib/roma/storage/basic_storage.rb, line 61
def get_stat
  ret = {}
  ret['storage.storage_path'] = File.expand_path(@storage_path)
  ret['storage.st_class'] = @st_class.to_s.match(/Roma::Storage::(.*)/)[1]
  ret['storage.divnum'] = @divnum
  ret['storage.option'] = @option
  ret['storage.each_vn_dump_sleep'] = @each_vn_dump_sleep
  ret['storage.each_vn_dump_sleep_count'] = @each_vn_dump_sleep_count
  ret['storage.each_vn_dump_files'] = @each_vn_dump_files.inspect
  ret['storage.each_clean_up_sleep'] = @each_clean_up_sleep
  ret['storage.cleanup_regexp'] = @cleanup_regexp
  ret['storage.logic_clock_expire'] = @logic_clock_expire
  ret['storage.safecopy_stats'] = @dbs.inspect
  ret
end
incr(vn, k, d, v) click to toggle source
# File lib/roma/storage/basic_storage.rb, line 356
def incr(vn, k, d, v)
  buf = db_get(vn, k)
  return nil unless buf

  # buf != nil
  vn, t, clk, expt2, v2 = unpack_data(buf)
  return nil if Time.now.to_i > expt2
  clk = (clk + 1) & 0xffffffff

  v = (v2.to_i + v)
  v = 0 if v < 0
  v = v & 0xffffffffffffffff

  ret = [vn, Time.now.to_i, clk, expt2, v.to_s]
  return ret if db_put(vn, k, pack_data(*ret))
  nil
end
load(dmp) click to toggle source
# File lib/roma/storage/basic_storage.rb, line 465
def load(dmp)
  n = 0
  h = Marshal.load(dmp)
  h.each_pair{ |k, v|
    # remort data
    r_vn, r_last, r_clk, r_expt = unpack_header(v)
    raise "An invalid vnode number is include.key=#{k} vn=#{r_vn}" unless @hdiv.key?(r_vn)
    local = @hdb[@hdiv[r_vn]].get(k)
    if local == nil
      n += 1
      @hdb[@hdiv[r_vn]].put(k, v)
    else
      # local data
      l_vn, l_last, l_clk, l_expt = unpack_data(local)
      if r_last - l_last < @logic_clock_expire && cmp_clk(r_clk,l_clk) <= 0
      else # remort is newer.
        n += 1
        @hdb[@hdiv[r_vn]].put(k, v)
      end
    end
    sleep @each_vn_dump_sleep
  }
  n
end
load_stream_dump(vn, last, clk, expt, k, v) click to toggle source
# File lib/roma/storage/basic_storage.rb, line 490
def load_stream_dump(vn, last, clk, expt, k, v)
  buf = db_get(vn, k)
  if buf
    data = unpack_header(buf)
    if last - data[1] < @logic_clock_expire && cmp_clk(clk,data[2]) <= 0
      return nil
    end
  end

  ret = [vn, last, clk, expt, v]
  if expt == 0
    # for the deleted mark
    return ret if db_put(vn, k, pack_header(*ret[0..3]))
  else
    return ret if db_put(vn, k, pack_data(*ret))
  end
  nil
end
load_stream_dump_for_cachecleaning(vn, last, clk, expt, k, v) click to toggle source
# File lib/roma/storage/basic_storage.rb, line 509
def load_stream_dump_for_cachecleaning(vn, last, clk, expt, k, v)
  n = @hdiv[vn]
  buf = @hdb[n].get(k)
  if buf
    data = unpack_header(buf)
    if last - data[1] < @logic_clock_expire && cmp_clk(clk,data[2]) <= 0
      return nil
    end
  end

  ret = [vn, last, clk, expt, v]
  if expt == 0
    # for the deleted mark
    return ret if @hdb[n].put(k, pack_header(*ret[0..3]))
  else
    return ret if @hdb[n].put(k, pack_data(*ret))
  end
  nil
end
open_log() click to toggle source
# File lib/roma/storage/basic_storage.rb, line 730
def open_log
  logs = get_logfile_list
  if logs.length == 0
    @log_name="#{@storage_path}/status.log.1"
  else
    if File::stat("#{@fname}.#{logs.last[0]}").size == 0
      @log_name="#{@fname}.#{logs.last[0]}"
    else
      @log_name="#{@fname}.#{logs.last[0]+1}"
    end
  end
  @log_fd=File.open(@log_name,"a")
end
opendb() click to toggle source
# File lib/roma/storage/basic_storage.rb, line 99
def opendb
  create_div_hash
  FileUtils.mkdir_p(@storage_path)
  @divnum.times do |i|
    # open database file
    @hdb[i] = open_db("#{@storage_path}/#{i}.#{@ext_name}")
    # check cache file
    if File.exist?(cache_file_name(i))
      @hdbc[i] = open_db(cache_file_name(i))
      stop_clean_up { @dbs[i] = :safecopy_flushed }
    else
      @dbs[i] = :normal
      @hdbc[i] = nil
    end
  end
  open_log
end
out(vn, k, d) click to toggle source
# File lib/roma/storage/basic_storage.rb, line 352
def out(vn, k, d)
  @hdb[@hdiv[vn]].out(k)
end
out_cache(dn, key) click to toggle source

Remove a key for the cache(@hdbc).

dn

number of database

key

key

# File lib/roma/storage/basic_storage.rb, line 598
def out_cache(dn, key)
  @hdbc[dn].out(key)
end
prepend(vn, k, d, expt, v) click to toggle source
# File lib/roma/storage/basic_storage.rb, line 268
def prepend(vn, k, d, expt, v)
  buf = db_get(vn, k)
  return nil unless buf

  # buf != nil
  vn, t, clk, expt2, v2 = unpack_data(buf)
  return nil if Time.now.to_i > expt2
  clk = (clk + 1) & 0xffffffff

  ret = [vn, Time.now.to_i, clk, expt, v + v2]
  return ret if db_put(vn, k, pack_data(*ret))
  nil
end
rdelete(vn, k, d, clk) click to toggle source
# File lib/roma/storage/basic_storage.rb, line 306
def rdelete(vn, k, d, clk)
  buf = db_get(vn, k)
  t = Time.now.to_i
  if buf
    data = unpack_header(buf)
    if t - data[1] < @logic_clock_expire && cmp_clk(clk,data[2]) <= 0
      @error_message = "error:#{t-data[1]} < #{@logic_clock_expire} && cmp_clk(#{clk},#{data[2]})<=0"
      return nil
    end
  end

  # [ 0.. 3] vn
  # [ 4.. 7] physical clock(unix time)
  # [ 8..11] logical clock
  # [12..15] exptime(unix time) => 0
  ret = [vn, t, clk, 0]
  if db_put(vn, k, pack_header(*ret))
    return ret
  else
    return nil
  end
end
replace(vn, k, d, expt, v) click to toggle source
# File lib/roma/storage/basic_storage.rb, line 240
def replace(vn, k, d, expt, v)
  buf = db_get(vn, k)
  return nil unless buf

  # buf != nil
  vn, t, clk, expt2, v2 = unpack_data(buf)
  return nil if Time.now.to_i > expt2
  clk = (clk + 1) & 0xffffffff

  ret = [vn, Time.now.to_i, clk, expt, v]
  return ret if db_put(vn, k, pack_data(*ret))
  nil
end
rset(vn, k, d, clk, expt, v) click to toggle source
# File lib/roma/storage/basic_storage.rb, line 195
def rset(vn, k, d, clk, expt, v)
  buf = db_get(vn, k)
  t = Time.now.to_i
  if buf
    data = unpack_data(buf)
    if t - data[1] < @logic_clock_expire && cmp_clk(clk,data[2]) <= 0
      @error_message = "error:#{t-data[1]} < #{@logic_clock_expire} && cmp_clk(#{clk},#{data[2]})<=0"
      return nil
    end
  end

  ret = [vn, t, clk, expt, v]
  return ret if db_put(vn, k, pack_data(*ret))
  @error_message = "error:put"
  nil
end
set(vn, k, d, expt, v) click to toggle source
# File lib/roma/storage/basic_storage.rb, line 212
def set(vn, k, d, expt, v)
  buf = db_get(vn, k)
  clk = 0
  if buf
    data = unpack_data(buf)
    clk = (data[2] + 1) & 0xffffffff
  end

  ret = [vn, Time.now.to_i, clk, expt, v]
  return ret if db_put(vn , k, pack_data(*ret))
  nil
end
set_db_stat(dn, stat) click to toggle source
# File lib/roma/storage/basic_storage.rb, line 665
def set_db_stat(dn, stat)
  @stat_lock.synchronize do
    case @dbs[dn]
    when :normal
      @each_vn_dump_vnodes.each do |vn|
        return false if dn == @hdiv[vn]
      end
      if stat == :safecopy_flushing
        # open cache
        @hdbc[dn] = open_db(cache_file_name(dn))
        stop_clean_up { @dbs[dn] = stat }
        write_log("#{dn} #{stat.to_s}")
        stat
      else
        false
      end
    when :safecopy_flushing
      if stat == :safecopy_flushed
        write_log("#{dn} #{stat.to_s}")
        @dbs[dn] = stat
      else
        false
      end
    when :safecopy_flushed
      if stat == :cachecleaning
        write_log("#{dn} #{stat.to_s}")
        @dbs[dn] = stat
      else
        false
      end
    when :cachecleaning
      if stat == :normal
        write_log("#{dn} #{stat.to_s}")
        @dbs[dn] = stat
        # remove cache
        close_db(@hdbc[dn])
        @hdbc[dn] = nil
        if File.exist?("#{@storage_path}/#{dn}.cache.#{@ext_name}")
          File.unlink("#{@storage_path}/#{dn}.cache.#{@ext_name}")
        end
        stat
      elsif stat == :safecopy_flushing
        write_log("#{dn} #{stat.to_s}")
        @dbs[dn] = stat
      else
        false
      end
    else
      false
    end
  end
end
set_expt(vn, k, d, expt) click to toggle source

set expire time

# File lib/roma/storage/basic_storage.rb, line 393
def set_expt(vn, k, d, expt)
  buf = db_get(vn, k)
  if buf
    vn, t, clk, expt2, v = unpack_data(buf)
    return nil if Time.now.to_i > expt2
    clk = (clk + 1) & 0xffffffff
    ret = [vn, Time.now.to_i, clk, expt, v]
    return ret if db_put(vn, k, pack_data(*ret))
  end
  nil
end
stop_clean_up(&block) click to toggle source
# File lib/roma/storage/basic_storage.rb, line 453
def stop_clean_up(&block)
  @do_clean_up = false
  if block
    @each_clean_up_lock.lock
    begin
      block.call
    ensure
      @each_clean_up_lock.unlock
    end
  end
end
true_length() click to toggle source
# File lib/roma/storage/basic_storage.rb, line 405
def true_length
  res = 0
  @hdb.each{ |hdb| res += hdb.rnum }
  res
end
write_log(line) click to toggle source
# File lib/roma/storage/basic_storage.rb, line 744
def write_log(line)
  return unless @log_name
  # log rotation
  if File::stat(@log_name).size > 1000 * 1024
    close_log
    open_log
  end
  t = Time.now
  tstr = "#{t.strftime('%Y-%m-%dT%H:%M:%S')}.#{t.usec}"
  @log_fd.write("#{tstr} #{line}\n")
  @log_fd.flush
end

Protected Instance Methods

create_div_hash() click to toggle source
# File lib/roma/storage/basic_storage.rb, line 92
def create_div_hash
  @vn_list.each{ |vn|
    @hdiv[vn] = Digest::SHA1.hexdigest(vn.to_s).hex % @divnum
  }
end

Private Instance Methods

cmp_clk(clk1, clk2) click to toggle source

Compare this clock with the specified.

-1, 0 or 1 as clk1 is numerically less than, equal to, or greater than the clk2 given as the parameter.

logical clock space is a 32bit ring.

# File lib/roma/storage/basic_storage.rb, line 83
def cmp_clk(clk1, clk2)
  if (clk1-clk2).abs < 0x80000000 # 1<<31
    clk1 <=> clk2
  else
    clk2 <=> clk1
  end
end
each_unpacked_db(target_vn, db) { |vn, last, clk, expt, k, val| ... } click to toggle source
# File lib/roma/storage/basic_storage.rb, line 566
def each_unpacked_db(target_vn, db)
  count = 0
  tn =  Time.now.to_i
  db[@hdiv[target_vn]].each do |k,v|
    vn, last, clk, expt, val = unpack_data(v)
    if vn != target_vn || (expt != 0 && tn > expt)
      count += 1
      sleep @each_vn_dump_sleep if count % @each_vn_dump_sleep_count == 0
      next
    end
    yield vn, last, clk, expt, k, val
  end
end
get_vnode_hash(vn) click to toggle source

Create vnode dump.

# File lib/roma/storage/basic_storage.rb, line 644
def get_vnode_hash(vn)
  buf = {}
  count = 0
  @hdb[@hdiv[vn]].each{ |k, v|
    count += 1
    sleep @each_vn_dump_sleep if count % @each_vn_dump_sleep_count == 0
    dat = unpack_data(v) #v.unpack('NNNN')
    buf[k] = v if dat[0] == vn
  }
  return buf
end
pack_data(vn, physical_clock, logical_clock, expire,value) click to toggle source
# File lib/roma/storage/basic_storage.rb, line 141
def pack_data(vn, physical_clock, logical_clock, expire,value)
  [vn,physical_clock, logical_clock, expire, value].pack(PACK_TEMPLATE)
end
pack_header(vn, physical_clock, logical_clock, expire) click to toggle source
# File lib/roma/storage/basic_storage.rb, line 135
def pack_header(vn, physical_clock, logical_clock, expire)
  [vn,physical_clock, logical_clock, expire].pack(PACK_HEADER_TEMPLATE)
end
unpack_data(str) click to toggle source
# File lib/roma/storage/basic_storage.rb, line 144
def unpack_data(str)
  str.unpack(PACK_TEMPLATE)
end
unpack_header(str) click to toggle source
# File lib/roma/storage/basic_storage.rb, line 138
def unpack_header(str)
  str.unpack(PACK_HEADER_TEMPLATE)
end
vn_dump_pack(vn, last, clk, expt, k, val) click to toggle source
# File lib/roma/storage/basic_storage.rb, line 557
def vn_dump_pack(vn, last, clk, expt, k, val)
    if val
      return [vn, last, clk, expt, k.length, k, val.length, val].pack("NNNNNa#{k.length}Na#{val.length}")
    else
      return [vn, last, clk, expt, k.length, k, 0].pack("NNNNNa#{k.length}N")
    end
end