module Bluepill::ProcessJournal

Attributes

journal_base_dir[R]
logger[R]

Public Class Methods

base_dir=(base_dir) click to toggle source
# File lib/bluepill/process_journal.rb, line 15
def base_dir=(base_dir)
  @journal_base_dir ||= File.join(base_dir, 'journals')
  FileUtils.mkdir_p(@journal_base_dir) unless File.exist?(@journal_base_dir)
  FileUtils.chmod(0777, @journal_base_dir)
end
logger=(new_logger) click to toggle source
# File lib/bluepill/process_journal.rb, line 11
def logger=(new_logger)
  @logger ||= new_logger
end

Public Instance Methods

acquire_atomic_fs_lock(name) { || ... } click to toggle source

atomic operation on POSIX filesystems, since f.flock(File::LOCK_SH) is not available on all platforms

# File lib/bluepill/process_journal.rb, line 32
def acquire_atomic_fs_lock(name)
  times = 0
  name += '.lock'
  Dir.mkdir name, 0700
  logger.debug("Acquired lock #{name}")
  yield
rescue Errno::EEXIST
  times += 1
  logger.debug("Waiting for lock #{name}")
  sleep 1
  if times < 10
    retry
  else
    logger.info("Timeout waiting for lock #{name}")
    raise "Timeout waiting for lock #{name}"
  end
ensure
  clear_atomic_fs_lock(name)
end
append_pgid_to_journal(journal_name, pgid) click to toggle source
# File lib/bluepill/process_journal.rb, line 169
def append_pgid_to_journal(journal_name, pgid)
  if skip_pgid?(pgid)
    logger.debug("Skipping invalid pgid #{pgid} (class #{pgid.class})")
    return
  end

  filename = pgid_journal_filename(journal_name)
  acquire_atomic_fs_lock(filename) do
    if pgid_journal(filename).include?(pgid)
      logger.debug("Skipping duplicate pgid #{pgid} already in journal #{journal_name}")
    else
      logger.debug("Saving pgid #{pgid} to process journal #{journal_name}")
      File.open(filename, 'a+', 0600) { |f| f.puts(pgid) }
      logger.info("Saved pgid #{pgid} to journal #{journal_name}")
      logger.debug("Journal now = #{File.open(filename, 'r').read}")
    end
  end
end
append_pid_to_journal(journal_name, pid) click to toggle source
# File lib/bluepill/process_journal.rb, line 188
def append_pid_to_journal(journal_name, pid)
  begin
    append_pgid_to_journal(journal_name, ::Process.getpgid(pid))
  rescue Errno::ESRCH
  end
  if skip_pid?(pid)
    logger.debug("Skipping invalid pid #{pid} (class #{pid.class})")
    return
  end

  filename = pid_journal_filename(journal_name)
  acquire_atomic_fs_lock(filename) do
    if pid_journal(filename).include?(pid)
      logger.debug("Skipping duplicate pid #{pid} already in journal #{journal_name}")
    else
      logger.debug("Saving pid #{pid} to process journal #{journal_name}")
      File.open(filename, 'a+', 0600) { |f| f.puts(pid) }
      logger.info("Saved pid #{pid} to journal #{journal_name}")
      logger.debug("Journal now = #{File.open(filename, 'r').read}")
    end
  end
end
clear_all_atomic_fs_locks() click to toggle source
# File lib/bluepill/process_journal.rb, line 52
def clear_all_atomic_fs_locks
  Dir['.*.lock'].each do |f|
    System.delete_if_exists(f) if File.directory?(f)
  end
end
clear_atomic_fs_lock(name) click to toggle source
# File lib/bluepill/process_journal.rb, line 84
def clear_atomic_fs_lock(name)
  return unless File.directory?(name)
  Dir.rmdir(name)
  logger.debug("Cleared lock #{name}")
end
kill_all_from_all_journals() click to toggle source
# File lib/bluepill/process_journal.rb, line 90
def kill_all_from_all_journals
  pids = Dir['.bluepill_pids_journal.*'].collect { |p| p.sub(/^\.bluepill_pids_journal\./, '') }
  pids.reject! { |p| p =~ /\.lock$/ }
  pids.each do |journal_name|
    kill_all_from_journal(journal_name)
  end
end
kill_all_from_journal(journal_name) click to toggle source
# File lib/bluepill/process_journal.rb, line 98
def kill_all_from_journal(journal_name)
  kill_all_pids_from_journal(journal_name)
  kill_all_pgids_from_journal(journal_name)
end
kill_all_pgids_from_journal(journal_name) click to toggle source
# File lib/bluepill/process_journal.rb, line 103
def kill_all_pgids_from_journal(journal_name)
  filename = pgid_journal_filename(journal_name)
  j = pgid_journal(filename)
  if !j.empty?
    acquire_atomic_fs_lock(filename) do
      j.each do |pgid|
        begin
          ::Process.kill('TERM', -pgid)
          logger.info("Termed old process group #{pgid}")
        rescue Errno::ESRCH
          logger.debug("Unable to term missing process group #{pgid}")
        end
      end

      if j.count { |pgid| System.pid_alive?(pgid) } > 1
        sleep(1)
        j.each do |pgid|
          begin
            ::Process.kill('KILL', -pgid)
            logger.info("Killed old process group #{pgid}")
          rescue Errno::ESRCH
            logger.debug("Unable to kill missing process group #{pgid}")
          end
        end
      end
      System.delete_if_exists(filename) # reset journal
      logger.debug('Journal cleanup completed')
    end
  else
    logger.debug('No previous process journal - Skipping cleanup')
  end
end
kill_all_pids_from_journal(journal_name) click to toggle source
# File lib/bluepill/process_journal.rb, line 136
def kill_all_pids_from_journal(journal_name)
  filename = pid_journal_filename(journal_name)
  j = pid_journal(filename)
  if !j.empty?
    acquire_atomic_fs_lock(filename) do
      j.each do |pid|
        begin
          ::Process.kill('TERM', pid)
          logger.info("Termed old process #{pid}")
        rescue Errno::ESRCH
          logger.debug("Unable to term missing process #{pid}")
        end
      end

      if j.count { |pid| System.pid_alive?(pid) } > 1
        sleep(1)
        j.each do |pid|
          begin
            ::Process.kill('KILL', pid)
            logger.info("Killed old process #{pid}")
          rescue Errno::ESRCH
            logger.debug("Unable to kill missing process #{pid}")
          end
        end
      end
      System.delete_if_exists(filename) # reset journal
      logger.debug('Journal cleanup completed')
    end
  else
    logger.debug('No previous process journal - Skipping cleanup')
  end
end
pgid_journal(filename) click to toggle source
# File lib/bluepill/process_journal.rb, line 75
def pgid_journal(filename)
  logger.debug("pgid journal file: #{filename}")
  result = File.open(filename, 'r').readlines.collect(&:to_i).reject { |pgid| skip_pgid?(pgid) }
  logger.debug("pgid journal = #{result.join(' ')}")
  result
rescue Errno::ENOENT
  []
end
pgid_journal_filename(journal_name) click to toggle source
# File lib/bluepill/process_journal.rb, line 62
def pgid_journal_filename(journal_name)
  File.join(@journal_base_dir, ".bluepill_pgids_journal.#{journal_name}")
end
pid_journal(filename) click to toggle source
# File lib/bluepill/process_journal.rb, line 66
def pid_journal(filename)
  logger.debug("pid journal file: #{filename}")
  result = File.open(filename, 'r').readlines.collect(&:to_i).reject { |pid| skip_pid?(pid) }
  logger.debug("pid journal = #{result.join(' ')}")
  result
rescue Errno::ENOENT
  []
end
pid_journal_filename(journal_name) click to toggle source
# File lib/bluepill/process_journal.rb, line 58
def pid_journal_filename(journal_name)
  File.join(@journal_base_dir, ".bluepill_pids_journal.#{journal_name}")
end
skip_pgid?(pgid) click to toggle source
# File lib/bluepill/process_journal.rb, line 26
def skip_pgid?(pgid)
  !pgid.is_a?(Integer) || pgid <= 1
end
skip_pid?(pid) click to toggle source
# File lib/bluepill/process_journal.rb, line 22
def skip_pid?(pid)
  !pid.is_a?(Integer) || pid <= 1
end