class Redstream::Trimmer

The Redstream::Trimmer class is neccessary to clean up messsages after all consumers have successfully processed and committed them. Otherwise they would fill up redis and finally bring redis down due to out of memory issues. The Trimmer will sleep for the specified interval in case there is nothing to trim. Please note that you must pass an array containing all consumer names reading from the stream which is about to be trimmed. Otherwise the Trimmer could trim messages from the stream before all consumers received the respective messages.

@example

trimmer = Redstream::Trimmer.new(
  interval: 30,
  stream_name: "users",
  consumer_names: ["indexer", "cacher"]
)

trimmer.run

Public Class Methods

new(interval:, stream_name:, consumer_names:, logger: Logger.new("/dev/null")) click to toggle source

Initializes a new trimmer. Accepts an interval to sleep for in case there is nothing to trim, the actual stream name, the consumer names as well as a logger for debug log messages.

@param interval [Fixnum, Float] Specifies a time to sleep in case there is

nothing to trim.

@param stream_name [String] The name of the stream that should be trimmed.

Please note, that redstream adds a prefix to the redis keys. However,
the stream_name param must be specified without any prefixes here. When
using Redstream::Model, the stream name is the downcased, pluralized
and underscored version of the model name. I.e., the stream name for a
'User' model will be 'users'

@params consumer_names [Array] The list of all consumers reading from the

specified stream

@param logger [Logger] A logger used for debug messages

# File lib/redstream/trimmer.rb, line 37
def initialize(interval:, stream_name:, consumer_names:, logger: Logger.new("/dev/null"))
  @interval = interval
  @stream_name = stream_name
  @consumer_names = consumer_names
  @logger = logger
  @lock = Lock.new(name: "trimmer:#{stream_name}")
end

Public Instance Methods

run() click to toggle source

Loops and blocks forever trimming messages from the specified redis stream.

# File lib/redstream/trimmer.rb, line 48
def run
  loop { run_once }
end
run_once() click to toggle source

Runs the trimming a single time. You usually want to use the run method instead, which loops/blocks forever.

# File lib/redstream/trimmer.rb, line 55
def run_once
  got_lock = @lock.acquire do
    min_committed_id = Redstream.connection_pool.with do |redis|
      offset_key_names = @consumer_names.map do |consumer_name|
        Redstream.offset_key_name(stream_name: @stream_name, consumer_name: consumer_name)
      end

      redis.mget(offset_key_names).map(&:to_s).reject(&:empty?).min
    end

    return sleep(@interval) unless min_committed_id

    loop do
      messages = Redstream.connection_pool.with do |redis|
        redis.xrange(Redstream.stream_key_name(@stream_name), "-", min_committed_id, count: 1_000)
      end

      return sleep(@interval) if messages.nil? || messages.empty?

      Redstream.connection_pool.with { |redis| redis.xdel Redstream.stream_key_name(@stream_name), messages.map(&:first) }

      @logger.debug "Trimmed #{messages.size} messages from #{@stream_name}"
    end
  end

  sleep(5) unless got_lock
rescue StandardError => e
  @logger.error e

  sleep 5

  retry
end