class Redstream::Producer

A Redstream::Producer is responsible for writing the actual messages to redis. This includes the delay messages as well as the messages for immediate retrieval. Usually, you don't have to use a producer directly. Instead, Redstream::Model handles all producer related interaction. However, Redstream::Model is not able to recognize model updates resulting from model updates via e.g. update_all, delete_all, etc, i.e. updates which by-pass model callbacks. Thus, calls to e.g. update_all must be wrapped with `find_in_batches` and Redstream::Producer#bulk (see example), to write these updates to the redis streams as well.

@example

producer = Redstream::Producer.new

User.where(confirmed: true).find_in_batches do |users|
  producer.bulk users do
    User.where(id: users.map(&:id)).update_all(send_mailing: true)
  end
end

Public Class Methods

new(wait: false) click to toggle source

Initializes a new producer. In case you're using a distributed redis setup, you can use redis WAIT to improve real world data safety via the wait param.

@param wait [Boolean, Integer] Defaults to false. Specify an integer to

enable using redis WAIT for writing delay messages. Check out the
redis docs for more info regarding WAIT.
Calls superclass method
# File lib/redstream/producer.rb, line 32
def initialize(wait: false)
  @wait = wait
  @stream_name_cache = {}

  super()
end

Public Instance Methods

bulk(records) { || ... } click to toggle source

Use to wrap calls to update_all, delete_all, etc. I.e. methods, which by-pass model lifecycle callbacks (after_save, etc.), as Redstream::Model can't recognize these updates and write them to redis streams automatically. You need to pass the records to be updated to the bulk method. The bulk method writes delay messages for the records to kafka, then yields and the writes the message for immediate retrieval. The method must ensure that the same set of records is used for the delay messages and the instant messages. Thus, you optimally, pass an array of records to it. If you pass an ActiveRecord::Relation, the method converts it to an array, i.e. loading all matching records into memory.

@param records [#to_a] The object/objects that will be updated or deleted

# File lib/redstream/producer.rb, line 52
def bulk(records)
  records_array = Array(records)

  delay_message_ids = bulk_delay(records_array)

  yield

  bulk_queue(records_array, delay_message_ids: delay_message_ids)
end
bulk_delay(records) click to toggle source

@api private

Writes delay messages to a delay stream in redis.

@param records [#to_a] The object/objects that will be updated or deleted

@return The redis message ids

# File lib/redstream/producer.rb, line 70
def bulk_delay(records)
  res = records.each_slice(250).flat_map do |slice|
    Redstream.connection_pool.with do |redis|
      redis.pipelined do
        slice.each do |object|
          redis.xadd(Redstream.stream_key_name("#{stream_name(object)}.delay"), payload: JSON.dump(object.redstream_payload))
        end
      end
    end
  end

  Redstream.connection_pool.with do |redis|
    redis.wait(@wait, 0) if @wait
  end

  res
end
bulk_queue(records, delay_message_ids: nil) click to toggle source

@api private

Writes messages to a stream in redis for immediate retrieval.

@param records [#to_a] The object/objects that will be updated deleted @param delay_message_ids [#to_a] The delay message ids to delete

# File lib/redstream/producer.rb, line 95
def bulk_queue(records, delay_message_ids: nil)
  records.each_with_index.each_slice(250) do |slice|
    Redstream.connection_pool.with do |redis|
      redis.pipelined do
        slice.each do |object, index|
          redis.xadd(Redstream.stream_key_name(stream_name(object)), payload: JSON.dump(object.redstream_payload))
          redis.xdel(Redstream.stream_key_name("#{stream_name(object)}.delay"), delay_message_ids[index]) if delay_message_ids
        end
      end
    end
  end

  true
end
delay(object) click to toggle source

@api private

Writes a single delay message to a delay stream in redis.

@param object The object that will be updated, deleted, etc.

@return The redis message id

# File lib/redstream/producer.rb, line 118
def delay(object)
  Redstream.connection_pool.with do |redis|
    res = redis.xadd(Redstream.stream_key_name("#{stream_name(object)}.delay"), payload: JSON.dump(object.redstream_payload))
    redis.wait(@wait, 0) if @wait
    res
  end
end
queue(object, delay_message_id: nil) click to toggle source

@api private

Writes a single message to a stream in redis for immediate retrieval.

@param object The object hat will be updated, deleted, etc. @param delay_message_id The delay message id to delete

# File lib/redstream/producer.rb, line 133
def queue(object, delay_message_id: nil)
  Redstream.connection_pool.with do |redis|
    redis.pipelined do
      redis.xadd(Redstream.stream_key_name(stream_name(object)), payload: JSON.dump(object.redstream_payload))
      redis.xdel(Redstream.stream_key_name("#{stream_name(object)}.delay"), delay_message_id) if delay_message_id
    end
  end

  true
end

Private Instance Methods

stream_name(object) click to toggle source
# File lib/redstream/producer.rb, line 146
def stream_name(object)
  synchronize do
    @stream_name_cache[object.class] ||= object.class.redstream_name
  end
end