class Redstream::Delayer
The Redstream::Delayer
class is responsible for reading messages from special delay streams which are used to fix inconsistencies resulting from network or other issues in between after_save and after_commit callbacks. To be able to fix such issues, delay messages will be added to a delay stream within an after_save callback. The delay messages aren't fetched immediately, but e.g. 5 minutes later, such that we can be sure that the database transaction is committed or has been rolled back, but is no longer running.
@example
Redstream::Delayer.new(stream_name: "users", delay: 5.minutes, logger: Logger.new(STDOUT)).run
Public Class Methods
Initializes the delayer for the specified stream name and delay.
@param stream_name [String] The stream name. Please note, that redstream
adds a prefix to the redis keys. However, the stream_name param must be specified without any prefixes here. When using Redstream::Model, the stream name is the downcased, pluralized and underscored version of the model name. I.e., the stream name for a 'User' model will be 'users'
@param delay [Fixnum, Float, ActiveSupport::Duration] The delay, i.e.
the age a message must have before processing it.
@param logger [Logger] The logger used for logging debug and error
messages to.
# File lib/redstream/delayer.rb, line 28 def initialize(stream_name:, delay:, logger: Logger.new("/dev/null")) @stream_name = stream_name @delay = delay @logger = logger @consumer = Consumer.new(name: "delayer", stream_name: "#{stream_name}.delay", logger: logger) @batch = [] end
Public Instance Methods
Loops and blocks forever processing delay messages read from a delay stream.
# File lib/redstream/delayer.rb, line 40 def run loop { run_once } end
Reads and processes a single batch of delay messages from a delay stream. You usually want to use the run
method instead, which loops/blocks forever.
# File lib/redstream/delayer.rb, line 48 def run_once @consumer.run_once do |messages| messages.each do |message| seconds_to_sleep = message.message_id.to_f / 1_000 + @delay.to_f - Time.now.to_f if seconds_to_sleep > 0 if @batch.size > 0 id = @batch.last.message_id deliver @consumer.commit id end sleep(seconds_to_sleep + 1) end @batch << message end deliver end rescue StandardError => e @logger.error e sleep 5 retry end
Private Instance Methods
# File lib/redstream/delayer.rb, line 80 def deliver return if @batch.size.zero? @logger.debug "Delayed #{@batch.size} messages for #{@delay.to_f} seconds on stream #{@stream_name}" Redstream.connection_pool.with do |redis| redis.pipelined do @batch.each do |message| redis.xadd Redstream.stream_key_name(@stream_name), payload: message.fields["payload"] end end redis.xdel Redstream.stream_key_name("#{@stream_name}.delay"), @batch.map(&:message_id) end @batch = [] end