class Daybreak::Journal

Daybreak::Journal handles background io, compaction and is the arbiter of multiprocess safety @api private

Attributes

file[R]
size[R]

Public Class Methods

new(file, format, serializer, &block) click to toggle source
Calls superclass method
# File lib/daybreak/journal.rb, line 8
def initialize(file, format, serializer, &block)
  super()
  @file, @format, @serializer, @emit = file, format, serializer, block
  open
  @worker = Thread.new(&method(:worker))
  @worker.priority = -1
  load
end

Public Instance Methods

bytesize() click to toggle source

Return byte size of journal

# File lib/daybreak/journal.rb, line 85
def bytesize
  @fd.stat.size
end
clear() click to toggle source

Clear the database log and yield

# File lib/daybreak/journal.rb, line 51
def clear
  flush
  with_tmpfile do |path, file|
    file.write(@format.header)
    file.close
    # Clear replaces the database file like a compactification does
    with_flock(File::LOCK_EX) do
      File.rename(path, @file)
    end
  end
  open
end
close() click to toggle source

Clear the queue and close the file handler

Calls superclass method
# File lib/daybreak/journal.rb, line 23
def close
  self << nil
  @worker.join
  @fd.close
  super
end
closed?() click to toggle source

Is the journal closed?

# File lib/daybreak/journal.rb, line 18
def closed?
  @fd.closed?
end
compact() { || ... } click to toggle source

Compact the logfile to represent the in-memory state

# File lib/daybreak/journal.rb, line 65
def compact
  load
  with_tmpfile do |path, file|
    # Compactified database has the same size -> return
    return self if @pos == file.write(dump(yield, @format.header))
    with_flock(File::LOCK_EX) do
      # Database was replaced (cleared or compactified) in the meantime
      if @pos != nil
        # Append changed journal records if the database changed during compactification
        file.write(read)
        file.close
        File.rename(path, @file)
      end
    end
  end
  open
  replay
end
load() click to toggle source

Load new journal entries

# File lib/daybreak/journal.rb, line 31
def load
  flush
  replay
end
lock() { || ... } click to toggle source

Lock the logfile across thread and process boundaries

# File lib/daybreak/journal.rb, line 37
def lock
  # Flush everything to start with a clean state
  # and to protect the @locked variable
  flush

  with_flock(File::LOCK_EX) do
    replay
    result = yield
    flush
    result
  end
end

Private Instance Methods

dump(records, dump = '') click to toggle source

Return database dump as string

# File lib/daybreak/journal.rb, line 127
def dump(records, dump = '')
  # each is faster than inject
  records.each do |record|
    record[1] = @serializer.dump(record.last)
    dump << @format.dump(record)
  end
  dump
end
open() click to toggle source

Open or reopen file

# File lib/daybreak/journal.rb, line 98
def open
  @fd.close if @fd
  @fd = File.open(@file, 'ab+')
  @fd.advise(:sequential) if @fd.respond_to? :advise
  stat = @fd.stat
  @inode = stat.ino
  write(@format.header) if stat.size == 0
  @pos = nil
end
read() click to toggle source

Read new file content

# File lib/daybreak/journal.rb, line 109
def read
  with_flock(File::LOCK_SH) do
    # File was opened
    unless @pos
      @fd.pos = 0
      @format.read_header(@fd)
      @size = 0
      @emit.call(nil)
    else
      @fd.pos = @pos
    end
    buf = @fd.read
    @pos = @fd.pos
    buf
  end
end
replay() click to toggle source

Emit records as we parse them

# File lib/daybreak/journal.rb, line 92
def replay
  buf = read
  @size += @format.parse(buf, &@emit)
end
with_flock(mode) { || ... } click to toggle source

Block with file lock

# File lib/daybreak/journal.rb, line 174
def with_flock(mode)
  return yield if @locked
  begin
    loop do
      # HACK: JRuby returns false if the process is already hold by the same process
      # see https://github.com/jruby/jruby/issues/496
      Thread.pass until @fd.flock(mode)
      # Check if database was replaced (cleared or compactified) in the meantime
      # break if not
      stat = @fd.stat
      break if stat.nlink > 0 && stat.ino == @inode
      open
    end
    @locked = true
    yield
  ensure
    @fd.flock(File::LOCK_UN)
    @locked = false
  end
end
with_tmpfile() { |path, file| ... } click to toggle source

Open temporary file and pass it to the block

# File lib/daybreak/journal.rb, line 196
def with_tmpfile
  path = [@file, $$.to_s(36), Thread.current.object_id.to_s(36)].join
  file = File.open(path, 'wb')
  yield(path, file)
ensure
  file.close unless file.closed?
  File.unlink(path) if File.exists?(path)
end
worker() click to toggle source

Worker thread

# File lib/daybreak/journal.rb, line 137
def worker
  while (record = first)
    tries = 0
    begin
      if Hash === record
        # Write batch update
        write(dump(record))
        @size += record.size
      else
        # Write single record
        record[1] = @serializer.dump(record.last) if record.size > 1
        write(@format.dump(record))
        @size += 1
      end
    rescue Exception => ex
      tries += 1
      warn "Daybreak worker, try #{tries}: #{ex.message}"
      tries <= 3 ? retry : raise
    ensure
      pop
    end
  end
rescue Exception => ex
  warn "Daybreak worker terminated: #{ex.message}"
end
write(dump) click to toggle source

Write data to output stream and advance @pos

# File lib/daybreak/journal.rb, line 164
def write(dump)
  with_flock(File::LOCK_EX) do
    @fd.write(dump)
    # Flush to make sure the file is really updated
    @fd.flush
  end
  @pos = @fd.pos if @pos && @fd.pos == @pos + dump.bytesize
end