module Garcon::Stash

Public Instance Methods

dump(records, dump = '') click to toggle source

Return database dump as string

# File lib/garcon/stash/journal.rb, line 151
def dump(records, dump = '')
  records.each do |record|
    record[1] = @serializer.dump(record.last)
    dump << @format.dump(record)
  end
  dump
end
open() click to toggle source

Open or reopen file

# File lib/garcon/stash/journal.rb, line 121
def open
  @fd.close if @fd
  @fd = File.open(@file, 'ab+')
  @fd.advise(:sequential) if @fd.respond_to? :advise
  stat = @fd.stat
  @inode = stat.ino
  write(@format.header) if stat.size == 0
  @pos = nil
end
read() click to toggle source

Read new file content

# File lib/garcon/stash/journal.rb, line 133
def read
  with_flock(File::LOCK_SH) do
    unless @pos
      @fd.pos = 0
      @format.read_header(@fd)
      @size = 0
      @emit.call(nil)
    else
      @fd.pos = @pos
    end
    buf = @fd.read
    @pos = @fd.pos
    buf
  end
end
with_flock(mode) { || ... } click to toggle source

Block with file lock

# File lib/garcon/stash/journal.rb, line 197
def with_flock(mode)
  return yield if @locked
  begin
    loop do
      Thread.pass until @fd.flock(mode)
      stat = @fd.stat
      break if stat.nlink > 0 && stat.ino == @inode
      open
    end
    @locked = true
    yield
  ensure
    @fd.flock(File::LOCK_UN)
    @locked = false
  end
end
with_tmpfile() { |path, file| ... } click to toggle source

Open temporary file and pass it to the block

# File lib/garcon/stash/journal.rb, line 216
def with_tmpfile
  path = [@file, $$.to_s(36), Thread.current.object_id.to_s(36)].join
  file = File.open(path, 'wb')
  yield(path, file)
ensure
  file.close unless file.closed?
  File.unlink(path) if File.exists?(path)
end
worker() click to toggle source

Worker thread

# File lib/garcon/stash/journal.rb, line 161
def worker
  while (record = first)
    tries = 0
    begin
      if Hash === record
        write(dump(record))
        @size += record.size
      else
        record[1] = @serializer.dump(record.last) if record.size > 1
        write(@format.dump(record))
        @size += 1
      end
    rescue Exception => e
      tries += 1
      warn "Stash worker, try #{tries}: #{e.message}"
      tries <= 3 ? retry : raise
    ensure
      pop
    end
  end
rescue Exception => e
  warn "Stash worker terminated: #{e.message}"
end
write(dump) click to toggle source

Write data to output stream and advance @pos

# File lib/garcon/stash/journal.rb, line 187
def write(dump)
  with_flock(File::LOCK_EX) do
    @fd.write(dump)
    @fd.flush
  end
  @pos = @fd.pos if @pos && @fd.pos == @pos + dump.bytesize
end