class Redstream::Consumer
The Redstream::Consumer
class to read messages from a specified redis stream in batches.
@example
Redstream::Consumer.new(name: "user_indexer", stream_name: "users").run do |messages| # ... end
Public Class Methods
Initializes a new consumer instance. Please note that you can have multiple consumers per stream, by specifying different names.
@param name [String] The consumer name. The name is used for locking @param stream_name [String] The name of the redis stream. Please note
that redstream adds a prefix to the redis keys. However, the stream_name param must be specified without any prefixes here. When using Redstream::Model, the stream name is the downcased, pluralized and underscored version of the model name. I.e., the stream name for a 'User' model will be 'users'
@param batch_size [Fixnum] The desired batch size, that is the number
of messages yielded at max. More concretely, the number of messages yielded may be lower the batch_size, but not higher
@param logger [Logger] The logger used for error logging
# File lib/redstream/consumer.rb, line 28 def initialize(name:, stream_name:, batch_size: 1_000, logger: Logger.new("/dev/null")) @name = name @stream_name = stream_name @batch_size = batch_size @logger = logger @redis = Redstream.connection_pool.with(&:dup) @lock = Lock.new(name: "consumer:#{@stream_name}:#{@name}") end
Public Instance Methods
@api private
Commits the specified offset/ID as the maximum ID already read, such that subsequent read calls will use this offset/ID as a starting point.
@param offset [String] The offset/ID to commit
# File lib/redstream/consumer.rb, line 109 def commit(offset) @redis.set Redstream.offset_key_name(stream_name: @stream_name, consumer_name: @name), offset end
Returns its maximum committed id, i.e. the consumer's offset.
@return [String, nil] The committed id, or nil
# File lib/redstream/consumer.rb, line 41 def max_committed_id @redis.get Redstream.offset_key_name(stream_name: @stream_name, consumer_name: @name) end
Loops and thus blocks forever while reading messages from the specified stream and yielding them in batches.
@example
consumer.run do |messages| # ... end
# File lib/redstream/consumer.rb, line 53 def run(&block) loop { run_once(&block) } end
Reads a single batch from the specified stream and yields it. You usually want to use the run
method instead, which loops/blocks forever.
@example
consumer.run_once do |messages| # ... end
# File lib/redstream/consumer.rb, line 65 def run_once(&block) got_lock = @lock.acquire do offset = @redis.get(Redstream.offset_key_name(stream_name: @stream_name, consumer_name: @name)) offset ||= "0-0" stream_key_name = Redstream.stream_key_name(@stream_name) response = begin @redis.xread(stream_key_name, offset, count: @batch_size, block: 5_000) rescue Redis::TimeoutError nil end return if response.nil? || response[stream_key_name].nil? || response[stream_key_name].empty? messages = response[stream_key_name].map do |raw_message| Message.new(raw_message) end block.call(messages) offset = response[stream_key_name].last[0] return unless offset commit offset end sleep(5) unless got_lock rescue StandardError => e @logger.error e sleep 5 retry end