class Pantry::Communication::FileService::ReceiveFile

Chunk file receiving tool that implements the protocol as described here

http://zguide.zeromq.org/page:all#Transferring-Files

In short, this tool requests chunks in a pipeline flow, writing out the received chunks to the file system at the given path.

Attributes

chunk_size[RW]
pipeline_size[RW]

Public Class Methods

new(service, chunk_size: 250_000, pipeline_size: 10) click to toggle source
# File lib/pantry/communication/file_service/receive_file.rb, line 14
def initialize(service, chunk_size: 250_000, pipeline_size: 10)
  @service = service

  @chunk_size    = chunk_size
  @pipeline_size = pipeline_size

  @receiving = {}
end

Public Instance Methods

receive_file(file_size, checksum) click to toggle source
# File lib/pantry/communication/file_service/receive_file.rb, line 23
def receive_file(file_size, checksum)
  FileService::ReceivingFile.new(
    file_size, checksum, chunk_size, pipeline_size
  ).tap do |info|
    @receiving[info.file_uuid] = info
  end
end
receive_message(from_identity, message) click to toggle source
# File lib/pantry/communication/file_service/receive_file.rb, line 31
def receive_message(from_identity, message)
  if current_file = @receiving[message.to]
    current_file.sender_uuid = from_identity
  else
    return
  end

  case message.body[0]
  when "START"
    Pantry.logger.debug("[Receive File] Received START message #{message.inspect}")
    fill_the_pipeline(current_file, message)
  when "CHUNK"
    Pantry.logger.debug("[Receive File] Received CHUNK message #{message.metadata}")
    process_chunk(current_file, message)
  end
end

Protected Instance Methods

fill_the_pipeline(current_file, message) click to toggle source
# File lib/pantry/communication/file_service/receive_file.rb, line 50
def fill_the_pipeline(current_file, message)
  current_file.chunks_to_fetch do |offset, size|
    Pantry.logger.debug("[Receive File] Fetching #{offset} x #{size} for #{current_file.file_uuid}")
    send_message(current_file, "FETCH", offset, size)
  end
end
finalize_file(current_file) click to toggle source
# File lib/pantry/communication/file_service/receive_file.rb, line 71
def finalize_file(current_file)
  if current_file.valid?
    Pantry.logger.debug("[Receive File] File #{current_file.file_uuid} finished")
    send_message(current_file, "FINISH")
  else
    Pantry.logger.debug("[Receive File] File #{current_file.file_uuid} did not upload successfully")
    current_file.remove
    send_message(current_file, "ERROR", "Checksum did not match the uploaded file")
  end

  current_file.finished!
  @receiving.delete(current_file.file_uuid)
end
process_chunk(current_file, message) click to toggle source
# File lib/pantry/communication/file_service/receive_file.rb, line 57
def process_chunk(current_file, message)
  chunk_offset = message[:chunk_offset]
  chunk_size   = message[:chunk_size]
  chunk_data   = message.body[1]

  current_file.write_chunk(chunk_offset, chunk_size, chunk_data)

  if current_file.complete?
    finalize_file(current_file)
  else
    fill_the_pipeline(current_file, message)
  end
end
send_message(current_file, *body) click to toggle source
# File lib/pantry/communication/file_service/receive_file.rb, line 85
def send_message(current_file, *body)
  message    = Pantry::Message.new
  message.to = current_file.file_uuid

  body.each {|part| message << part }

  @service.send_message(current_file.sender_uuid, message)
end