class OmfCommon::Comm::AMQP::FileReceiver

Receives a file broadcast on ‘topic’ and stores it in a local file. Optionally, it can report on progress through a provided block.

Constants

WAIT_BEFORE_REQUESTING
WAIT_BEFORE_REQUESTING_EVERYTHING

Public Class Methods

new(file_path, channel, topic, opts = {}, &block) click to toggle source

@param topic Name of topic to receive file on @param file_path Path to a local file @param opts @param block Called on progress.

Calls superclass method
# File lib/omf_common/comm/amqp/amqp_file_transfer.rb, line 137
def initialize(file_path, channel, topic, opts = {}, &block)
  super() # init monitor mixin
  f = File.open(file_path, 'wb')
  @running = false
  @received_chunks = false
  @outstanding_chunks = Set.new
  @all_requested = false # set to true if we encountered a request for ALL (no 'to')
  @requested_chunks = Set.new
  @received_anything = false
  
  control_topic = "#{topic}_control"
  @control_exchange = channel.topic(control_topic, :auto_delete => true)
  channel.queue("", :exclusive => false) do |queue|
    queue.bind(@control_exchange)
    debug "Subscribing to control topic '#{control_topic}'"
    queue.subscribe do |headers, payload|
      hdrs = headers.headers
      debug "Incoming control message '#{hdrs}'"
      from = hdrs['request_from']
      to = hdrs['request_to']
      synchronize do
        if to
          (from .. to).each { |i| @requested_chunks << i}
        else
          debug "Observed request for everything"
          @all_requested = true
          @nothing_received = -1 * WAIT_BEFORE_REQUESTING # Throttle our own desire to request everything
        end
      end
    end
    @control_queue = queue
  end

  @nothing_received = WAIT_BEFORE_REQUESTING_EVERYTHING - 2 * WAIT_BEFORE_REQUESTING

  data_exchange = channel.topic(topic, :auto_delete => true)
  channel.queue("", :exclusive => false) do |queue|
    queue.bind(data_exchange)
    queue.subscribe do |headers, payload|
      synchronize do
        @received_chunks = true
      end
      hdrs = headers.headers
      chunk_id = hdrs['chunk_id']
      chunk_offset = hdrs['chunk_offset']
      chunk_count = hdrs['chunk_count']
      unless chunk_id && chunk_offset && chunk_count
        debug "Received message with missing 'chunk_id' or 'chunk_offset' header information (#{hdrs})"
      end
      unless @received_anything
        @outstanding_chunks = chunk_count.times.to_set
        synchronize do 
          @running = true 
          @received_anything = true
        end
      end
      next unless @outstanding_chunks.include?(chunk_id)

      debug "Receiving chunk #{chunk_id}"
      f.seek(chunk_offset, IO::SEEK_SET)
      f.write(Base64.decode64(payload))
      @outstanding_chunks.delete(chunk_id)
      received = chunk_count - @outstanding_chunks.size
      if block
        block.call({action: :progress, received: received, progress: 1.0 * received / chunk_count, total: chunk_count})
      end
      
      if @outstanding_chunks.empty?
        # got everything
        f.close
        queue.unsubscribe
        @control_queue.unsubscribe if @control_queue
        @timer.cancel
        synchronize { @running = false }
        debug "Fully received #{file_path}"
        if block
          block.call({action: :done, size: hdrs['file_size'], 
            path: file_path, mime_type: hdrs['mime_type'], 
            received: chunk_count})
        end           
      end
    end
  end
  
  @timer = OmfCommon.eventloop.every(WAIT_BEFORE_REQUESTING) do
    from = to = nil
    synchronize do
      #puts "RUNNING: #{@running}"
      #break unless @running
      if @received_chunks
        @received_chunks = false
        @nothing_received = 0
        break # ok there is still action
      else
        # nothing happened, so let's ask for something
        if (@nothing_received += WAIT_BEFORE_REQUESTING) >= WAIT_BEFORE_REQUESTING_EVERYTHING
          # something stuck here, let's re-ask for everything
          from = 0
          @nothing_received = 0
        else
          # ask_for is the set of chunks we are still missing but haven't asked for
          ask_for = @outstanding_chunks - @requested_chunks
          break if ask_for.empty? # ok, someone already asked, so better wait
          
          # Ask for a single span of consecutive chunks
          aa = ask_for.to_a.sort
          from = to = aa[0]
          aa.each.with_index do |e, i| 
            break unless (from + i == e) 
            to = e
            @requested_chunks << e
          end
        end
        
      end
    end
    if from
      headers = {request_from: from}
      headers[:request_to] = to if to  # if nil, ask for everything
      @control_exchange.publish(nil, {headers: headers})
    end
  end 
  
end