class QuartzTorrent::PeerClientHandler
This class implements a Reactor
Handler
object. This Handler
implements the PeerClient
.
Attributes
PUBLIC API METHODS ################################################
Public Class Methods
# File lib/quartz_torrent/peerclient.rb, line 258 def initialize(baseDirectory, maxIncomplete = 5, maxActive = 10) # Hash of TorrentData objects, keyed by torrent infoHash @torrentData = {} @torrentQueue = TorrentQueue.new(maxIncomplete, maxActive) @baseDirectory = baseDirectory @logger = LogManager.getLogger("peerclient") # Overall maximum number of peers (connected + disconnected) @maxPeerCount = 120 # Number of peers we ideally want to try and be downloading/uploading with @targetActivePeerCount = 50 @targetUnchokedPeerCount = 4 @managePeersPeriod = 10 # Defined in bittorrent spec. Only unchoke peers every 10 seconds. @requestBlocksPeriod = 1 @handshakeTimeout = 1 @requestTimeout = 60 @endgameBlockThreshold = 20 end
Public Instance Methods
Add a new tracker client. This effectively adds a new torrent to download. Returns the TorrentData
object for the new torrent.
# File lib/quartz_torrent/peerclient.rb, line 285 def addTrackerClient(infoHash, info, trackerclient) raise "There is already a tracker registered for torrent #{QuartzTorrent.bytesToHex(infoHash)}" if @torrentData.has_key? infoHash torrentData = TorrentData.new(infoHash, info, trackerclient) trackerclient.alarms = torrentData.alarms @torrentData[infoHash] = torrentData torrentData.info = info torrentData.state = :initializing queue(torrentData) dequeue torrentData end
Adjust the bytesDownloaded property of the specified torrent by the passed amount. Adjustment should be an integer. It is added to the current bytesDownloaded amount.
# File lib/quartz_torrent/peerclient.rb, line 404 def adjustBytesDownloaded(infoHash, adjustment) torrentData = @torrentData[infoHash] if ! torrentData @logger.warn "Asked to adjust uploaded bytes for a non-existent torrent #{QuartzTorrent.bytesToHex(infoHash)}" return end runInReactorThread do torrentData.bytesDownloaded += adjustment torrentData.bytesDownloadedDataOnly += adjustment end end
Adjust the bytesUploaded property of the specified torrent by the passed amount. Adjustment should be an integer. It is added to the current bytesUploaded amount.
# File lib/quartz_torrent/peerclient.rb, line 389 def adjustBytesUploaded(infoHash, adjustment) torrentData = @torrentData[infoHash] if ! torrentData @logger.warn "Asked to adjust uploaded bytes for a non-existent torrent #{QuartzTorrent.bytesToHex(infoHash)}" return end runInReactorThread do torrentData.bytesUploaded += adjustment torrentData.bytesUploadedDataOnly += adjustment end end
Reactor
method called when we have connected to a peer.
# File lib/quartz_torrent/peerclient.rb, line 553 def clientInit(peer) # We connected to a peer # Send handshake torrentData = @torrentData[peer.infoHash] if ! torrentData @logger.warn "No tracker client found for peer #{peer}. Closing connection." close return end trackerclient = torrentData.trackerClient @logger.info "Connected to peer #{peer}. Sending handshake." msg = PeerHandshake.new msg.peerId = trackerclient.peerId msg.infoHash = peer.infoHash msg.serializeTo currentIo peer.state = :handshaking @reactor.scheduleTimer(@handshakeTimeout, [:handshake_timeout, peer], false) @logger.debug "Done sending handshake." # Send bitfield sendBitfield(currentIo, torrentData.blockState.completePieceBitfield) if torrentData.blockState setReadRateLimit(torrentData.downRateLimit) if torrentData.downRateLimit setWriteRateLimit(torrentData.upRateLimit) if torrentData.upRateLimit end
Reactor
method called when an IO error occurs.
# File lib/quartz_torrent/peerclient.rb, line 714 def error(peer, details) # If a peer closes the connection during handshake before we determine their id, we don't have a completed # Peer object yet. In this case the peer parameter is the symbol :listener_socket if peer == :listener_socket @logger.info "Error with handshaking peer: #{details}. Closing connection." else @logger.info "Error with peer #{peer}: #{details}. Closing connection." setPeerDisconnected(peer) end # Close connection close end
Get a hash of new TorrentDataDelegate
objects keyed by torrent infohash. This method is meant to be called from a different thread than the one the reactor is running in. This method is not immediate but blocks until the data is prepared. If infoHash is passed, only that torrent data is returned (still in a hashtable; just one entry)
# File lib/quartz_torrent/peerclient.rb, line 422 def getDelegateTorrentData(infoHash = nil) # Use an immediate, non-recurring timer. result = {} return result if stopped? semaphore = Semaphore.new timer = @reactor.scheduleTimer(0, [:get_torrent_data, result, semaphore, infoHash], false, true) if semaphore.wait(3) result else @logger.warn "getDelegateTorrentData: Waiting on semaphore timed out" throw "Waiting on semaphore for timer #{timer.object_id} timed out" end end
Reactor
method called when there is data ready to be read from a socket
# File lib/quartz_torrent/peerclient.rb, line 581 def recvData(peer) msg = nil @logger.debug "Got data from peer #{peer}" if peer.state == :handshaking # Read handshake message begin @logger.debug "Reading handshake from #{peer}" msg = PeerHandshake.unserializeFrom currentIo rescue @logger.warn "Peer #{peer} failed handshake: #{$!}" setPeerDisconnected(peer) close return end else begin @logger.debug "Reading wire-message from #{peer}" msg = peer.peerMsgSerializer.unserializeFrom currentIo #msg = PeerWireMessage.unserializeFrom currentIo rescue EOFError @logger.info "Peer #{peer} disconnected." setPeerDisconnected(peer) close return rescue @logger.warn "Unserializing message from peer #{peer} failed: #{$!}" @logger.warn $!.backtrace.join "\n" setPeerDisconnected(peer) close return end peer.updateUploadRate msg torrentData = @torrentData[peer.infoHash] torrentData.bytesDownloaded += msg.length if torrentData @logger.debug "Peer #{peer} upload rate: #{peer.uploadRate.value} data only: #{peer.uploadRateDataOnly.value}" end if msg.is_a? PeerHandshake # This is a remote peer that we connected to returning our handshake. processHandshake(msg, peer) peer.state = :established peer.amChoked = true peer.peerChoked = true peer.amInterested = false peer.peerInterested = false elsif msg.is_a? BitfieldMessage @logger.debug "Received bitfield message from peer." handleBitfield(msg, peer) elsif msg.is_a? Unchoke @logger.debug "Received unchoke message from peer." peer.amChoked = false elsif msg.is_a? Choke @logger.debug "Received choke message from peer." peer.amChoked = true elsif msg.is_a? Interested @logger.debug "Received interested message from peer." peer.peerInterested = true elsif msg.is_a? Uninterested @logger.debug "Received uninterested message from peer." peer.peerInterested = false elsif msg.is_a? Piece @logger.debug "Received piece message from peer for torrent #{QuartzTorrent.bytesToHex(peer.infoHash)}: piece #{msg.pieceIndex} offset #{msg.blockOffset} length #{msg.data.length}." handlePieceReceive(msg, peer) elsif msg.is_a? Request @logger.debug "Received request message from peer for torrent #{QuartzTorrent.bytesToHex(peer.infoHash)}: piece #{msg.pieceIndex} offset #{msg.blockOffset} length #{msg.blockLength}." handleRequest(msg, peer) elsif msg.is_a? Have @logger.debug "Received have message from peer for torrent #{QuartzTorrent.bytesToHex(peer.infoHash)}: piece #{msg.pieceIndex}" handleHave(msg, peer) elsif msg.is_a? KeepAlive @logger.debug "Received keep alive message from peer." elsif msg.is_a? ExtendedHandshake @logger.debug "Received extended handshake message from peer." handleExtendedHandshake(msg, peer) elsif msg.is_a? ExtendedMetaInfo @logger.debug "Received extended metainfo message from peer." handleExtendedMetainfo(msg, peer) else @logger.warn "Received a #{msg.class} message but handler is not implemented" end end
Remove a torrent.
# File lib/quartz_torrent/peerclient.rb, line 300 def removeTorrent(infoHash, deleteFiles = false) # Can't do this right now, since it could be in use by an event handler. Use an immediate, non-recurring timer instead. @logger.info "#{QuartzTorrent.bytesToHex(infoHash)}: Scheduling immediate timer to remove torrent. #{deleteFiles ? "Will" : "Wont"} delete downloaded files." @reactor.scheduleTimer(0, [:removetorrent, infoHash, deleteFiles], false, true) end
Reactor
method called when a peer has connected to us.
# File lib/quartz_torrent/peerclient.rb, line 450 def serverInit(metadata, addr, port) # A peer connected to us # Read handshake message @logger.warn "Peer connection from #{addr}:#{port}" begin msg = PeerHandshake.unserializeExceptPeerIdFrom currentIo rescue @logger.warn "Peer failed handshake: #{$!}" close return end torrentData = torrentDataForHandshake(msg, "#{addr}:#{port}") # Are we tracking this torrent? if !torrentData @logger.warn "Peer sent handshake for unknown torrent" close return end trackerclient = torrentData.trackerClient # If we already have too many connections, don't allow this connection. classifiedPeers = ClassifiedPeers.new torrentData.peers.all if classifiedPeers.establishedPeers.length > @targetActivePeerCount @logger.warn "Closing connection to peer from #{addr}:#{port} because we already have #{classifiedPeers.establishedPeers.length} active peers which is > the target count of #{@targetActivePeerCount} " close return end # Send handshake outgoing = PeerHandshake.new outgoing.peerId = trackerclient.peerId outgoing.infoHash = torrentData.infoHash outgoing.serializeTo currentIo # Send extended handshake if the peer supports extensions if (msg.reserved.unpack("C8")[5] & 0x10) != 0 @logger.warn "Peer supports extensions. Sending extended handshake" extended = Extension.createExtendedHandshake torrentData.info extended.serializeTo currentIo end # Read incoming handshake's peerid msg.peerId = currentIo.read(PeerHandshake::PeerIdLen) if msg.peerId == trackerclient.peerId @logger.info "We got a connection from ourself. Closing connection." close return end peer = nil peers = torrentData.peers.findById(msg.peerId) if peers peers.each do |existingPeer| if existingPeer.state != :disconnected @logger.warn "Peer with id #{msg.peerId} created a new connection when we already have a connection in state #{existingPeer.state}. Closing new connection." close return else if existingPeer.trackerPeer.ip == addr && existingPeer.trackerPeer.port == port peer = existingPeer end end end end if ! peer peer = Peer.new(TrackerPeer.new(addr, port)) updatePeerWithHandshakeInfo(torrentData, msg, peer) torrentData.peers.add peer if ! peers @logger.warn "Unknown peer with id #{msg.peerId} connected." else @logger.warn "Known peer with id #{msg.peerId} connected from new location." end else @logger.warn "Known peer with id #{msg.peerId} connected from known location." end @logger.info "Peer #{peer} connected to us. " peer.state = :established peer.amChoked = true peer.peerChoked = true peer.amInterested = false peer.peerInterested = false if torrentData.info peer.bitfield = Bitfield.new(torrentData.info.pieces.length) else peer.bitfield = EmptyBitfield.new @logger.info "We have no metainfo yet, so setting peer #{peer} to have an EmptyBitfield" end # Send bitfield sendBitfield(currentIo, torrentData.blockState.completePieceBitfield) if torrentData.blockState setMetaInfo(peer) setReadRateLimit(torrentData.downRateLimit) if torrentData.downRateLimit setWriteRateLimit(torrentData.upRateLimit) if torrentData.upRateLimit end
Set the download rate limit. Pass nil as the bytesPerSecond to disable the limit.
# File lib/quartz_torrent/peerclient.rb, line 314 def setDownloadRateLimit(infoHash, bytesPerSecond) torrentData = @torrentData[infoHash] if ! torrentData @logger.warn "Asked to set download rate limit for a non-existent torrent #{QuartzTorrent.bytesToHex(infoHash)}" return end if bytesPerSecond if ! torrentData.downRateLimit torrentData.downRateLimit = RateLimit.new(bytesPerSecond, 2*bytesPerSecond, 0) else torrentData.downRateLimit.unitsPerSecond = bytesPerSecond end else torrentData.downRateLimit = nil end torrentData.peers.all.each do |peer| withPeersIo(peer, "setting download rate limit") do |io| io.readRateLimit = torrentData.downRateLimit end end end
Pause or unpause the specified torrent.
# File lib/quartz_torrent/peerclient.rb, line 307 def setPaused(infoHash, value) # Can't do this right now, since it could be in use by an event handler. Use an immediate, non-recurring timer instead. @logger.info "#{QuartzTorrent.bytesToHex(infoHash)}: Scheduling immediate timer to #{value ? "pause" : "unpause"} torrent." @reactor.scheduleTimer(0, [:pausetorrent, infoHash, value], false, true) end
Set the maximum amount of time (in seconds) that a torrent can be in the upload-only state before it is paused. Pass nil to disable.
# File lib/quartz_torrent/peerclient.rb, line 377 def setUploadDuration(infoHash, seconds) torrentData = @torrentData[infoHash] if ! torrentData @logger.warn "Asked to set upload duration for a non-existent torrent #{QuartzTorrent.bytesToHex(infoHash)}" return end torrentData.uploadDuration = seconds end
Set the upload rate limit. Pass nil as the bytesPerSecond to disable the limit.
# File lib/quartz_torrent/peerclient.rb, line 340 def setUploadRateLimit(infoHash, bytesPerSecond) torrentData = @torrentData[infoHash] if ! torrentData @logger.warn "Asked to set upload rate limit for a non-existent torrent #{QuartzTorrent.bytesToHex(infoHash)}" return end if bytesPerSecond if ! torrentData.upRateLimit torrentData.upRateLimit = RateLimit.new(bytesPerSecond, 2*bytesPerSecond, 0) else torrentData.upRateLimit.unitsPerSecond = bytesPerSecond end else torrentData.upRateLimit = nil end torrentData.peers.all.each do |peer| withPeersIo(peer, "setting upload rate limit") do |io| io.writeRateLimit = torrentData.upRateLimit end end end
Set the upload ratio. Pass nil to disable
# File lib/quartz_torrent/peerclient.rb, line 365 def setUploadRatio(infoHash, ratio) torrentData = @torrentData[infoHash] if ! torrentData @logger.warn "Asked to set upload ratio limit for a non-existent torrent #{QuartzTorrent.bytesToHex(infoHash)}" return end torrentData.ratio = ratio end
Reactor
method called when a scheduled timer expires.
# File lib/quartz_torrent/peerclient.rb, line 668 def timerExpired(metadata) if metadata.is_a?(Array) && metadata[0] == :manage_peers managePeers(metadata[1]) elsif metadata.is_a?(Array) && metadata[0] == :request_blocks requestBlocks(metadata[1]) elsif metadata.is_a?(Array) && metadata[0] == :check_piece_manager checkPieceManagerResults(metadata[1]) elsif metadata.is_a?(Array) && metadata[0] == :handshake_timeout handleHandshakeTimeout(metadata[1]) elsif metadata.is_a?(Array) && metadata[0] == :removetorrent handleRemoveTorrent(metadata[1], metadata[2]) elsif metadata.is_a?(Array) && metadata[0] == :pausetorrent handlePause(metadata[1], metadata[2]) elsif metadata.is_a?(Array) && metadata[0] == :get_torrent_data @torrentData.each do |k,v| begin if metadata[3].nil? || k == metadata[3] v = TorrentDataDelegate.new(v, self) metadata[1][k] = v end rescue @logger.error "Error building torrent data response for user: #{$!}" @logger.error "#{$!.backtrace.join("\n")}" end end metadata[2].signal elsif metadata.is_a?(Array) && metadata[0] == :update_torrent_data delegate = metadata[1] if ! @torrentData.has_key?(infoHash) delegate.state = :deleted else delegate.internalRefresh end metadata[2].signal elsif metadata.is_a?(Array) && metadata[0] == :request_metadata_pieces requestMetadataPieces(metadata[1]) elsif metadata.is_a?(Array) && metadata[0] == :check_metadata_piece_manager checkMetadataPieceManagerResults(metadata[1]) elsif metadata.is_a?(Array) && metadata[0] == :runproc metadata[1].call else @logger.info "Unknown timer #{metadata} expired." end end
Update the data stored in a TorrentDataDelegate
to the latest information.
# File lib/quartz_torrent/peerclient.rb, line 437 def updateDelegateTorrentData(delegate) return if stopped? # Use an immediate, non-recurring timer. semaphore = Semaphore.new @reactor.scheduleTimer(0, [:update_torrent_data, delegate, semaphore], false, true) semaphore.wait result end
Private Instance Methods
# File lib/quartz_torrent/peerclient.rb, line 1039 def checkMetadataPieceManagerResults(infoHash) torrentData = @torrentData[infoHash] if ! torrentData @logger.error "Check metadata piece manager results: data for torrent #{QuartzTorrent.bytesToHex(infoHash)} not found." return end # We may not have completed the extended handshake with the peer which specifies the torrent size. # In this case torrentData.metainfoPieceState is not yet set. return if ! torrentData.metainfoPieceState results = torrentData.metainfoPieceState.checkResults results.each do |result| metaData = torrentData.pieceManagerMetainfoRequestMetadata.delete(result.requestId) if ! metaData @logger.error "#{QuartzTorrent.bytesToHex(infoHash)}: Can't find metadata for PieceManager request #{result.requestId}" next end if metaData.type == :read && result.successful? # Send the piece to the peer. msg = ExtendedMetaInfo.new msg.msgType = :piece msg.piece = metaData.data.requestMsg.piece msg.data = result.data withPeersIo(metaData.data.peer, "sending extended metainfo piece message") do |io| @logger.debug "#{QuartzTorrent.bytesToHex(infoHash)}: Sending metainfo piece to #{metaData.data.peer}: piece #{msg.piece} with data length #{msg.data.length}" sendMessageToPeer msg, io, metaData.data.peer end result.data end end if torrentData.metainfoPieceState.complete? && torrentData.state == :downloading_metainfo @logger.info "#{QuartzTorrent.bytesToHex(infoHash)}: Obtained all pieces of metainfo. Will begin checking existing pieces." torrentData.metainfoPieceState.flush # We don't need to download metainfo anymore. cancelTimer torrentData.metainfoRequestTimer if torrentData.metainfoRequestTimer info = MetainfoPieceState.downloaded(@baseDirectory, torrentData.infoHash) if info torrentData.info = info startCheckingPieces torrentData else @logger.error "#{QuartzTorrent.bytesToHex(infoHash)}: Metadata download is complete but reading the metadata failed" torrentData.state = :error end end end
# File lib/quartz_torrent/peerclient.rb, line 1220 def checkPieceManagerResults(infoHash) torrentData = @torrentData[infoHash] if ! torrentData @logger.error "Request blocks peers: tracker client for torrent #{QuartzTorrent.bytesToHex(infoHash)} not found." return end while true result = torrentData.pieceManager.nextResult break if ! result metaData = torrentData.pieceManagerRequestMetadata.delete(result.requestId) if ! metaData @logger.error "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Can't find metadata for PieceManager request #{result.requestId}" next end if metaData.type == :write if result.successful? @logger.debug "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Block written to disk. " # Block successfully written! torrentData.blockState.setBlockCompleted metaData.data.pieceIndex, metaData.data.blockOffset, true do |pieceIndex| # The peice is completed! Check hash. @logger.debug "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Piece #{pieceIndex} is complete. Checking hash. " id = torrentData.pieceManager.checkPieceHash(metaData.data.pieceIndex) torrentData.pieceManagerRequestMetadata[id] = PieceManagerRequestMetadata.new(:hash, metaData.data.pieceIndex) end else # Block failed! Clear completed and requested state. torrentData.blockState.setBlockCompleted metaData.data.pieceIndex, metaData.data.blockOffset, false @logger.error "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Writing block failed: #{result.error}" end elsif metaData.type == :read if result.successful? readRequestMetadata = metaData.data peer = readRequestMetadata.peer withPeersIo(peer, "sending piece message") do |io| msg = Piece.new msg.pieceIndex = readRequestMetadata.requestMsg.pieceIndex msg.blockOffset = readRequestMetadata.requestMsg.blockOffset msg.data = result.data @logger.debug "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Sending block to #{peer}: piece #{msg.pieceIndex} offset #{msg.blockOffset} length #{msg.data.length}" sendMessageToPeer msg, io, peer torrentData.bytesUploadedDataOnly += msg.data.length @logger.debug "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Sending piece to peer" end else @logger.error "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Reading block failed: #{result.error}" end elsif metaData.type == :hash if result.successful? @logger.debug "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Hash of piece #{metaData.data} is correct" sendHaves(torrentData, metaData.data) sendUninterested(torrentData) else @logger.info "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Hash of piece #{metaData.data} is incorrect. Marking piece as not complete." torrentData.blockState.setPieceCompleted metaData.data, false end elsif metaData.type == :check_existing handleCheckExistingResult(torrentData, result) end end end
Dequeue any torrents that can now run based on available space
# File lib/quartz_torrent/peerclient.rb, line 1666 def dequeue torrents = @torrentQueue.dequeue(@torrentData.values) torrents.each do |torrentData| if torrentData.state == :initializing initTorrent torrentData else setFrozen torrentData, false if ! torrentData.paused end end end
Update our internal peer list for this torrent from the tracker client
# File lib/quartz_torrent/peerclient.rb, line 1516 def getPeersFromTracker(torrentData, infoHash) addPeer = Proc.new do |trackerPeer| peer = Peer.new(trackerPeer) peer.infoHash = infoHash torrentData.peers.add peer true end classifiedPeers = nil replaceDisconnectedPeer = Proc.new do |trackerPeer| classifiedPeers = ClassifiedPeers.new(torrentData.peers.all) if ! classifiedPeers if classifiedPeers.disconnectedPeers.size > 0 torrentData.peers.delete classifiedPeers.disconnectedPeers.pop addPeer.call trackerPeer true else false end end trackerclient = torrentData.trackerClient addProc = addPeer flipped = false trackerclient.peers.each do |p| if ! flipped && torrentData.peers.size >= @maxPeerCount addProc = replaceDisconnectedPeer flipped = true end # Don't treat ourself as a peer. next if p.id && p.id == trackerclient.peerId if ! torrentData.peers.findByAddr(p.ip, p.port) @logger.debug "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Adding tracker peer #{p} to peers list" break if ! addProc.call(p) end end end
# File lib/quartz_torrent/peerclient.rb, line 1159 def handleBitfield(msg, peer) torrentData = @torrentData[peer.infoHash] if ! torrentData @logger.error "Bitfield: torrent data for torrent #{QuartzTorrent.bytesToHex(peer.infoHash)} not found." return end peer.bitfield = msg.bitfield if torrentData.info peer.bitfield.length = torrentData.info.pieces.length else @logger.warn "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: A peer connected and sent a bitfield but we don't know the length of the torrent yet. Assuming number of pieces is divisible by 8" end if ! torrentData.blockState @logger.warn "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Bitfield: no blockstate yet." return end # If we are interested in something from this peer, let them know. needed = torrentData.blockState.completePieceBitfield.compliment needed.intersection!(peer.bitfield) if ! needed.allClear? if ! peer.amInterested @logger.debug "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Need some pieces from peer #{peer} so sending Interested message" msg = Interested.new sendMessageToPeer msg, currentIo, peer peer.amInterested = true end end end
Handle the result of the PieceManager’s checkExisting (check which pieces we already have) operation. If the resukt is successful, this begins the actual download.
# File lib/quartz_torrent/peerclient.rb, line 1286 def handleCheckExistingResult(torrentData, pieceManagerResult) if pieceManagerResult.successful? existingBitfield = pieceManagerResult.data @logger.info "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: We already have #{existingBitfield.countSet}/#{existingBitfield.length} pieces." info = torrentData.info torrentData.blockState = BlockState.new(info, existingBitfield) @logger.info "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Starting torrent. Information:" @logger.info "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: piece length: #{info.pieceLen}" @logger.info "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: number of pieces: #{info.pieces.size}" @logger.info "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: total length #{info.dataLength}" startDownload torrentData else @logger.info "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Checking existing pieces of torrent failed: #{pieceManagerResult.error}" torrentData.state = :error end end
# File lib/quartz_torrent/peerclient.rb, line 1390 def handleExtendedHandshake(msg, peer) torrentData = @torrentData[peer.infoHash] if ! torrentData @logger.error "Extended Handshake: torrent data for torrent #{QuartzTorrent.bytesToHex(peer.infoHash)} not found." return end metadataSize = msg.dict['metadata_size'] if metadataSize # This peer knows the size of the metadata. If we haven't created our MetainfoPieceState yet, create it now. if ! torrentData.metainfoPieceState @logger.info "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Extended Handshake: Learned that metadata size is #{metadataSize}. Creating MetainfoPieceState" torrentData.metainfoPieceState = MetainfoPieceState.new(@baseDirectory, torrentData.infoHash, metadataSize) end end end
# File lib/quartz_torrent/peerclient.rb, line 1408 def handleExtendedMetainfo(msg, peer) torrentData = @torrentData[peer.infoHash] if ! torrentData @logger.error "Extended Handshake: torrent data for torrent #{QuartzTorrent.bytesToHex(peer.infoHash)} not found." return end if msg.msgType == :request @logger.debug "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Got extended metainfo request for piece #{msg.piece}" # Build a response for this piece. if torrentData.metainfoPieceState.pieceCompleted? msg.piece @logger.debug "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Requesting extended metainfo piece #{msg.piece} from metainfoPieceState." id = torrentData.metainfoPieceState.readPiece msg.piece torrentData.pieceManagerMetainfoRequestMetadata[id] = PieceManagerRequestMetadata.new(:read, ReadRequestMetadata.new(peer,msg)) else reject = ExtendedMetaInfo.new reject.msgType = :reject reject.piece = msg.piece withPeersIo(peer, "sending extended metainfo reject message") do |io| @logger.debug "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Sending metainfo reject to #{peer}: piece #{msg.piece}" sendMessageToPeer reject, io, peer end end elsif msg.msgType == :piece @logger.debug "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Got extended metainfo piece response for piece #{msg.piece} with data length #{msg.data.length}" if ! torrentData.metainfoPieceState.pieceCompleted? msg.piece id = torrentData.metainfoPieceState.savePiece msg.piece, msg.data torrentData.pieceManagerMetainfoRequestMetadata[id] = PieceManagerRequestMetadata.new(:write, msg) end elsif msg.msgType == :reject @logger.debug "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Got extended metainfo reject response for piece #{msg.piece}" # Mark this peer as bad. torrentData.metainfoPieceState.markPeerBad peer torrentData.metainfoPieceState.setPieceRequested(msg.piece, false) end end
# File lib/quartz_torrent/peerclient.rb, line 818 def handleHandshakeTimeout(peer) if peer.state == :handshaking @logger.warn "Peer #{peer} failed handshake: handshake timed out after #{@handshakeTimeout} seconds." withPeersIo(peer, "handling handshake timeout") do |io| setPeerDisconnected(peer) close(io) end end end
# File lib/quartz_torrent/peerclient.rb, line 1191 def handleHave(msg, peer) torrentData = @torrentData[peer.infoHash] if ! torrentData @logger.error "Have: torrent data for torrent #{QuartzTorrent.bytesToHex(peer.infoHash)} not found." return end if msg.pieceIndex >= peer.bitfield.length @logger.warn "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Peer #{peer} sent Have message with invalid piece index" return end # Update peer's bitfield peer.bitfield.set msg.pieceIndex if ! torrentData.blockState @logger.warn "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Have: no blockstate yet." return end # If we are interested in something from this peer, let them know. if ! torrentData.blockState.completePieceBitfield.set?(msg.pieceIndex) @logger.debug "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Peer #{peer} just got a piece we need so sending Interested message" msg = Interested.new sendMessageToPeer msg, currentIo, peer peer.amInterested = true end end
Pause or unpause a torrent that we are downloading.
# File lib/quartz_torrent/peerclient.rb, line 1629 def handlePause(infoHash, value) torrentData = @torrentData[infoHash] if ! torrentData @logger.warn "Asked to pause a non-existent torrent #{QuartzTorrent.bytesToHex(infoHash)}" return end return if torrentData.paused == value torrentData.paused = value if !value # On unpause, queue the torrent since there might not be room for it to run. # Make sure it goes to the head of the queue. queue(torrentData, :unshift) end setFrozen infoHash, value if ! torrentData.queued dequeue end
# File lib/quartz_torrent/peerclient.rb, line 1088 def handlePieceReceive(msg, peer) torrentData = @torrentData[peer.infoHash] if ! torrentData @logger.error "Receive piece: torrent data for torrent #{QuartzTorrent.bytesToHex(peer.infoHash)} not found." return end if ! torrentData.blockState @logger.error "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Receive piece: no blockstate yet." return end blockInfo = torrentData.blockState.createBlockinfoByPieceResponse(msg.pieceIndex, msg.blockOffset, msg.data.length) if ! peer.requestedBlocks.has_key?(blockInfo.blockIndex) @logger.debug "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Receive piece: we either didn't request this piece, or it was already received due to endgame strategy. Ignoring this message." return end if torrentData.blockState.blockCompleted?(blockInfo) @logger.debug "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Receive piece: we already have this block. Ignoring this message." return end peer.requestedBlocks.delete blockInfo.blockIndex # Block is marked as not requested when hash is confirmed torrentData.bytesDownloadedDataOnly += msg.data.length id = torrentData.pieceManager.writeBlock(msg.pieceIndex, msg.blockOffset, msg.data) torrentData.pieceManagerRequestMetadata[id] = PieceManagerRequestMetadata.new(:write, msg) if torrentData.isEndgame # Assume this block is correct. Send a Cancel message to all other peers from whom we requested # this piece. classifiedPeers = ClassifiedPeers.new torrentData.peers.all classifiedPeers.requestablePeers.each do |otherPeer| if otherPeer.requestedBlocks.has_key?(blockInfo.blockIndex) withPeersIo(otherPeer, "when sending Cancel message") do |io| cancel = Cancel.new cancel.pieceIndex = msg.pieceIndex cancel.blockOffset = msg.blockOffset cancel.blockLength = msg.data.length sendMessageToPeer cancel, io, otherPeer @logger.debug "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Sending Cancel message to peer #{peer}" end end end end end
Remove a torrent that we are downloading.
# File lib/quartz_torrent/peerclient.rb, line 1558 def handleRemoveTorrent(infoHash, deleteFiles) torrentData = @torrentData.delete infoHash if ! torrentData @logger.warn "Asked to remove a non-existent torrent #{QuartzTorrent.bytesToHex(infoHash)}" return end @logger.info "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Removing torrent. #{deleteFiles ? "Will" : "Wont"} delete downloaded files." @logger.info "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Removing torrent: no torrentData.metainfoRequestTimer" if ! torrentData.metainfoRequestTimer @logger.info "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Removing torrent: no torrentData.managePeersTimer" if ! torrentData.managePeersTimer @logger.info "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Removing torrent: no torrentData.checkMetadataPieceManagerTimer" if ! torrentData.checkMetadataPieceManagerTimer @logger.info "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Removing torrent: no torrentData.checkPieceManagerTimer" if ! torrentData.checkPieceManagerTimer @logger.info "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Removing torrent: no torrentData.requestBlocksTimer" if ! torrentData.requestBlocksTimer # Stop all timers cancelTimer torrentData.metainfoRequestTimer if torrentData.metainfoRequestTimer cancelTimer torrentData.managePeersTimer if torrentData.managePeersTimer cancelTimer torrentData.checkMetadataPieceManagerTimer if torrentData.checkMetadataPieceManagerTimer cancelTimer torrentData.checkPieceManagerTimer if torrentData.checkPieceManagerTimer cancelTimer torrentData.requestBlocksTimer if torrentData.requestBlocksTimer torrentData.trackerClient.removePeersChangedListener(torrentData.peerChangeListener) # Remove all the peers for this torrent. torrentData.peers.all.each do |peer| if peer.state != :disconnected # Close socket withPeersIo(peer, "when removing torrent") do |io| setPeerDisconnected(peer) close(io) @logger.debug "Closing connection to peer #{peer}" end end torrentData.peers.delete peer end # Stop tracker client torrentData.trackerClient.stop if torrentData.trackerClient # Stop PieceManagers torrentData.pieceManager.stop if torrentData.pieceManager torrentData.metainfoPieceState.stop if torrentData.metainfoPieceState # Remove metainfo file if it exists begin torrentData.metainfoPieceState.remove if torrentData.metainfoPieceState rescue @logger.warn "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Deleting metainfo file for torrent #{QuartzTorrent.bytesToHex(infoHash)} failed: #{$!}" end if deleteFiles if torrentData.info begin path = @baseDirectory + File::SEPARATOR + torrentData.info.name if File.exists? path FileUtils.rm_r path @logger.info "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Deleted #{path}" else @logger.warn "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Deleting '#{path}' for torrent #{QuartzTorrent.bytesToHex(infoHash)} failed: #{$!}" end rescue @logger.warn "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: When removing torrent, deleting '#{path}' failed because it doesn't exist" end end end dequeue end
# File lib/quartz_torrent/peerclient.rb, line 1139 def handleRequest(msg, peer) if peer.peerChoked @logger.warn "Request piece: peer #{peer} requested a block when they are choked." return end torrentData = @torrentData[peer.infoHash] if ! torrentData @logger.error "Request piece: torrent data for torrent #{QuartzTorrent.bytesToHex(peer.infoHash)} not found." return end if msg.blockLength <= 0 @logger.error "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Request piece: peer requested block of length #{msg.blockLength} which is invalid." return end id = torrentData.pieceManager.readBlock(msg.pieceIndex, msg.blockOffset, msg.blockLength) torrentData.pieceManagerRequestMetadata[id] = PieceManagerRequestMetadata.new(:read, ReadRequestMetadata.new(peer,msg)) end
Take a torrent that is in the :initializing state and make it go.
# File lib/quartz_torrent/peerclient.rb, line 1335 def initTorrent(torrentData) # If we already have the metainfo info for this torrent, we can begin checking the pieces. # If we don't have the metainfo info then we need to get the metainfo first. if ! torrentData.info torrentData.info = MetainfoPieceState.downloaded(@baseDirectory, torrentData.infoHash) end if torrentData.info startCheckingPieces torrentData else # Request the metainfo from peers. torrentData.state = :downloading_metainfo @logger.info "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Downloading metainfo" # Schedule peer connection management. Recurring and immediate torrentData.managePeersTimer = @reactor.scheduleTimer(@managePeersPeriod, [:manage_peers, torrentData.infoHash], true, true) # Schedule a timer for requesting metadata pieces from peers. torrentData.metainfoRequestTimer = @reactor.scheduleTimer(@requestBlocksPeriod, [:request_metadata_pieces, torrentData.infoHash], true, false) # Schedule checking for metainfo PieceManager results (including when piece reading completes) torrentData.checkMetadataPieceManagerTimer = @reactor.scheduleTimer(@requestBlocksPeriod, [:check_metadata_piece_manager, torrentData.infoHash], true, false) end end
# File lib/quartz_torrent/peerclient.rb, line 828 def managePeers(infoHash) torrentData = @torrentData[infoHash] if ! torrentData @logger.error "Manage peers: tracker client for torrent #{QuartzTorrent.bytesToHex(infoHash)} not found." return end return if torrentData.paused || torrentData.queued trackerclient = torrentData.trackerClient # Update our internal peer list for this torrent from the tracker client getPeersFromTracker(torrentData, infoHash) classifiedPeers = ClassifiedPeers.new torrentData.peers.all manager = torrentData.peerManager if ! manager @logger.error "#{QuartzTorrent.bytesToHex(infoHash)}: Manage peers: peer manager client for torrent #{QuartzTorrent.bytesToHex(infoHash)} not found." return end toConnect = manager.manageConnections(classifiedPeers) toConnect.each do |peer| @logger.debug "#{QuartzTorrent.bytesToHex(infoHash)}: Connecting to peer #{peer}" connect peer.trackerPeer.ip, peer.trackerPeer.port, peer end manageResult = manager.managePeers(classifiedPeers) manageResult.unchoke.each do |peer| @logger.debug "#{QuartzTorrent.bytesToHex(infoHash)}: Unchoking peer #{peer}" withPeersIo(peer, "unchoking peer") do |io| msg = Unchoke.new sendMessageToPeer msg, io, peer peer.peerChoked = false end end manageResult.choke.each do |peer| @logger.debug "#{QuartzTorrent.bytesToHex(infoHash)}: Choking peer #{peer}" withPeersIo(peer, "choking peer") do |io| msg = Choke.new sendMessageToPeer msg, io, peer peer.peerChoked = true end end end
# File lib/quartz_torrent/peerclient.rb, line 749 def processHandshake(msg, peer) torrentData = torrentDataForHandshake(msg, peer) # Are we tracking this torrent? return false if !torrentData if msg.peerId == torrentData.trackerClient.peerId @logger.info "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: We connected to ourself. Closing connection." peer.isUs = true close return end peers = torrentData.peers.findById(msg.peerId) if peers peers.each do |existingPeer| if existingPeer.state == :connected @logger.warn "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Peer with id #{msg.peerId} created a new connection when we already have a connection in state #{existingPeer.state}. Closing new connection." torrentData.peers.delete existingPeer setPeerDisconnected(peer) close return end end end trackerclient = torrentData.trackerClient updatePeerWithHandshakeInfo(torrentData, msg, peer) if torrentData.info peer.bitfield = Bitfield.new(torrentData.info.pieces.length) else peer.bitfield = EmptyBitfield.new @logger.info "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: We have no metainfo yet, so setting peer #{peer} to have an EmptyBitfield" end # Send extended handshake if the peer supports extensions if (msg.reserved.unpack("C8")[5] & 0x10) != 0 @logger.warn "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Peer supports extensions. Sending extended handshake" extended = Extension.createExtendedHandshake torrentData.info extended.serializeTo currentIo end true end
Queue a torrent
# File lib/quartz_torrent/peerclient.rb, line 1652 def queue(torrentData, mode = :queue) return if torrentData.queued # Queue the torrent if mode == :unshift @torrentQueue.unshift torrentData else @torrentQueue.push torrentData end setFrozen torrentData, true if ! torrentData.paused end
# File lib/quartz_torrent/peerclient.rb, line 877 def requestBlocks(infoHash) torrentData = @torrentData[infoHash] if ! torrentData @logger.error "Request blocks peers: tracker client for torrent #{QuartzTorrent.bytesToHex(infoHash)} not found." return end return if torrentData.paused || torrentData.queued classifiedPeers = ClassifiedPeers.new torrentData.peers.all if ! torrentData.blockState @logger.error "#{QuartzTorrent.bytesToHex(infoHash)}: Request blocks peers: no blockstate yet." return end if torrentData.state == :uploading && !torrentData.paused if torrentData.ratio if torrentData.bytesUploadedDataOnly >= torrentData.ratio*torrentData.blockState.totalLength @logger.info "#{QuartzTorrent.bytesToHex(infoHash)}: Pausing torrent due to upload ratio limit." if torrentData.metainfoPieceState.complete? setPaused(infoHash, true) return end end if torrentData.uploadDuration && torrentData.downloadCompletedTime if Time.new > torrentData.downloadCompletedTime + torrentData.uploadDuration @logger.info "#{QuartzTorrent.bytesToHex(infoHash)}: Pausing torrent due to upload duration being reached." if torrentData.metainfoPieceState.complete? setPaused(infoHash, true) return end end end # Should we switch to endgame mode? if torrentData.state == :running && !torrentData.isEndgame blocks = torrentData.blockState.completeBlockBitfield set = blocks.countSet if set >= blocks.length - @endgameBlockThreshold && set < blocks.length @logger.info "#{QuartzTorrent.bytesToHex(infoHash)}: Entering endgame mode: blocks #{set}/#{blocks.length} complete." torrentData.isEndgame = true end elsif torrentData.isEndgame && torrentData.state != :running torrentData.isEndgame = false end # Delete any timed-out requests. classifiedPeers.establishedPeers.each do |peer| toDelete = [] peer.requestedBlocks.each do |blockIndex, requestTime| toDelete.push blockIndex if (Time.new - requestTime) > @requestTimeout end toDelete.each do |blockIndex| @logger.debug "#{QuartzTorrent.bytesToHex(infoHash)}: Block #{blockIndex} request timed out." blockInfo = torrentData.blockState.createBlockinfoByBlockIndex(blockIndex) torrentData.blockState.setBlockRequested blockInfo, false peer.requestedBlocks.delete blockIndex end end # Update the allowed pending requests based on how well the peer did since last time. classifiedPeers.establishedPeers.each do |peer| if peer.requestedBlocksSizeLastPass if peer.requestedBlocksSizeLastPass == peer.maxRequestedBlocks downloaded = peer.requestedBlocksSizeLastPass - peer.requestedBlocks.size if downloaded > peer.maxRequestedBlocks*8/10 peer.maxRequestedBlocks = peer.maxRequestedBlocks * 12 / 10 elsif downloaded == 0 peer.maxRequestedBlocks = peer.maxRequestedBlocks * 8 / 10 end peer.maxRequestedBlocks = 10 if peer.maxRequestedBlocks < 10 end end end # Request blocks blockInfos = torrentData.blockState.findRequestableBlocks(classifiedPeers, 100) blockInfos.each do |blockInfo| peersToRequest = [] if torrentData.isEndgame # Since we are in endgame mode, request blocks from all elegible peers elegiblePeers = blockInfo.peers.find_all{ |p| p.requestedBlocks.length < p.maxRequestedBlocks } peersToRequest.concat elegiblePeers else # Pick one of the peers that has the piece to download it from. Pick one of the # peers with the top 3 upload rates. elegiblePeers = blockInfo.peers.find_all{ |p| p.requestedBlocks.length < p.maxRequestedBlocks }.sort{ |a,b| b.uploadRate.value <=> a.uploadRate.value} random = elegiblePeers[rand(blockInfo.peers.size)] peer = elegiblePeers.first(3).push(random).shuffle.first next if ! peer peersToRequest.push peer end peersToRequest.each do |peer| withPeersIo(peer, "requesting block") do |io| if ! peer.amInterested # Let this peer know that I'm interested if I haven't yet. msg = Interested.new sendMessageToPeer msg, io, peer peer.amInterested = true end @logger.debug "#{QuartzTorrent.bytesToHex(infoHash)}: Requesting block from #{peer}: piece #{blockInfo.pieceIndex} offset #{blockInfo.offset} length #{blockInfo.length}" msg = blockInfo.getRequest sendMessageToPeer msg, io, peer torrentData.blockState.setBlockRequested blockInfo, true peer.requestedBlocks[blockInfo.blockIndex] = Time.new end end end if blockInfos.size == 0 if torrentData.state != :uploading && torrentData.blockState.completePieceBitfield.allSet? @logger.info "#{QuartzTorrent.bytesToHex(infoHash)}: Download complete." torrentData.state = :uploading torrentData.downloadCompletedTime = Time.new dequeue end end classifiedPeers.establishedPeers.each { |peer| peer.requestedBlocksSizeLastPass = peer.requestedBlocks.length } end
For a torrent where we don’t have the metainfo, request metainfo pieces from peers.
# File lib/quartz_torrent/peerclient.rb, line 1002 def requestMetadataPieces(infoHash) torrentData = @torrentData[infoHash] if ! torrentData @logger.error "Request metadata pices: torrent data for torrent #{QuartzTorrent.bytesToHex(infoHash)} not found." return end return if torrentData.paused || torrentData.queued # We may not have completed the extended handshake with the peer which specifies the torrent size. # In this case torrentData.metainfoPieceState is not yet set. return if ! torrentData.metainfoPieceState @logger.info "#{QuartzTorrent.bytesToHex(infoHash)}: Obtained all pieces of metainfo." if torrentData.metainfoPieceState.complete? pieces = torrentData.metainfoPieceState.findRequestablePieces classifiedPeers = ClassifiedPeers.new torrentData.peers.all peers = torrentData.metainfoPieceState.findRequestablePeers(classifiedPeers) if peers.size > 0 # For now, just request all pieces from the first peer. pieces.each do |pieceIndex| msg = ExtendedMetaInfo.new msg.msgType = :request msg.piece = pieceIndex withPeersIo(peers.first, "requesting metadata piece") do |io| sendMessageToPeer msg, io, peers.first torrentData.metainfoPieceState.setPieceRequested(pieceIndex, true) @logger.debug "#{QuartzTorrent.bytesToHex(infoHash)}: Requesting metainfo piece from #{peers.first}: piece #{pieceIndex}" end end else @logger.error "#{QuartzTorrent.bytesToHex(infoHash)}: No peers found that have metadata." end end
Run the passed block in the Reactor’s thread. This allows manipulation of torrent data without race conditions. This method works by scheduling a non-recurring, immediate timer in the reactor that on expiry runs the passed block.
# File lib/quartz_torrent/peerclient.rb, line 1713 def runInReactorThread(&block) @reactor.scheduleTimer(0, [:runproc, block], false, true) end
# File lib/quartz_torrent/peerclient.rb, line 1461 def sendBitfield(io, bitfield) if ! bitfield.allClear? @logger.debug "Sending bitfield of size #{bitfield.length}." msg = BitfieldMessage.new msg.bitfield = bitfield msg.serializeTo io end end
# File lib/quartz_torrent/peerclient.rb, line 1470 def sendHaves(torrentData, pieceIndex) @logger.debug "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Sending Have messages to all connected peers for piece #{pieceIndex}" torrentData.peers.all.each do |peer| next if peer.state != :established || peer.isUs withPeersIo(peer, "when sending Have message") do |io| msg = Have.new msg.pieceIndex = pieceIndex sendMessageToPeer msg, io, peer end end end
# File lib/quartz_torrent/peerclient.rb, line 1503 def sendMessageToPeer(msg, io, peer) peer.updateDownloadRate(msg) torrentData = @torrentData[peer.infoHash] torrentData.bytesUploaded += msg.length if torrentData begin peer.peerMsgSerializer.serializeTo(msg, io) rescue @logger.warn "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Sending message to peer #{peer} failed: #{$!.message}" end end
# File lib/quartz_torrent/peerclient.rb, line 1482 def sendUninterested(torrentData) # If we are no longer interested in peers once this piece has been completed, let them know return if ! torrentData.blockState needed = torrentData.blockState.completePieceBitfield.compliment classifiedPeers = ClassifiedPeers.new torrentData.peers.all classifiedPeers.establishedPeers.each do |peer| # Don't bother sending uninterested message if we are already uninterested. next if ! peer.amInterested || peer.isUs needFromPeer = needed.intersection(peer.bitfield) if needFromPeer.allClear? withPeersIo(peer, "when sending Uninterested message") do |io| msg = Uninterested.new sendMessageToPeer msg, io, peer peer.amInterested = false @logger.debug "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Sending Uninterested message to peer #{peer}" end end end end
Freeze or unfreeze a torrent. If value is true, then we disconnect from all peers for this torrent and forget the peers. If value is false, we start reconnecting to peers. Parameter torrent can be an infoHash or TorrentData
# File lib/quartz_torrent/peerclient.rb, line 1680 def setFrozen(torrent, value) torrentData = torrent if ! torrent.is_a?(TorrentData) torrentData = @torrentData[torrent] if ! torrentData @logger.warn "Asked to freeze a non-existent torrent #{QuartzTorrent.bytesToHex(torrent)}" return end end if value # Disconnect from all peers so we won't reply to any messages. torrentData.peers.all.each do |peer| if peer.state != :disconnected # Close socket withPeersIo(peer, "when removing torrent") do |io| setPeerDisconnected(peer) close(io) end end torrentData.peers.delete peer end else # Get our list of peers and start connecting right away # Non-recurring and immediate timer torrentData.managePeersTimer = @reactor.scheduleTimer(@managePeersPeriod, [:manage_peers, torrentData.infoHash], false, true) end end
PRIVATE METHODS ################################################
# File lib/quartz_torrent/peerclient.rb, line 729 def setPeerDisconnected(peer) peer.state = :disconnected peer.uploadRate.reset peer.downloadRate.reset peer.uploadRateDataOnly.reset peer.downloadRateDataOnly.reset torrentData = @torrentData[peer.infoHash] # Are we tracking this torrent? if torrentData && torrentData.blockState # For any outstanding requests, mark that we no longer have requested them peer.requestedBlocks.each do |blockIndex, b| blockInfo = torrentData.blockState.createBlockinfoByBlockIndex(blockIndex) torrentData.blockState.setBlockRequested blockInfo, false end peer.requestedBlocks.clear end end
Start checking which pieces we already have downloaded. This method schedules the necessary timers and changes the state to :checking_pieces. When the pieces are finished being checked the actual download will begin. Preconditions: The torrentData object already has it’s info member set.
# File lib/quartz_torrent/peerclient.rb, line 1311 def startCheckingPieces(torrentData) torrentData.pieceManager = QuartzTorrent::PieceManager.new(@baseDirectory, torrentData.info) torrentData.state = :checking_pieces @logger.info "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Checking pieces of torrent #{QuartzTorrent.bytesToHex(torrentData.infoHash)} asynchronously." id = torrentData.pieceManager.findExistingPieces torrentData.pieceManagerRequestMetadata[id] = PieceManagerRequestMetadata.new(:check_existing, nil) if ! torrentData.metainfoPieceState torrentData.metainfoPieceState = MetainfoPieceState.new(@baseDirectory, torrentData.infoHash, nil, torrentData.info) end # Schedule checking for PieceManager results torrentData.checkPieceManagerTimer = @reactor.scheduleTimer(@requestBlocksPeriod, [:check_piece_manager, torrentData.infoHash], true, false) # Schedule checking for metainfo PieceManager results (including when piece reading completes) if ! torrentData.checkMetadataPieceManagerTimer torrentData.checkMetadataPieceManagerTimer = @reactor.scheduleTimer(@requestBlocksPeriod, [:check_metadata_piece_manager, torrentData.infoHash], true, false) end end
Start the actual torrent download. This method schedules the necessary timers and registers the necessary listeners and changes the state to :running. It is meant to be called after checking for existing pieces or downloading the torrent metadata (if this is a magnet link torrent)
# File lib/quartz_torrent/peerclient.rb, line 1367 def startDownload(torrentData) # Add a listener for when the tracker's peers change. torrentData.peerChangeListener = Proc.new do @logger.debug "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: Managing peers on peer change event" # Non-recurring and immediate timer torrentData.managePeersTimer = @reactor.scheduleTimer(@managePeersPeriod, [:manage_peers, torrentData.infoHash], false, true) end torrentData.trackerClient.addPeersChangedListener torrentData.peerChangeListener # Schedule peer connection management. Recurring and immediate if ! torrentData.managePeersTimer torrentData.managePeersTimer = @reactor.scheduleTimer(@managePeersPeriod, [:manage_peers, torrentData.infoHash], true, true) end # Schedule requesting blocks from peers. Recurring and not immediate torrentData.requestBlocksTimer = @reactor.scheduleTimer(@requestBlocksPeriod, [:request_blocks, torrentData.infoHash], true, false) torrentData.state = :running end
# File lib/quartz_torrent/peerclient.rb, line 794 def torrentDataForHandshake(msg, peer) torrentData = @torrentData[msg.infoHash] # Are we tracking this torrent? if !torrentData if peer.is_a?(Peer) @logger.info "Peer #{peer} failed handshake: we are not managing torrent #{QuartzTorrent.bytesToHex(msg.infoHash)}" setPeerDisconnected(peer) else @logger.info "Incoming peer #{peer} failed handshake: we are not managing torrent #{QuartzTorrent.bytesToHex(msg.infoHash)}" end close return nil end torrentData end
# File lib/quartz_torrent/peerclient.rb, line 810 def updatePeerWithHandshakeInfo(torrentData, msg, peer) @logger.info "#{QuartzTorrent.bytesToHex(torrentData.infoHash)}: peer #{peer} sent valid handshake for torrent #{QuartzTorrent.bytesToHex(torrentData.infoHash)}" peer.infoHash = msg.infoHash # If this was a peer we got from a tracker that had no id then we only learn the id on handshake. peer.trackerPeer.id = msg.peerId torrentData.peers.idSet peer end
Find the io associated with the peer and yield it to the passed block. If no io is found an error is logged.
# File lib/quartz_torrent/peerclient.rb, line 1450 def withPeersIo(peer, what = nil) io = findIoByMetainfo(peer) if io yield io else s = "" s = "when #{what}" if what @logger.warn "Couldn't find the io for peer #{peer} #{what}" end end