class TrRMIte::Broker

Public Class Methods

new(registry_uri, queue_uri, quiet = false) click to toggle source
# File lib/TrRMIte/broker.rb, line 4
def initialize registry_uri, queue_uri, quiet = false
  @producer_queue  = Queue.new
  @dispatch_queue  = Queue.new
  @consumer_queues = Hash.new

  @registry_semaphore = Mutex.new

  @registry_uri = registry_uri
  @queue_uri    = queue_uri

  @quiet = quiet
end

Public Instance Methods

checkpoint(name, sequence, tsequence) click to toggle source
# File lib/TrRMIte/broker.rb, line 25
def checkpoint(name, sequence, tsequence)
  unless @consumer_queues.key? name
    # consumer needs to be registered when checkpointing
    raise ArgumentError, 'consumer not registered'
  end
  @consumer_offsets[name] = [sequence, tsequence]
  log_state unless @quiet
end
log_state() click to toggle source
# File lib/TrRMIte/broker.rb, line 229
def log_state
  puts self.state
end
offsets(name, topic) click to toggle source
# File lib/TrRMIte/broker.rb, line 17
def offsets(name, topic)
  if @consumer_queues.key? name
    # offset needs to be requested before registration
    raise ArgumentError, 'consumer already registered'
  end
  @consumer_offsets[name] ||= [@sequence, @tsequences[topic]]
end
publish(exception, name, event) click to toggle source
# File lib/TrRMIte/broker.rb, line 233
def publish(exception, name, event)
  exception_message = "Consumer #{name} has disappeared... unregistering!"
  exception_event = Event.new(
    topic:   'Idiophone', # FIXME: make exception notifier listen to all topics...
    type:    'BrokerExceptionRaised',
    payload: {
      consumer_name:     name,
      exception_type:    exception.class.name,
      exception_message: exception_message,
      backtrace:         exception.backtrace,
      timestamp:         TrRMIte.wall_clock_time,
    },
    causation_id: event.uuid, # the event we were trying to dispatch
  )
  @producer_queue.push(exception_event.to_json)
end
register(name, topic, queue, version = nil) click to toggle source
# File lib/TrRMIte/broker.rb, line 34
def register(name, topic, queue, version = nil)
  unless !version || version == TrRMIte::VERSION
    # ensure bundled TrRMIte matches running TrRMIte
    raise ArgumentError, \
      "bundled v#{version} doesn't match running v#{TrRMIte::VERSION}"
  end
  @registry_semaphore.synchronize do
    if @consumer_queues.key? name
      # ensure all consumer names are unique...
      raise ArgumentError, 'duplicate consumer name'
    end
    dispatch_queue = Queue.new
    thread = Thread.new do
      Thread.current.name = name[0..15]
      last_sequence = 0
      if File.file?(@event_log)
        sequence, _tsequence = @consumer_offsets[name]
        File.open(@event_log, 'r').each_line.with_index do |event_json, line_number|
          next if (event_sequence = line_number + 1) <= sequence
          event = Event.parse(event_json)
          next unless event.topic.eql?(topic)
          last_sequence = event.sequence
          begin
            queue.push(event.freeze)
          rescue DRb::DRbConnError
            publish($!, name, event)
            unregister(name)
          end
        end
      end
      loop do
        event = dispatch_queue.pop
        next if event && event.sequence <= last_sequence
        begin
          queue.push(event)
        rescue DRb::DRbConnError
          publish($!, name, event)
          unregister(name)
        end
      end
    end
    @consumer_queues[name] = [topic, dispatch_queue, thread]
    log_state unless @quiet
  end
end
start(event_log, offset_store) click to toggle source
# File lib/TrRMIte/broker.rb, line 100
def start(event_log, offset_store)
  @offset_store     = offset_store
  @consumer_offsets = if File.file?(@offset_store)
                        JSON.parse(File.read(@offset_store))
                      else
                        Hash.new
                      end

  @incoming = DRb.start_service(@queue_uri, @producer_queue)

  # extract the broker offsets
  broker_offsets = @consumer_offsets.delete('TrRMIte') || {}

  # sequence counter (~ order)
  @sequence = broker_offsets.fetch('sequence', 0)

  # per topic sequence counters
  @tsequences = Hash.new(0).merge(broker_offsets.fetch('tsequences', {}))

  @event_log = event_log
  @log_name = File.basename(@event_log)

  # report status
  log_state unless @quiet

  # broadcast from dispatch queue to consumer queues
  Thread.new do
    Thread.current.name = 'TRMT/dispatcher'
    loop do
      event = @dispatch_queue.pop
      log_state unless @quiet
      @registry_semaphore.synchronize do
        @consumer_queues.each do |_name, (topic, queue, _thread)|
          next unless event.topic.eql?(topic)
          queue.push(event)
        end
      end
    end
  end

  # allow incoming consumer connections
  @registry = DRb.start_service(@registry_uri, self)

  # start processing incoming events
  File.open(@event_log, 'a+') do |event_log|
    begin
      loop do
        event = Event.parse(@producer_queue.pop)
        event.sequence  = (@sequence += 1)
        event.tsequence = (@tsequences[event.topic] += 1)
        event.timestamp = TrRMIte.wall_clock_time
        event_log.syswrite(event.to_json + $RS) # persist the event in the log
        @dispatch_queue.push(event.freeze)      # dispatch the event to consumers
      end
    rescue Interrupt
      # clean exit
    end
  end

  # shut things down: DRb stuff
  @incoming.stop_service
  @registry.stop_service

ensure

  @consumer_offsets.merge!(
    'TrRMIte': {
      sequence:   @sequence,
      tsequences: @tsequences,
    }
  )

  # keep track of things
  File.write(@offset_store, @consumer_offsets.to_json)

end
state() click to toggle source
# File lib/TrRMIte/broker.rb, line 177
def state
  @state ||= {
    name:      TrRMIte::NAME,
    version:   TrRMIte::VERSION,
    event_log: @log_name,
  }

  @state[:wall_clock_time] = TrRMIte.wall_clock_time(true)

  @state[:sequence]   = @sequence
  @state[:tsequences] = @tsequences

  @state[:threads] = Thread.list.map(&:name).compact

  @state[:consumers] =
    @consumer_queues.each_with_object({}) { |(name, _), hash|
      topic    = @consumer_queues[name].first
      offsets  = @consumer_offsets[name]
      progress = if topic
                   # for consumers subscribed to a single topic
                   offsets.last * 100.00 / @tsequences[topic]
                 else
                   # for consumers subscribed to all topics
                   offsets.first * 100.00 / @sequence
                 end

      hash[name] = {
        topic:    topic,
        offsets:  offsets.join('/'),
        progress: format('%0.2f%%', progress),
      }
    }

  @state[:offsets] =
    @consumer_offsets.each_with_object({}) { |(name, offsets), hash|
      next if @consumer_queues.has_key? name
      hash[name] = offsets.join('/')
    }

  dispatched = @sequence
  queued     = @producer_queue.length
  total      = dispatched + queued
  progress   = dispatched * 100.00 / total

  @state[:dispatched] = dispatched.commify
  @state[:queued]     = queued.commify
  @state[:total]      = total.commify
  @state[:progress]   = format('%0.2f%%', progress)

  @state.to_json
end
terminate(name) click to toggle source
# File lib/TrRMIte/broker.rb, line 80
def terminate(name)
  unless @consumer_queues.key? name
    # consumer needs to be registered when trying to terminate
    raise ArgumentError, 'consumer not registered'
  end
  @consumer_queues[name][1] << nil
end
unregister(name) click to toggle source
# File lib/TrRMIte/broker.rb, line 88
def unregister(name)
  @registry_semaphore.synchronize do
    unless @consumer_queues.key? name
      # consumer needs to be registered when unregistering
      raise ArgumentError, 'consumer not registered'
    end
    _topic, _queue, thread = @consumer_queues.delete(name)
    thread.terminate
    log_state unless @quiet
  end
end