class OmfCommon::Comm::AMQP::FileReceiver
Receives a file broadcast on ‘topic’ and stores it in a local file. Optionally, it can report on progress through a provided block.
Constants
- WAIT_BEFORE_REQUESTING
- WAIT_BEFORE_REQUESTING_EVERYTHING
Public Class Methods
new(file_path, channel, topic, opts = {}, &block)
click to toggle source
@param topic Name of topic to receive file on @param file_path Path to a local file @param opts @param block Called on progress.
Calls superclass method
# File lib/omf_common/comm/amqp/amqp_file_transfer.rb, line 137 def initialize(file_path, channel, topic, opts = {}, &block) super() # init monitor mixin f = File.open(file_path, 'wb') @running = false @received_chunks = false @outstanding_chunks = Set.new @all_requested = false # set to true if we encountered a request for ALL (no 'to') @requested_chunks = Set.new @received_anything = false control_topic = "#{topic}_control" @control_exchange = channel.topic(control_topic, :auto_delete => true) channel.queue("", :exclusive => false) do |queue| queue.bind(@control_exchange) debug "Subscribing to control topic '#{control_topic}'" queue.subscribe do |headers, payload| hdrs = headers.headers debug "Incoming control message '#{hdrs}'" from = hdrs['request_from'] to = hdrs['request_to'] synchronize do if to (from .. to).each { |i| @requested_chunks << i} else debug "Observed request for everything" @all_requested = true @nothing_received = -1 * WAIT_BEFORE_REQUESTING # Throttle our own desire to request everything end end end @control_queue = queue end @nothing_received = WAIT_BEFORE_REQUESTING_EVERYTHING - 2 * WAIT_BEFORE_REQUESTING data_exchange = channel.topic(topic, :auto_delete => true) channel.queue("", :exclusive => false) do |queue| queue.bind(data_exchange) queue.subscribe do |headers, payload| synchronize do @received_chunks = true end hdrs = headers.headers chunk_id = hdrs['chunk_id'] chunk_offset = hdrs['chunk_offset'] chunk_count = hdrs['chunk_count'] unless chunk_id && chunk_offset && chunk_count debug "Received message with missing 'chunk_id' or 'chunk_offset' header information (#{hdrs})" end unless @received_anything @outstanding_chunks = chunk_count.times.to_set synchronize do @running = true @received_anything = true end end next unless @outstanding_chunks.include?(chunk_id) debug "Receiving chunk #{chunk_id}" f.seek(chunk_offset, IO::SEEK_SET) f.write(Base64.decode64(payload)) @outstanding_chunks.delete(chunk_id) received = chunk_count - @outstanding_chunks.size if block block.call({action: :progress, received: received, progress: 1.0 * received / chunk_count, total: chunk_count}) end if @outstanding_chunks.empty? # got everything f.close queue.unsubscribe @control_queue.unsubscribe if @control_queue @timer.cancel synchronize { @running = false } debug "Fully received #{file_path}" if block block.call({action: :done, size: hdrs['file_size'], path: file_path, mime_type: hdrs['mime_type'], received: chunk_count}) end end end end @timer = OmfCommon.eventloop.every(WAIT_BEFORE_REQUESTING) do from = to = nil synchronize do #puts "RUNNING: #{@running}" #break unless @running if @received_chunks @received_chunks = false @nothing_received = 0 break # ok there is still action else # nothing happened, so let's ask for something if (@nothing_received += WAIT_BEFORE_REQUESTING) >= WAIT_BEFORE_REQUESTING_EVERYTHING # something stuck here, let's re-ask for everything from = 0 @nothing_received = 0 else # ask_for is the set of chunks we are still missing but haven't asked for ask_for = @outstanding_chunks - @requested_chunks break if ask_for.empty? # ok, someone already asked, so better wait # Ask for a single span of consecutive chunks aa = ask_for.to_a.sort from = to = aa[0] aa.each.with_index do |e, i| break unless (from + i == e) to = e @requested_chunks << e end end end end if from headers = {request_from: from} headers[:request_to] = to if to # if nil, ask for everything @control_exchange.publish(nil, {headers: headers}) end end end