class TrRMIte::Consumer
Attributes
name[RW]
topic[RW]
Public Class Methods
default_name()
click to toggle source
# File lib/TrRMIte/consumer.rb, line 85 def self.default_name TrRMIte.inflect((self.name.split('::') - ['Manager']).last, :upcase) end
new(name, topic, registry_uri = nil)
click to toggle source
# File lib/TrRMIte/consumer.rb, line 6 def initialize(name, topic, registry_uri = nil) @name = name @topic = topic @version = TrRMIte::VERSION @sequence = @tsequence = nil @stop_processing = false @registry_uri = registry_uri end
process(io)
click to toggle source
# File lib/TrRMIte/consumer.rb, line 73 def self.process(io) consumer = self.new(self.default_name, nil) io.each_line do |event_json| event = Event.parse(event_json) consumer.process(event) end end
run(topic, registry_uri = nil)
click to toggle source
# File lib/TrRMIte/consumer.rb, line 81 def self.run(topic, registry_uri = nil) self.new(self.default_name, topic, registry_uri).run end
Public Instance Methods
default_name()
click to toggle source
# File lib/TrRMIte/consumer.rb, line 89 def default_name self.class.default_name end
fork(thread_name)
click to toggle source
# File lib/TrRMIte/consumer.rb, line 62 def fork(thread_name) Thread.new do Thread.current.name = thread_name self.run end end
run() { |event| ... }
click to toggle source
# File lib/TrRMIte/consumer.rb, line 15 def run &block DRb.start_service # ensure DRb is running for this consumer process @registry_uri ||= TrRMIte::DEFAULT_REGISTRY_URI @broker = DRbObject.new_with_uri(@registry_uri) @sequence, @tsequence = begin @broker.offsets(@name, @topic) rescue DRb::DRbConnError warn "Waiting 5 seconds for TrRMIte to become available" if $stderr.tty? sleep 5 # wait for TrRMIte to come online retry end restore_state(@sequence) events = Queue.new @broker.register(@name, @topic, events, @version) begin event = nil # to ensure we can reference it outside of the loop loop do break if @stop_processing # allow subclasses to terminate processing break if (event = events.pop).nil? # exit loop on `nil` sentinel raise TrRMIte::OutOfSequence unless event.sequence >= (@sequence + 1) raise TrRMIte::OutOfSequence unless event.tsequence == (@tsequence + 1) raise TrRMIte::WrongTopic unless event.topic == @topic if block_given? yield event else process(event) end # "internal" consumer checkpointing @sequence = event.sequence @tsequence = event.tsequence # "external" broker checkpointing @broker.checkpoint(@name, @sequence, @tsequence) persist_state(event) end rescue TrRMIte::OutOfSequence $stderr.print("OutOfSequence exception occurred for consumer '#{@name}'... exiting!") $stderr.print(" (expected sequence >= ##{@sequence + 1} but got ##{event.sequence})") $stderr.print(" (expected tsequence == ##{@tsequence + 1} but got ##{event.tsequence})") $stderr.puts rescue Interrupt # exit cleanly... ensure @broker.unregister(@name) rescue Errno::ECONNREFUSED finalize end end
terminate()
click to toggle source
# File lib/TrRMIte/consumer.rb, line 69 def terminate @broker.terminate(self.name) end
Private Instance Methods
finalize()
click to toggle source
# File lib/TrRMIte/consumer.rb, line 107 def finalize # NOOP (consumer subclasses _may_ choose to implement this) end
persist_state(event)
click to toggle source
# File lib/TrRMIte/consumer.rb, line 103 def persist_state(event) # NOOP (stateful consumers can/should implement this callback) end
process(event)
click to toggle source
# File lib/TrRMIte/consumer.rb, line 95 def process(event) raise 'SubclassResponsibility' end
restore_state(sequence)
click to toggle source
# File lib/TrRMIte/consumer.rb, line 99 def restore_state(sequence) # NOOP (stateful consumers can/should implement this callback) end