class Futex
Futex
(file mutex) is a fine-grained mutex that uses a file, not an entire thread, like Mutex
does. Use it like this:
require 'futex' Futex.new('/tmp/my-file.txt').open |f| IO.write(f, 'Hello, world!') end
The file /tmp/my-file.txt.lock<tt> will be created and used as an entrance lock. If the file is already locked by another thread or another process, exception <tt>Futex::CantLock
will be raised.
If you are not planning to write to the file, to speed things up, you may want to get a non-exclusive access to it, by providing false
to the method open()
:
require 'futex' Futex.new('/tmp/my-file.txt').open(false) |f| IO.read(f) end
For more information read README file.
- Author
-
Yegor Bugayenko (yegor256@gmail.com)
- Copyright
-
Copyright © 2018 Yegor Bugayenko
- License
-
MIT
Constants
- COUNTS
Global file for locks counting
Public Class Methods
Creates a new instance of the class.
# File lib/futex.rb, line 74 def initialize(path, log: STDOUT, timeout: 16, sleep: 0.005, lock: path + '.lock', logging: false) @path = path @log = log @logging = logging @timeout = timeout @sleep = sleep @lock = lock end
Public Instance Methods
Open the file. By default the file will be locked for exclusive access, which means that absolutely no other process will be able to do the same. This type of access (exclusive) is supposed to be used when you are making changes to the file. However, very often you may need just to read it and it's OK to let many processes do the reading at the same time, provided none of them do the writing. In that case you should call this method open()
with false
first argument, which will mean “shared” access. Many threads and processes may have shared access to the same lock file, but they all will stop and wait if one of them will require an “exclusive” access. This mechanism is inherited from POSIX, read about it <a href=“hereman7.org/linux/man-pages/man2/flock.2.html”>here>.
# File lib/futex.rb, line 95 def open(exclusive = true) FileUtils.mkdir_p(File.dirname(@lock)) step = (1 / @sleep).to_i start = Time.now prefix = exclusive ? '' : 'non-' b = badge(exclusive) Thread.current.thread_variable_set(:futex_lock, @lock) Thread.current.thread_variable_set(:futex_badge, b) open_synchronized(@lock) do |f| cycle = 0 loop do if f.flock((exclusive ? File::LOCK_EX : File::LOCK_SH) | File::LOCK_NB) Thread.current.thread_variable_set(:futex_cycle, nil) Thread.current.thread_variable_set(:futex_time, nil) break end sleep(@sleep) cycle += 1 Thread.current.thread_variable_set(:futex_cycle, cycle) Thread.current.thread_variable_set(:futex_time, Time.now - start) if Time.now - start > @timeout raise CantLock.new("#{b} can't get #{prefix}exclusive access \ to the file #{@path} because of the lock at #{@lock}, after #{age(start)} \ of waiting: #{IO.read(@lock)} (modified #{age(File.mtime(@lock))} ago)", File.mtime(@lock)) end next unless (cycle % step).zero? && Time.now - start > @timeout / 2 debug("#{b} still waiting for #{prefix}exclusive \ access to #{@path}, #{age(start)} already: #{IO.read(@lock)} \ (modified #{age(File.mtime(@lock))} ago)") end debug("Locked by #{b} in #{age(start)}, #{prefix}exclusive: \ #{@path} (attempt no.#{cycle})") IO.write(@lock, b) acq = Time.now res = block_given? ? yield(@path) : nil debug("Unlocked by #{b} in #{age(acq)}, #{prefix}exclusive: #{@path}") res end ensure Thread.current.thread_variable_set(:futex_cycle, nil) Thread.current.thread_variable_set(:futex_time, nil) Thread.current.thread_variable_set(:futex_lock, nil) Thread.current.thread_variable_set(:futex_badge, nil) end
Private Instance Methods
# File lib/futex.rb, line 149 def age(time) sec = Time.now - time return "#{(sec * 1_000_000).round}μs" if sec < 0.001 return "#{(sec * 1000).round}ms" if sec < 1 return "#{sec.round(2)}s" if sec < 60 return "#{(sec / 60).round}m" if sec < 60 * 60 "#{(sec / 3600).round}h" end
# File lib/futex.rb, line 143 def badge(exclusive) tname = Thread.current.name tname = 'nil' if tname.nil? "##{Process.pid}-#{exclusive ? 'ex' : 'sh'}/#{tname}[#{caller(2..2).first}]" end
# File lib/futex.rb, line 158 def debug(msg) return unless @logging if @log.respond_to?(:debug) @log.debug(msg) elsif @log.respond_to?(:puts) @log.puts(msg) end end
# File lib/futex.rb, line 201 def deserialize(data) data.empty? ? {} : JSON.parse(data) rescue JSON::ParserError {} end
# File lib/futex.rb, line 167 def open_synchronized(path) path = File.absolute_path(path) file = nil synchronized do |counts| file = File.open(path, File::CREAT | File::RDWR) refs = deserialize(IO.read(counts.path)) refs[path] = (refs[path] || 0) + 1 IO.write(counts.path, serialize(refs)) end yield file ensure synchronized do |counts| file&.close refs = deserialize(IO.read(counts.path)) refs[path] = (refs[path] || 1) - 1 if refs[path].zero? FileUtils.rm(path, force: true) refs.delete(path) end IO.write(counts.path, serialize(refs)) end end
# File lib/futex.rb, line 197 def serialize(data) data.to_json end
# File lib/futex.rb, line 190 def synchronized File.open(COUNTS, File::CREAT | File::RDWR) do |f| f.flock(File::LOCK_EX) yield f end end