class Consumer

Attributes

event_handlers[R]

Public Class Methods

handle(*event_types, &handler) click to toggle source
# File lib/kafka_lite/consumer.rb, line 3
def self.handle(*event_types, &handler)
  @event_handlers ||= begin
                        handlers = {}
                        def handlers.for(event)
                          self[event.type.underscore.to_sym]
                        end
                        handlers
                      end

  event_types.each do |event_type|
    @event_handlers[event_type] = handler
  end
end
new(name, topic = nil) click to toggle source
# File lib/kafka_lite/consumer.rb, line 21
def initialize(name, topic = nil)
  @name = name ; @topic = topic
  @sequence, @tsequence = Kafka::Lite::CONSUMER_REGISTRY.offsets(name, topic)
  restore_state(@sequence)
  @events = Queue.new
  Kafka::Lite::CONSUMER_REGISTRY.register(@name, @topic, @events)
end

Public Instance Methods

run() { |event| ... } click to toggle source
# File lib/kafka_lite/consumer.rb, line 29
def run &block
  begin
    event = nil # to ensure we can reference it outside of the loop
    loop do
      event = ConsumerEvent.parse(@events.pop)
      raise TrRMIte::OutOfSequence unless event.sequence >= (@sequence + 1)
      raise TrRMIte::OutOfSequence unless @topic.nil? || event.tsequence == (@tsequence + 1)
      raise Kafka::Lite::WrongTopic unless @topic.nil? || event.topic == @topic
      if block_given?
        yield event
      else
        process(event)
      end
      # "internal" consumer checkpointing
      @sequence = event.sequence
      @tsequence = event.tsequence
      # "external" Kafka checkpointing
      Kafka::Lite::CONSUMER_REGISTRY.checkpoint(@name, @sequence, @tsequence)
      persist_state(@sequence)
    end
  rescue TrRMIte::OutOfSequence
    $stderr.print("OutOfSequence exception occurred for consumer '#{@name}'... exiting!")
    $stderr.print(" (expected sequence >= ##{@sequence + 1} but got ##{event.sequence})")
    $stderr.print(" (expected tsequence == ##{@tsequence + 1} but got ##{event.tsequence})")
    $stderr.puts
  rescue Interrupt
    # exit cleanly...
  ensure
    Kafka::Lite::CONSUMER_REGISTRY.unregister(@name) rescue Errno::ECONNREFUSED
    finalize
  end
end

Private Instance Methods

finalize() click to toggle source
# File lib/kafka_lite/consumer.rb, line 82
def finalize
  # NOOP (consumer subclasses _may_ choose to implement this)
end
persist_state(sequence) click to toggle source
# File lib/kafka_lite/consumer.rb, line 78
def persist_state(sequence)
  # NOOP (stateful consumers can/should implement this callback)
end
process(event) click to toggle source
# File lib/kafka_lite/consumer.rb, line 64
def process(event)
  if handler = self.class.event_handlers.for(event)
    instance_exec(event, &handler)
  else
    # subclasses that don't register explicit, type-specific event handlers
    # can always process all topic events by overriding `process(event)`...
    # raise 'SubclassResponsibility'
  end
end
restore_state(sequence) click to toggle source
# File lib/kafka_lite/consumer.rb, line 74
def restore_state(sequence)
  # NOOP (stateful consumers can/should implement this callback)
end