class Roma::Storage::BasicStorage
Constants
- PACK_HEADER_TEMPLATE
- 0.. 3
-
vn
- 4.. 7
-
physical clock (unix time)
- 8..11
-
logical clock
- 12..15
-
exptime(unix time)
- 16..
-
value data
- PACK_TEMPLATE
Attributes
cleanup_regexp[RW]
dbs[R]
divnum[RW]
do_each_vn_dump[RW]
each_clean_up_sleep[RW]
each_vn_dump_sleep[RW]
each_vn_dump_sleep_count[RW]
error_message[R]
ext_name[R]
hdb[R]
hdiv[R]
logic_clock_expire[RW]
option[W]
st_class[RW]
storage_path[W]
vn_list[W]
Public Class Methods
new()
click to toggle source
# File lib/roma/storage/basic_storage.rb, line 32 def initialize # database handler @hdb = [] # database cache handler @hdbc = [] # status of a database @dbs = [] @log_fd = nil # file number list of a each_vn_dump while @each_vn_dump_vnodes = [] @hdiv = Hash.new(0) @ext_name = 'db' @st_class = nil @divnum = 10 @each_vn_dump_sleep = 0.001 @each_vn_dump_sleep_count = 100 @each_clean_up_sleep = 0.01 @cleanup_regexp = nil @logic_clock_expire = 300 @each_cache_lock = Mutex::new @each_clean_up_lock = Mutex::new @stat_lock = Mutex::new end
Public Instance Methods
add(vn, k, d, expt, v)
click to toggle source
# File lib/roma/storage/basic_storage.rb, line 225 def add(vn, k, d, expt, v) buf = db_get(vn, k) clk = 0 if buf vn, t, clk, expt2, v2 = unpack_data(buf) return nil if Time.now.to_i <= expt2 clk = (clk + 1) & 0xffffffff end # not exist ret = [vn, Time.now.to_i, clk, expt, v] return ret if db_put(vn, k, pack_data(*ret)) nil end
append(vn, k, d, expt, v)
click to toggle source
# File lib/roma/storage/basic_storage.rb, line 254 def append(vn, k, d, expt, v) buf = db_get(vn, k) return nil unless buf # buf != nil vn, t, clk, expt2, v2 = unpack_data(buf) return nil if Time.now.to_i > expt2 clk = (clk + 1) & 0xffffffff ret = [vn, Time.now.to_i, clk, expt, v2 + v] return ret if db_put(vn, k, pack_data(*ret)) nil end
cache_file_name(dn)
click to toggle source
# File lib/roma/storage/basic_storage.rb, line 661 def cache_file_name(dn) "#{@storage_path}/#{dn}.cache.#{@ext_name}" end
cas(vn, k, d, clk, expt, v)
click to toggle source
# File lib/roma/storage/basic_storage.rb, line 182 def cas(vn, k, d, clk, expt, v) buf = db_get(vn ,k) return :not_found unless buf t = Time.now.to_i data = unpack_data(buf) return :not_found if t > data[3] return :exists if clk != data[2] clk = (data[2] + 1) & 0xffffffff ret = [vn, t, clk, expt, v] return ret if db_put(vn, k, pack_data(*ret)) nil end
close_log()
click to toggle source
# File lib/roma/storage/basic_storage.rb, line 757 def close_log @log_fd.close if @log_fd @log_fd = nil end
closedb()
click to toggle source
# File lib/roma/storage/basic_storage.rb, line 117 def closedb stop_clean_up buf = @hdb; @hdb = [] buf.each{ |h| close_db(h) if h } buf = @hdbc; @hdbc = [] buf.each{ |h| close_db(h) if h } close_log end
db_get(vn, k)
click to toggle source
# File lib/roma/storage/basic_storage.rb, line 149 def db_get(vn, k) n = @hdiv[vn] d = @hdb[n].get(k) return d if @dbs[n] == :normal c = @hdbc[n].get(k) return d unless c # in case of out of :normal status if @dbs[n] == :cachecleaning && d # in case of existing value is both @hdb and @hdbc vn, lat, clk, expt = unpack_header(d) cvn, clat, cclk, cexpt = unpack_header(c) return d if cmp_clk(clk, cclk) > 0 # if @hdb newer than @hdbc end c end
db_put(vn, k, v)
click to toggle source
# File lib/roma/storage/basic_storage.rb, line 166 def db_put(vn, k, v) n = @hdiv[vn] if @dbs[n] == :safecopy_flushing || @dbs[n] == :safecopy_flushed ret = @hdbc[n].put(k, v) else ret = @hdb[n].put(k, v) end ret end
decr(vn, k, d, v)
click to toggle source
# File lib/roma/storage/basic_storage.rb, line 374 def decr(vn, k, d, v) buf = db_get(vn, k) return nil unless buf # buf != nil vn, t, clk, expt2, v2 = unpack_data(buf) return nil if Time.now.to_i > expt2 clk = (clk + 1) & 0xffffffff v = (v2.to_i - v) v = 0 if v < 0 v = v & 0xffffffffffffffff ret = [vn, Time.now.to_i, clk, expt2, v.to_s] return ret if db_put(vn, k, pack_data(*ret)) nil end
delete(vn, k, d)
click to toggle source
# File lib/roma/storage/basic_storage.rb, line 329 def delete(vn, k, d) buf = db_get(vn, k) v = ret = nil clk = 0 if buf vn, t, clk, expt, v2 = unpack_data(buf) return :deletemark if expt == 0 clk = (clk + 1) & 0xffffffff v = v2 if v2 && v2.length != 0 && Time.now.to_i <= expt end # [ 0.. 3] vn # [ 4.. 7] physical clock(unix time) # [ 8..11] logical clock # [12..15] exptime(unix time) => 0 ret = [vn, Time.now.to_i, clk, 0, v] if db_put(vn, k, pack_header(*ret[0..-2])) return ret else return nil end end
dump(vn)
click to toggle source
Returns the vnode dump.
# File lib/roma/storage/basic_storage.rb, line 530 def dump(vn) buf = get_vnode_hash(vn) return nil if buf.length == 0 Marshal.dump(buf) end
each_cache_by_keys(dn, keys) { |vn, last, clk, expt, k, val| ... }
click to toggle source
Calls the geven block, passes the cache(@hdbc) element.
dn
-
number of database
keys
-
key list
# File lib/roma/storage/basic_storage.rb, line 606 def each_cache_by_keys(dn, keys) keys.each do |k| v = @hdbc[dn].get(k) vn, last, clk, expt, val = unpack_data(v) yield [vn, last, clk, expt, k, val] end end
each_cache_dump_pack(dn, keys) { |vn_dump_pack(vn, last, clk, expt, k, val)| ... }
click to toggle source
Calls the geven block, passes the cache(@hdbc) element as the spushv command data format.
dn
-
number of database
keys
-
key list
# File lib/roma/storage/basic_storage.rb, line 618 def each_cache_dump_pack(dn, keys) keys.each do |k| v = @hdbc[dn].get(k) vn, last, clk, expt, val = unpack_data(v) yield vn_dump_pack(vn, last, clk, expt, k, val) end end
each_clean_up(t, vnhash) { |k, vn| ... }
click to toggle source
# File lib/roma/storage/basic_storage.rb, line 411 def each_clean_up(t, vnhash) @do_clean_up = true f = nil; if @cleanup_regexp && File.exist?(@storage_path) f = open(@storage_path + '/klist.txt','w') end return unless @each_clean_up_lock.try_lock nt = Time.now.to_i @divnum.times do |i| next if @dbs[i] != :normal hdb = @hdb[i] hdb.each do |k, v| return unless @do_clean_up # 1st check vn, last, clk, expt = unpack_header(v) vn_stat = vnhash[vn] if f && @cleanup_regexp && k =~ /#{@cleanup_regexp}/ # write klist f.puts("#{k},#{last},#{clk}") if hdb.get(k) == v end if vn_stat == :primary && ( (expt != 0 && nt > expt) || (expt == 0 && t > last) ) if yield k, vn hdb.out(k) if hdb.get(k) == v end elsif vn_stat == nil && t > last if yield k, vn hdb.out(k) if hdb.get(k) == v end end return unless @do_clean_up # 2nd ckeck sleep @each_clean_up_sleep end end ensure @each_clean_up_lock.unlock if @each_clean_up_lock.locked? if f @cleanup_regexp = nil f.close end end
each_hdb_dump(i,except_vnh = nil) { |[vn, last, clk, expt, length, k, length, val].pack("NNNNNa#{length}Na#{length}")| ... }
click to toggle source
# File lib/roma/storage/basic_storage.rb, line 581 def each_hdb_dump(i,except_vnh = nil) count = 0 @hdb[i].each{|k,v| vn, last, clk, expt, val = unpack_data(v) if except_vnh && except_vnh.key?(vn) || Time.now.to_i > expt count += 1 sleep @each_vn_dump_sleep if count % @each_vn_dump_sleep_count == 0 else yield [vn, last, clk, expt, k.length, k, val.length, val].pack("NNNNNa#{k.length}Na#{val.length}") sleep @each_vn_dump_sleep end } end
each_vn_dump(target_vn) { |vn_dump_pack(vn, last, clk, expt, k, val)| ... }
click to toggle source
# File lib/roma/storage/basic_storage.rb, line 536 def each_vn_dump(target_vn) n = @hdiv[target_vn] @stat_lock.synchronize do return false if @dbs[n] != :normal return false if @each_vn_dump_vnodes.include?(target_vn) @each_vn_dump_vnodes << target_vn end begin @do_each_vn_dump = true each_unpacked_db(target_vn, @hdb) do |vn, last, clk, expt, k, val| return unless @do_each_vn_dump yield vn_dump_pack(vn, last, clk, expt, k, val) end ensure @each_vn_dump_vnodes.delete(target_vn) end true end
flush_db(dn)
click to toggle source
# File lib/roma/storage/basic_storage.rb, line 657 def flush_db(dn) @hdb[dn].sync end
get(vn, k, d)
click to toggle source
# File lib/roma/storage/basic_storage.rb, line 282 def get(vn, k, d) buf = db_get(vn, k) return nil unless buf vn, t, clk, expt, v = unpack_data(buf) return nil if Time.now.to_i > expt v end
get_context(vn, k, d)
click to toggle source
# File lib/roma/storage/basic_storage.rb, line 176 def get_context(vn, k, d) buf = db_get(vn, k) return nil unless buf unpack_header(buf) end
get_keys_in_cache(dn, kn=100)
click to toggle source
Returns a key array in a cache(@hdbc).
dn
-
number of database
kn
-
number of keys which is return value
# File lib/roma/storage/basic_storage.rb, line 629 def get_keys_in_cache(dn, kn=100) return nil if @do_each_vn_dump ret = [] return ret unless @hdbc[dn] count = 0 @each_cache_lock.synchronize do @hdbc[dn].each do |k, v| ret << k break if (count+=1) >= kn end end ret end
get_logfile_list()
click to toggle source
# File lib/roma/storage/basic_storage.rb, line 718 def get_logfile_list l={} files=Dir.glob("#{@storage_path}/status.log.*") files.each{ |file| if /$.+status\.log\.(\d+)$/=~file l[$1.to_i]=$& end } # sorted by old order l.to_a.sort{|a,b| a[0]<=>b[0]} end
get_raw(vn, k, d)
click to toggle source
# File lib/roma/storage/basic_storage.rb, line 291 def get_raw(vn, k, d) buf = db_get(vn, k) return nil unless buf unpack_data(buf) end
get_raw2(k)
click to toggle source
# File lib/roma/storage/basic_storage.rb, line 298 def get_raw2(k) @hdb.each{|hdb| buf = hdb.get(k) return unpack_data(buf) if buf } nil end
get_stat()
click to toggle source
# File lib/roma/storage/basic_storage.rb, line 61 def get_stat ret = {} ret['storage.storage_path'] = File.expand_path(@storage_path) ret['storage.st_class'] = @st_class.to_s.match(/Roma::Storage::(.*)/)[1] ret['storage.divnum'] = @divnum ret['storage.option'] = @option ret['storage.each_vn_dump_sleep'] = @each_vn_dump_sleep ret['storage.each_vn_dump_sleep_count'] = @each_vn_dump_sleep_count ret['storage.each_vn_dump_files'] = @each_vn_dump_files.inspect ret['storage.each_clean_up_sleep'] = @each_clean_up_sleep ret['storage.cleanup_regexp'] = @cleanup_regexp ret['storage.logic_clock_expire'] = @logic_clock_expire ret['storage.safecopy_stats'] = @dbs.inspect ret end
incr(vn, k, d, v)
click to toggle source
# File lib/roma/storage/basic_storage.rb, line 356 def incr(vn, k, d, v) buf = db_get(vn, k) return nil unless buf # buf != nil vn, t, clk, expt2, v2 = unpack_data(buf) return nil if Time.now.to_i > expt2 clk = (clk + 1) & 0xffffffff v = (v2.to_i + v) v = 0 if v < 0 v = v & 0xffffffffffffffff ret = [vn, Time.now.to_i, clk, expt2, v.to_s] return ret if db_put(vn, k, pack_data(*ret)) nil end
load(dmp)
click to toggle source
# File lib/roma/storage/basic_storage.rb, line 465 def load(dmp) n = 0 h = Marshal.load(dmp) h.each_pair{ |k, v| # remort data r_vn, r_last, r_clk, r_expt = unpack_header(v) raise "An invalid vnode number is include.key=#{k} vn=#{r_vn}" unless @hdiv.key?(r_vn) local = @hdb[@hdiv[r_vn]].get(k) if local == nil n += 1 @hdb[@hdiv[r_vn]].put(k, v) else # local data l_vn, l_last, l_clk, l_expt = unpack_data(local) if r_last - l_last < @logic_clock_expire && cmp_clk(r_clk,l_clk) <= 0 else # remort is newer. n += 1 @hdb[@hdiv[r_vn]].put(k, v) end end sleep @each_vn_dump_sleep } n end
load_stream_dump(vn, last, clk, expt, k, v)
click to toggle source
# File lib/roma/storage/basic_storage.rb, line 490 def load_stream_dump(vn, last, clk, expt, k, v) buf = db_get(vn, k) if buf data = unpack_header(buf) if last - data[1] < @logic_clock_expire && cmp_clk(clk,data[2]) <= 0 return nil end end ret = [vn, last, clk, expt, v] if expt == 0 # for the deleted mark return ret if db_put(vn, k, pack_header(*ret[0..3])) else return ret if db_put(vn, k, pack_data(*ret)) end nil end
load_stream_dump_for_cachecleaning(vn, last, clk, expt, k, v)
click to toggle source
# File lib/roma/storage/basic_storage.rb, line 509 def load_stream_dump_for_cachecleaning(vn, last, clk, expt, k, v) n = @hdiv[vn] buf = @hdb[n].get(k) if buf data = unpack_header(buf) if last - data[1] < @logic_clock_expire && cmp_clk(clk,data[2]) <= 0 return nil end end ret = [vn, last, clk, expt, v] if expt == 0 # for the deleted mark return ret if @hdb[n].put(k, pack_header(*ret[0..3])) else return ret if @hdb[n].put(k, pack_data(*ret)) end nil end
open_log()
click to toggle source
# File lib/roma/storage/basic_storage.rb, line 730 def open_log logs = get_logfile_list if logs.length == 0 @log_name="#{@storage_path}/status.log.1" else if File::stat("#{@fname}.#{logs.last[0]}").size == 0 @log_name="#{@fname}.#{logs.last[0]}" else @log_name="#{@fname}.#{logs.last[0]+1}" end end @log_fd=File.open(@log_name,"a") end
opendb()
click to toggle source
# File lib/roma/storage/basic_storage.rb, line 99 def opendb create_div_hash FileUtils.mkdir_p(@storage_path) @divnum.times do |i| # open database file @hdb[i] = open_db("#{@storage_path}/#{i}.#{@ext_name}") # check cache file if File.exist?(cache_file_name(i)) @hdbc[i] = open_db(cache_file_name(i)) stop_clean_up { @dbs[i] = :safecopy_flushed } else @dbs[i] = :normal @hdbc[i] = nil end end open_log end
out(vn, k, d)
click to toggle source
# File lib/roma/storage/basic_storage.rb, line 352 def out(vn, k, d) @hdb[@hdiv[vn]].out(k) end
out_cache(dn, key)
click to toggle source
Remove a key for the cache(@hdbc).
dn
-
number of database
key
-
key
# File lib/roma/storage/basic_storage.rb, line 598 def out_cache(dn, key) @hdbc[dn].out(key) end
prepend(vn, k, d, expt, v)
click to toggle source
# File lib/roma/storage/basic_storage.rb, line 268 def prepend(vn, k, d, expt, v) buf = db_get(vn, k) return nil unless buf # buf != nil vn, t, clk, expt2, v2 = unpack_data(buf) return nil if Time.now.to_i > expt2 clk = (clk + 1) & 0xffffffff ret = [vn, Time.now.to_i, clk, expt, v + v2] return ret if db_put(vn, k, pack_data(*ret)) nil end
rdelete(vn, k, d, clk)
click to toggle source
# File lib/roma/storage/basic_storage.rb, line 306 def rdelete(vn, k, d, clk) buf = db_get(vn, k) t = Time.now.to_i if buf data = unpack_header(buf) if t - data[1] < @logic_clock_expire && cmp_clk(clk,data[2]) <= 0 @error_message = "error:#{t-data[1]} < #{@logic_clock_expire} && cmp_clk(#{clk},#{data[2]})<=0" return nil end end # [ 0.. 3] vn # [ 4.. 7] physical clock(unix time) # [ 8..11] logical clock # [12..15] exptime(unix time) => 0 ret = [vn, t, clk, 0] if db_put(vn, k, pack_header(*ret)) return ret else return nil end end
replace(vn, k, d, expt, v)
click to toggle source
# File lib/roma/storage/basic_storage.rb, line 240 def replace(vn, k, d, expt, v) buf = db_get(vn, k) return nil unless buf # buf != nil vn, t, clk, expt2, v2 = unpack_data(buf) return nil if Time.now.to_i > expt2 clk = (clk + 1) & 0xffffffff ret = [vn, Time.now.to_i, clk, expt, v] return ret if db_put(vn, k, pack_data(*ret)) nil end
rset(vn, k, d, clk, expt, v)
click to toggle source
# File lib/roma/storage/basic_storage.rb, line 195 def rset(vn, k, d, clk, expt, v) buf = db_get(vn, k) t = Time.now.to_i if buf data = unpack_data(buf) if t - data[1] < @logic_clock_expire && cmp_clk(clk,data[2]) <= 0 @error_message = "error:#{t-data[1]} < #{@logic_clock_expire} && cmp_clk(#{clk},#{data[2]})<=0" return nil end end ret = [vn, t, clk, expt, v] return ret if db_put(vn, k, pack_data(*ret)) @error_message = "error:put" nil end
set(vn, k, d, expt, v)
click to toggle source
# File lib/roma/storage/basic_storage.rb, line 212 def set(vn, k, d, expt, v) buf = db_get(vn, k) clk = 0 if buf data = unpack_data(buf) clk = (data[2] + 1) & 0xffffffff end ret = [vn, Time.now.to_i, clk, expt, v] return ret if db_put(vn , k, pack_data(*ret)) nil end
set_db_stat(dn, stat)
click to toggle source
# File lib/roma/storage/basic_storage.rb, line 665 def set_db_stat(dn, stat) @stat_lock.synchronize do case @dbs[dn] when :normal @each_vn_dump_vnodes.each do |vn| return false if dn == @hdiv[vn] end if stat == :safecopy_flushing # open cache @hdbc[dn] = open_db(cache_file_name(dn)) stop_clean_up { @dbs[dn] = stat } write_log("#{dn} #{stat.to_s}") stat else false end when :safecopy_flushing if stat == :safecopy_flushed write_log("#{dn} #{stat.to_s}") @dbs[dn] = stat else false end when :safecopy_flushed if stat == :cachecleaning write_log("#{dn} #{stat.to_s}") @dbs[dn] = stat else false end when :cachecleaning if stat == :normal write_log("#{dn} #{stat.to_s}") @dbs[dn] = stat # remove cache close_db(@hdbc[dn]) @hdbc[dn] = nil if File.exist?("#{@storage_path}/#{dn}.cache.#{@ext_name}") File.unlink("#{@storage_path}/#{dn}.cache.#{@ext_name}") end stat elsif stat == :safecopy_flushing write_log("#{dn} #{stat.to_s}") @dbs[dn] = stat else false end else false end end end
set_expt(vn, k, d, expt)
click to toggle source
set expire time
# File lib/roma/storage/basic_storage.rb, line 393 def set_expt(vn, k, d, expt) buf = db_get(vn, k) if buf vn, t, clk, expt2, v = unpack_data(buf) return nil if Time.now.to_i > expt2 clk = (clk + 1) & 0xffffffff ret = [vn, Time.now.to_i, clk, expt, v] return ret if db_put(vn, k, pack_data(*ret)) end nil end
stop_clean_up(&block)
click to toggle source
# File lib/roma/storage/basic_storage.rb, line 453 def stop_clean_up(&block) @do_clean_up = false if block @each_clean_up_lock.lock begin block.call ensure @each_clean_up_lock.unlock end end end
true_length()
click to toggle source
# File lib/roma/storage/basic_storage.rb, line 405 def true_length res = 0 @hdb.each{ |hdb| res += hdb.rnum } res end
write_log(line)
click to toggle source
# File lib/roma/storage/basic_storage.rb, line 744 def write_log(line) return unless @log_name # log rotation if File::stat(@log_name).size > 1000 * 1024 close_log open_log end t = Time.now tstr = "#{t.strftime('%Y-%m-%dT%H:%M:%S')}.#{t.usec}" @log_fd.write("#{tstr} #{line}\n") @log_fd.flush end
Protected Instance Methods
create_div_hash()
click to toggle source
# File lib/roma/storage/basic_storage.rb, line 92 def create_div_hash @vn_list.each{ |vn| @hdiv[vn] = Digest::SHA1.hexdigest(vn.to_s).hex % @divnum } end
Private Instance Methods
cmp_clk(clk1, clk2)
click to toggle source
Compare this clock with the specified.
-1, 0 or 1 as clk1
is numerically less than, equal to, or greater than the clk2
given as the parameter.
logical clock space is a 32bit ring.
# File lib/roma/storage/basic_storage.rb, line 83 def cmp_clk(clk1, clk2) if (clk1-clk2).abs < 0x80000000 # 1<<31 clk1 <=> clk2 else clk2 <=> clk1 end end
each_unpacked_db(target_vn, db) { |vn, last, clk, expt, k, val| ... }
click to toggle source
# File lib/roma/storage/basic_storage.rb, line 566 def each_unpacked_db(target_vn, db) count = 0 tn = Time.now.to_i db[@hdiv[target_vn]].each do |k,v| vn, last, clk, expt, val = unpack_data(v) if vn != target_vn || (expt != 0 && tn > expt) count += 1 sleep @each_vn_dump_sleep if count % @each_vn_dump_sleep_count == 0 next end yield vn, last, clk, expt, k, val end end
get_vnode_hash(vn)
click to toggle source
Create vnode dump.
# File lib/roma/storage/basic_storage.rb, line 644 def get_vnode_hash(vn) buf = {} count = 0 @hdb[@hdiv[vn]].each{ |k, v| count += 1 sleep @each_vn_dump_sleep if count % @each_vn_dump_sleep_count == 0 dat = unpack_data(v) #v.unpack('NNNN') buf[k] = v if dat[0] == vn } return buf end
pack_data(vn, physical_clock, logical_clock, expire,value)
click to toggle source
# File lib/roma/storage/basic_storage.rb, line 141 def pack_data(vn, physical_clock, logical_clock, expire,value) [vn,physical_clock, logical_clock, expire, value].pack(PACK_TEMPLATE) end
pack_header(vn, physical_clock, logical_clock, expire)
click to toggle source
# File lib/roma/storage/basic_storage.rb, line 135 def pack_header(vn, physical_clock, logical_clock, expire) [vn,physical_clock, logical_clock, expire].pack(PACK_HEADER_TEMPLATE) end
unpack_data(str)
click to toggle source
# File lib/roma/storage/basic_storage.rb, line 144 def unpack_data(str) str.unpack(PACK_TEMPLATE) end
unpack_header(str)
click to toggle source
# File lib/roma/storage/basic_storage.rb, line 138 def unpack_header(str) str.unpack(PACK_HEADER_TEMPLATE) end
vn_dump_pack(vn, last, clk, expt, k, val)
click to toggle source
# File lib/roma/storage/basic_storage.rb, line 557 def vn_dump_pack(vn, last, clk, expt, k, val) if val return [vn, last, clk, expt, k.length, k, val.length, val].pack("NNNNNa#{k.length}Na#{val.length}") else return [vn, last, clk, expt, k.length, k, 0].pack("NNNNNa#{k.length}N") end end