class OMGF::VerifyPaths

This module makes HEAD requests with reusable HTTP connections to verify paths. This is faster than having the mogilefsd tracker verifying paths, and the client could have broken routing to some the storage nodes the mogilefsd tracker/monitor can see.

Public Class Methods

new(logger) click to toggle source
# File lib/omgf/verify_paths.rb, line 95
def initialize(logger)
  @pool = Hash.new { |hash,host_port| hash[host_port] = [] }
  @logger = logger
  @finishq = Queue.new
  @finisher = nil
  @lock = Mutex.new
  @pid = $$
end

Public Instance Methods

error(msg) click to toggle source
# File lib/omgf/verify_paths.rb, line 104
def error(msg)
  @logger.error(msg) if @logger
end
finisher(timeout = 10000) click to toggle source

this runs in a background thread to cleanup all the requests that didn't finish quickly enough

# File lib/omgf/verify_paths.rb, line 137
def finisher(timeout = 10000)
  begin
    pollset = @finishq.pop # park here when idle

    while ready = Kgio.poll(pollset.dup, timeout)
      ready.each_key do |sock|
        sock.retry_ok = false
        # try to return good sockets back to the pool
        iter_check([], sock, pollset)
      end

      # try to stuff the pollset as much as possible for further looping
      while more = (@finishq.pop(true) rescue nil)
        pollset.merge!(more)
      end
    end

    # connections timed out, kill them
    pollset.each_key { |sock| sock.close }
  rescue => err
    error("#{err.message} (#{err.class})")
  end while true
end
iter_check(ok, sock, pollset) click to toggle source
# File lib/omgf/verify_paths.rb, line 108
def iter_check(ok, sock, pollset)
  rv = sock.poll_iter(pollset)
  case rv
  when Symbol # in progress
  when Array
    code = rv[0].to_i
    if 200 == code
      ok << sock.uri
    elsif code >= 100 && code <= 999
      error("HEAD #{sock.uri} returned HTTP code: #{code}")
    else
      error("HEAD #{sock.uri} returned #{rv.inspect} (kcar bug?)")
    end
    sock_put(sock)
  when nil # premature EOF
    error("HEAD #{sock.uri} hit socket EOF")
  else
    # exception or some other error return value...
    if rv.respond_to?(:message)
      error("HEAD #{sock.uri} error: #{rv.message} (#{rv.class})")
    else
      error("HEAD #{sock.uri} error (#{rv.class}): #{rv.inspect}")
    end
  end
  rv
end
key_for(uri) click to toggle source

returns a string key for the connection pool

# File lib/omgf/verify_paths.rb, line 197
def key_for(uri)
  "#{uri.host}:#{uri.port}"
end
sock_get(uri) click to toggle source

initializes a cached connection for uri or creates a new one

# File lib/omgf/verify_paths.rb, line 202
def sock_get(uri)
  key = key_for(uri)

  # detect forks and prevent sharing of connected sockets across processes
  @lock.synchronize do
    if @pid != $$
      @pid = $$
      @pool.clear
    end
  end

  while sock = @lock.synchronize { @pool[key].pop }
    begin
      # check if sock is still alive and idle
      # :wait_readable is good here
      break if sock.kgio_tryread(1) == :wait_readable
    rescue
      # ignore socket errors, we'll just give them a new socket
      # socket should've been idle, but it was not (or EOFed on us)
      # give them a new one
    end
    sock.close
  end

  sock ||= HeadSock.start(uri)
  sock.http_init(uri)
  sock
rescue
  # we'll return nil on any errors
end
sock_put(sock) click to toggle source

returns an idle socket to the pool

# File lib/omgf/verify_paths.rb, line 234
def sock_put(sock)
  key = key_for(sock.uri)
  sock.http_reusable? and @lock.synchronize { @pool[key] << sock }
rescue => err
  error("HTTP reuse check failed: #{err.message} (#{err.class})")
end
verify(uris, count, timeout) click to toggle source

reorders URIs based on response time This is the main method of this class

# File lib/omgf/verify_paths.rb, line 163
def verify(uris, count, timeout)
  tout = (timeout * 1000).to_i
  pollset = {}
  ok = []

  uris.each do |uri|
    sock = sock_get(uri) and iter_check(ok, sock, pollset)
  end

  while ok.size < count && tout > 0 && ! pollset.empty?
    t0 = Time.now
    ready = Kgio.poll(pollset.dup, tout) or break
    tout -= ((Time.now - t0) * 1000).to_i

    ready.each_key do |sock|
      iter_check(ok, sock, pollset)
    end
  end

  finish(pollset) unless pollset.empty?
  [ok, uris - ok] # good URLs first
end