class OmfCommon::Comm::AMQP::FileBroadcaster
Distributes a local file to a set of receivers subscribed to the same topic but may join a various stages.
Constants
- DEF_CHUNK_SIZE
- DEF_IDLE_TIME
Public Class Methods
new(file_path, channel, topic, opts = {}, &block)
click to toggle source
@param topic Name of topic to send file to @param file_path Path to a local file @param opts
:chunk_size Max size of data chunk to send :idle_time Min. time in sec to close down broadcaster after having sent last chunk
Calls superclass method
# File lib/omf_common/comm/amqp/amqp_file_transfer.rb, line 27 def initialize(file_path, channel, topic, opts = {}, &block) super() # init monitor mixin @block = block unless File.readable?(file_path) raise "Can't read file '#{file_path}'" end @mime_type = `file -b --mime-type #{file_path}`.strip unless $?.success? raise "Can't determine file's mime-type (#{$?})" end @file_path = file_path f = File.open(file_path, 'rb') chunk_size = opts[:chunk_size] || DEF_CHUNK_SIZE chunk_count = (f.size / chunk_size) + 1 @outstanding_chunks = Set.new @running = true @semaphore = new_cond() idle_time = opts[:idle_time] || DEF_IDLE_TIME #chunk_count.times.each {|i| @outstanding_chunks << i} exchange = channel.topic(topic, :auto_delete => true) OmfCommon.eventloop.defer do _send(f, chunk_size, chunk_count, exchange, idle_time) end control_topic = "#{topic}_control" control_exchange = channel.topic(control_topic, :auto_delete => true) channel.queue("", :exclusive => false) do |queue| queue.bind(control_exchange) debug "Subscribing to control channel '#{control_topic}'" queue.subscribe do |headers, payload| hdrs = headers.headers debug "Incoming control message '#{hdrs}'" from = hdrs['request_from'] from = 0 if from < 0 to = hdrs['request_to'] to = chunk_count - 1 if !to || to >= chunk_count synchronize do (from .. to).each { |i| @outstanding_chunks << i} @semaphore.signal end end @control_queue = queue end end
Public Instance Methods
_send(f, chunk_size, chunk_count, exchange, idle_time)
click to toggle source
# File lib/omf_common/comm/amqp/amqp_file_transfer.rb, line 75 def _send(f, chunk_size, chunk_count, exchange, idle_time) chunks_to_send = nil @sent_chunk = false _wait_for_closedown(idle_time) loop do synchronize do @semaphore.wait_while { @outstanding_chunks.empty? && @running } return unless @running # done! chunks_to_send = @outstanding_chunks.to_a end chunks_to_send.each do |chunk_id| #sleep 3 synchronize do @outstanding_chunks.delete(chunk_id) @sent_chunk = true end offset = chunk_id * chunk_size f.seek(offset, IO::SEEK_SET) chunk = f.read(chunk_size) payload = Base64.encode64(chunk) headers = {chunk_id: chunk_id, chunk_count: chunk_count, chunk_offset: offset, chunk_size: payload.size, path: f.path, file_size: f.size, mime_type: @mime_type} debug "Sending chunk #{chunk_id}" exchange.publish(payload, {headers: headers}) end end end
_wait_for_closedown(idle_time)
click to toggle source
# File lib/omf_common/comm/amqp/amqp_file_transfer.rb, line 105 def _wait_for_closedown(idle_time) OmfCommon.eventloop.after(idle_time) do done = false synchronize do done = !@sent_chunk && @outstanding_chunks.empty? @sent_chunk = false end if done @control_queue.unsubscribe if @control_queue @block.call({action: :done}) if @block else # there was activity in last interval, wait a bit longer _wait_for_closedown(idle_time) end end end