class QuartzTorrent::PieceManager
A class that spawns a thread for performing PieceIO
operations asynchronously. This class is what is used to read and write blocks of a torrent.
Attributes
Public Class Methods
Create a new PieceManager
that will map to files inside ‘baseDirectory’. Parameter ‘torrinfo’ should be a Metainfo::Info
object (the info part of the metainfo). Parameter ‘alertCallback’ should be a Proc. It will be called when an operation is complete. The alerted code can then retrieve the events from the completed queue. This callback will be called from a different thread.
# File lib/quartz_torrent/filemanager.rb, line 289 def initialize(baseDirectory, torrinfo, alertCallback = nil) @alertCallback = alertCallback @mutex = Mutex.new @results = [] @requests = [] # The progress of requests as they are being serviced, keyed by request id. @requestProgress = {} @progressMutex = Mutex.new @requestsSemaphore = Semaphore.new @resultsSemaphore = Semaphore.new @baseDirectory = baseDirectory @torrinfo = torrinfo @pieceIO = PieceIO.new(baseDirectory, torrinfo) @requestId = 0 @logger = LogManager.getLogger("piecemanager") @torrentDataLength = torrinfo.dataLength @startedCondition = ConditionVariable.new @startedMutex = Mutex.new @state = :before_start startThread end
Public Instance Methods
Validate that the hash of the downloaded piece matches the hash from the metainfo. The result is successful? if the hash matches, false otherwise. The data of the result is set to the piece index.
# File lib/quartz_torrent/filemanager.rb, line 360 def checkPieceHash(pieceIndex) id = returnAndIncrRequestId return id if @state == :after_stop @requests.push [id, :hash_piece, pieceIndex] @requestsSemaphore.signal id end
This is meant to be called when the torrent is first loaded to check what pieces we’ve already downloaded. The data property of the result for this call is set to a Bitfield
representing the complete pieces.
# File lib/quartz_torrent/filemanager.rb, line 349 def findExistingPieces id = returnAndIncrRequestId return id if @state == :after_stop @requests.push [id, :find_existing] @requestsSemaphore.signal id end
Flush to disk. The result for this operation is always successful.
# File lib/quartz_torrent/filemanager.rb, line 369 def flush() id = returnAndIncrRequestId return id if @state == :after_stop @requests.push [id, :flush] @requestsSemaphore.signal id end
Check if there are results ready. This method will return immediately without blocking.
# File lib/quartz_torrent/filemanager.rb, line 409 def hasResults? ! @results.empty? end
Result
retrieval. Returns the next result, or nil if none are ready. The results that are returned are PieceIOWorker::Result objects. For readBlock operations the data property of the result object contains the block.
# File lib/quartz_torrent/filemanager.rb, line 381 def nextResult result = nil @mutex.synchronize do result = @results.shift @progressMutex.synchronize{ @requestProgress.delete result.requestId } if result end result end
Get the progress of the specified request as an integer between 0 and 100. Currently, only the findExistingPieces operation registers progress; other operations just return nil for this.
# File lib/quartz_torrent/filemanager.rb, line 393 def progress(requestId) result = nil @progressMutex.synchronize{ result = @requestProgress[requestId] } result end
Read a block from the torrent asynchronously. When the operation is complete the result is stored in the ‘results’ list. This method returns an id that can be used to match the response to the request. The readBlock and writeBlock methods are not threadsafe with respect to callers; they shouldn’t be called by multiple threads concurrently.
# File lib/quartz_torrent/filemanager.rb, line 319 def readBlock(pieceIndex, offset, length) id = returnAndIncrRequestId return id if @state == :after_stop @requests.push [id, :read_block, pieceIndex, offset, length] @requestsSemaphore.signal id end
Read a block of the torrent asynchronously.
# File lib/quartz_torrent/filemanager.rb, line 337 def readPiece(pieceIndex) id = returnAndIncrRequestId return id if @state == :after_stop @requests.push [id, :read_piece, pieceIndex] @requestsSemaphore.signal id end
Stop the PieceManager
.
# File lib/quartz_torrent/filemanager.rb, line 414 def stop waitUntilStarted @state = :after_stop id = returnAndIncrRequestId @requests.push [id, :stop] @requestsSemaphore.signal end
Wait until the next result is ready. If this method is used it must always be called before nextResult. This is mostly useful for testing.
# File lib/quartz_torrent/filemanager.rb, line 401 def wait waitUntilStarted @resultsSemaphore.wait end
Write a block to the torrent asynchronously.
# File lib/quartz_torrent/filemanager.rb, line 328 def writeBlock(pieceIndex, offset, block) id = returnAndIncrRequestId return id if @state == :after_stop @requests.push [id, :write_block, pieceIndex, offset, block] @requestsSemaphore.signal id end
Private Instance Methods
# File lib/quartz_torrent/filemanager.rb, line 495 def findExistingPiecesInternal(requestId) completePieceBitfield = Bitfield.new(@torrinfo.pieces.length) raise "Base directory #{@baseDirectory} doesn't exist" if ! File.directory?(@baseDirectory) raise "Base directory #{@baseDirectory} is not writable" if ! File.writable?(@baseDirectory) raise "Base directory #{@baseDirectory} is not readable" if ! File.readable?(@baseDirectory) piecesHashes = @torrinfo.pieces index = 0 piecesHashes.each do |hash| @logger.debug "Checking piece #{index+1}/#{piecesHashes.length}" piece = @pieceIO.readPiece(index) if piece # Check hash calc = Digest::SHA1.digest(piece) if calc != hash @logger.debug "Piece #{index} calculated hash #{QuartzTorrent.bytesToHex(calc)} doesn't match tracker hash #{QuartzTorrent.bytesToHex(hash)}" else completePieceBitfield.set(index) @logger.debug "Piece #{index+1}/#{piecesHashes.length} is complete." end else @logger.debug "Piece #{index+1}/#{piecesHashes.length} doesn't exist" end index += 1 @progressMutex.synchronize{ @requestProgress[requestId] = (index+1)*100/piecesHashes.length } end completePieceBitfield end
# File lib/quartz_torrent/filemanager.rb, line 523 def hashPiece(pieceIndex) result = false piece = @pieceIO.readPiece pieceIndex if piece # Check hash piecesHashes = @torrinfo.pieces hash = piecesHashes[pieceIndex] calc = Digest::SHA1.digest(piece) if calc != hash @logger.info "Piece #{pieceIndex} calculated hash #{QuartzTorrent.bytesToHex(calc)} doesn't match tracker hash #{QuartzTorrent.bytesToHex(hash)}" else @logger.debug "Piece #{pieceIndex+1}/#{piecesHashes.length} hash is correct." result = true end else @logger.debug "Piece #{pieceIndex+1}/#{piecesHashes.length} doesn't exist" end result end
# File lib/quartz_torrent/filemanager.rb, line 487 def returnAndIncrRequestId result = @requestId @requestId += 1 # Wrap? @requestId = 0 if @requestId > 0xffffffff result end
# File lib/quartz_torrent/filemanager.rb, line 423 def startThread @thread = Thread.new do @startedMutex.synchronize do @state = :running @startedCondition.broadcast end QuartzTorrent.initThread("piecemanager") while @state == :running begin @requestsSemaphore.wait if @requests.size > 1000 @logger.warn "Request queue has grown past 1000 entries; we are io bound" end result = nil req = @requests.shift @progressMutex.synchronize{ @requestProgress[req[0]] = 0 } begin if req[1] == :read_block result = @pieceIO.readBlock req[2], req[3], req[4] elsif req[1] == :write_block @pieceIO.writeBlock req[2], req[3], req[4] elsif req[1] == :read_piece result = @pieceIO.readPiece req[2] elsif req[1] == :find_existing result = findExistingPiecesInternal(req[0]) elsif req[1] == :hash_piece result = hashPiece req[2] result = Result.new(req[0], result, req[2]) elsif req[1] == :flush @pieceIO.flush result = true elsif req[1] == :stop result = true end result = Result.new(req[0], true, result) if ! result.is_a?(Result) rescue @logger.error "Exception when processing request: #{$!}" @logger.error "#{$!.backtrace.join("\n")}" result = Result.new(req[0], false, nil, $!) end @progressMutex.synchronize{ @requestProgress[req[0]] = 100 } @mutex.synchronize do @results.push result end @resultsSemaphore.signal @alertCallback.call() if @alertCallback rescue @logger.error "Unexpected exception in PieceManager worker thread: #{$!}" @logger.error "#{$!.backtrace.join("\n")}" end end end end
# File lib/quartz_torrent/filemanager.rb, line 481 def waitUntilStarted if @state == :before_start @startedMutex.synchronize{ @startedCondition.wait(@startedMutex) if @state == :before_start } end end