class Nagare::RedisStreams
Abstraction layer for dealing with the basic RedisStreams X… commands for interacting with streams, groups and consumers.
This module may be mocked during testing if necessary, or replaced with an implementation using other technology, like kafka, AciveMQ or others.
Important: Groups are always assumed to be named `<stream>-<group>`.
Consumers are always created using the hostname + thread id
rubocop:disable Metrics/ClassLength
Public Class Methods
Claums the next message of the consumer group that is stuck (pending and past min_idle_time since being picked up)
@param stream_prefix [String] name of the stream @param group [String] name of the consumer group
@return [Array] array containing the 1 message or empty rubocop:disable Metrics/MethodLength, Metrics/AbcSize
# File lib/nagare/redis_streams.rb, line 92 def claim_next_stuck_message(stream_prefix, group) stream = stream_name(stream_prefix) result = connection.xautoclaim(stream, "#{stream}-#{group}", "#{hostname}-#{thread_id}", Nagare::Config.min_idle_time, '0-0', count: 1) # Move message to DLQ if retried too much and get next one if result['entries'].any? message_id = result['entries'].first.first if retry_count(stream_prefix, group, message_id) > Nagare::Config.max_retries move_to_dlq(stream_prefix, group, result['entries'].first) return claim_next_stuck_message(stream_prefix, group) end end result['entries'] || [] end
Returns a connection to redis. Currently not pooled
@return [Redis] a connection to redis from the redis-rb gem
# File lib/nagare/redis_streams.rb, line 22 def connection # FIXME: Connection pool should come in handy @connection ||= Redis.new(url: Nagare::Config.redis_url) end
Creates a group in redis for the stream using xgroup
@param stream [String] name of the stream @param group [String] name of the group
@return [String] OK
# File lib/nagare/redis_streams.rb, line 52 def create_group(stream, group) stream = stream_name(stream) connection.xgroup(:create, stream, "#{stream}-#{group}", '$', mkstream: true) end
Deletes a group in redis for the stream using xgroup
@param stream [String] name of the stream @param group [String] name of the group
@return [String] OK
# File lib/nagare/redis_streams.rb, line 65 def delete_group(stream, group) stream = stream_name(stream) connection.xgroup(:destroy, stream, "#{stream}-#{group}") end
Determines wether a group already exists in redis or not using xinfo
@param stream [String] name of the stream @param group [String] name of the group
@return [Boolean] true if the group exists, otherwise false
# File lib/nagare/redis_streams.rb, line 34 def group_exists?(stream, group) stream = stream_name(stream) info = connection.xinfo(:groups, stream.to_s) info.any? { |line| line['name'] == "#{stream}-#{group}" } rescue Redis::CommandError => e logger.info "Seems the group doesn't exist" logger.info e.message logger.info e.backtrace.join("\n") false end
Acknowledges a message as processed in the consumer group
@param stream [String] name of the stream @param group [String] name of the group @param message_id [String] the id of the message
@return [Integer] number of messages processed
# File lib/nagare/redis_streams.rb, line 158 def mark_processed(stream, group, message_id) stream = stream_name(stream) group = "#{stream}-#{group}" count = connection.xack(stream, group, message_id) return if count == 1 raise "Message could not be ACKed in Redis: #{stream} #{group} "\ "#{message_id}. Return value: #{count}" end
Moves a message to the dead letter queue stream
# File lib/nagare/redis_streams.rb, line 132 def move_to_dlq(stream, group, message) Nagare.logger.warn "Moving message to DLQ #{message} \ from stream #{stream}" publish(Nagare::Config.dlq_stream, stream, message.to_json) mark_processed(stream, group, message.first) end
Query pending messages for a consumer group
@return [Hash] {
"size"=>0,
"min_entry_id"=>nil,
"max_entry_id"=>nil,
"consumers"=>{}
}
# File lib/nagare/redis_streams.rb, line 208 def pending(stream, group) stream = stream_name(stream) group = "#{stream}-#{group}" connection.xpending(stream, group) end
Publishes an eevent to the specified stream
@param stream [String] name of the stream @param event_name [String] key of the event @param data [String] data for the event, usually in JSON format.
@return [String] message id
# File lib/nagare/redis_streams.rb, line 78 def publish(stream, event_name, data) stream = stream_name(stream) connection.xadd(stream, { "#{event_name}": data }) end
Reads the next messages from the consumer group in redis.
@returns [Array] Array of tuples with [message-id, data_as_hash]
# File lib/nagare/redis_streams.rb, line 143 def read_next_messages(stream, group) stream = stream_name(stream) result = connection.xreadgroup("#{stream}-#{group}", "#{hostname}-#{thread_id}", stream, '>') result[stream] || [] end
Reads the last message on the stream without using a consumer group
@param stream [String] name of the stream
@return [Array] tuple of [message-id, event]
# File lib/nagare/redis_streams.rb, line 175 def read_one(stream) stream = stream_name(stream) result = connection.xread(stream, [0], count: 1) result[stream]&.first end
Uses XPENDING to verify the number of times the message was delivered
# File lib/nagare/redis_streams.rb, line 118 def retry_count(stream, group, message_id) stream = stream_name(stream) result = connection.xpending(stream, "#{stream}-#{group}", message_id, message_id, 1) return 0 unless result.any? result.first['count'] end
# File lib/nagare/redis_streams.rb, line 190 def stream_name(stream) suffix = Nagare::Config.suffix if suffix.nil? stream else "#{stream}-#{suffix}" end end
Empties a stream for all readers, not only the consumer group
@return [Integer] the number of entries actually deleted
# File lib/nagare/redis_streams.rb, line 185 def truncate(stream) stream = stream_name(stream) connection.xtrim(stream, 0) end
Private Class Methods
# File lib/nagare/redis_streams.rb, line 220 def hostname Socket.gethostname end
# File lib/nagare/redis_streams.rb, line 216 def logger Nagare.logger end
# File lib/nagare/redis_streams.rb, line 224 def thread_id Thread.current.object_id end