module Redstream

Constants

VERSION

Public Class Methods

base_key_name() click to toggle source

@api private

Returns the full name namespace for redis keys.

# File lib/redstream.rb, line 140
def self.base_key_name
  [namespace, "redstream"].compact.join(":")
end
connection_pool() click to toggle source

Returns the connection pool instance or sets and creates a new connection pool in case no pool is yet created.

@return [ConnectionPool] The connection pool

# File lib/redstream.rb, line 40
def self.connection_pool
  @connection_pool ||= ConnectionPool.new { Redis.new }
end
connection_pool=(connection_pool) click to toggle source

Redstream uses the connection_pool gem to pool redis connections. In case you have a distributed redis setup (sentinel/cluster) or the default pool size doesn't match your requirements, then you must specify the connection pool. A connection pool is neccessary, because redstream is using blocking commands. Please note, redis connections are somewhat cheap, so you better specify the pool size to be large enough instead of running into bottlenecks.

@example

Redstream.connection_pool = ConnectionPool.new(size: 50) do
  Redis.new("...")
end
# File lib/redstream.rb, line 31
def self.connection_pool=(connection_pool)
  @connection_pool = connection_pool
end
lock_key_name(name) click to toggle source

@api private

Generates the redis key name used for locking.

@param name A high level name for the lock @return [String] A redis key name used for locking

# File lib/redstream.rb, line 132
def self.lock_key_name(name)
  "#{base_key_name}:lock:#{name}"
end
max_consumer_id(stream_name:, consumer_name:) click to toggle source

Returns the max committed id, i.e. the consumer's offset, for the specified consumer name.

@param stream_name [String] the stream name @param name [String] the consumer name

@return [String, nil] The max committed offset, or nil

# File lib/redstream.rb, line 96
def self.max_consumer_id(stream_name:, consumer_name:)
  connection_pool.with do |redis|
    redis.get offset_key_name(stream_name: stream_name, consumer_name: consumer_name)
  end
end
max_stream_id(stream_name) click to toggle source

Returns the max id of the specified stream, i.e. the id of the last/newest message added. Returns nil for empty streams.

@param stream_name [String] The stream name @return [String, nil] The id of a stream's newest messages, or nil

# File lib/redstream.rb, line 78
def self.max_stream_id(stream_name)
  connection_pool.with do |redis|
    message = redis.xrevrange(stream_key_name(stream_name), "+", "-", count: 1).first

    return nil unless message

    message[0]
  end
end
namespace() click to toggle source

Returns the previously set namespace for redis keys to be used by Redstream.

# File lib/redstream.rb, line 57
def self.namespace
  @namespace
end
namespace=(namespace) click to toggle source

You can specify a namespace to use for redis keys. This is useful in case you are using a shared redis.

@example

Redstream.namespace = 'my_app'
# File lib/redstream.rb, line 50
def self.namespace=(namespace)
  @namespace = namespace
end
offset_key_name(stream_name:, consumer_name:) click to toggle source

@api private

Generates the redis key name used for storing a consumer's current offset, i.e. the maximum id successfully processed.

@param consumer_name A high level consumer name @return [String] A redis key name for storing a stream's current offset

# File lib/redstream.rb, line 121
def self.offset_key_name(stream_name:, consumer_name:)
  "#{base_key_name}:offset:#{stream_name}:#{consumer_name}"
end
stream_key_name(stream_name) click to toggle source

@api private

Generates the low level redis stream key name.

@param stream_name A high level stream name @return [String] A low level redis stream key name

# File lib/redstream.rb, line 109
def self.stream_key_name(stream_name)
  "#{base_key_name}:stream:#{stream_name}"
end
stream_size(stream_name) click to toggle source

Returns the length of the specified stream.

@param stream_name [String] The stream name @return [Integer] The length of the stream

# File lib/redstream.rb, line 66
def self.stream_size(stream_name)
  connection_pool.with do |redis|
    redis.xlen(stream_key_name(stream_name))
  end
end