class Nagare::ListenerPool
ListenerPool acts both as a registry of all listeners in the application and as the polling mechanism that retrieves messages from redis using consumer groups and deivers them to registered listenersone at a time.
Public Class Methods
listener_pool()
click to toggle source
A registry of listeners in the format { stream: [listeners…]}
@return [Hash] listeners
# File lib/nagare/listener_pool.rb, line 14 def listener_pool listeners.each_with_object({}) do |listener, hash| stream = listener.stream_name unless hash.key?(listener.stream_name) logger.debug "Assigned stream #{stream} - listener #{listener.name}" create_and_subscribe_to_stream(stream) hash[stream] = [] end hash[stream] << listener hash end end
listeners()
click to toggle source
# File lib/nagare/listener_pool.rb, line 28 def listeners ObjectSpace.each_object(Class).select do |klass| klass < Nagare::Listener end end
poll()
click to toggle source
Polls redis for new messages on all registered streams and delivers messages to the registered listeners. If the listener does not raise any errors, automatically ACKs the message to the redis consumer group.
# File lib/nagare/listener_pool.rb, line 53 def poll listener_pool.each do |stream, listeners| poll_stream(stream, listeners) end end
start_listening()
click to toggle source
Initiates polling of redis and distribution of messages to listeners in a thread
@return [Thread] the listening thread
# File lib/nagare/listener_pool.rb, line 39 def start_listening logger.info 'Starting Nagare thread' Thread.new do loop do poll sleep 1 end end end
Private Class Methods
claim_pending_messages(stream)
click to toggle source
# File lib/nagare/listener_pool.rb, line 76 def claim_pending_messages(stream) return nil unless Nagare::RedisStreams.group_exists?(stream, group) end
create_and_subscribe_to_stream(stream)
click to toggle source
# File lib/nagare/listener_pool.rb, line 110 def create_and_subscribe_to_stream(stream) unless Nagare::RedisStreams.group_exists?(stream, group) logger.info("Creating listener group #{group} for stream #{stream}") Nagare::RedisStreams.create_group(stream, group) return true end false end
deliver_message(stream, message, listeners)
click to toggle source
# File lib/nagare/listener_pool.rb, line 80 def deliver_message(stream, message, listeners) listener_failed = false listeners.each do |listener| invoke_listener(stream, message, listener) rescue StandardError => e listener_failed = true Nagare::Config.error_handler.call(message, e) end return if listener_failed Nagare::RedisStreams.mark_processed(stream, group, message[0]) end
group()
click to toggle source
# File lib/nagare/listener_pool.rb, line 106 def group Nagare::Config.group_name end
invoke_listener(stream, message, listener)
click to toggle source
# File lib/nagare/listener_pool.rb, line 95 def invoke_listener(stream, message, listener) # TODO: Transactions logger.info "Invoking listener #{listener.name} for stream #{stream} "\ "with message #{message}" listener.new.handle_event(message[1]) end
logger()
click to toggle source
# File lib/nagare/listener_pool.rb, line 102 def logger Nagare.logger end
poll_stream(stream, listeners)
click to toggle source
# File lib/nagare/listener_pool.rb, line 61 def poll_stream(stream, listeners) return unless Nagare::RedisStreams.group_exists?(stream, group) messages = Nagare::RedisStreams.claim_next_stuck_message(stream, group) if messages.nil? || messages.empty? messages = Nagare::RedisStreams.read_next_messages(stream, group) end return unless messages.any? messages.each do |message| deliver_message(stream, message, listeners) end end