class TrRMIte::Broker
Public Class Methods
new(registry_uri, queue_uri, quiet = false)
click to toggle source
# File lib/TrRMIte/broker.rb, line 4 def initialize registry_uri, queue_uri, quiet = false @producer_queue = Queue.new @dispatch_queue = Queue.new @consumer_queues = Hash.new @registry_semaphore = Mutex.new @registry_uri = registry_uri @queue_uri = queue_uri @quiet = quiet end
Public Instance Methods
checkpoint(name, sequence, tsequence)
click to toggle source
# File lib/TrRMIte/broker.rb, line 25 def checkpoint(name, sequence, tsequence) unless @consumer_queues.key? name # consumer needs to be registered when checkpointing raise ArgumentError, 'consumer not registered' end @consumer_offsets[name] = [sequence, tsequence] log_state unless @quiet end
log_state()
click to toggle source
# File lib/TrRMIte/broker.rb, line 229 def log_state puts self.state end
offsets(name, topic)
click to toggle source
# File lib/TrRMIte/broker.rb, line 17 def offsets(name, topic) if @consumer_queues.key? name # offset needs to be requested before registration raise ArgumentError, 'consumer already registered' end @consumer_offsets[name] ||= [@sequence, @tsequences[topic]] end
publish(exception, name, event)
click to toggle source
# File lib/TrRMIte/broker.rb, line 233 def publish(exception, name, event) exception_message = "Consumer #{name} has disappeared... unregistering!" exception_event = Event.new( topic: 'Idiophone', # FIXME: make exception notifier listen to all topics... type: 'BrokerExceptionRaised', payload: { consumer_name: name, exception_type: exception.class.name, exception_message: exception_message, backtrace: exception.backtrace, timestamp: TrRMIte.wall_clock_time, }, causation_id: event.uuid, # the event we were trying to dispatch ) @producer_queue.push(exception_event.to_json) end
register(name, topic, queue, version = nil)
click to toggle source
# File lib/TrRMIte/broker.rb, line 34 def register(name, topic, queue, version = nil) unless !version || version == TrRMIte::VERSION # ensure bundled TrRMIte matches running TrRMIte raise ArgumentError, \ "bundled v#{version} doesn't match running v#{TrRMIte::VERSION}" end @registry_semaphore.synchronize do if @consumer_queues.key? name # ensure all consumer names are unique... raise ArgumentError, 'duplicate consumer name' end dispatch_queue = Queue.new thread = Thread.new do Thread.current.name = name[0..15] last_sequence = 0 if File.file?(@event_log) sequence, _tsequence = @consumer_offsets[name] File.open(@event_log, 'r').each_line.with_index do |event_json, line_number| next if (event_sequence = line_number + 1) <= sequence event = Event.parse(event_json) next unless event.topic.eql?(topic) last_sequence = event.sequence begin queue.push(event.freeze) rescue DRb::DRbConnError publish($!, name, event) unregister(name) end end end loop do event = dispatch_queue.pop next if event && event.sequence <= last_sequence begin queue.push(event) rescue DRb::DRbConnError publish($!, name, event) unregister(name) end end end @consumer_queues[name] = [topic, dispatch_queue, thread] log_state unless @quiet end end
start(event_log, offset_store)
click to toggle source
# File lib/TrRMIte/broker.rb, line 100 def start(event_log, offset_store) @offset_store = offset_store @consumer_offsets = if File.file?(@offset_store) JSON.parse(File.read(@offset_store)) else Hash.new end @incoming = DRb.start_service(@queue_uri, @producer_queue) # extract the broker offsets broker_offsets = @consumer_offsets.delete('TrRMIte') || {} # sequence counter (~ order) @sequence = broker_offsets.fetch('sequence', 0) # per topic sequence counters @tsequences = Hash.new(0).merge(broker_offsets.fetch('tsequences', {})) @event_log = event_log @log_name = File.basename(@event_log) # report status log_state unless @quiet # broadcast from dispatch queue to consumer queues Thread.new do Thread.current.name = 'TRMT/dispatcher' loop do event = @dispatch_queue.pop log_state unless @quiet @registry_semaphore.synchronize do @consumer_queues.each do |_name, (topic, queue, _thread)| next unless event.topic.eql?(topic) queue.push(event) end end end end # allow incoming consumer connections @registry = DRb.start_service(@registry_uri, self) # start processing incoming events File.open(@event_log, 'a+') do |event_log| begin loop do event = Event.parse(@producer_queue.pop) event.sequence = (@sequence += 1) event.tsequence = (@tsequences[event.topic] += 1) event.timestamp = TrRMIte.wall_clock_time event_log.syswrite(event.to_json + $RS) # persist the event in the log @dispatch_queue.push(event.freeze) # dispatch the event to consumers end rescue Interrupt # clean exit end end # shut things down: DRb stuff @incoming.stop_service @registry.stop_service ensure @consumer_offsets.merge!( 'TrRMIte': { sequence: @sequence, tsequences: @tsequences, } ) # keep track of things File.write(@offset_store, @consumer_offsets.to_json) end
state()
click to toggle source
# File lib/TrRMIte/broker.rb, line 177 def state @state ||= { name: TrRMIte::NAME, version: TrRMIte::VERSION, event_log: @log_name, } @state[:wall_clock_time] = TrRMIte.wall_clock_time(true) @state[:sequence] = @sequence @state[:tsequences] = @tsequences @state[:threads] = Thread.list.map(&:name).compact @state[:consumers] = @consumer_queues.each_with_object({}) { |(name, _), hash| topic = @consumer_queues[name].first offsets = @consumer_offsets[name] progress = if topic # for consumers subscribed to a single topic offsets.last * 100.00 / @tsequences[topic] else # for consumers subscribed to all topics offsets.first * 100.00 / @sequence end hash[name] = { topic: topic, offsets: offsets.join('/'), progress: format('%0.2f%%', progress), } } @state[:offsets] = @consumer_offsets.each_with_object({}) { |(name, offsets), hash| next if @consumer_queues.has_key? name hash[name] = offsets.join('/') } dispatched = @sequence queued = @producer_queue.length total = dispatched + queued progress = dispatched * 100.00 / total @state[:dispatched] = dispatched.commify @state[:queued] = queued.commify @state[:total] = total.commify @state[:progress] = format('%0.2f%%', progress) @state.to_json end
terminate(name)
click to toggle source
# File lib/TrRMIte/broker.rb, line 80 def terminate(name) unless @consumer_queues.key? name # consumer needs to be registered when trying to terminate raise ArgumentError, 'consumer not registered' end @consumer_queues[name][1] << nil end
unregister(name)
click to toggle source
# File lib/TrRMIte/broker.rb, line 88 def unregister(name) @registry_semaphore.synchronize do unless @consumer_queues.key? name # consumer needs to be registered when unregistering raise ArgumentError, 'consumer not registered' end _topic, _queue, thread = @consumer_queues.delete(name) thread.terminate log_state unless @quiet end end