class Nagare::ListenerPool

ListenerPool acts both as a registry of all listeners in the application and as the polling mechanism that retrieves messages from redis using consumer groups and deivers them to registered listenersone at a time.

Public Class Methods

listener_pool() click to toggle source

A registry of listeners in the format { stream: [listeners…]}

@return [Hash] listeners

# File lib/nagare/listener_pool.rb, line 14
def listener_pool
  listeners.each_with_object({}) do |listener, hash|
    stream = listener.stream_name

    unless hash.key?(listener.stream_name)
      logger.debug "Assigned stream #{stream} - listener #{listener.name}"
      create_and_subscribe_to_stream(stream)
      hash[stream] = []
    end
    hash[stream] << listener
    hash
  end
end
listeners() click to toggle source
# File lib/nagare/listener_pool.rb, line 28
def listeners
  ObjectSpace.each_object(Class).select do |klass|
    klass < Nagare::Listener
  end
end
poll() click to toggle source

Polls redis for new messages on all registered streams and delivers messages to the registered listeners. If the listener does not raise any errors, automatically ACKs the message to the redis consumer group.

# File lib/nagare/listener_pool.rb, line 53
def poll
  listener_pool.each do |stream, listeners|
    poll_stream(stream, listeners)
  end
end
start_listening() click to toggle source

Initiates polling of redis and distribution of messages to listeners in a thread

@return [Thread] the listening thread

# File lib/nagare/listener_pool.rb, line 39
def start_listening
  logger.info 'Starting Nagare thread'
  Thread.new do
    loop do
      poll
      sleep 1
    end
  end
end

Private Class Methods

claim_pending_messages(stream) click to toggle source
# File lib/nagare/listener_pool.rb, line 76
def claim_pending_messages(stream)
  return nil unless Nagare::RedisStreams.group_exists?(stream, group)
end
create_and_subscribe_to_stream(stream) click to toggle source
# File lib/nagare/listener_pool.rb, line 110
def create_and_subscribe_to_stream(stream)
  unless Nagare::RedisStreams.group_exists?(stream, group)
    logger.info("Creating listener group #{group} for stream #{stream}")
    Nagare::RedisStreams.create_group(stream, group)
    return true
  end
  false
end
deliver_message(stream, message, listeners) click to toggle source
# File lib/nagare/listener_pool.rb, line 80
def deliver_message(stream, message, listeners)
  listener_failed = false

  listeners.each do |listener|
    invoke_listener(stream, message, listener)
  rescue StandardError => e
    listener_failed = true
    Nagare::Config.error_handler.call(message, e)
  end

  return if listener_failed

  Nagare::RedisStreams.mark_processed(stream, group, message[0])
end
group() click to toggle source
# File lib/nagare/listener_pool.rb, line 106
def group
  Nagare::Config.group_name
end
invoke_listener(stream, message, listener) click to toggle source
# File lib/nagare/listener_pool.rb, line 95
def invoke_listener(stream, message, listener)
  # TODO: Transactions
  logger.info "Invoking listener #{listener.name} for stream #{stream} "\
    "with message #{message}"
  listener.new.handle_event(message[1])
end
logger() click to toggle source
# File lib/nagare/listener_pool.rb, line 102
def logger
  Nagare.logger
end
poll_stream(stream, listeners) click to toggle source
# File lib/nagare/listener_pool.rb, line 61
def poll_stream(stream, listeners)
  return unless Nagare::RedisStreams.group_exists?(stream, group)

  messages = Nagare::RedisStreams.claim_next_stuck_message(stream, group)

  if messages.nil? || messages.empty?
    messages = Nagare::RedisStreams.read_next_messages(stream, group)
  end
  return unless messages.any?

  messages.each do |message|
    deliver_message(stream, message, listeners)
  end
end