class QuartzTorrent::Reactor
This class implements the Reactor
pattern. The Reactor
listens for activity on IO objects and calls methods on an associated Handler
object when activity is detected. Callers can use listen, connect or open to register IO objects with the reactor.
This Reactor
is implemented using Fibers in such a way that when activity is defected on an IO, the handler can perform reads of N bytes without blocking and without needing to buffer. For example, the handler may call:
msg = io.read(300)
when it knows it must read 300 bytes. If only 100 are available, the handler is cooperatively preempted and later resumed when more bytes are available, so that the read seems atomic while also not blocking.
Attributes
Public Class Methods
Create a new reactor that uses the passed hander.
# File lib/quartz_torrent/reactor.rb, line 393 def initialize(handler, logger = nil) raise "Reactor.new called with nil handler. Handler can't be nil" if handler.nil? @stopped = false @handler = handler @handler.reactor = self # Hash of IOInfo objects, keyed by io. @ioInfo = {} @timerManager = TimerManager.new(logger) @currentIoInfo = nil @logger = logger @listenBacklog = 10 @eventRead, @eventWrite = IO.pipe @currentEventPipeChars = 0 @currentHandlerCallback = nil @userEvents = [] end
Public Instance Methods
Add a generic event. This event will be processed the next pass through the event loop
# File lib/quartz_torrent/reactor.rb, line 460 def addUserEvent(event) @userEvents.push event end
Meant to be called from the handler. Cancel
the timer scheduled with scheduleTimer
# File lib/quartz_torrent/reactor.rb, line 512 def cancelTimer(timerInfo) @timerManager.cancel timerInfo end
Meant to be called from the handler. Closes the passed io, or if it’s nil, closes the current io
# File lib/quartz_torrent/reactor.rb, line 537 def close(io = nil) if ! io disposeIo @currentIoInfo if @currentIoInfo else disposeIo io end end
Create a TCP connection to the specified host. Note that this method may raise exceptions. For example ‘Too many open files’ might be raised if the process is using too many file descriptors
# File lib/quartz_torrent/reactor.rb, line 422 def connect(addr, port, metainfo, timeout = nil) ioInfo = startConnection(port, addr, metainfo) @ioInfo[ioInfo.io] = ioInfo if timeout && ioInfo.state == :connecting ioInfo.connectTimeout = timeout ioInfo.connectTimer = scheduleTimer(timeout, InternalTimerInfo.new(:connect_timeout, ioInfo), false) end end
Meant to be called from the handler. Returns the current io
# File lib/quartz_torrent/reactor.rb, line 546 def currentIo @currentIoInfo.readFiberIoFacade end
Meant to be called from the handler. Find an IO by metainfo. The == operator is used to match the metainfo.
# File lib/quartz_torrent/reactor.rb, line 569 def findIoByMetainfo(metainfo) @ioInfo.each_value do |info| if info.metainfo == metainfo io = info.readFiberIoFacade # Don't allow read calls from timer handlers. This is to prevent a complex situation. # See the processTimer call in eventLoopBody for more info io = WriteOnlyIoFacade.new(info) if @currentHandlerCallback == :timer return io end end nil end
Create a TCP server that listens for connections on the specified port
# File lib/quartz_torrent/reactor.rb, line 433 def listen(addr, port, metainfo) listener = Socket.new( AF_INET, SOCK_STREAM, 0 ) sockaddr = Socket.pack_sockaddr_in( port, "0.0.0.0" ) listener.setsockopt(Socket::SOL_SOCKET,Socket::SO_REUSEADDR, true) listener.bind( sockaddr ) @logger.debug "listening on port #{port}" if @logger listener.listen( @listenBacklog ) info = IOInfo.new(listener, metainfo) info.readFiberIoFacade.logger = @logger if @logger info.state = :listening @ioInfo[info.io] = info end
Open the specified file for the specified mode.
# File lib/quartz_torrent/reactor.rb, line 448 def open(path, mode, metainfo, useErrorhandler = true) file = File.open(path, mode) info = IOInfo.new(file, metainfo, true) info.useErrorhandler = useErrorhandler info.readFiberIoFacade.logger = @logger if @logger info.state = :connected @ioInfo[info.io] = info end
Meant to be called from the handler. Read ‘len’ bytes from the current IO.
# File lib/quartz_torrent/reactor.rb, line 527 def read(len) if @currentIoInfo # This is meant to be called from inside a fiber. Should add a check to confirm that here. @currentIoInfo.readFiberIoFacade.read(len) else raise "Reactor.read called with no current io. Was it called from a timer handler?" end end
Schedule a timer. Parameter ‘duration’ specifies the timer duration in seconds, ‘metainfo’ is caller information passed to the handler when the timer expires, ‘recurring’ should be true if the timer will repeat, or false if it will only expire once, and ‘immed’ when true specifies that the timer should expire immediately (and again each duration if recurring) while false specifies that the timer will only expire the first time after it’s duration elapses.
# File lib/quartz_torrent/reactor.rb, line 499 def scheduleTimer(duration, metainfo = nil, recurring = true, immed = false) timerInfo = @timerManager.add(duration, metainfo, recurring, immed) # This timer may expire sooner than the current sleep we are doing in select(). To make # sure we will write to the event pipe to break out of select(). if @currentEventPipeChars == 0 @eventWrite.write 'x' @currentEventPipeChars += 1 @eventWrite.flush end timerInfo end
Meant to be called from the handler. Sets the meta info for the current io
# File lib/quartz_torrent/reactor.rb, line 551 def setMetaInfo(metainfo) @currentIoInfo.metainfo = metainfo end
Meant to be called from the handler. Sets the max rate at which the current io can read.
# File lib/quartz_torrent/reactor.rb, line 556 def setReadRateLimit(rate) raise "The argument must be a RateLimit" if ! rate.nil? && !rate.is_a?(RateLimit) @currentIoInfo.readRateLimit = rate end
Meant to be called from the handler. Sets the max rate at which the current io can be written to.
# File lib/quartz_torrent/reactor.rb, line 562 def setWriteRateLimit(rate) raise "The argument must be a RateLimit" if ! rate.nil? && !rate.is_a?(RateLimit) @currentIoInfo.writeRateLimit = rate end
Run event loop
# File lib/quartz_torrent/reactor.rb, line 465 def start while true begin break if eventLoopBody == :halt rescue @logger.error "Unexpected exception in reactor event loop: #{$!}" if @logger @logger.error $!.backtrace.join "\n" if @logger end end @logger.info "Reactor shutting down" if @logger # Event loop finished @ioInfo.each do |k,v| k.close end end
Stop the event loop.
# File lib/quartz_torrent/reactor.rb, line 485 def stop @stopped = true return if @currentEventPipeChars > 0 @eventWrite.write 'x' @currentEventPipeChars += 1 @eventWrite.flush end
Returns true if the reactor is stopped
# File lib/quartz_torrent/reactor.rb, line 415 def stopped? @stopped end
Meant to be called from the handler. Adds the specified data to the outgoing queue for the current io
# File lib/quartz_torrent/reactor.rb, line 517 def write(data) if @currentIoInfo # This is meant to be called from inside a fiber. Should add a check to confirm that here. @currentIoInfo.readFiberIoFacade.write(data) else raise "Reactor.write called with no current io. Was it called from a timer handler?" end end
Private Instance Methods
Inner function in disposeIo.
# File lib/quartz_torrent/reactor.rb, line 781 def closeIo(io) begin io.close if !io.closed? rescue @logger.warn "Closing IO failed with exception #{$!}" @logger.debug $!.backtrace.join("\n") end end
# File lib/quartz_torrent/reactor.rb, line 779 def disposeIo(io) # Inner function in disposeIo. def closeIo(io) begin io.close if !io.closed? rescue @logger.warn "Closing IO failed with exception #{$!}" @logger.debug $!.backtrace.join("\n") end end if io.is_a?(IOInfo) # Flush any output begin @logger.debug "disposeIo: flushing data" if @logger io.outputBuffer.flush rescue end closeIo(io.io) @ioInfo.delete io.io elsif io.is_a?(IoFacade) closeIo(io) io.removeFromIOHash(@ioInfo) else closeIo(io) @ioInfo.delete io end end
Returns :continue or :halt to the caller, specifying whether to continue the event loop or halt.
# File lib/quartz_torrent/reactor.rb, line 585 def eventLoopBody # 1. Check timers timer, selectTimeout = processTimers readset = [] writeset = [] outputBufferNotEmptyCount = 0 ioToRemove = [] @ioInfo.each do |k,v| if k.closed? ioToRemove.push k next end readset.push k if v.state != :connecting && ! @stopped && (v.readRateLimit.nil? || v.readRateLimit.avail >= 1.0) @logger.debug "eventloop: IO metainfo=#{v.metainfo} added to read set" if @logger writeset.push k if (!v.outputBuffer.empty? || v.state == :connecting) && v.state != :listening && (v.writeRateLimit.nil? || v.writeRateLimit.avail >= 1.0) @logger.debug "eventloop: IO metainfo=#{v.metainfo} added to write set" if @logger outputBufferNotEmptyCount += 1 if !v.outputBuffer.empty? end readset.push @eventRead # Only exit the event loop once we've written all pending data. return :halt if @stopped && outputBufferNotEmptyCount == 0 # 2. Check user events @userEvents.each{ |event| @handler.userEvent event } if ! @stopped # 3. Call Select. Ignore exception set: apparently this is for OOB data, or terminal things. selectResult = nil while true begin if readset.length > 1024 || writeset.length > 1024 @logger.error "Too many file descriptors to pass to select! Trimming them. Some fds may starve!" if @logger readset = readset.first(1024) writeset = writeset.first(1024) end @logger.debug "eventloop: Calling select" if @logger selectResult = IO.select(readset, writeset, nil, selectTimeout) @logger.debug "eventloop: select done. result: #{selectResult.inspect}" if @logger break rescue # Exception occurred. Probably EINTR. @logger.warn "Select raised exception; will retry. Reason: #{$!}" if @logger end end if timer # Calling processTimer in withReadFiber here is not correct. What if at this point the fiber was already paused in a read, and we # want to process a timer? In that case we will resume the read and it will possibly finish, but we'll never # call the timer handler. For this reason we must prevent read calls in timerHandlers. # Process timer @logger.debug "eventloop: processing timer" if @logger processTimer(timer) end if ! selectResult.nil? readable, writeable = selectResult # If we are stopped, then ignore reads; we only care about completing our writes that were pending when we were stopped. readable = [] if @stopped readable.each do |io| # This could be the eventRead pipe, which we use to signal shutdown or to reloop. if io == @eventRead @logger.debug "Event received on the eventRead pipe." if @logger @eventRead.read 1 @currentEventPipeChars -= 1 next end @currentIoInfo = @ioInfo[io] # The IOInfo associated with this io could have been closed by the timer handler processed above. next if @currentIoInfo.nil? if @currentIoInfo.state == :listening @logger.debug "eventloop: calling handleAccept for IO metainfo=#{@currentIoInfo.metainfo}" if @logger # Replace the currentIoInfo with the accepted socket listenerMetainfo = @currentIoInfo.metainfo @currentIoInfo, addr, port = handleAccept(@currentIoInfo) withReadFiber(@currentIoInfo) do @currentHandlerCallback = :serverinit @handler.serverInit(listenerMetainfo, addr, port) end else @logger.debug "eventloop: calling handleRead for IO metainfo=#{@currentIoInfo.metainfo}" if @logger #handleRead(@currentIoInfo) withReadFiber(@currentIoInfo) do @currentHandlerCallback = :recv_data @handler.recvData @currentIoInfo.metainfo end end end writeable.each do |io| @currentIoInfo = @ioInfo[io] # Check if there is still ioInfo for this io. This can happen if this io was also ready for read, and # the read had an error (for example connection failure) and the ioinfo was removed when handling the error. next if ! @currentIoInfo if @currentIoInfo.state == :connecting @logger.debug "eventloop: calling finishConnection for IO metainfo=#{@currentIoInfo.metainfo}" if @logger finishConnection(@currentIoInfo) else @logger.debug "eventloop: calling writeOutputBuffer for IO metainfo=#{@currentIoInfo.metainfo}" if @logger writeOutputBuffer(@currentIoInfo) end end end ioToRemove.each do |io| ioInfo = @ioInfo.delete io @logger.warn "Detected an IO that was closed but still in the list of selectable IO. Metadata = #{ioInfo.metainfo}" end :continue end
# File lib/quartz_torrent/reactor.rb, line 739 def finishConnection(ioInfo) # Socket was connecting and is now writable. Check if there was a connection failure # This uses the getpeername method. See http://cr.yp.to/docs/connect.html begin ioInfo.io.getpeername ioInfo.state = :connected if ioInfo.connectTimer @logger.debug "cancelling connect timer for IO metainfo=#{@currentIoInfo.metainfo}" if @logger @timerManager.cancel ioInfo.connectTimer end @currentHandlerCallback = :client_init @handler.clientInit(ioInfo.metainfo) rescue # Connection failed. @logger.debug "connection failed for IO metainfo=#{@currentIoInfo.metainfo}: #{$!}" if @logger @currentHandlerCallback = :connect_error @handler.connectError(ioInfo.metainfo, $!) disposeIo(ioInfo) end end
Given the ioInfo for a listening socket, call accept and return the new ioInfo for the client’s socket
# File lib/quartz_torrent/reactor.rb, line 811 def handleAccept(ioInfo) socket, clientAddr = ioInfo.io.accept info = IOInfo.new(socket, ioInfo.metainfo) info.readFiberIoFacade.logger = @logger if @logger info.state = :connected @ioInfo[info.io] = info if @logger port, addr = Socket.unpack_sockaddr_in(clientAddr) @logger.debug "Accepted connection from #{addr}:#{port}" if @logger end [info, addr, port] end
# File lib/quartz_torrent/reactor.rb, line 825 def handleRead(ioInfo) if ioInfo.readFiber.nil? || ! ioInfo.readFiber.alive? ioInfo.readFiber = Fiber.new do |ioInfo| @currentHandlerCallback = :recv_data @handler.recvData ioInfo.metainfo end end # Allow handler to read some data. # This call will return either if: # 1. the handler needs more data but it isn't available yet, # 2. if it's read all the data it wanted to read for the current message it's building # 3. if a read error occurred. # # In case 2 the latter case the fiber will be dead. In cases 1 and 2, we should select on the socket # until data is ready. For case 3, the state of the ioInfo is set to error and the io should be # removed. ioInfo.readFiber.resume(ioInfo) if ioInfo.state == :error @currentHandlerCallback = :error @handler.error(ioInfo.metainfo, ioInfo.lastReadError) disposeIo(ioInfo) end end
# File lib/quartz_torrent/reactor.rb, line 760 def processTimer(timer) begin # Check for internal timers first. if timer.metainfo && timer.metainfo.is_a?(InternalTimerInfo) if timer.metainfo.type == :connect_timeout @currentHandlerCallback = :error @handler.error(timer.metainfo.data.metainfo, "Connection timed out") disposeIo(timer.metainfo.data) end else @currentHandlerCallback = :timer @handler.timerExpired(timer.metainfo) end rescue @logger.error "Exception in timer event handler: #{$!}" if @logger @logger.error $!.backtrace.join "\n" if @logger end end
# File lib/quartz_torrent/reactor.rb, line 702 def processTimers selectTimeout = nil timer = nil while true && ! @stopped secondsUntilExpiry = nil timer = @timerManager.next{ |s| secondsUntilExpiry = s } break if ! timer if secondsUntilExpiry > 0 selectTimeout = secondsUntilExpiry break end # Process timer now; it's firing time has already passed. processTimer(timer) end [timer, selectTimeout] end
# File lib/quartz_torrent/reactor.rb, line 719 def startConnection(port, addr, metainfo) socket = Socket.new(AF_INET, SOCK_STREAM, 0) addr = Socket.pack_sockaddr_in(port, addr) info = IOInfo.new(socket, metainfo) info.readFiberIoFacade.logger = @logger if @logger begin socket.connect_nonblock(addr) info.state = :connected @currentHandlerCallback = :client_init @handler.clientInit(ioInfo.metainfo) rescue Errno::EINPROGRESS # Connection is ongoing. info.state = :connecting end info end
Call the passed block in the context of the read Fiber. Basically the passed block is run as normal, but if the block performs a read from an io and that read would block, the block is paused, and withReadFiber returns. The next time withReadFiber is called the block will be resumed at the point of the read.
# File lib/quartz_torrent/reactor.rb, line 854 def withReadFiber(ioInfo) if ioInfo.readFiber.nil? || ! ioInfo.readFiber.alive? ioInfo.readFiber = Fiber.new do |ioInfo| yield ioInfo.readFiberIoFacade end end # Allow handler to read some data. # This call will return either if: # 1. the handler needs more data but it isn't available yet, # 2. if it's read all the data it wanted to read for the current message it's building # 3. if a read error occurred. # # In case 2 the latter case the fiber will be dead. In cases 1 and 2, we should select on the socket # until data is ready. For case 3, the state of the ioInfo is set to error and the io should be # removed. ioInfo.readFiber.resume(ioInfo) if ioInfo.state == :error @currentHandlerCallback = :error @handler.error(ioInfo.metainfo, ioInfo.lastReadError) disposeIo(ioInfo) end end
# File lib/quartz_torrent/reactor.rb, line 879 def writeOutputBuffer(ioInfo) begin @logger.debug "writeOutputBuffer: flushing data" if @logger if !ioInfo.writeRateLimit ioInfo.outputBuffer.flush else avail = ioInfo.writeRateLimit.avail if avail < ioInfo.outputBuffer.size if avail > 0 ioInfo.writeRateLimit.withdraw avail ioInfo.outputBuffer.flush avail end else ioInfo.writeRateLimit.withdraw ioInfo.outputBuffer.size ioInfo.outputBuffer.flush end end rescue Errno::EWOULDBLOCK, Errno::EAGAIN, Errno::EINTR # Need to wait to write more. @logger.debug "writeOutputBuffer: write failed with retryable exception #{$!}" if @logger rescue # Write failure occurred @logger.debug "writeOutputBuffer: write failed with unexpected exception #{$!}" if @logger if ioInfo.useErrorhandler @currentHandlerCallback = :error @handler.error(ioInfo.metainfo, "Write error: #{$!}") else raise $! end end end