class Pantry::Communication::FileService::ReceiveFile
Chunk file receiving tool that implements the protocol as described here
http://zguide.zeromq.org/page:all#Transferring-Files
In short, this tool requests chunks in a pipeline flow, writing out the received chunks to the file system at the given path.
Attributes
chunk_size[RW]
pipeline_size[RW]
Public Class Methods
new(service, chunk_size: 250_000, pipeline_size: 10)
click to toggle source
# File lib/pantry/communication/file_service/receive_file.rb, line 14 def initialize(service, chunk_size: 250_000, pipeline_size: 10) @service = service @chunk_size = chunk_size @pipeline_size = pipeline_size @receiving = {} end
Public Instance Methods
receive_file(file_size, checksum)
click to toggle source
# File lib/pantry/communication/file_service/receive_file.rb, line 23 def receive_file(file_size, checksum) FileService::ReceivingFile.new( file_size, checksum, chunk_size, pipeline_size ).tap do |info| @receiving[info.file_uuid] = info end end
receive_message(from_identity, message)
click to toggle source
# File lib/pantry/communication/file_service/receive_file.rb, line 31 def receive_message(from_identity, message) if current_file = @receiving[message.to] current_file.sender_uuid = from_identity else return end case message.body[0] when "START" Pantry.logger.debug("[Receive File] Received START message #{message.inspect}") fill_the_pipeline(current_file, message) when "CHUNK" Pantry.logger.debug("[Receive File] Received CHUNK message #{message.metadata}") process_chunk(current_file, message) end end
Protected Instance Methods
fill_the_pipeline(current_file, message)
click to toggle source
# File lib/pantry/communication/file_service/receive_file.rb, line 50 def fill_the_pipeline(current_file, message) current_file.chunks_to_fetch do |offset, size| Pantry.logger.debug("[Receive File] Fetching #{offset} x #{size} for #{current_file.file_uuid}") send_message(current_file, "FETCH", offset, size) end end
finalize_file(current_file)
click to toggle source
# File lib/pantry/communication/file_service/receive_file.rb, line 71 def finalize_file(current_file) if current_file.valid? Pantry.logger.debug("[Receive File] File #{current_file.file_uuid} finished") send_message(current_file, "FINISH") else Pantry.logger.debug("[Receive File] File #{current_file.file_uuid} did not upload successfully") current_file.remove send_message(current_file, "ERROR", "Checksum did not match the uploaded file") end current_file.finished! @receiving.delete(current_file.file_uuid) end
process_chunk(current_file, message)
click to toggle source
# File lib/pantry/communication/file_service/receive_file.rb, line 57 def process_chunk(current_file, message) chunk_offset = message[:chunk_offset] chunk_size = message[:chunk_size] chunk_data = message.body[1] current_file.write_chunk(chunk_offset, chunk_size, chunk_data) if current_file.complete? finalize_file(current_file) else fill_the_pipeline(current_file, message) end end
send_message(current_file, *body)
click to toggle source
# File lib/pantry/communication/file_service/receive_file.rb, line 85 def send_message(current_file, *body) message = Pantry::Message.new message.to = current_file.file_uuid body.each {|part| message << part } @service.send_message(current_file.sender_uuid, message) end