class QuartzTorrent::PieceManager

A class that spawns a thread for performing PieceIO operations asynchronously. This class is what is used to read and write blocks of a torrent.

Attributes

torrentDataLength[R]

Public Class Methods

new(baseDirectory, torrinfo, alertCallback = nil) click to toggle source

Create a new PieceManager that will map to files inside ‘baseDirectory’. Parameter ‘torrinfo’ should be a Metainfo::Info object (the info part of the metainfo). Parameter ‘alertCallback’ should be a Proc. It will be called when an operation is complete. The alerted code can then retrieve the events from the completed queue. This callback will be called from a different thread.

# File lib/quartz_torrent/filemanager.rb, line 289
def initialize(baseDirectory, torrinfo, alertCallback = nil)
  @alertCallback = alertCallback
  @mutex = Mutex.new
  @results = []
  @requests = []
  # The progress of requests as they are being serviced, keyed by request id.
  @requestProgress = {}
  @progressMutex = Mutex.new
  @requestsSemaphore = Semaphore.new
  @resultsSemaphore = Semaphore.new
  @baseDirectory = baseDirectory
  @torrinfo = torrinfo
  @pieceIO = PieceIO.new(baseDirectory, torrinfo)
  @requestId = 0
  @logger = LogManager.getLogger("piecemanager")
  @torrentDataLength = torrinfo.dataLength
  @startedCondition = ConditionVariable.new
  @startedMutex = Mutex.new
  @state = :before_start
  startThread
end

Public Instance Methods

checkPieceHash(pieceIndex) click to toggle source

Validate that the hash of the downloaded piece matches the hash from the metainfo. The result is successful? if the hash matches, false otherwise. The data of the result is set to the piece index.

# File lib/quartz_torrent/filemanager.rb, line 360
def checkPieceHash(pieceIndex)
  id = returnAndIncrRequestId
  return id if @state == :after_stop
  @requests.push [id, :hash_piece, pieceIndex]
  @requestsSemaphore.signal
  id
end
findExistingPieces() click to toggle source

This is meant to be called when the torrent is first loaded to check what pieces we’ve already downloaded. The data property of the result for this call is set to a Bitfield representing the complete pieces.

# File lib/quartz_torrent/filemanager.rb, line 349
def findExistingPieces
  id = returnAndIncrRequestId
  return id if @state == :after_stop
  @requests.push [id, :find_existing]
  @requestsSemaphore.signal
  id
end
flush() click to toggle source

Flush to disk. The result for this operation is always successful.

# File lib/quartz_torrent/filemanager.rb, line 369
def flush()
  id = returnAndIncrRequestId
  return id if @state == :after_stop
  @requests.push [id, :flush]
  @requestsSemaphore.signal
  id
end
hasResults?() click to toggle source

Check if there are results ready. This method will return immediately without blocking.

# File lib/quartz_torrent/filemanager.rb, line 409
def hasResults?
  ! @results.empty?
end
nextResult() click to toggle source

Result retrieval. Returns the next result, or nil if none are ready. The results that are returned are PieceIOWorker::Result objects. For readBlock operations the data property of the result object contains the block.

# File lib/quartz_torrent/filemanager.rb, line 381
def nextResult
  result = nil
  @mutex.synchronize do
    result = @results.shift
    @progressMutex.synchronize{ @requestProgress.delete result.requestId } if result
  end
  result
end
progress(requestId) click to toggle source

Get the progress of the specified request as an integer between 0 and 100. Currently, only the findExistingPieces operation registers progress; other operations just return nil for this.

# File lib/quartz_torrent/filemanager.rb, line 393
def progress(requestId)
  result = nil
  @progressMutex.synchronize{ result = @requestProgress[requestId] }
  result
end
readBlock(pieceIndex, offset, length) click to toggle source

Read a block from the torrent asynchronously. When the operation is complete the result is stored in the ‘results’ list. This method returns an id that can be used to match the response to the request. The readBlock and writeBlock methods are not threadsafe with respect to callers; they shouldn’t be called by multiple threads concurrently.

# File lib/quartz_torrent/filemanager.rb, line 319
def readBlock(pieceIndex, offset, length)
  id = returnAndIncrRequestId
  return id if @state == :after_stop
  @requests.push [id, :read_block, pieceIndex, offset, length]
  @requestsSemaphore.signal
  id
end
readPiece(pieceIndex) click to toggle source

Read a block of the torrent asynchronously.

# File lib/quartz_torrent/filemanager.rb, line 337
def readPiece(pieceIndex)
  id = returnAndIncrRequestId
  return id if @state == :after_stop
  @requests.push [id, :read_piece, pieceIndex]
  @requestsSemaphore.signal
  id
end
stop() click to toggle source

Stop the PieceManager.

# File lib/quartz_torrent/filemanager.rb, line 414
def stop
  waitUntilStarted
  @state = :after_stop
  id = returnAndIncrRequestId
  @requests.push [id, :stop]
  @requestsSemaphore.signal
end
wait() click to toggle source

Wait until the next result is ready. If this method is used it must always be called before nextResult. This is mostly useful for testing.

# File lib/quartz_torrent/filemanager.rb, line 401
def wait
  waitUntilStarted

  @resultsSemaphore.wait
end
writeBlock(pieceIndex, offset, block) click to toggle source

Write a block to the torrent asynchronously.

# File lib/quartz_torrent/filemanager.rb, line 328
def writeBlock(pieceIndex, offset, block)
  id = returnAndIncrRequestId
  return id if @state == :after_stop
  @requests.push [id, :write_block, pieceIndex, offset, block]
  @requestsSemaphore.signal
  id
end

Private Instance Methods

findExistingPiecesInternal(requestId) click to toggle source
# File lib/quartz_torrent/filemanager.rb, line 495
def findExistingPiecesInternal(requestId)
  completePieceBitfield = Bitfield.new(@torrinfo.pieces.length)
  raise "Base directory #{@baseDirectory} doesn't exist" if ! File.directory?(@baseDirectory)
  raise "Base directory #{@baseDirectory} is not writable" if ! File.writable?(@baseDirectory)
  raise "Base directory #{@baseDirectory} is not readable" if ! File.readable?(@baseDirectory)
  piecesHashes = @torrinfo.pieces
  index = 0
  piecesHashes.each do |hash|
    @logger.debug "Checking piece #{index+1}/#{piecesHashes.length}"
    piece = @pieceIO.readPiece(index)
    if piece
      # Check hash
      calc = Digest::SHA1.digest(piece)
      if calc != hash
        @logger.debug "Piece #{index} calculated hash #{QuartzTorrent.bytesToHex(calc)} doesn't match tracker hash #{QuartzTorrent.bytesToHex(hash)}"
      else
        completePieceBitfield.set(index)
        @logger.debug "Piece #{index+1}/#{piecesHashes.length} is complete."
      end
    else
      @logger.debug "Piece #{index+1}/#{piecesHashes.length} doesn't exist"
    end
    index += 1
    @progressMutex.synchronize{ @requestProgress[requestId] = (index+1)*100/piecesHashes.length }
  end
  completePieceBitfield
end
hashPiece(pieceIndex) click to toggle source
# File lib/quartz_torrent/filemanager.rb, line 523
def hashPiece(pieceIndex)
  result = false
  piece = @pieceIO.readPiece pieceIndex
  if piece
    # Check hash
    piecesHashes = @torrinfo.pieces
    hash = piecesHashes[pieceIndex]
    calc = Digest::SHA1.digest(piece)
    if calc != hash
      @logger.info "Piece #{pieceIndex} calculated hash #{QuartzTorrent.bytesToHex(calc)} doesn't match tracker hash #{QuartzTorrent.bytesToHex(hash)}"
    else
      @logger.debug "Piece #{pieceIndex+1}/#{piecesHashes.length} hash is correct."
      result = true
    end
  else
    @logger.debug "Piece #{pieceIndex+1}/#{piecesHashes.length} doesn't exist"
  end
  result
end
returnAndIncrRequestId() click to toggle source
# File lib/quartz_torrent/filemanager.rb, line 487
def returnAndIncrRequestId
  result = @requestId
  @requestId += 1
  # Wrap?
  @requestId = 0 if @requestId > 0xffffffff
  result
end
startThread() click to toggle source
# File lib/quartz_torrent/filemanager.rb, line 423
def startThread
  @thread = Thread.new do
    @startedMutex.synchronize do 
      @state = :running
      @startedCondition.broadcast
    end
    QuartzTorrent.initThread("piecemanager")
    while @state == :running
      begin
        @requestsSemaphore.wait

        if @requests.size > 1000
          @logger.warn "Request queue has grown past 1000 entries; we are io bound"
        end

        result = nil
        req = @requests.shift
        @progressMutex.synchronize{ @requestProgress[req[0]] = 0 }
        begin
          if req[1] == :read_block
            result = @pieceIO.readBlock req[2], req[3], req[4]
          elsif req[1] == :write_block
            @pieceIO.writeBlock req[2], req[3], req[4]
          elsif req[1] == :read_piece
            result = @pieceIO.readPiece req[2]
          elsif req[1] == :find_existing
            result = findExistingPiecesInternal(req[0])
          elsif req[1] == :hash_piece
            result = hashPiece req[2]
            result = Result.new(req[0], result, req[2])
          elsif req[1] == :flush
            @pieceIO.flush
            result = true
          elsif req[1] == :stop
            result = true
          end
          result = Result.new(req[0], true, result) if ! result.is_a?(Result)
        rescue
          @logger.error "Exception when processing request: #{$!}"
          @logger.error "#{$!.backtrace.join("\n")}"
          result = Result.new(req[0], false, nil, $!)
        end
        @progressMutex.synchronize{ @requestProgress[req[0]] = 100 }

        @mutex.synchronize do
          @results.push result
        end
        @resultsSemaphore.signal

        @alertCallback.call() if @alertCallback
      rescue
        @logger.error "Unexpected exception in PieceManager worker thread: #{$!}"
        @logger.error "#{$!.backtrace.join("\n")}"
      end
    end
  end
end
waitUntilStarted() click to toggle source
# File lib/quartz_torrent/filemanager.rb, line 481
def waitUntilStarted
  if @state == :before_start
    @startedMutex.synchronize{ @startedCondition.wait(@startedMutex) if @state == :before_start }
  end
end