class QuartzTorrent::PeerClientHandler

This class implements a Reactor Handler object. This Handler implements the PeerClient.

Attributes

torrentData[R]

PUBLIC API METHODS ################################################

Public Class Methods

new(baseDirectory, maxIncomplete = 5, maxActive = 10) click to toggle source
# File lib/quartz_torrent/peerclient.rb, line 258
def initialize(baseDirectory, maxIncomplete = 5, maxActive = 10)
  # Hash of TorrentData objects, keyed by torrent infoHash
  @torrentData = {}
  @torrentQueue = TorrentQueue.new(maxIncomplete, maxActive)

  @baseDirectory = baseDirectory

  @logger = LogManager.getLogger("peerclient")

  # Overall maximum number of peers (connected + disconnected)
  @maxPeerCount = 120
  # Number of peers we ideally want to try and be downloading/uploading with
  @targetActivePeerCount = 50
  @targetUnchokedPeerCount = 4
  @managePeersPeriod = 10 # Defined in bittorrent spec. Only unchoke peers every 10 seconds.
  @requestBlocksPeriod = 1
  @handshakeTimeout = 1
  @requestTimeout = 60
  @endgameBlockThreshold = 20
end

Public Instance Methods

addTrackerClient(infoHash, info, trackerclient) click to toggle source

Add a new tracker client. This effectively adds a new torrent to download. Returns the TorrentData object for the new torrent.

# File lib/quartz_torrent/peerclient.rb, line 285
def addTrackerClient(infoHash, info, trackerclient)
  raise "There is already a tracker registered for torrent #{QuartzTorrent.bytesToHex(infoHash)}" if @torrentData.has_key? infoHash
  torrentData = TorrentData.new(infoHash, info, trackerclient)
  trackerclient.alarms = torrentData.alarms
  @torrentData[infoHash] = torrentData
  torrentData.info = info
  torrentData.state = :initializing

  queue(torrentData)
  dequeue      

  torrentData
end
adjustBytesDownloaded(infoHash, adjustment) click to toggle source

Adjust the bytesDownloaded property of the specified torrent by the passed amount. Adjustment should be an integer. It is added to the current bytesDownloaded amount.

# File lib/quartz_torrent/peerclient.rb, line 404
def adjustBytesDownloaded(infoHash, adjustment)
  torrentData = @torrentData[infoHash]
  if ! torrentData
    @logger.warn "Asked to adjust uploaded bytes for a non-existent torrent #{QuartzTorrent.bytesToHex(infoHash)}"
    return
  end
  
  runInReactorThread do
    torrentData.bytesDownloaded += adjustment
    torrentData.bytesDownloadedDataOnly += adjustment
  end
end
adjustBytesUploaded(infoHash, adjustment) click to toggle source

Adjust the bytesUploaded property of the specified torrent by the passed amount. Adjustment should be an integer. It is added to the current bytesUploaded amount.

# File lib/quartz_torrent/peerclient.rb, line 389
def adjustBytesUploaded(infoHash, adjustment)
  torrentData = @torrentData[infoHash]
  if ! torrentData
    @logger.warn "Asked to adjust uploaded bytes for a non-existent torrent #{QuartzTorrent.bytesToHex(infoHash)}"
    return
  end
  
  runInReactorThread do
    torrentData.bytesUploaded += adjustment
    torrentData.bytesUploadedDataOnly += adjustment
  end
end
clientInit(peer) click to toggle source

Reactor method called when we have connected to a peer.

# File lib/quartz_torrent/peerclient.rb, line 553
def clientInit(peer)
  # We connected to a peer
  # Send handshake
  torrentData = @torrentData[peer.infoHash]
  if ! torrentData
    @logger.warn "No tracker client found for peer #{peer}. Closing connection."
    close
    return
  end
  trackerclient = torrentData.trackerClient

  @logger.info "Connected to peer #{peer}. Sending handshake."
  msg = PeerHandshake.new
  msg.peerId = trackerclient.peerId
  msg.infoHash = peer.infoHash
  msg.serializeTo currentIo
  peer.state = :handshaking
  @reactor.scheduleTimer(@handshakeTimeout, [:handshake_timeout, peer], false)
  @logger.debug "Done sending handshake."

  # Send bitfield
  sendBitfield(currentIo, torrentData.blockState.completePieceBitfield) if torrentData.blockState

  setReadRateLimit(torrentData.downRateLimit) if torrentData.downRateLimit
  setWriteRateLimit(torrentData.upRateLimit) if torrentData.upRateLimit
end
error(peer, details) click to toggle source

Reactor method called when an IO error occurs.

# File lib/quartz_torrent/peerclient.rb, line 714
def error(peer, details)
  # If a peer closes the connection during handshake before we determine their id, we don't have a completed
  # Peer object yet. In this case the peer parameter is the symbol :listener_socket
  if peer == :listener_socket
    @logger.info "Error with handshaking peer: #{details}. Closing connection."
  else
    @logger.info "Error with peer #{peer}: #{details}. Closing connection."
    setPeerDisconnected(peer)
  end
  # Close connection
  close
end
getDelegateTorrentData(infoHash = nil) click to toggle source

Get a hash of new TorrentDataDelegate objects keyed by torrent infohash. This method is meant to be called from a different thread than the one the reactor is running in. This method is not immediate but blocks until the data is prepared. If infoHash is passed, only that torrent data is returned (still in a hashtable; just one entry)

# File lib/quartz_torrent/peerclient.rb, line 422
def getDelegateTorrentData(infoHash = nil)
  # Use an immediate, non-recurring timer.
  result = {}
  return result if stopped?
  semaphore = Semaphore.new
  timer = @reactor.scheduleTimer(0, [:get_torrent_data, result, semaphore, infoHash], false, true)
  if semaphore.wait(3)
    result
  else
    @logger.warn "getDelegateTorrentData: Waiting on semaphore timed out"
    throw "Waiting on semaphore for timer #{timer.object_id} timed out"
  end
end
recvData(peer) click to toggle source

Reactor method called when there is data ready to be read from a socket

# File lib/quartz_torrent/peerclient.rb, line 581
def recvData(peer)
  msg = nil

  @logger.debug "Got data from peer #{peer}"

  if peer.state == :handshaking
    # Read handshake message
    begin
      @logger.debug "Reading handshake from #{peer}"
      msg = PeerHandshake.unserializeFrom currentIo
    rescue
      @logger.warn "Peer #{peer} failed handshake: #{$!}"
      setPeerDisconnected(peer)
      close
      return
    end
  else
    begin
      @logger.debug "Reading wire-message from #{peer}"
      msg = peer.peerMsgSerializer.unserializeFrom currentIo
      #msg = PeerWireMessage.unserializeFrom currentIo
    rescue EOFError
      @logger.info "Peer #{peer} disconnected."
      setPeerDisconnected(peer)
      close
      return
    rescue
      @logger.warn "Unserializing message from peer #{peer} failed: #{$!}"
      @logger.warn $!.backtrace.join "\n"
      setPeerDisconnected(peer)
      close
      return
    end

    peer.updateUploadRate msg
    torrentData = @torrentData[peer.infoHash]
    torrentData.bytesDownloaded += msg.length if torrentData
    @logger.debug "Peer #{peer} upload rate: #{peer.uploadRate.value}  data only: #{peer.uploadRateDataOnly.value}"
  end


  if msg.is_a? PeerHandshake
    # This is a remote peer that we connected to returning our handshake.
    processHandshake(msg, peer)
    peer.state = :established
    peer.amChoked = true
    peer.peerChoked = true
    peer.amInterested = false
    peer.peerInterested = false
  elsif msg.is_a? BitfieldMessage
    @logger.debug "Received bitfield message from peer."
    handleBitfield(msg, peer)
  elsif msg.is_a? Unchoke
    @logger.debug "Received unchoke message from peer."
    peer.amChoked = false
  elsif msg.is_a? Choke
    @logger.debug "Received choke message from peer."
    peer.amChoked = true
  elsif msg.is_a? Interested
    @logger.debug "Received interested message from peer."
    peer.peerInterested = true
  elsif msg.is_a? Uninterested
    @logger.debug "Received uninterested message from peer."
    peer.peerInterested = false
  elsif msg.is_a? Piece
    @logger.debug "Received piece message from peer for torrent #{QuartzTorrent.bytesToHex(peer.infoHash)}: piece #{msg.pieceIndex} offset #{msg.blockOffset} length #{msg.data.length}."
    handlePieceReceive(msg, peer)
  elsif msg.is_a? Request
    @logger.debug "Received request message from peer for torrent #{QuartzTorrent.bytesToHex(peer.infoHash)}: piece #{msg.pieceIndex} offset #{msg.blockOffset} length #{msg.blockLength}."
    handleRequest(msg, peer)
  elsif msg.is_a? Have
    @logger.debug "Received have message from peer for torrent #{QuartzTorrent.bytesToHex(peer.infoHash)}: piece #{msg.pieceIndex}"
    handleHave(msg, peer)
  elsif msg.is_a? KeepAlive
    @logger.debug "Received keep alive message from peer."
  elsif msg.is_a? ExtendedHandshake
    @logger.debug "Received extended handshake message from peer."
    handleExtendedHandshake(msg, peer)
  elsif msg.is_a? ExtendedMetaInfo
    @logger.debug "Received extended metainfo message from peer."
    handleExtendedMetainfo(msg, peer)
  else
    @logger.warn "Received a #{msg.class} message but handler is not implemented"
  end
end
removeTorrent(infoHash, deleteFiles = false) click to toggle source

Remove a torrent.

# File lib/quartz_torrent/peerclient.rb, line 300
def removeTorrent(infoHash, deleteFiles = false)
  # Can't do this right now, since it could be in use by an event handler. Use an immediate, non-recurring timer instead.
  @logger.info "#{QuartzTorrent.bytesToHex(infoHash)}: Scheduling immediate timer to remove torrent. #{deleteFiles ? "Will" : "Wont"} delete downloaded files."
  @reactor.scheduleTimer(0, [:removetorrent, infoHash, deleteFiles], false, true)
end
serverInit(metadata, addr, port) click to toggle source

Reactor method called when a peer has connected to us.

# File lib/quartz_torrent/peerclient.rb, line 450
def serverInit(metadata, addr, port)
  # A peer connected to us
  # Read handshake message
  @logger.warn "Peer connection from #{addr}:#{port}"
  begin
    msg = PeerHandshake.unserializeExceptPeerIdFrom currentIo
  rescue
    @logger.warn "Peer failed handshake: #{$!}"
    close
    return
  end

  torrentData = torrentDataForHandshake(msg, "#{addr}:#{port}")
  # Are we tracking this torrent?
  if !torrentData
    @logger.warn "Peer sent handshake for unknown torrent"
    close
    return 
  end
  trackerclient = torrentData.trackerClient

  # If we already have too many connections, don't allow this connection.
  classifiedPeers = ClassifiedPeers.new torrentData.peers.all
  if classifiedPeers.establishedPeers.length > @targetActivePeerCount
    @logger.warn "Closing connection to peer from #{addr}:#{port} because we already have #{classifiedPeers.establishedPeers.length} active peers which is > the target count of #{@targetActivePeerCount} "
    close
    return 
  end  

  # Send handshake
  outgoing = PeerHandshake.new
  outgoing.peerId = trackerclient.peerId
  outgoing.infoHash = torrentData.infoHash
  outgoing.serializeTo currentIo

  # Send extended handshake if the peer supports extensions
  if (msg.reserved.unpack("C8")[5] & 0x10) != 0
    @logger.warn "Peer supports extensions. Sending extended handshake"
    extended = Extension.createExtendedHandshake torrentData.info
    extended.serializeTo currentIo
  end
 
  # Read incoming handshake's peerid
  msg.peerId = currentIo.read(PeerHandshake::PeerIdLen)

  if msg.peerId == trackerclient.peerId
    @logger.info "We got a connection from ourself. Closing connection."
    close
    return
  end
 
  peer = nil
  peers = torrentData.peers.findById(msg.peerId)
  if peers
    peers.each do |existingPeer|
      if existingPeer.state != :disconnected
        @logger.warn "Peer with id #{msg.peerId} created a new connection when we already have a connection in state #{existingPeer.state}. Closing new connection."
        close
        return
      else
        if existingPeer.trackerPeer.ip == addr && existingPeer.trackerPeer.port == port
          peer = existingPeer
        end
      end
    end
  end

  if ! peer
    peer = Peer.new(TrackerPeer.new(addr, port))
    updatePeerWithHandshakeInfo(torrentData, msg, peer)
    torrentData.peers.add peer
    if ! peers
      @logger.warn "Unknown peer with id #{msg.peerId} connected."
    else
      @logger.warn "Known peer with id #{msg.peerId} connected from new location."
    end
  else
    @logger.warn "Known peer with id #{msg.peerId} connected from known location."
  end

  @logger.info "Peer #{peer} connected to us. "

  peer.state = :established
  peer.amChoked = true
  peer.peerChoked = true
  peer.amInterested = false
  peer.peerInterested = false
  if torrentData.info
    peer.bitfield = Bitfield.new(torrentData.info.pieces.length)
  else
    peer.bitfield = EmptyBitfield.new
    @logger.info "We have no metainfo yet, so setting peer #{peer} to have an EmptyBitfield"
  end

  # Send bitfield
  sendBitfield(currentIo, torrentData.blockState.completePieceBitfield) if torrentData.blockState

  setMetaInfo(peer)
  setReadRateLimit(torrentData.downRateLimit) if torrentData.downRateLimit
  setWriteRateLimit(torrentData.upRateLimit) if torrentData.upRateLimit
end
setDownloadRateLimit(infoHash, bytesPerSecond) click to toggle source

Set the download rate limit. Pass nil as the bytesPerSecond to disable the limit.

# File lib/quartz_torrent/peerclient.rb, line 314
def setDownloadRateLimit(infoHash, bytesPerSecond)
  torrentData = @torrentData[infoHash]
  if ! torrentData
    @logger.warn "Asked to set download rate limit for a non-existent torrent #{QuartzTorrent.bytesToHex(infoHash)}"
    return
  end
 
  if bytesPerSecond
    if ! torrentData.downRateLimit
      torrentData.downRateLimit = RateLimit.new(bytesPerSecond, 2*bytesPerSecond, 0)
    else
      torrentData.downRateLimit.unitsPerSecond = bytesPerSecond
    end
  else
    torrentData.downRateLimit = nil
  end

  torrentData.peers.all.each do |peer|
    withPeersIo(peer, "setting download rate limit") do |io|
      io.readRateLimit = torrentData.downRateLimit
    end
  end
 
end
setPaused(infoHash, value) click to toggle source

Pause or unpause the specified torrent.

# File lib/quartz_torrent/peerclient.rb, line 307
def setPaused(infoHash, value)
  # Can't do this right now, since it could be in use by an event handler. Use an immediate, non-recurring timer instead.
  @logger.info "#{QuartzTorrent.bytesToHex(infoHash)}: Scheduling immediate timer to #{value ? "pause" : "unpause"} torrent."
  @reactor.scheduleTimer(0, [:pausetorrent, infoHash, value], false, true)
end
setUploadDuration(infoHash, seconds) click to toggle source

Set the maximum amount of time (in seconds) that a torrent can be in the upload-only state before it is paused. Pass nil to disable.

# File lib/quartz_torrent/peerclient.rb, line 377
def setUploadDuration(infoHash, seconds)
  torrentData = @torrentData[infoHash]
  if ! torrentData
    @logger.warn "Asked to set upload duration for a non-existent torrent #{QuartzTorrent.bytesToHex(infoHash)}"
    return
  end

  torrentData.uploadDuration = seconds
end
setUploadRateLimit(infoHash, bytesPerSecond) click to toggle source

Set the upload rate limit. Pass nil as the bytesPerSecond to disable the limit.

# File lib/quartz_torrent/peerclient.rb, line 340
def setUploadRateLimit(infoHash, bytesPerSecond)
  torrentData = @torrentData[infoHash]
  if ! torrentData
    @logger.warn "Asked to set upload rate limit for a non-existent torrent #{QuartzTorrent.bytesToHex(infoHash)}"
    return
  end

  if bytesPerSecond
    if ! torrentData.upRateLimit
      torrentData.upRateLimit = RateLimit.new(bytesPerSecond, 2*bytesPerSecond, 0)
    else
      torrentData.upRateLimit.unitsPerSecond = bytesPerSecond
    end
  else
    torrentData.upRateLimit = nil
  end

  torrentData.peers.all.each do |peer|
    withPeersIo(peer, "setting upload rate limit") do |io|
      io.writeRateLimit = torrentData.upRateLimit
    end
  end
end
setUploadRatio(infoHash, ratio) click to toggle source

Set the upload ratio. Pass nil to disable

# File lib/quartz_torrent/peerclient.rb, line 365
def setUploadRatio(infoHash, ratio)
  torrentData = @torrentData[infoHash]
  if ! torrentData
    @logger.warn "Asked to set upload ratio limit for a non-existent torrent #{QuartzTorrent.bytesToHex(infoHash)}"
    return
  end

  torrentData.ratio = ratio
end
timerExpired(metadata) click to toggle source

Reactor method called when a scheduled timer expires.

# File lib/quartz_torrent/peerclient.rb, line 668
def timerExpired(metadata)
  if metadata.is_a?(Array) && metadata[0] == :manage_peers
    managePeers(metadata[1])
  elsif metadata.is_a?(Array) && metadata[0] == :request_blocks
    requestBlocks(metadata[1])
  elsif metadata.is_a?(Array) && metadata[0] == :check_piece_manager
    checkPieceManagerResults(metadata[1])
  elsif metadata.is_a?(Array) && metadata[0] == :handshake_timeout
    handleHandshakeTimeout(metadata[1])
  elsif metadata.is_a?(Array) && metadata[0] == :removetorrent
    handleRemoveTorrent(metadata[1], metadata[2])
  elsif metadata.is_a?(Array) && metadata[0] == :pausetorrent
    handlePause(metadata[1], metadata[2])
  elsif metadata.is_a?(Array) && metadata[0] == :get_torrent_data
    @torrentData.each do |k,v|
      begin
        if metadata[3].nil? || k == metadata[3]
          v = TorrentDataDelegate.new(v, self)
          metadata[1][k] = v
        end
      rescue
        @logger.error "Error building torrent data response for user: #{$!}"
        @logger.error "#{$!.backtrace.join("\n")}"
      end
    end
    metadata[2].signal
  elsif metadata.is_a?(Array) && metadata[0] == :update_torrent_data
    delegate = metadata[1]
    if ! @torrentData.has_key?(infoHash)
      delegate.state = :deleted 
    else
      delegate.internalRefresh
    end
    metadata[2].signal
  elsif metadata.is_a?(Array) && metadata[0] == :request_metadata_pieces
    requestMetadataPieces(metadata[1])
  elsif metadata.is_a?(Array) && metadata[0] == :check_metadata_piece_manager
    checkMetadataPieceManagerResults(metadata[1])
  elsif metadata.is_a?(Array) && metadata[0] == :runproc
    metadata[1].call
  else
    @logger.info "Unknown timer #{metadata} expired."
  end
end
updateDelegateTorrentData(delegate) click to toggle source

Update the data stored in a TorrentDataDelegate to the latest information.

# File lib/quartz_torrent/peerclient.rb, line 437
def updateDelegateTorrentData(delegate)
  return if stopped?
  # Use an immediate, non-recurring timer.
  semaphore = Semaphore.new
  @reactor.scheduleTimer(0, [:update_torrent_data, delegate, semaphore], false, true)
  semaphore.wait
  result
end

Private Instance Methods

checkMetadataPieceManagerResults(infoHash) click to toggle source
# File lib/quartz_torrent/peerclient.rb, line 1039
def checkMetadataPieceManagerResults(infoHash)
  torrentData = @torrentData[infoHash]
  if ! torrentData
    @logger.error "Check metadata piece manager results: data for torrent #{QuartzTorrent.bytesToHex(infoHash)} not found."
    return
  end

  # We may not have completed the extended handshake with the peer which specifies the torrent size.
  # In this case torrentData.metainfoPieceState is not yet set.
  return if ! torrentData.metainfoPieceState

  results = torrentData.metainfoPieceState.checkResults
  results.each do |result|
    metaData = torrentData.pieceManagerMetainfoRequestMetadata.delete(result.requestId)
    if ! metaData
      @logger.error "#{QuartzTorrent.bytesToHex(infoHash)}: Can't find metadata for PieceManager request #{result.requestId}"
      next
    end

    if metaData.type == :read && result.successful?
      # Send the piece to the peer.
      msg = ExtendedMetaInfo.new
      msg.msgType = :piece
      msg.piece = metaData.data.requestMsg.piece
      msg.data = result.data
      withPeersIo(metaData.data.peer, "sending extended metainfo piece message") do |io|
        @logger.debug "#{QuartzTorrent.bytesToHex(infoHash)}: Sending metainfo piece to #{metaData.data.peer}: piece #{msg.piece} with data length #{msg.data.length}"
        sendMessageToPeer msg, io, metaData.data.peer
      end
      result.data
    end
  end

  if torrentData.metainfoPieceState.complete? && torrentData.state == :downloading_metainfo
    @logger.info "#{QuartzTorrent.bytesToHex(infoHash)}: Obtained all pieces of metainfo. Will begin checking existing pieces."
    torrentData.metainfoPieceState.flush
    # We don't need to download metainfo anymore.
    cancelTimer torrentData.metainfoRequestTimer if torrentData.metainfoRequestTimer
    info = MetainfoPieceState.downloaded(@baseDirectory, torrentData.infoHash)
    if info
      torrentData.info = info
      startCheckingPieces torrentData
    else
      @logger.error "#{QuartzTorrent.bytesToHex(infoHash)}: Metadata download is complete but reading the metadata failed"
      torrentData.state = :error
    end
  end
end
checkPieceManagerResults(infoHash) click to toggle source
# File lib/quartz_torrent/peerclient.rb, line 1220
def checkPieceManagerResults(infoHash)
  torrentData = @torrentData[infoHash]
  if ! torrentData
    @logger.error "Request blocks peers: tracker client for torrent #{QuartzTorrent.bytesToHex(infoHash)} not found."
    return
  end
 
  while true
    result = torrentData.pieceManager.nextResult
    break if ! result

    metaData = torrentData.pieceManagerRequestMetadata.delete(result.requestId)
    if ! metaData
      @logger.error "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Can't find metadata for PieceManager request #{result.requestId}"
      next
    end
  
    if metaData.type == :write
      if result.successful?
        @logger.debug "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Block written to disk. "
        # Block successfully written!
        torrentData.blockState.setBlockCompleted metaData.data.pieceIndex, metaData.data.blockOffset, true do |pieceIndex|
          # The peice is completed! Check hash.
          @logger.debug "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Piece #{pieceIndex} is complete. Checking hash. "
          id = torrentData.pieceManager.checkPieceHash(metaData.data.pieceIndex)
          torrentData.pieceManagerRequestMetadata[id] = PieceManagerRequestMetadata.new(:hash, metaData.data.pieceIndex)
        end
      else
        # Block failed! Clear completed and requested state.
        torrentData.blockState.setBlockCompleted metaData.data.pieceIndex, metaData.data.blockOffset, false
        @logger.error "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Writing block failed: #{result.error}"
      end
    elsif metaData.type == :read
      if result.successful?
        readRequestMetadata = metaData.data
        peer = readRequestMetadata.peer
        withPeersIo(peer, "sending piece message") do |io|
          msg = Piece.new
          msg.pieceIndex = readRequestMetadata.requestMsg.pieceIndex
          msg.blockOffset = readRequestMetadata.requestMsg.blockOffset
          msg.data = result.data
          @logger.debug "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Sending block to #{peer}: piece #{msg.pieceIndex} offset #{msg.blockOffset} length #{msg.data.length}"
          sendMessageToPeer msg, io, peer
          torrentData.bytesUploadedDataOnly += msg.data.length
          @logger.debug "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Sending piece to peer"
        end
      else
        @logger.error "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Reading block failed: #{result.error}"
      end
    elsif metaData.type == :hash
      if result.successful?
        @logger.debug "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Hash of piece #{metaData.data} is correct"
        sendHaves(torrentData, metaData.data)
        sendUninterested(torrentData)
      else
        @logger.info "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Hash of piece #{metaData.data} is incorrect. Marking piece as not complete."
        torrentData.blockState.setPieceCompleted metaData.data, false
      end
    elsif metaData.type == :check_existing
      handleCheckExistingResult(torrentData, result)
    end
  end
end
dequeue() click to toggle source

Dequeue any torrents that can now run based on available space

# File lib/quartz_torrent/peerclient.rb, line 1666
def dequeue
  torrents = @torrentQueue.dequeue(@torrentData.values)
  torrents.each do |torrentData|
    if torrentData.state == :initializing
      initTorrent torrentData
    else
      setFrozen torrentData, false if ! torrentData.paused
    end
  end
end
getPeersFromTracker(torrentData, infoHash) click to toggle source

Update our internal peer list for this torrent from the tracker client

# File lib/quartz_torrent/peerclient.rb, line 1516
def getPeersFromTracker(torrentData, infoHash)
  addPeer = Proc.new do |trackerPeer|
    peer = Peer.new(trackerPeer)
    peer.infoHash = infoHash
    torrentData.peers.add peer
    true
  end

  classifiedPeers = nil
  replaceDisconnectedPeer = Proc.new do |trackerPeer|
    classifiedPeers = ClassifiedPeers.new(torrentData.peers.all) if ! classifiedPeers
  
    if classifiedPeers.disconnectedPeers.size > 0
      torrentData.peers.delete classifiedPeers.disconnectedPeers.pop
      addPeer.call trackerPeer
      true
    else
      false
    end
  end

  trackerclient = torrentData.trackerClient 

  addProc = addPeer
  flipped = false
  trackerclient.peers.each do |p|
    if ! flipped && torrentData.peers.size >= @maxPeerCount
      addProc = replaceDisconnectedPeer
      flipped = true
    end
  
    # Don't treat ourself as a peer.
    next if p.id && p.id == trackerclient.peerId
        
    if ! torrentData.peers.findByAddr(p.ip, p.port)
      @logger.debug "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Adding tracker peer #{p} to peers list"
      break if ! addProc.call(p)
    end
  end
end
handleBitfield(msg, peer) click to toggle source
# File lib/quartz_torrent/peerclient.rb, line 1159
def handleBitfield(msg, peer)
  torrentData = @torrentData[peer.infoHash]
  if ! torrentData
    @logger.error "Bitfield: torrent data for torrent #{QuartzTorrent.bytesToHex(peer.infoHash)} not found."
    return
  end

  peer.bitfield = msg.bitfield
  if torrentData.info
    peer.bitfield.length = torrentData.info.pieces.length
  else
    @logger.warn "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: A peer connected and sent a bitfield but we don't know the length of the torrent yet. Assuming number of pieces is divisible by 8"
  end

  if ! torrentData.blockState
    @logger.warn "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Bitfield: no blockstate yet."
    return
  end

  # If we are interested in something from this peer, let them know.
  needed = torrentData.blockState.completePieceBitfield.compliment
  needed.intersection!(peer.bitfield)
  if ! needed.allClear?
    if ! peer.amInterested
      @logger.debug "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Need some pieces from peer #{peer} so sending Interested message"
      msg = Interested.new
      sendMessageToPeer msg, currentIo, peer
      peer.amInterested = true
    end
  end
end
handleCheckExistingResult(torrentData, pieceManagerResult) click to toggle source

Handle the result of the PieceManager’s checkExisting (check which pieces we already have) operation. If the resukt is successful, this begins the actual download.

# File lib/quartz_torrent/peerclient.rb, line 1286
def handleCheckExistingResult(torrentData, pieceManagerResult)
  if pieceManagerResult.successful?
    existingBitfield = pieceManagerResult.data
    @logger.info "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: We already have #{existingBitfield.countSet}/#{existingBitfield.length} pieces." 

    info = torrentData.info
   
    torrentData.blockState = BlockState.new(info, existingBitfield)

    @logger.info "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Starting torrent. Information:"
    @logger.info "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}:   piece length:     #{info.pieceLen}"
    @logger.info "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}:   number of pieces: #{info.pieces.size}"
    @logger.info "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}:   total length      #{info.dataLength}"

    startDownload torrentData
  else
    @logger.info "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Checking existing pieces of torrent failed: #{pieceManagerResult.error}"
    torrentData.state = :error
  end
end
handleExtendedHandshake(msg, peer) click to toggle source
# File lib/quartz_torrent/peerclient.rb, line 1390
def handleExtendedHandshake(msg, peer)
  torrentData = @torrentData[peer.infoHash]
  if ! torrentData
    @logger.error "Extended Handshake: torrent data for torrent #{QuartzTorrent.bytesToHex(peer.infoHash)} not found."
    return
  end

  metadataSize = msg.dict['metadata_size']
  if metadataSize
    # This peer knows the size of the metadata. If we haven't created our MetainfoPieceState yet, create it now.
    if ! torrentData.metainfoPieceState
      @logger.info "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Extended Handshake: Learned that metadata size is #{metadataSize}. Creating MetainfoPieceState"
      torrentData.metainfoPieceState = MetainfoPieceState.new(@baseDirectory, torrentData.infoHash, metadataSize)
    end
  end

end
handleExtendedMetainfo(msg, peer) click to toggle source
# File lib/quartz_torrent/peerclient.rb, line 1408
def handleExtendedMetainfo(msg, peer)
  torrentData = @torrentData[peer.infoHash]
  if ! torrentData
    @logger.error "Extended Handshake: torrent data for torrent #{QuartzTorrent.bytesToHex(peer.infoHash)} not found."
    return
  end

  if msg.msgType == :request
    @logger.debug "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Got extended metainfo request for piece #{msg.piece}"
    # Build a response for this piece.
    if torrentData.metainfoPieceState.pieceCompleted? msg.piece
      @logger.debug "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Requesting extended metainfo piece #{msg.piece} from metainfoPieceState."
      id = torrentData.metainfoPieceState.readPiece msg.piece
      torrentData.pieceManagerMetainfoRequestMetadata[id] = 
        PieceManagerRequestMetadata.new(:read, ReadRequestMetadata.new(peer,msg))
    else
      reject = ExtendedMetaInfo.new
      reject.msgType = :reject
      reject.piece = msg.piece
      withPeersIo(peer, "sending extended metainfo reject message") do |io|
        @logger.debug "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Sending metainfo reject to #{peer}: piece #{msg.piece}"
        sendMessageToPeer reject, io, peer
      end
    end
  elsif msg.msgType == :piece
    @logger.debug "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Got extended metainfo piece response for piece #{msg.piece} with data length #{msg.data.length}"
    if ! torrentData.metainfoPieceState.pieceCompleted? msg.piece
      id = torrentData.metainfoPieceState.savePiece msg.piece, msg.data
      torrentData.pieceManagerMetainfoRequestMetadata[id] = 
        PieceManagerRequestMetadata.new(:write, msg)
    end
  elsif msg.msgType == :reject
    @logger.debug "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Got extended metainfo reject response for piece #{msg.piece}"
    # Mark this peer as bad.
    torrentData.metainfoPieceState.markPeerBad peer
    torrentData.metainfoPieceState.setPieceRequested(msg.piece, false)
  end
end
handleHandshakeTimeout(peer) click to toggle source
# File lib/quartz_torrent/peerclient.rb, line 818
def handleHandshakeTimeout(peer)
  if peer.state == :handshaking
    @logger.warn "Peer #{peer} failed handshake: handshake timed out after #{@handshakeTimeout} seconds."
    withPeersIo(peer, "handling handshake timeout") do |io|
      setPeerDisconnected(peer)
      close(io)
    end
  end
end
handleHave(msg, peer) click to toggle source
# File lib/quartz_torrent/peerclient.rb, line 1191
def handleHave(msg, peer)
  torrentData = @torrentData[peer.infoHash]
  if ! torrentData
    @logger.error "Have: torrent data for torrent #{QuartzTorrent.bytesToHex(peer.infoHash)} not found."
    return
  end

  if msg.pieceIndex >= peer.bitfield.length
    @logger.warn "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Peer #{peer} sent Have message with invalid piece index"
    return
  end

  # Update peer's bitfield
  peer.bitfield.set msg.pieceIndex

  if ! torrentData.blockState
    @logger.warn "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Have: no blockstate yet."
    return
  end

  # If we are interested in something from this peer, let them know.
  if ! torrentData.blockState.completePieceBitfield.set?(msg.pieceIndex)
    @logger.debug "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Peer #{peer} just got a piece we need so sending Interested message"
    msg = Interested.new
    sendMessageToPeer msg, currentIo, peer
    peer.amInterested = true
  end
end
handlePause(infoHash, value) click to toggle source

Pause or unpause a torrent that we are downloading.

# File lib/quartz_torrent/peerclient.rb, line 1629
def handlePause(infoHash, value)
  torrentData = @torrentData[infoHash]
  if ! torrentData
    @logger.warn "Asked to pause a non-existent torrent #{QuartzTorrent.bytesToHex(infoHash)}"
    return
  end

  return if torrentData.paused == value

  torrentData.paused = value

  if !value
    # On unpause, queue the torrent since there might not be room for it to run.
    # Make sure it goes to the head of the queue.
    queue(torrentData, :unshift)
  end

  setFrozen infoHash, value if ! torrentData.queued

  dequeue
end
handlePieceReceive(msg, peer) click to toggle source
# File lib/quartz_torrent/peerclient.rb, line 1088
def handlePieceReceive(msg, peer)
  torrentData = @torrentData[peer.infoHash]
  if ! torrentData
    @logger.error "Receive piece: torrent data for torrent #{QuartzTorrent.bytesToHex(peer.infoHash)} not found."
    return
  end

  if ! torrentData.blockState
    @logger.error "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Receive piece: no blockstate yet."
    return
  end

  blockInfo = torrentData.blockState.createBlockinfoByPieceResponse(msg.pieceIndex, msg.blockOffset, msg.data.length)

  if ! peer.requestedBlocks.has_key?(blockInfo.blockIndex) 
    @logger.debug "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Receive piece: we either didn't request this piece, or it was already received due to endgame strategy. Ignoring this message."
    return
  end

  if torrentData.blockState.blockCompleted?(blockInfo)
    @logger.debug "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Receive piece: we already have this block. Ignoring this message."
    return
  end
  peer.requestedBlocks.delete blockInfo.blockIndex

  # Block is marked as not requested when hash is confirmed
  torrentData.bytesDownloadedDataOnly += msg.data.length
  id = torrentData.pieceManager.writeBlock(msg.pieceIndex, msg.blockOffset, msg.data)
  torrentData.pieceManagerRequestMetadata[id] = PieceManagerRequestMetadata.new(:write, msg)

  if torrentData.isEndgame
    # Assume this block is correct. Send a Cancel message to all other peers from whom we requested
    # this piece.
    classifiedPeers = ClassifiedPeers.new torrentData.peers.all
    classifiedPeers.requestablePeers.each do |otherPeer|
      if otherPeer.requestedBlocks.has_key?(blockInfo.blockIndex) 
        withPeersIo(otherPeer, "when sending Cancel message") do |io|

          cancel = Cancel.new
          cancel.pieceIndex = msg.pieceIndex
          cancel.blockOffset = msg.blockOffset
          cancel.blockLength = msg.data.length
          sendMessageToPeer cancel, io, otherPeer
          @logger.debug "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Sending Cancel message to peer #{peer}"
        end
      end
    end
  end

end
handleRemoveTorrent(infoHash, deleteFiles) click to toggle source

Remove a torrent that we are downloading.

# File lib/quartz_torrent/peerclient.rb, line 1558
def handleRemoveTorrent(infoHash, deleteFiles)
  torrentData = @torrentData.delete infoHash
  if ! torrentData
    @logger.warn "Asked to remove a non-existent torrent #{QuartzTorrent.bytesToHex(infoHash)}"
    return
  end
  @logger.info "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Removing torrent.  #{deleteFiles ? "Will" : "Wont"} delete downloaded files."

  @logger.info "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Removing torrent: no torrentData.metainfoRequestTimer" if ! torrentData.metainfoRequestTimer
  @logger.info "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Removing torrent: no torrentData.managePeersTimer" if ! torrentData.managePeersTimer 
  @logger.info "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Removing torrent: no torrentData.checkMetadataPieceManagerTimer" if ! torrentData.checkMetadataPieceManagerTimer 
  @logger.info "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Removing torrent: no torrentData.checkPieceManagerTimer" if ! torrentData.checkPieceManagerTimer 
  @logger.info "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Removing torrent: no torrentData.requestBlocksTimer" if ! torrentData.requestBlocksTimer 


  # Stop all timers
  cancelTimer torrentData.metainfoRequestTimer if torrentData.metainfoRequestTimer
  cancelTimer torrentData.managePeersTimer if torrentData.managePeersTimer
  cancelTimer torrentData.checkMetadataPieceManagerTimer if torrentData.checkMetadataPieceManagerTimer
  cancelTimer torrentData.checkPieceManagerTimer if torrentData.checkPieceManagerTimer
  cancelTimer torrentData.requestBlocksTimer if torrentData.requestBlocksTimer

  torrentData.trackerClient.removePeersChangedListener(torrentData.peerChangeListener)

  # Remove all the peers for this torrent.
  torrentData.peers.all.each do |peer|
    if peer.state != :disconnected
      # Close socket
      withPeersIo(peer, "when removing torrent") do |io|
        setPeerDisconnected(peer)
        close(io)
        @logger.debug "Closing connection to peer #{peer}"
      end
    end
    torrentData.peers.delete peer
  end

  # Stop tracker client
  torrentData.trackerClient.stop if torrentData.trackerClient

  # Stop PieceManagers
  torrentData.pieceManager.stop if torrentData.pieceManager
  torrentData.metainfoPieceState.stop if torrentData.metainfoPieceState

  # Remove metainfo file if it exists
  begin
    torrentData.metainfoPieceState.remove if torrentData.metainfoPieceState
  rescue
    @logger.warn "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Deleting metainfo file for torrent #{QuartzTorrent.bytesToHex(infoHash)} failed: #{$!}"  
  end

  if deleteFiles
    if torrentData.info
      begin
        path = @baseDirectory + File::SEPARATOR + torrentData.info.name
        if File.exists? path
          FileUtils.rm_r path
          @logger.info "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Deleted #{path}"
        else
          @logger.warn "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Deleting '#{path}' for torrent #{QuartzTorrent.bytesToHex(infoHash)} failed: #{$!}"  
        end
      rescue
        @logger.warn "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: When removing torrent, deleting '#{path}' failed because it doesn't exist"  
      end
    end
  end

  dequeue
end
handleRequest(msg, peer) click to toggle source
# File lib/quartz_torrent/peerclient.rb, line 1139
def handleRequest(msg, peer)
  if peer.peerChoked
    @logger.warn "Request piece: peer #{peer} requested a block when they are choked."
    return
  end

  torrentData = @torrentData[peer.infoHash]
  if ! torrentData
    @logger.error "Request piece: torrent data for torrent #{QuartzTorrent.bytesToHex(peer.infoHash)} not found."
    return
  end
  if msg.blockLength <= 0
    @logger.error "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Request piece: peer requested block of length #{msg.blockLength} which is invalid."
    return
  end

  id = torrentData.pieceManager.readBlock(msg.pieceIndex, msg.blockOffset, msg.blockLength)
  torrentData.pieceManagerRequestMetadata[id] = PieceManagerRequestMetadata.new(:read, ReadRequestMetadata.new(peer,msg))
end
initTorrent(torrentData) click to toggle source

Take a torrent that is in the :initializing state and make it go.

# File lib/quartz_torrent/peerclient.rb, line 1335
def initTorrent(torrentData)
  # If we already have the metainfo info for this torrent, we can begin checking the pieces.
  # If we don't have the metainfo info then we need to get the metainfo first.
  if ! torrentData.info
    torrentData.info = MetainfoPieceState.downloaded(@baseDirectory, torrentData.infoHash)
  end

  if torrentData.info
    startCheckingPieces torrentData
  else
    # Request the metainfo from peers.
    torrentData.state = :downloading_metainfo

    @logger.info "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Downloading metainfo"

    # Schedule peer connection management. Recurring and immediate
    torrentData.managePeersTimer = 
      @reactor.scheduleTimer(@managePeersPeriod, [:manage_peers, torrentData.infoHash], true, true)

    # Schedule a timer for requesting metadata pieces from peers.
    torrentData.metainfoRequestTimer = 
      @reactor.scheduleTimer(@requestBlocksPeriod, [:request_metadata_pieces, torrentData.infoHash], true, false)

    # Schedule checking for metainfo PieceManager results (including when piece reading completes)
    torrentData.checkMetadataPieceManagerTimer =
      @reactor.scheduleTimer(@requestBlocksPeriod, [:check_metadata_piece_manager, torrentData.infoHash], true, false)
  end
end
managePeers(infoHash) click to toggle source
# File lib/quartz_torrent/peerclient.rb, line 828
def managePeers(infoHash)
  torrentData = @torrentData[infoHash]
  if ! torrentData
    @logger.error "Manage peers: tracker client for torrent #{QuartzTorrent.bytesToHex(infoHash)} not found."
    return
  end

  return if torrentData.paused || torrentData.queued

  trackerclient = torrentData.trackerClient

  # Update our internal peer list for this torrent from the tracker client
  getPeersFromTracker(torrentData, infoHash)

  classifiedPeers = ClassifiedPeers.new torrentData.peers.all

  manager = torrentData.peerManager
  if ! manager
    @logger.error "#{QuartzTorrent.bytesToHex(infoHash)}: Manage peers: peer manager client for torrent #{QuartzTorrent.bytesToHex(infoHash)} not found."
    return
  end

  toConnect = manager.manageConnections(classifiedPeers)
  toConnect.each do |peer|
    @logger.debug "#{QuartzTorrent.bytesToHex(infoHash)}: Connecting to peer #{peer}"
    connect peer.trackerPeer.ip, peer.trackerPeer.port, peer
  end

  manageResult = manager.managePeers(classifiedPeers)
  manageResult.unchoke.each do |peer|
    @logger.debug "#{QuartzTorrent.bytesToHex(infoHash)}: Unchoking peer #{peer}"
    withPeersIo(peer, "unchoking peer") do |io|
      msg = Unchoke.new
      sendMessageToPeer msg, io, peer
      peer.peerChoked = false
    end
  end

  manageResult.choke.each do |peer|
    @logger.debug "#{QuartzTorrent.bytesToHex(infoHash)}: Choking peer #{peer}"
    withPeersIo(peer, "choking peer") do |io|
      msg = Choke.new
      sendMessageToPeer msg, io, peer
      peer.peerChoked = true
    end
  end

end
processHandshake(msg, peer) click to toggle source
# File lib/quartz_torrent/peerclient.rb, line 749
def processHandshake(msg, peer)
  torrentData = torrentDataForHandshake(msg, peer)
  # Are we tracking this torrent?
  return false if !torrentData

  if msg.peerId == torrentData.trackerClient.peerId
    @logger.info "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: We connected to ourself. Closing connection."
    peer.isUs = true
    close
    return
  end

  peers = torrentData.peers.findById(msg.peerId)
  if peers
    peers.each do |existingPeer|
      if existingPeer.state == :connected
        @logger.warn "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Peer with id #{msg.peerId} created a new connection when we already have a connection in state #{existingPeer.state}. Closing new connection."
        torrentData.peers.delete existingPeer
        setPeerDisconnected(peer)
        close
        return
      end
    end
  end

  trackerclient = torrentData.trackerClient

  updatePeerWithHandshakeInfo(torrentData, msg, peer)
  if torrentData.info
    peer.bitfield = Bitfield.new(torrentData.info.pieces.length)
  else
    peer.bitfield = EmptyBitfield.new
    @logger.info "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: We have no metainfo yet, so setting peer #{peer} to have an EmptyBitfield"
  end

  # Send extended handshake if the peer supports extensions
  if (msg.reserved.unpack("C8")[5] & 0x10) != 0
    @logger.warn "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Peer supports extensions. Sending extended handshake"
    extended = Extension.createExtendedHandshake torrentData.info
    extended.serializeTo currentIo
  end

  true
end
queue(torrentData, mode = :queue) click to toggle source

Queue a torrent

# File lib/quartz_torrent/peerclient.rb, line 1652
def queue(torrentData, mode = :queue)
  return if torrentData.queued
  
  # Queue the torrent
  if mode == :unshift
    @torrentQueue.unshift torrentData
  else
    @torrentQueue.push torrentData
  end

  setFrozen torrentData, true if ! torrentData.paused
end
requestBlocks(infoHash) click to toggle source
# File lib/quartz_torrent/peerclient.rb, line 877
def requestBlocks(infoHash)
  torrentData = @torrentData[infoHash]
  if ! torrentData
    @logger.error "Request blocks peers: tracker client for torrent #{QuartzTorrent.bytesToHex(infoHash)} not found."
    return
  end

  return if torrentData.paused || torrentData.queued

  classifiedPeers = ClassifiedPeers.new torrentData.peers.all

  if ! torrentData.blockState
    @logger.error "#{QuartzTorrent.bytesToHex(infoHash)}: Request blocks peers: no blockstate yet."
    return
  end

  if torrentData.state == :uploading && !torrentData.paused
    if torrentData.ratio
      if torrentData.bytesUploadedDataOnly >= torrentData.ratio*torrentData.blockState.totalLength
        @logger.info "#{QuartzTorrent.bytesToHex(infoHash)}: Pausing torrent due to upload ratio limit." if torrentData.metainfoPieceState.complete?
        setPaused(infoHash, true)
        return
      end
    end
    if torrentData.uploadDuration && torrentData.downloadCompletedTime
      if Time.new > torrentData.downloadCompletedTime + torrentData.uploadDuration
        @logger.info "#{QuartzTorrent.bytesToHex(infoHash)}: Pausing torrent due to upload duration being reached." if torrentData.metainfoPieceState.complete?
        setPaused(infoHash, true)
        return
      end
    end
  end

  # Should we switch to endgame mode?
  if torrentData.state == :running && !torrentData.isEndgame
    blocks = torrentData.blockState.completeBlockBitfield
    set = blocks.countSet
    if set >= blocks.length - @endgameBlockThreshold && set < blocks.length
      @logger.info "#{QuartzTorrent.bytesToHex(infoHash)}: Entering endgame mode: blocks #{set}/#{blocks.length} complete."
      torrentData.isEndgame = true
    end
  elsif torrentData.isEndgame && torrentData.state != :running
    torrentData.isEndgame = false
  end

  # Delete any timed-out requests.
  classifiedPeers.establishedPeers.each do |peer|
    toDelete = []
    peer.requestedBlocks.each do |blockIndex, requestTime|
      toDelete.push blockIndex if (Time.new - requestTime) > @requestTimeout
    end
    toDelete.each do |blockIndex|
      @logger.debug "#{QuartzTorrent.bytesToHex(infoHash)}: Block #{blockIndex} request timed out."
      blockInfo = torrentData.blockState.createBlockinfoByBlockIndex(blockIndex)
      torrentData.blockState.setBlockRequested blockInfo, false
      peer.requestedBlocks.delete blockIndex
    end
  end

  # Update the allowed pending requests based on how well the peer did since last time.
  classifiedPeers.establishedPeers.each do |peer|
    if peer.requestedBlocksSizeLastPass
      if peer.requestedBlocksSizeLastPass == peer.maxRequestedBlocks
        downloaded = peer.requestedBlocksSizeLastPass - peer.requestedBlocks.size
        if downloaded > peer.maxRequestedBlocks*8/10
          peer.maxRequestedBlocks = peer.maxRequestedBlocks * 12 / 10
        elsif downloaded == 0
          peer.maxRequestedBlocks = peer.maxRequestedBlocks * 8 / 10
        end
        peer.maxRequestedBlocks = 10 if peer.maxRequestedBlocks < 10
      end
    end
  end

  # Request blocks
  blockInfos = torrentData.blockState.findRequestableBlocks(classifiedPeers, 100)
  blockInfos.each do |blockInfo|

    peersToRequest = []
    if torrentData.isEndgame
      # Since we are in endgame mode, request blocks from all elegible peers
      elegiblePeers = blockInfo.peers.find_all{ |p| p.requestedBlocks.length < p.maxRequestedBlocks }

      peersToRequest.concat elegiblePeers
    else
      # Pick one of the peers that has the piece to download it from. Pick one of the
      # peers with the top 3 upload rates.
      elegiblePeers = blockInfo.peers.find_all{ |p| p.requestedBlocks.length < p.maxRequestedBlocks }.sort{ |a,b| b.uploadRate.value <=> a.uploadRate.value}
      random = elegiblePeers[rand(blockInfo.peers.size)]
      peer = elegiblePeers.first(3).push(random).shuffle.first
      next if ! peer
      peersToRequest.push peer
    end

    peersToRequest.each do |peer|
      withPeersIo(peer, "requesting block") do |io|
        if ! peer.amInterested
          # Let this peer know that I'm interested if I haven't yet.
          msg = Interested.new
          sendMessageToPeer msg, io, peer
          peer.amInterested = true
        end
        @logger.debug "#{QuartzTorrent.bytesToHex(infoHash)}: Requesting block from #{peer}: piece #{blockInfo.pieceIndex} offset #{blockInfo.offset} length #{blockInfo.length}"
        msg = blockInfo.getRequest
        sendMessageToPeer msg, io, peer
        torrentData.blockState.setBlockRequested blockInfo, true
        peer.requestedBlocks[blockInfo.blockIndex] = Time.new
      end
    end
  end

  if blockInfos.size == 0
    if torrentData.state != :uploading && torrentData.blockState.completePieceBitfield.allSet?
      @logger.info "#{QuartzTorrent.bytesToHex(infoHash)}: Download complete."
      torrentData.state = :uploading
      torrentData.downloadCompletedTime = Time.new

      dequeue
    end
  end

  classifiedPeers.establishedPeers.each { |peer| peer.requestedBlocksSizeLastPass = peer.requestedBlocks.length }
end
requestMetadataPieces(infoHash) click to toggle source

For a torrent where we don’t have the metainfo, request metainfo pieces from peers.

# File lib/quartz_torrent/peerclient.rb, line 1002
def requestMetadataPieces(infoHash)
  torrentData = @torrentData[infoHash]
  if ! torrentData
    @logger.error "Request metadata pices: torrent data for torrent #{QuartzTorrent.bytesToHex(infoHash)} not found."
    return
  end
  
  return if torrentData.paused || torrentData.queued

  # We may not have completed the extended handshake with the peer which specifies the torrent size.
  # In this case torrentData.metainfoPieceState is not yet set.
  return if ! torrentData.metainfoPieceState

  @logger.info "#{QuartzTorrent.bytesToHex(infoHash)}: Obtained all pieces of metainfo." if torrentData.metainfoPieceState.complete?

  pieces = torrentData.metainfoPieceState.findRequestablePieces
  classifiedPeers = ClassifiedPeers.new torrentData.peers.all
  peers = torrentData.metainfoPieceState.findRequestablePeers(classifiedPeers)
  
  if peers.size > 0
    # For now, just request all pieces from the first peer.
    pieces.each do |pieceIndex|
      msg = ExtendedMetaInfo.new
      msg.msgType = :request
      msg.piece = pieceIndex
      withPeersIo(peers.first, "requesting metadata piece") do |io|
        sendMessageToPeer msg, io, peers.first
        torrentData.metainfoPieceState.setPieceRequested(pieceIndex, true)
        @logger.debug "#{QuartzTorrent.bytesToHex(infoHash)}: Requesting metainfo piece from #{peers.first}: piece #{pieceIndex}"
      end
    end
  else
    @logger.error "#{QuartzTorrent.bytesToHex(infoHash)}: No peers found that have metadata."
  end

end
runInReactorThread(&block) click to toggle source

Run the passed block in the Reactor’s thread. This allows manipulation of torrent data without race conditions. This method works by scheduling a non-recurring, immediate timer in the reactor that on expiry runs the passed block.

# File lib/quartz_torrent/peerclient.rb, line 1713
def runInReactorThread(&block)
  @reactor.scheduleTimer(0, [:runproc, block], false, true)
end
sendBitfield(io, bitfield) click to toggle source
# File lib/quartz_torrent/peerclient.rb, line 1461
def sendBitfield(io, bitfield)
  if ! bitfield.allClear?
    @logger.debug "Sending bitfield of size #{bitfield.length}."
    msg = BitfieldMessage.new
    msg.bitfield = bitfield
    msg.serializeTo io
  end
end
sendHaves(torrentData, pieceIndex) click to toggle source
# File lib/quartz_torrent/peerclient.rb, line 1470
def sendHaves(torrentData, pieceIndex)
  @logger.debug "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Sending Have messages to all connected peers for piece #{pieceIndex}"
  torrentData.peers.all.each do |peer|
    next if peer.state != :established || peer.isUs
    withPeersIo(peer, "when sending Have message") do |io|
      msg = Have.new
      msg.pieceIndex = pieceIndex
      sendMessageToPeer msg, io, peer
    end
  end
end
sendMessageToPeer(msg, io, peer) click to toggle source
# File lib/quartz_torrent/peerclient.rb, line 1503
def sendMessageToPeer(msg, io, peer)
  peer.updateDownloadRate(msg)
  torrentData = @torrentData[peer.infoHash]
  torrentData.bytesUploaded += msg.length if torrentData

  begin
    peer.peerMsgSerializer.serializeTo(msg, io)
  rescue
    @logger.warn "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Sending message to peer #{peer} failed: #{$!.message}"
  end
end
sendUninterested(torrentData) click to toggle source
# File lib/quartz_torrent/peerclient.rb, line 1482
def sendUninterested(torrentData)
  # If we are no longer interested in peers once this piece has been completed, let them know
  return if ! torrentData.blockState
  needed = torrentData.blockState.completePieceBitfield.compliment
  
  classifiedPeers = ClassifiedPeers.new torrentData.peers.all
  classifiedPeers.establishedPeers.each do |peer|
    # Don't bother sending uninterested message if we are already uninterested.
    next if ! peer.amInterested || peer.isUs
    needFromPeer = needed.intersection(peer.bitfield)
    if needFromPeer.allClear?
      withPeersIo(peer, "when sending Uninterested message") do |io|
        msg = Uninterested.new
        sendMessageToPeer msg, io, peer
        peer.amInterested = false
        @logger.debug "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Sending Uninterested message to peer #{peer}"
      end
    end
  end
end
setFrozen(torrent, value) click to toggle source

Freeze or unfreeze a torrent. If value is true, then we disconnect from all peers for this torrent and forget the peers. If value is false, we start reconnecting to peers. Parameter torrent can be an infoHash or TorrentData

# File lib/quartz_torrent/peerclient.rb, line 1680
def setFrozen(torrent, value)
  torrentData = torrent
  if ! torrent.is_a?(TorrentData)
    torrentData = @torrentData[torrent]
    if ! torrentData
      @logger.warn "Asked to freeze a non-existent torrent #{QuartzTorrent.bytesToHex(torrent)}"
      return
    end
  end

  if value
    # Disconnect from all peers so we won't reply to any messages.
    torrentData.peers.all.each do |peer|
      if peer.state != :disconnected
        # Close socket
        withPeersIo(peer, "when removing torrent") do |io|
          setPeerDisconnected(peer)
          close(io)
        end
      end
      torrentData.peers.delete peer
    end
  else
    # Get our list of peers and start connecting right away
    # Non-recurring and immediate timer
    torrentData.managePeersTimer =
      @reactor.scheduleTimer(@managePeersPeriod, [:manage_peers, torrentData.infoHash], false, true)
  end
end
setPeerDisconnected(peer) click to toggle source

PRIVATE METHODS ################################################

# File lib/quartz_torrent/peerclient.rb, line 729
def setPeerDisconnected(peer)
  peer.state = :disconnected
  peer.uploadRate.reset
  peer.downloadRate.reset
  peer.uploadRateDataOnly.reset
  peer.downloadRateDataOnly.reset

  torrentData = @torrentData[peer.infoHash]
  # Are we tracking this torrent?
  if torrentData && torrentData.blockState
    # For any outstanding requests, mark that we no longer have requested them
    peer.requestedBlocks.each do |blockIndex, b|
      blockInfo = torrentData.blockState.createBlockinfoByBlockIndex(blockIndex)
      torrentData.blockState.setBlockRequested blockInfo, false
    end
    peer.requestedBlocks.clear
  end

end
startCheckingPieces(torrentData) click to toggle source

Start checking which pieces we already have downloaded. This method schedules the necessary timers and changes the state to :checking_pieces. When the pieces are finished being checked the actual download will begin. Preconditions: The torrentData object already has it’s info member set.

# File lib/quartz_torrent/peerclient.rb, line 1311
def startCheckingPieces(torrentData)
  torrentData.pieceManager = QuartzTorrent::PieceManager.new(@baseDirectory, torrentData.info)

  torrentData.state = :checking_pieces
  @logger.info "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Checking pieces of torrent #{QuartzTorrent.bytesToHex(torrentData.infoHash)} asynchronously."
  id = torrentData.pieceManager.findExistingPieces
  torrentData.pieceManagerRequestMetadata[id] = PieceManagerRequestMetadata.new(:check_existing, nil)

  if ! torrentData.metainfoPieceState
    torrentData.metainfoPieceState = MetainfoPieceState.new(@baseDirectory, torrentData.infoHash, nil, torrentData.info)
  end

  # Schedule checking for PieceManager results
  torrentData.checkPieceManagerTimer =
    @reactor.scheduleTimer(@requestBlocksPeriod, [:check_piece_manager, torrentData.infoHash], true, false)

  # Schedule checking for metainfo PieceManager results (including when piece reading completes)
  if ! torrentData.checkMetadataPieceManagerTimer
    torrentData.checkMetadataPieceManagerTimer =
      @reactor.scheduleTimer(@requestBlocksPeriod, [:check_metadata_piece_manager, torrentData.infoHash], true, false)
  end
end
startDownload(torrentData) click to toggle source

Start the actual torrent download. This method schedules the necessary timers and registers the necessary listeners and changes the state to :running. It is meant to be called after checking for existing pieces or downloading the torrent metadata (if this is a magnet link torrent)

# File lib/quartz_torrent/peerclient.rb, line 1367
def startDownload(torrentData)
  # Add a listener for when the tracker's peers change.
  torrentData.peerChangeListener = Proc.new do
    @logger.debug "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Managing peers on peer change event"
  
    # Non-recurring and immediate timer
    torrentData.managePeersTimer =
      @reactor.scheduleTimer(@managePeersPeriod, [:manage_peers, torrentData.infoHash], false, true)
  end
  torrentData.trackerClient.addPeersChangedListener torrentData.peerChangeListener

  # Schedule peer connection management. Recurring and immediate
  if ! torrentData.managePeersTimer 
    torrentData.managePeersTimer =
      @reactor.scheduleTimer(@managePeersPeriod, [:manage_peers, torrentData.infoHash], true, true)
  end

  # Schedule requesting blocks from peers. Recurring and not immediate
  torrentData.requestBlocksTimer =
    @reactor.scheduleTimer(@requestBlocksPeriod, [:request_blocks, torrentData.infoHash], true, false)
  torrentData.state = :running
end
torrentDataForHandshake(msg, peer) click to toggle source
# File lib/quartz_torrent/peerclient.rb, line 794
def torrentDataForHandshake(msg, peer)
  torrentData = @torrentData[msg.infoHash]
  # Are we tracking this torrent?
  if !torrentData
    if peer.is_a?(Peer)
      @logger.info "Peer #{peer} failed handshake: we are not managing torrent #{QuartzTorrent.bytesToHex(msg.infoHash)}"
      setPeerDisconnected(peer)
    else
      @logger.info "Incoming peer #{peer} failed handshake: we are not managing torrent #{QuartzTorrent.bytesToHex(msg.infoHash)}"
    end
    close
    return nil
  end
  torrentData
end
updatePeerWithHandshakeInfo(torrentData, msg, peer) click to toggle source
# File lib/quartz_torrent/peerclient.rb, line 810
def updatePeerWithHandshakeInfo(torrentData, msg, peer)
  @logger.info "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: peer #{peer} sent valid handshake for torrent #{QuartzTorrent.bytesToHex(torrentData.infoHash)}"
  peer.infoHash = msg.infoHash
  # If this was a peer we got from a tracker that had no id then we only learn the id on handshake.
  peer.trackerPeer.id = msg.peerId
  torrentData.peers.idSet peer
end
withPeersIo(peer, what = nil) { |io| ... } click to toggle source

Find the io associated with the peer and yield it to the passed block. If no io is found an error is logged.

# File lib/quartz_torrent/peerclient.rb, line 1450
def withPeersIo(peer, what = nil)
  io = findIoByMetainfo(peer)
  if io
    yield io
  else
    s = ""
    s = "when #{what}" if what
    @logger.warn "Couldn't find the io for peer #{peer} #{what}"
  end
end