class Futex

Futex (file mutex) is a fine-grained mutex that uses a file, not an entire thread, like Mutex does. Use it like this:

require 'futex'
Futex.new('/tmp/my-file.txt').open |f|
  IO.write(f, 'Hello, world!')
end

The file /tmp/my-file.txt.lock<tt> will be created and used as an entrance lock. If the file is already locked by another thread or another process, exception <tt>Futex::CantLock will be raised.

If you are not planning to write to the file, to speed things up, you may want to get a non-exclusive access to it, by providing false to the method open():

require 'futex'
Futex.new('/tmp/my-file.txt').open(false) |f|
  IO.read(f)
end

For more information read README file.

Author

Yegor Bugayenko (yegor256@gmail.com)

Copyright

Copyright © 2018 Yegor Bugayenko

License

MIT

Constants

COUNTS

Global file for locks counting

Public Class Methods

new(path, log: STDOUT, timeout: 16, sleep: 0.005, lock: path + '.lock', logging: false) click to toggle source

Creates a new instance of the class.

# File lib/futex.rb, line 74
def initialize(path, log: STDOUT, timeout: 16, sleep: 0.005,
  lock: path + '.lock', logging: false)
  @path = path
  @log = log
  @logging = logging
  @timeout = timeout
  @sleep = sleep
  @lock = lock
end

Public Instance Methods

open(exclusive = true) { |path| ... } click to toggle source

Open the file. By default the file will be locked for exclusive access, which means that absolutely no other process will be able to do the same. This type of access (exclusive) is supposed to be used when you are making changes to the file. However, very often you may need just to read it and it's OK to let many processes do the reading at the same time, provided none of them do the writing. In that case you should call this method open() with false first argument, which will mean “shared” access. Many threads and processes may have shared access to the same lock file, but they all will stop and wait if one of them will require an “exclusive” access. This mechanism is inherited from POSIX, read about it <a href=“hereman7.org/linux/man-pages/man2/flock.2.html”>here>.

# File lib/futex.rb, line 95
  def open(exclusive = true)
    FileUtils.mkdir_p(File.dirname(@lock))
    step = (1 / @sleep).to_i
    start = Time.now
    prefix = exclusive ? '' : 'non-'
    b = badge(exclusive)
    Thread.current.thread_variable_set(:futex_lock, @lock)
    Thread.current.thread_variable_set(:futex_badge, b)
    open_synchronized(@lock) do |f|
      cycle = 0
      loop do
        if f.flock((exclusive ? File::LOCK_EX : File::LOCK_SH) | File::LOCK_NB)
          Thread.current.thread_variable_set(:futex_cycle, nil)
          Thread.current.thread_variable_set(:futex_time, nil)
          break
        end
        sleep(@sleep)
        cycle += 1
        Thread.current.thread_variable_set(:futex_cycle, cycle)
        Thread.current.thread_variable_set(:futex_time, Time.now - start)
        if Time.now - start > @timeout
          raise CantLock.new("#{b} can't get #{prefix}exclusive access \
to the file #{@path} because of the lock at #{@lock}, after #{age(start)} \
of waiting: #{IO.read(@lock)} (modified #{age(File.mtime(@lock))} ago)",
          File.mtime(@lock))
        end
        next unless (cycle % step).zero? && Time.now - start > @timeout / 2
        debug("#{b} still waiting for #{prefix}exclusive \
access to #{@path}, #{age(start)} already: #{IO.read(@lock)} \
(modified #{age(File.mtime(@lock))} ago)")
      end
      debug("Locked by #{b} in #{age(start)}, #{prefix}exclusive: \
#{@path} (attempt no.#{cycle})")
      IO.write(@lock, b)
      acq = Time.now
      res = block_given? ? yield(@path) : nil
      debug("Unlocked by #{b} in #{age(acq)}, #{prefix}exclusive: #{@path}")
      res
    end
  ensure
    Thread.current.thread_variable_set(:futex_cycle, nil)
    Thread.current.thread_variable_set(:futex_time, nil)
    Thread.current.thread_variable_set(:futex_lock, nil)
    Thread.current.thread_variable_set(:futex_badge, nil)
  end

Private Instance Methods

age(time) click to toggle source
# File lib/futex.rb, line 149
def age(time)
  sec = Time.now - time
  return "#{(sec * 1_000_000).round}μs" if sec < 0.001
  return "#{(sec * 1000).round}ms" if sec < 1
  return "#{sec.round(2)}s" if sec < 60
  return "#{(sec / 60).round}m" if sec < 60 * 60
  "#{(sec / 3600).round}h"
end
badge(exclusive) click to toggle source
# File lib/futex.rb, line 143
def badge(exclusive)
  tname = Thread.current.name
  tname = 'nil' if tname.nil?
  "##{Process.pid}-#{exclusive ? 'ex' : 'sh'}/#{tname}[#{caller(2..2).first}]"
end
debug(msg) click to toggle source
# File lib/futex.rb, line 158
def debug(msg)
  return unless @logging
  if @log.respond_to?(:debug)
    @log.debug(msg)
  elsif @log.respond_to?(:puts)
    @log.puts(msg)
  end
end
deserialize(data) click to toggle source
# File lib/futex.rb, line 201
def deserialize(data)
  data.empty? ? {} : JSON.parse(data)
rescue JSON::ParserError
  {}
end
open_synchronized(path) { |file| ... } click to toggle source
# File lib/futex.rb, line 167
def open_synchronized(path)
  path = File.absolute_path(path)
  file = nil
  synchronized do |counts|
    file = File.open(path, File::CREAT | File::RDWR)
    refs = deserialize(IO.read(counts.path))
    refs[path] = (refs[path] || 0) + 1
    IO.write(counts.path, serialize(refs))
  end
  yield file
ensure
  synchronized do |counts|
    file&.close
    refs = deserialize(IO.read(counts.path))
    refs[path] = (refs[path] || 1) - 1
    if refs[path].zero?
      FileUtils.rm(path, force: true)
      refs.delete(path)
    end
    IO.write(counts.path, serialize(refs))
  end
end
serialize(data) click to toggle source
# File lib/futex.rb, line 197
def serialize(data)
  data.to_json
end
synchronized() { |f| ... } click to toggle source
# File lib/futex.rb, line 190
def synchronized
  File.open(COUNTS, File::CREAT | File::RDWR) do |f|
    f.flock(File::LOCK_EX)
    yield f
  end
end