class OmfCommon::Comm::AMQP::FileBroadcaster

Distributes a local file to a set of receivers subscribed to the same topic but may join a various stages.

Constants

DEF_CHUNK_SIZE
DEF_IDLE_TIME

Public Class Methods

new(file_path, channel, topic, opts = {}, &block) click to toggle source

@param topic Name of topic to send file to @param file_path Path to a local file @param opts

:chunk_size Max size of data chunk to send
:idle_time Min. time in sec to close down broadcaster after having sent last chunk
Calls superclass method
# File lib/omf_common/comm/amqp/amqp_file_transfer.rb, line 27
def initialize(file_path, channel, topic, opts = {}, &block)
  super() # init monitor mixin
  @block = block
  unless File.readable?(file_path)
    raise "Can't read file '#{file_path}'"
  end
  @mime_type = `file -b --mime-type #{file_path}`.strip
  unless $?.success?
    raise "Can't determine file's mime-type (#{$?})" 
  end
  @file_path = file_path 
  f = File.open(file_path, 'rb')
  chunk_size = opts[:chunk_size] || DEF_CHUNK_SIZE
  chunk_count = (f.size / chunk_size) + 1
  
  @outstanding_chunks = Set.new
  @running = true
  @semaphore = new_cond()
  idle_time = opts[:idle_time] || DEF_IDLE_TIME
  
  #chunk_count.times.each {|i| @outstanding_chunks << i}

  exchange = channel.topic(topic, :auto_delete => true)
  OmfCommon.eventloop.defer do
    _send(f, chunk_size, chunk_count, exchange, idle_time)
  end
  
  control_topic = "#{topic}_control"
  control_exchange = channel.topic(control_topic, :auto_delete => true)
  channel.queue("", :exclusive => false) do |queue|
    queue.bind(control_exchange)
    debug "Subscribing to control channel '#{control_topic}'"
    queue.subscribe do |headers, payload|
      hdrs = headers.headers
      debug "Incoming control message '#{hdrs}'"
      from = hdrs['request_from']
      from = 0 if from < 0
      to = hdrs['request_to']
      to = chunk_count - 1 if !to || to >= chunk_count
      synchronize do
        (from .. to).each { |i| @outstanding_chunks << i}
        @semaphore.signal
      end
    end
    @control_queue = queue
  end
end

Public Instance Methods

_send(f, chunk_size, chunk_count, exchange, idle_time) click to toggle source
# File lib/omf_common/comm/amqp/amqp_file_transfer.rb, line 75
def _send(f, chunk_size, chunk_count, exchange, idle_time)
  chunks_to_send = nil
  @sent_chunk = false
  _wait_for_closedown(idle_time)
  loop do
    synchronize do
      @semaphore.wait_while { @outstanding_chunks.empty? && @running }
      return unless @running # done!
      chunks_to_send = @outstanding_chunks.to_a
    end
    
    chunks_to_send.each do |chunk_id|
      #sleep 3
      synchronize do
        @outstanding_chunks.delete(chunk_id)
        @sent_chunk = true
      end
      offset = chunk_id * chunk_size
      f.seek(offset, IO::SEEK_SET)
      chunk = f.read(chunk_size)
      payload = Base64.encode64(chunk)
      headers = {chunk_id: chunk_id, chunk_count: chunk_count, chunk_offset: offset, 
                  chunk_size: payload.size, 
                  path: f.path, file_size: f.size, mime_type: @mime_type}
      debug "Sending chunk #{chunk_id}"
      exchange.publish(payload, {headers: headers})
    end
  end
end
_wait_for_closedown(idle_time) click to toggle source
# File lib/omf_common/comm/amqp/amqp_file_transfer.rb, line 105
def _wait_for_closedown(idle_time)
  OmfCommon.eventloop.after(idle_time) do
    done = false
    synchronize do
      done = !@sent_chunk && @outstanding_chunks.empty?
      @sent_chunk = false
    end
    if done
      @control_queue.unsubscribe if @control_queue
      @block.call({action: :done}) if @block
    else
      # there was activity in last interval, wait a bit longer
      _wait_for_closedown(idle_time)
    end
  end
end