class TrRMIte::Consumer

Attributes

name[RW]
topic[RW]

Public Class Methods

default_name() click to toggle source
# File lib/TrRMIte/consumer.rb, line 85
def self.default_name
  TrRMIte.inflect((self.name.split('::') - ['Manager']).last, :upcase)
end
new(name, topic, registry_uri = nil) click to toggle source
# File lib/TrRMIte/consumer.rb, line 6
def initialize(name, topic, registry_uri = nil)
  @name = name
  @topic = topic
  @version = TrRMIte::VERSION
  @sequence = @tsequence = nil
  @stop_processing = false
  @registry_uri = registry_uri
end
process(io) click to toggle source
# File lib/TrRMIte/consumer.rb, line 73
def self.process(io)
  consumer = self.new(self.default_name, nil)
  io.each_line do |event_json|
    event = Event.parse(event_json)
    consumer.process(event)
  end
end
run(topic, registry_uri = nil) click to toggle source
# File lib/TrRMIte/consumer.rb, line 81
def self.run(topic, registry_uri = nil)
  self.new(self.default_name, topic, registry_uri).run
end

Public Instance Methods

default_name() click to toggle source
# File lib/TrRMIte/consumer.rb, line 89
def default_name
  self.class.default_name
end
fork(thread_name) click to toggle source
# File lib/TrRMIte/consumer.rb, line 62
def fork(thread_name)
  Thread.new do
    Thread.current.name = thread_name
    self.run
  end
end
run() { |event| ... } click to toggle source
# File lib/TrRMIte/consumer.rb, line 15
def run &block
  DRb.start_service # ensure DRb is running for this consumer process
  @registry_uri ||= TrRMIte::DEFAULT_REGISTRY_URI
  @broker = DRbObject.new_with_uri(@registry_uri)
  @sequence, @tsequence = begin
    @broker.offsets(@name, @topic)
  rescue DRb::DRbConnError
    warn "Waiting 5 seconds for TrRMIte to become available" if $stderr.tty?
    sleep 5 # wait for TrRMIte to come online
    retry
  end
  restore_state(@sequence)
  events = Queue.new
  @broker.register(@name, @topic, events, @version)
  begin
    event = nil # to ensure we can reference it outside of the loop
    loop do
      break if @stop_processing # allow subclasses to terminate processing
      break if (event = events.pop).nil? # exit loop on `nil` sentinel
      raise TrRMIte::OutOfSequence unless event.sequence >= (@sequence + 1)
      raise TrRMIte::OutOfSequence unless event.tsequence == (@tsequence + 1)
      raise TrRMIte::WrongTopic unless event.topic == @topic
      if block_given?
        yield event
      else
        process(event)
      end
      # "internal" consumer checkpointing
      @sequence = event.sequence
      @tsequence = event.tsequence
      # "external" broker checkpointing
      @broker.checkpoint(@name, @sequence, @tsequence)
      persist_state(event)
    end
  rescue TrRMIte::OutOfSequence
    $stderr.print("OutOfSequence exception occurred for consumer '#{@name}'... exiting!")
    $stderr.print(" (expected sequence >= ##{@sequence + 1} but got ##{event.sequence})")
    $stderr.print(" (expected tsequence == ##{@tsequence + 1} but got ##{event.tsequence})")
    $stderr.puts
  rescue Interrupt
    # exit cleanly...
  ensure
    @broker.unregister(@name) rescue Errno::ECONNREFUSED
    finalize
  end
end
terminate() click to toggle source
# File lib/TrRMIte/consumer.rb, line 69
def terminate
  @broker.terminate(self.name)
end

Private Instance Methods

finalize() click to toggle source
# File lib/TrRMIte/consumer.rb, line 107
def finalize
  # NOOP (consumer subclasses _may_ choose to implement this)
end
persist_state(event) click to toggle source
# File lib/TrRMIte/consumer.rb, line 103
def persist_state(event)
  # NOOP (stateful consumers can/should implement this callback)
end
process(event) click to toggle source
# File lib/TrRMIte/consumer.rb, line 95
def process(event)
  raise 'SubclassResponsibility'
end
restore_state(sequence) click to toggle source
# File lib/TrRMIte/consumer.rb, line 99
def restore_state(sequence)
  # NOOP (stateful consumers can/should implement this callback)
end