class OMGF::VerifyPaths
This module makes HEAD requests with reusable HTTP connections to verify paths. This is faster than having the mogilefsd tracker verifying paths, and the client could have broken routing to some the storage nodes the mogilefsd tracker/monitor can see.
Public Class Methods
new(logger)
click to toggle source
# File lib/omgf/verify_paths.rb, line 95 def initialize(logger) @pool = Hash.new { |hash,host_port| hash[host_port] = [] } @logger = logger @finishq = Queue.new @finisher = nil @lock = Mutex.new @pid = $$ end
Public Instance Methods
error(msg)
click to toggle source
# File lib/omgf/verify_paths.rb, line 104 def error(msg) @logger.error(msg) if @logger end
finisher(timeout = 10000)
click to toggle source
this runs in a background thread to cleanup all the requests that didn't finish quickly enough
# File lib/omgf/verify_paths.rb, line 137 def finisher(timeout = 10000) begin pollset = @finishq.pop # park here when idle while ready = Kgio.poll(pollset.dup, timeout) ready.each_key do |sock| sock.retry_ok = false # try to return good sockets back to the pool iter_check([], sock, pollset) end # try to stuff the pollset as much as possible for further looping while more = (@finishq.pop(true) rescue nil) pollset.merge!(more) end end # connections timed out, kill them pollset.each_key { |sock| sock.close } rescue => err error("#{err.message} (#{err.class})") end while true end
iter_check(ok, sock, pollset)
click to toggle source
# File lib/omgf/verify_paths.rb, line 108 def iter_check(ok, sock, pollset) rv = sock.poll_iter(pollset) case rv when Symbol # in progress when Array code = rv[0].to_i if 200 == code ok << sock.uri elsif code >= 100 && code <= 999 error("HEAD #{sock.uri} returned HTTP code: #{code}") else error("HEAD #{sock.uri} returned #{rv.inspect} (kcar bug?)") end sock_put(sock) when nil # premature EOF error("HEAD #{sock.uri} hit socket EOF") else # exception or some other error return value... if rv.respond_to?(:message) error("HEAD #{sock.uri} error: #{rv.message} (#{rv.class})") else error("HEAD #{sock.uri} error (#{rv.class}): #{rv.inspect}") end end rv end
key_for(uri)
click to toggle source
returns a string key for the connection pool
# File lib/omgf/verify_paths.rb, line 197 def key_for(uri) "#{uri.host}:#{uri.port}" end
sock_get(uri)
click to toggle source
initializes a cached connection for uri
or creates a new one
# File lib/omgf/verify_paths.rb, line 202 def sock_get(uri) key = key_for(uri) # detect forks and prevent sharing of connected sockets across processes @lock.synchronize do if @pid != $$ @pid = $$ @pool.clear end end while sock = @lock.synchronize { @pool[key].pop } begin # check if sock is still alive and idle # :wait_readable is good here break if sock.kgio_tryread(1) == :wait_readable rescue # ignore socket errors, we'll just give them a new socket # socket should've been idle, but it was not (or EOFed on us) # give them a new one end sock.close end sock ||= HeadSock.start(uri) sock.http_init(uri) sock rescue # we'll return nil on any errors end
sock_put(sock)
click to toggle source
returns an idle socket to the pool
# File lib/omgf/verify_paths.rb, line 234 def sock_put(sock) key = key_for(sock.uri) sock.http_reusable? and @lock.synchronize { @pool[key] << sock } rescue => err error("HTTP reuse check failed: #{err.message} (#{err.class})") end
verify(uris, count, timeout)
click to toggle source
reorders URIs based on response time This is the main method of this class
# File lib/omgf/verify_paths.rb, line 163 def verify(uris, count, timeout) tout = (timeout * 1000).to_i pollset = {} ok = [] uris.each do |uri| sock = sock_get(uri) and iter_check(ok, sock, pollset) end while ok.size < count && tout > 0 && ! pollset.empty? t0 = Time.now ready = Kgio.poll(pollset.dup, tout) or break tout -= ((Time.now - t0) * 1000).to_i ready.each_key do |sock| iter_check(ok, sock, pollset) end end finish(pollset) unless pollset.empty? [ok, uris - ok] # good URLs first end