class Daybreak::Journal
Daybreak::Journal
handles background io, compaction and is the arbiter of multiprocess safety @api private
Attributes
file[R]
size[R]
Public Class Methods
new(file, format, serializer, &block)
click to toggle source
Calls superclass method
# File lib/daybreak/journal.rb, line 8 def initialize(file, format, serializer, &block) super() @file, @format, @serializer, @emit = file, format, serializer, block open @worker = Thread.new(&method(:worker)) @worker.priority = -1 load end
Public Instance Methods
bytesize()
click to toggle source
Return byte size of journal
# File lib/daybreak/journal.rb, line 85 def bytesize @fd.stat.size end
clear()
click to toggle source
Clear the database log and yield
# File lib/daybreak/journal.rb, line 51 def clear flush with_tmpfile do |path, file| file.write(@format.header) file.close # Clear replaces the database file like a compactification does with_flock(File::LOCK_EX) do File.rename(path, @file) end end open end
close()
click to toggle source
Clear the queue and close the file handler
Calls superclass method
# File lib/daybreak/journal.rb, line 23 def close self << nil @worker.join @fd.close super end
closed?()
click to toggle source
Is the journal closed?
# File lib/daybreak/journal.rb, line 18 def closed? @fd.closed? end
compact() { || ... }
click to toggle source
Compact the logfile to represent the in-memory state
# File lib/daybreak/journal.rb, line 65 def compact load with_tmpfile do |path, file| # Compactified database has the same size -> return return self if @pos == file.write(dump(yield, @format.header)) with_flock(File::LOCK_EX) do # Database was replaced (cleared or compactified) in the meantime if @pos != nil # Append changed journal records if the database changed during compactification file.write(read) file.close File.rename(path, @file) end end end open replay end
load()
click to toggle source
Load new journal entries
# File lib/daybreak/journal.rb, line 31 def load flush replay end
lock() { || ... }
click to toggle source
Lock the logfile across thread and process boundaries
# File lib/daybreak/journal.rb, line 37 def lock # Flush everything to start with a clean state # and to protect the @locked variable flush with_flock(File::LOCK_EX) do replay result = yield flush result end end
Private Instance Methods
dump(records, dump = '')
click to toggle source
Return database dump as string
# File lib/daybreak/journal.rb, line 127 def dump(records, dump = '') # each is faster than inject records.each do |record| record[1] = @serializer.dump(record.last) dump << @format.dump(record) end dump end
open()
click to toggle source
Open or reopen file
# File lib/daybreak/journal.rb, line 98 def open @fd.close if @fd @fd = File.open(@file, 'ab+') @fd.advise(:sequential) if @fd.respond_to? :advise stat = @fd.stat @inode = stat.ino write(@format.header) if stat.size == 0 @pos = nil end
read()
click to toggle source
Read new file content
# File lib/daybreak/journal.rb, line 109 def read with_flock(File::LOCK_SH) do # File was opened unless @pos @fd.pos = 0 @format.read_header(@fd) @size = 0 @emit.call(nil) else @fd.pos = @pos end buf = @fd.read @pos = @fd.pos buf end end
replay()
click to toggle source
Emit records as we parse them
# File lib/daybreak/journal.rb, line 92 def replay buf = read @size += @format.parse(buf, &@emit) end
with_flock(mode) { || ... }
click to toggle source
Block with file lock
# File lib/daybreak/journal.rb, line 174 def with_flock(mode) return yield if @locked begin loop do # HACK: JRuby returns false if the process is already hold by the same process # see https://github.com/jruby/jruby/issues/496 Thread.pass until @fd.flock(mode) # Check if database was replaced (cleared or compactified) in the meantime # break if not stat = @fd.stat break if stat.nlink > 0 && stat.ino == @inode open end @locked = true yield ensure @fd.flock(File::LOCK_UN) @locked = false end end
with_tmpfile() { |path, file| ... }
click to toggle source
Open temporary file and pass it to the block
# File lib/daybreak/journal.rb, line 196 def with_tmpfile path = [@file, $$.to_s(36), Thread.current.object_id.to_s(36)].join file = File.open(path, 'wb') yield(path, file) ensure file.close unless file.closed? File.unlink(path) if File.exists?(path) end
worker()
click to toggle source
Worker thread
# File lib/daybreak/journal.rb, line 137 def worker while (record = first) tries = 0 begin if Hash === record # Write batch update write(dump(record)) @size += record.size else # Write single record record[1] = @serializer.dump(record.last) if record.size > 1 write(@format.dump(record)) @size += 1 end rescue Exception => ex tries += 1 warn "Daybreak worker, try #{tries}: #{ex.message}" tries <= 3 ? retry : raise ensure pop end end rescue Exception => ex warn "Daybreak worker terminated: #{ex.message}" end
write(dump)
click to toggle source
Write data to output stream and advance @pos
# File lib/daybreak/journal.rb, line 164 def write(dump) with_flock(File::LOCK_EX) do @fd.write(dump) # Flush to make sure the file is really updated @fd.flush end @pos = @fd.pos if @pos && @fd.pos == @pos + dump.bytesize end