class QuartzTorrent::Reactor

This class implements the Reactor pattern. The Reactor listens for activity on IO objects and calls methods on an associated Handler object when activity is detected. Callers can use listen, connect or open to register IO objects with the reactor.

This Reactor is implemented using Fibers in such a way that when activity is defected on an IO, the handler can perform reads of N bytes without blocking and without needing to buffer. For example, the handler may call:

msg = io.read(300)

when it knows it must read 300 bytes. If only 100 are available, the handler is cooperatively preempted and later resumed when more bytes are available, so that the read seems atomic while also not blocking.

Attributes

listenBacklog[RW]

Public Class Methods

new(handler, logger = nil) click to toggle source

Create a new reactor that uses the passed hander.

# File lib/quartz_torrent/reactor.rb, line 393
def initialize(handler, logger = nil)
  raise "Reactor.new called with nil handler. Handler can't be nil" if handler.nil?

  @stopped = false
  @handler = handler
  @handler.reactor = self

  # Hash of IOInfo objects, keyed by io.
  @ioInfo = {}
  @timerManager = TimerManager.new(logger)
  @currentIoInfo = nil
  @logger = logger
  @listenBacklog = 10
  @eventRead, @eventWrite = IO.pipe
  @currentEventPipeChars = 0
  @currentHandlerCallback = nil
  @userEvents = []
end

Public Instance Methods

addUserEvent(event) click to toggle source

Add a generic event. This event will be processed the next pass through the event loop

# File lib/quartz_torrent/reactor.rb, line 460
def addUserEvent(event)
  @userEvents.push event
end
cancelTimer(timerInfo) click to toggle source

Meant to be called from the handler. Cancel the timer scheduled with scheduleTimer

# File lib/quartz_torrent/reactor.rb, line 512
def cancelTimer(timerInfo)
  @timerManager.cancel timerInfo
end
close(io = nil) click to toggle source

Meant to be called from the handler. Closes the passed io, or if it’s nil, closes the current io

# File lib/quartz_torrent/reactor.rb, line 537
def close(io = nil)
  if ! io
    disposeIo @currentIoInfo if @currentIoInfo
  else
    disposeIo io
  end
end
connect(addr, port, metainfo, timeout = nil) click to toggle source

Create a TCP connection to the specified host. Note that this method may raise exceptions. For example ‘Too many open files’ might be raised if the process is using too many file descriptors

# File lib/quartz_torrent/reactor.rb, line 422
def connect(addr, port, metainfo, timeout = nil)
  ioInfo = startConnection(port, addr, metainfo)
  @ioInfo[ioInfo.io] = ioInfo
  if timeout && ioInfo.state == :connecting
    ioInfo.connectTimeout = timeout
    ioInfo.connectTimer = scheduleTimer(timeout, InternalTimerInfo.new(:connect_timeout, ioInfo), false)
  end
end
currentIo() click to toggle source

Meant to be called from the handler. Returns the current io

# File lib/quartz_torrent/reactor.rb, line 546
def currentIo
  @currentIoInfo.readFiberIoFacade
end
findIoByMetainfo(metainfo) click to toggle source

Meant to be called from the handler. Find an IO by metainfo. The == operator is used to match the metainfo.

# File lib/quartz_torrent/reactor.rb, line 569
def findIoByMetainfo(metainfo)
  @ioInfo.each_value do |info|
    if info.metainfo == metainfo
      io = info.readFiberIoFacade
      # Don't allow read calls from timer handlers. This is to prevent a complex situation.
      # See the processTimer call in eventLoopBody for more info
      io = WriteOnlyIoFacade.new(info) if @currentHandlerCallback == :timer
      return io
    end
  end
  nil
end
listen(addr, port, metainfo) click to toggle source

Create a TCP server that listens for connections on the specified port

# File lib/quartz_torrent/reactor.rb, line 433
def listen(addr, port, metainfo)
  listener = Socket.new( AF_INET, SOCK_STREAM, 0 )
  sockaddr = Socket.pack_sockaddr_in( port, "0.0.0.0" )
  listener.setsockopt(Socket::SOL_SOCKET,Socket::SO_REUSEADDR, true)
  listener.bind( sockaddr )
  @logger.debug "listening on port #{port}" if @logger
  listener.listen( @listenBacklog )
             
  info = IOInfo.new(listener, metainfo)
  info.readFiberIoFacade.logger = @logger if @logger
  info.state = :listening
  @ioInfo[info.io] = info
end
open(path, mode, metainfo, useErrorhandler = true) click to toggle source

Open the specified file for the specified mode.

# File lib/quartz_torrent/reactor.rb, line 448
def open(path, mode, metainfo, useErrorhandler = true)
  file = File.open(path, mode)

  info = IOInfo.new(file, metainfo, true)
  info.useErrorhandler = useErrorhandler
  info.readFiberIoFacade.logger = @logger if @logger
  info.state = :connected
  @ioInfo[info.io] = info
end
read(len) click to toggle source

Meant to be called from the handler. Read ‘len’ bytes from the current IO.

# File lib/quartz_torrent/reactor.rb, line 527
def read(len)
  if @currentIoInfo
    # This is meant to be called from inside a fiber. Should add a check to confirm that here.
    @currentIoInfo.readFiberIoFacade.read(len)
  else
    raise "Reactor.read called with no current io. Was it called from a timer handler?"
  end
end
scheduleTimer(duration, metainfo = nil, recurring = true, immed = false) click to toggle source

Schedule a timer. Parameter ‘duration’ specifies the timer duration in seconds, ‘metainfo’ is caller information passed to the handler when the timer expires, ‘recurring’ should be true if the timer will repeat, or false if it will only expire once, and ‘immed’ when true specifies that the timer should expire immediately (and again each duration if recurring) while false specifies that the timer will only expire the first time after it’s duration elapses.

# File lib/quartz_torrent/reactor.rb, line 499
def scheduleTimer(duration, metainfo = nil, recurring = true, immed = false)
  timerInfo = @timerManager.add(duration, metainfo, recurring, immed)
  # This timer may expire sooner than the current sleep we are doing in select(). To make
  # sure we will write to the event pipe to break out of select().
  if @currentEventPipeChars == 0
    @eventWrite.write 'x'
    @currentEventPipeChars += 1
    @eventWrite.flush
  end
  timerInfo
end
setMetaInfo(metainfo) click to toggle source

Meant to be called from the handler. Sets the meta info for the current io

# File lib/quartz_torrent/reactor.rb, line 551
def setMetaInfo(metainfo)
  @currentIoInfo.metainfo = metainfo
end
setReadRateLimit(rate) click to toggle source

Meant to be called from the handler. Sets the max rate at which the current io can read.

# File lib/quartz_torrent/reactor.rb, line 556
def setReadRateLimit(rate)
  raise "The argument must be a RateLimit" if ! rate.nil? && !rate.is_a?(RateLimit)
  @currentIoInfo.readRateLimit = rate
end
setWriteRateLimit(rate) click to toggle source

Meant to be called from the handler. Sets the max rate at which the current io can be written to.

# File lib/quartz_torrent/reactor.rb, line 562
def setWriteRateLimit(rate)
  raise "The argument must be a RateLimit" if ! rate.nil? && !rate.is_a?(RateLimit)
  @currentIoInfo.writeRateLimit = rate
end
start() click to toggle source

Run event loop

# File lib/quartz_torrent/reactor.rb, line 465
def start
  while true
    begin
      break if eventLoopBody == :halt
    rescue
      @logger.error "Unexpected exception in reactor event loop: #{$!}" if @logger
      @logger.error $!.backtrace.join "\n" if @logger
    end
  end

  @logger.info "Reactor shutting down" if @logger

  # Event loop finished
  @ioInfo.each do |k,v|
    k.close
  end

end
stop() click to toggle source

Stop the event loop.

# File lib/quartz_torrent/reactor.rb, line 485
def stop
  @stopped = true
  return if @currentEventPipeChars > 0
  @eventWrite.write 'x'
  @currentEventPipeChars += 1
  @eventWrite.flush
end
stopped?() click to toggle source

Returns true if the reactor is stopped

# File lib/quartz_torrent/reactor.rb, line 415
def stopped?
  @stopped
end
write(data) click to toggle source

Meant to be called from the handler. Adds the specified data to the outgoing queue for the current io

# File lib/quartz_torrent/reactor.rb, line 517
def write(data)
  if @currentIoInfo
    # This is meant to be called from inside a fiber. Should add a check to confirm that here.
    @currentIoInfo.readFiberIoFacade.write(data)
  else
    raise "Reactor.write called with no current io. Was it called from a timer handler?"
  end
end

Private Instance Methods

closeIo(io) click to toggle source

Inner function in disposeIo.

# File lib/quartz_torrent/reactor.rb, line 781
def closeIo(io)
  begin
    io.close if !io.closed?
  rescue
    @logger.warn "Closing IO failed with exception #{$!}"
    @logger.debug $!.backtrace.join("\n")
  end
end
disposeIo(io) click to toggle source
# File lib/quartz_torrent/reactor.rb, line 779
def disposeIo(io)
  # Inner function in disposeIo.
  def closeIo(io)
    begin
      io.close if !io.closed?
    rescue
      @logger.warn "Closing IO failed with exception #{$!}"
      @logger.debug $!.backtrace.join("\n")
    end
  end
  
  if io.is_a?(IOInfo)
    # Flush any output
    begin
      @logger.debug "disposeIo: flushing data" if @logger
      io.outputBuffer.flush
    rescue
    end
    
    closeIo(io.io)
    @ioInfo.delete io.io
  elsif io.is_a?(IoFacade)
    closeIo(io)
    io.removeFromIOHash(@ioInfo)
  else
    closeIo(io)
    @ioInfo.delete io
  end
end
eventLoopBody() click to toggle source

Returns :continue or :halt to the caller, specifying whether to continue the event loop or halt.

# File lib/quartz_torrent/reactor.rb, line 585
def eventLoopBody
  # 1. Check timers
  timer, selectTimeout = processTimers

  readset = []
  writeset = []
  outputBufferNotEmptyCount = 0
  ioToRemove = []
  @ioInfo.each do |k,v|
    if k.closed?
      ioToRemove.push k
      next
    end
    readset.push k if v.state != :connecting && ! @stopped && (v.readRateLimit.nil? || v.readRateLimit.avail >= 1.0)
    @logger.debug "eventloop: IO metainfo=#{v.metainfo} added to read set" if @logger
    writeset.push k if (!v.outputBuffer.empty? || v.state == :connecting) && v.state != :listening && (v.writeRateLimit.nil? || v.writeRateLimit.avail >= 1.0)
    @logger.debug "eventloop: IO metainfo=#{v.metainfo} added to write set" if @logger
    outputBufferNotEmptyCount += 1 if !v.outputBuffer.empty?
  end
  readset.push @eventRead

  # Only exit the event loop once we've written all pending data.
  return :halt if @stopped && outputBufferNotEmptyCount == 0

  # 2. Check user events
  @userEvents.each{ |event| @handler.userEvent event } if ! @stopped

  # 3. Call Select. Ignore exception set: apparently this is for OOB data, or terminal things.
  selectResult = nil
  while true
    begin
      if readset.length > 1024 || writeset.length > 1024
        @logger.error "Too many file descriptors to pass to select! Trimming them. Some fds may starve!" if @logger
        readset = readset.first(1024)
        writeset = writeset.first(1024)
      end
      @logger.debug "eventloop: Calling select" if @logger
      selectResult = IO.select(readset, writeset, nil, selectTimeout)
      @logger.debug "eventloop: select done. result: #{selectResult.inspect}" if @logger
      break
    rescue
      # Exception occurred. Probably EINTR.
      @logger.warn "Select raised exception; will retry. Reason: #{$!}" if @logger
    end
  end

  if timer
    # Calling processTimer in withReadFiber here is not correct. What if at this point the fiber was already paused in a read, and we
    # want to process a timer? In that case we will resume the read and it will possibly finish, but we'll never
    # call the timer handler. For this reason we must prevent read calls in timerHandlers.
    # Process timer
    @logger.debug "eventloop: processing timer" if @logger
    processTimer(timer)
  end

  if ! selectResult.nil?
    readable, writeable = selectResult
  
    # If we are stopped, then ignore reads; we only care about completing our writes that were pending when we were stopped.
    readable = [] if @stopped

    readable.each do |io|
      # This could be the eventRead pipe, which we use to signal shutdown or to reloop.
      if io == @eventRead
        @logger.debug "Event received on the eventRead pipe." if @logger
        @eventRead.read 1
        @currentEventPipeChars -= 1
        next
      end

      @currentIoInfo = @ioInfo[io]
 
      # The IOInfo associated with this io could have been closed by the timer handler processed above.
      next if @currentIoInfo.nil?

      if @currentIoInfo.state == :listening
        @logger.debug "eventloop: calling handleAccept for IO metainfo=#{@currentIoInfo.metainfo}" if @logger
        # Replace the currentIoInfo with the accepted socket
        listenerMetainfo = @currentIoInfo.metainfo
        @currentIoInfo, addr, port = handleAccept(@currentIoInfo)
        withReadFiber(@currentIoInfo) do 
          @currentHandlerCallback = :serverinit
          @handler.serverInit(listenerMetainfo, addr, port)
        end
      else
        @logger.debug "eventloop: calling handleRead for IO metainfo=#{@currentIoInfo.metainfo}" if @logger
        #handleRead(@currentIoInfo)
        withReadFiber(@currentIoInfo) do 
          @currentHandlerCallback = :recv_data
          @handler.recvData @currentIoInfo.metainfo
        end
      end
    end

    writeable.each do |io|
      @currentIoInfo = @ioInfo[io]
      # Check if there is still ioInfo for this io. This can happen if this io was also ready for read, and
      # the read had an error (for example connection failure) and the ioinfo was removed when handling the error.
      next if ! @currentIoInfo 
      if @currentIoInfo.state == :connecting
        @logger.debug "eventloop: calling finishConnection for IO metainfo=#{@currentIoInfo.metainfo}" if @logger
        finishConnection(@currentIoInfo)
      else
        @logger.debug "eventloop: calling writeOutputBuffer for IO metainfo=#{@currentIoInfo.metainfo}" if @logger
        writeOutputBuffer(@currentIoInfo)
      end
    end
  end

  ioToRemove.each do |io|
    ioInfo = @ioInfo.delete io
    @logger.warn "Detected an IO that was closed but still in the list of selectable IO. Metadata = #{ioInfo.metainfo}"
  end

  :continue
end
finishConnection(ioInfo) click to toggle source
# File lib/quartz_torrent/reactor.rb, line 739
def finishConnection(ioInfo)
  # Socket was connecting and is now writable. Check if there was a connection failure
  # This uses the getpeername method. See http://cr.yp.to/docs/connect.html
  begin
    ioInfo.io.getpeername
    ioInfo.state = :connected
    if ioInfo.connectTimer 
      @logger.debug "cancelling connect timer for IO metainfo=#{@currentIoInfo.metainfo}" if @logger
      @timerManager.cancel ioInfo.connectTimer
    end
    @currentHandlerCallback = :client_init
    @handler.clientInit(ioInfo.metainfo)
  rescue
    # Connection failed.
    @logger.debug "connection failed for IO metainfo=#{@currentIoInfo.metainfo}: #{$!}" if @logger
    @currentHandlerCallback = :connect_error
    @handler.connectError(ioInfo.metainfo, $!)
    disposeIo(ioInfo)
  end
end
handleAccept(ioInfo) click to toggle source

Given the ioInfo for a listening socket, call accept and return the new ioInfo for the client’s socket

# File lib/quartz_torrent/reactor.rb, line 811
def handleAccept(ioInfo)
  socket, clientAddr = ioInfo.io.accept
  info = IOInfo.new(socket, ioInfo.metainfo)
  info.readFiberIoFacade.logger = @logger if @logger
  info.state = :connected
  @ioInfo[info.io] = info
  if @logger
    port, addr = Socket.unpack_sockaddr_in(clientAddr)
    @logger.debug "Accepted connection from #{addr}:#{port}" if @logger
  end
  
  [info, addr, port]
end
handleRead(ioInfo) click to toggle source
# File lib/quartz_torrent/reactor.rb, line 825
def handleRead(ioInfo)
  if ioInfo.readFiber.nil? || ! ioInfo.readFiber.alive?
    ioInfo.readFiber = Fiber.new do |ioInfo|
      @currentHandlerCallback = :recv_data
      @handler.recvData ioInfo.metainfo
    end
  end     
  
  # Allow handler to read some data.
  # This call will return either if:
  #  1. the handler needs more data but it isn't available yet,
  #  2. if it's read all the data it wanted to read for the current message it's building
  #  3. if a read error occurred.
  #
  # In case 2 the latter case the fiber will be dead. In cases 1 and 2, we should select on the socket
  # until data is ready. For case 3, the state of the ioInfo is set to error and the io should be
  # removed.
  ioInfo.readFiber.resume(ioInfo)
  if ioInfo.state == :error
    @currentHandlerCallback = :error
    @handler.error(ioInfo.metainfo, ioInfo.lastReadError)
    disposeIo(ioInfo)
  end
end
processTimer(timer) click to toggle source
# File lib/quartz_torrent/reactor.rb, line 760
def processTimer(timer)
  begin
    # Check for internal timers first.
    if timer.metainfo && timer.metainfo.is_a?(InternalTimerInfo)
      if timer.metainfo.type == :connect_timeout
        @currentHandlerCallback = :error
        @handler.error(timer.metainfo.data.metainfo, "Connection timed out")
        disposeIo(timer.metainfo.data)
      end
    else
      @currentHandlerCallback = :timer
      @handler.timerExpired(timer.metainfo)
    end
  rescue
    @logger.error "Exception in timer event handler: #{$!}" if @logger
    @logger.error $!.backtrace.join "\n" if @logger
  end
end
processTimers() click to toggle source
# File lib/quartz_torrent/reactor.rb, line 702
def processTimers
  selectTimeout = nil
  timer = nil
  while true && ! @stopped
    secondsUntilExpiry = nil
    timer = @timerManager.next{ |s| secondsUntilExpiry = s }
    break if ! timer
    if secondsUntilExpiry > 0
      selectTimeout = secondsUntilExpiry
      break
    end
    # Process timer now; it's firing time has already passed.
    processTimer(timer)
  end
  [timer, selectTimeout]
end
startConnection(port, addr, metainfo) click to toggle source
# File lib/quartz_torrent/reactor.rb, line 719
def startConnection(port, addr, metainfo)
  socket = Socket.new(AF_INET, SOCK_STREAM, 0)
  addr = Socket.pack_sockaddr_in(port, addr)

  info = IOInfo.new(socket, metainfo)
  info.readFiberIoFacade.logger = @logger if @logger

  begin
    socket.connect_nonblock(addr)
    info.state = :connected
    @currentHandlerCallback = :client_init
    @handler.clientInit(ioInfo.metainfo)
  rescue Errno::EINPROGRESS
    # Connection is ongoing.
    info.state = :connecting
  end

  info
end
withReadFiber(ioInfo) { |readFiberIoFacade| ... } click to toggle source

Call the passed block in the context of the read Fiber. Basically the passed block is run as normal, but if the block performs a read from an io and that read would block, the block is paused, and withReadFiber returns. The next time withReadFiber is called the block will be resumed at the point of the read.

# File lib/quartz_torrent/reactor.rb, line 854
def withReadFiber(ioInfo)
  if ioInfo.readFiber.nil? || ! ioInfo.readFiber.alive?
    ioInfo.readFiber = Fiber.new do |ioInfo|
      yield ioInfo.readFiberIoFacade
    end
  end     
  
  # Allow handler to read some data.
  # This call will return either if:
  #  1. the handler needs more data but it isn't available yet,
  #  2. if it's read all the data it wanted to read for the current message it's building
  #  3. if a read error occurred.
  #
  # In case 2 the latter case the fiber will be dead. In cases 1 and 2, we should select on the socket
  # until data is ready. For case 3, the state of the ioInfo is set to error and the io should be
  # removed.
  ioInfo.readFiber.resume(ioInfo)
  if ioInfo.state == :error
    @currentHandlerCallback = :error
    @handler.error(ioInfo.metainfo, ioInfo.lastReadError)
    disposeIo(ioInfo)
  end
  
end
writeOutputBuffer(ioInfo) click to toggle source
# File lib/quartz_torrent/reactor.rb, line 879
def writeOutputBuffer(ioInfo)
  begin
    @logger.debug "writeOutputBuffer: flushing data" if @logger
    if !ioInfo.writeRateLimit
      ioInfo.outputBuffer.flush
    else
      avail = ioInfo.writeRateLimit.avail
      if avail < ioInfo.outputBuffer.size
        if avail > 0
          ioInfo.writeRateLimit.withdraw avail
          ioInfo.outputBuffer.flush avail 
        end
      else
        ioInfo.writeRateLimit.withdraw ioInfo.outputBuffer.size
        ioInfo.outputBuffer.flush
      end
    end
  rescue Errno::EWOULDBLOCK, Errno::EAGAIN, Errno::EINTR
    # Need to wait to write more.
    @logger.debug "writeOutputBuffer: write failed with retryable exception #{$!}" if @logger
  rescue
    # Write failure occurred
    @logger.debug "writeOutputBuffer: write failed with unexpected exception #{$!}" if @logger
    if ioInfo.useErrorhandler
      @currentHandlerCallback = :error
      @handler.error(ioInfo.metainfo, "Write error: #{$!}")
    else
      raise $!
    end
  end

end