class Consumer
Attributes
event_handlers[R]
Public Class Methods
handle(*event_types, &handler)
click to toggle source
# File lib/kafka_lite/consumer.rb, line 3 def self.handle(*event_types, &handler) @event_handlers ||= begin handlers = {} def handlers.for(event) self[event.type.underscore.to_sym] end handlers end event_types.each do |event_type| @event_handlers[event_type] = handler end end
new(name, topic = nil)
click to toggle source
# File lib/kafka_lite/consumer.rb, line 21 def initialize(name, topic = nil) @name = name ; @topic = topic @sequence, @tsequence = Kafka::Lite::CONSUMER_REGISTRY.offsets(name, topic) restore_state(@sequence) @events = Queue.new Kafka::Lite::CONSUMER_REGISTRY.register(@name, @topic, @events) end
Public Instance Methods
run() { |event| ... }
click to toggle source
# File lib/kafka_lite/consumer.rb, line 29 def run &block begin event = nil # to ensure we can reference it outside of the loop loop do event = ConsumerEvent.parse(@events.pop) raise TrRMIte::OutOfSequence unless event.sequence >= (@sequence + 1) raise TrRMIte::OutOfSequence unless @topic.nil? || event.tsequence == (@tsequence + 1) raise Kafka::Lite::WrongTopic unless @topic.nil? || event.topic == @topic if block_given? yield event else process(event) end # "internal" consumer checkpointing @sequence = event.sequence @tsequence = event.tsequence # "external" Kafka checkpointing Kafka::Lite::CONSUMER_REGISTRY.checkpoint(@name, @sequence, @tsequence) persist_state(@sequence) end rescue TrRMIte::OutOfSequence $stderr.print("OutOfSequence exception occurred for consumer '#{@name}'... exiting!") $stderr.print(" (expected sequence >= ##{@sequence + 1} but got ##{event.sequence})") $stderr.print(" (expected tsequence == ##{@tsequence + 1} but got ##{event.tsequence})") $stderr.puts rescue Interrupt # exit cleanly... ensure Kafka::Lite::CONSUMER_REGISTRY.unregister(@name) rescue Errno::ECONNREFUSED finalize end end
Private Instance Methods
finalize()
click to toggle source
# File lib/kafka_lite/consumer.rb, line 82 def finalize # NOOP (consumer subclasses _may_ choose to implement this) end
persist_state(sequence)
click to toggle source
# File lib/kafka_lite/consumer.rb, line 78 def persist_state(sequence) # NOOP (stateful consumers can/should implement this callback) end
process(event)
click to toggle source
# File lib/kafka_lite/consumer.rb, line 64 def process(event) if handler = self.class.event_handlers.for(event) instance_exec(event, &handler) else # subclasses that don't register explicit, type-specific event handlers # can always process all topic events by overriding `process(event)`... # raise 'SubclassResponsibility' end end
restore_state(sequence)
click to toggle source
# File lib/kafka_lite/consumer.rb, line 74 def restore_state(sequence) # NOOP (stateful consumers can/should implement this callback) end