class Redis::TimeSeries

The Redis::TimeSeries class is an interface for working with time-series data in Redis, using the {oss.redislabs.com/redistimeseries RedisTimeSeries} module.

You can't use this gem with vanilla Redis, the time series module must be compiled and loaded. The easiest way to do this is by running the provided Docker container. Refer to the {oss.redislabs.com/redistimeseries/#setup setup guide} for more info.

+docker run -p 6379:6379 -it –rm redislabs/redistimeseries+

Once you're up and running, you can create a new time series and start recording data. Many commands are documented below, but you should refer to the {oss.redislabs.com/redistimeseries/commands command documentation} for the most authoritative and up-to-date reference.

@example

ts = Redis::TimeSeries.create('time_series_example')
ts.add(12345)
ts.get
#=> #<Redis::TimeSeries::Sample:0x00007ff00d942e60 @time=2020-07-19 16:52:48 -0700, @value=0.12345e5>

Constants

Info

The Info struct wraps the result of the TS.INFO command with method access. It also applies some limited parsing to the result values, mainly snakifying the property keys, and instantiating Rule objects if necessary.

All properties of the struct are also available on a TimeSeries object itself via delegation.

@!attribute [r] chunk_count

@return [Integer] number of memory chunks used for the time-series

@!attribute [r] chunk_size

@return [Integer] amount of allocated memory in bytes

@!attribute [r] chunk_type

@return [String] whether the chunk is "compressed" or "uncompressed"

@!attribute [r] first_timestamp

@return [Integer] first timestamp present in the time-series (milliseconds since epoch)

@!attribute [r] labels

@return [Hash] a hash of label-value pairs that represent metadata labels of the time-series

@!attribute [r] last_timestamp

@return [Integer] last timestamp present in the time-series (milliseconds since epoch)

@!attribute [r] max_samples_per_chunk

@return [Integer] maximum number of samples per memory chunk

@!attribute [r] memory_usage

@return [Integer] total number of bytes allocated for the time-series

@!attribute [r] retention_time

@return [Integer] retention time, in milliseconds, for the time-series.
  A zero value means unlimited retention.

@!attribute [r] rules

@return [Array<Rule>] an array of configured compaction {Rule}s

@!attribute [r] series

@return [TimeSeries] the series this info is from

@!attribute [r] source_key

@return [String, nil] the key of the source series, if this series is the destination
  of a compaction rule

@!attribute [r] total_samples

@return [Integer] the total number of samples in the series

@see TimeSeries#info @see oss.redislabs.com/redistimeseries/commands/#tsinfo

VERSION

Attributes

key[R]

@return [String] the Redis key this time series is stored in

Public Class Methods

create(key, **options) click to toggle source

Create a new time series.

@param key [String] the Redis key to store time series data in @option options [Hash] :labels

A hash of label-value pairs to apply to this series.

@option options [Redis] :redis (self.class.redis) a different Redis client to use @option options [Integer] :retention

Maximum age for samples compared to last event time (in milliseconds).
With no value, the series will not be trimmed.

@option options [Boolean] :uncompressed

When true, series data will be stored in an uncompressed format.

@option options [String, Symbol] :duplicate_policy

A duplication policy to resolve conflicts when adding values to the series.
Valid values are in Redis::TimeSeries::DuplicatePolicy::VALID_POLICIES

@option options [Integer] :chunk_size

Amount of memory, in bytes, to allocate for each chunk of data

@return [Redis::TimeSeries] the created time series @see oss.redislabs.com/redistimeseries/commands/#tscreate

# File lib/redis/time_series.rb, line 48
def create(key, **options)
  new(key, redis: options.fetch(:redis, redis)).create(**options)
end
create_rule(source:, dest:, aggregation:) click to toggle source

Create a compaction rule for a series. Note that both source and destination series must exist before the rule can be created.

@param source [String, TimeSeries] the source series (or key) to apply the rule to @param dest [String, TimeSeries] the destination series (or key) to aggregate the data @param aggregation [Array(<String, Symbol>, Integer), Aggregation]

The aggregation to apply. Can be an {Aggregation} object, or an array of
aggregation_type and duration +[:avg, 120000]+

@return [String] the string “OK” @raise [Redis::TimeSeries::AggregationError] if the given aggregation params are invalid @raise [Redis::CommandError] if the compaction rule cannot be applied to either series

@see TimeSeries#create_rule @see oss.redislabs.com/redistimeseries/commands/#tscreaterule

# File lib/redis/time_series.rb, line 67
def create_rule(source:, dest:, aggregation:)
  cmd 'TS.CREATERULE', key_for(source), key_for(dest), Aggregation.parse(aggregation).to_a
end
delete_rule(source:, dest:) click to toggle source

Delete an existing compaction rule.

@param source [String, TimeSeries] the source series (or key) to remove the rule from @param dest [String, TimeSeries] the destination series (or key) the rule applies to

@return [String] the string “OK” @raise [Redis::CommandError] if the compaction rule does not exist

# File lib/redis/time_series.rb, line 78
def delete_rule(source:, dest:)
  cmd 'TS.DELETERULE', key_for(source), key_for(dest)
end
destroy(key) click to toggle source

Delete all data and remove a time series from Redis.

@param key [String] the key to remove @return [1] if the series existed @return [0] if the series did not exist

# File lib/redis/time_series.rb, line 87
def destroy(key)
  redis.del key
end
madd(data) click to toggle source
# File lib/redis/time_series.rb, line 91
def madd(data)
  data.reduce([]) do |memo, (key, value)|
    memo << parse_madd_values(key, value)
    memo
  end.then do |args|
    cmd('TS.MADD', args).each_with_index.map do |result, idx|
      result.is_a?(Redis::CommandError) ? result : Sample.new(result, args[idx][2])
    end
  end
end
new(key, redis: self.class.redis) click to toggle source

@param key [String] the Redis key to store the time series in @param redis [Redis] an optional Redis client

# File lib/redis/time_series.rb, line 152
def initialize(key, redis: self.class.redis)
  @key = key
  @redis = redis
end
query_index(filter_value) click to toggle source

Search for a time series matching the provided filters. Refer to the {Filters} documentation for more details on how to filter.

@example Using a filter string

Redis::TimeSeries.query_index('foo=bar')
#=> [#<Redis::TimeSeries:0x00007ff00e222788 @key="ts3", @redis=#<Redis...>>]

@example Using the .where alias with hash DSL

Redis::TimeSeries.where(foo: 'bar')
#=> [#<Redis::TimeSeries:0x00007ff00e2a1d30 @key="ts3", @redis=#<Redis...>>]

@param filter_value [Hash, String] a set of filters to query with @return [Array<TimeSeries>] an array of series that matched the given filters

@see Filters @see oss.redislabs.com/redistimeseries/commands/#tsqueryindex @see oss.redislabs.com/redistimeseries/commands/#filtering

# File lib/redis/time_series.rb, line 118
def query_index(filter_value)
  filters = Filters.new(filter_value)
  filters.validate!
  cmd('TS.QUERYINDEX', filters.to_a).map { |key| new(key) }
end
Also aliased as: where
where(filter_value)
Alias for: query_index

Private Class Methods

key_for(series_or_string) click to toggle source
# File lib/redis/time_series.rb, line 127
def key_for(series_or_string)
  series_or_string.is_a?(self) ? series_or_string.key : series_or_string.to_s
end
parse_madd_values(key, raw) click to toggle source
# File lib/redis/time_series.rb, line 131
def parse_madd_values(key, raw)
  if raw.is_a?(Hash) || (raw.is_a?(Array) && raw.first.is_a?(Array))
    # multiple timestamp => value pairs
    raw.map do |timestamp, value|
      [key, timestamp, value]
    end
  elsif raw.is_a? Array
    # single [timestamp, value]
    [key, raw.first, raw.last]
  else
    # single value, no timestamp
    [key, '*', raw]
  end
end

Public Instance Methods

==(other) click to toggle source

Compare series based on Redis key and configured client. @return [Boolean] whether the two TimeSeries objects refer to the same series

# File lib/redis/time_series.rb, line 368
def ==(other)
  return false unless other.is_a?(self.class)
  key == other.key && redis == other.redis
end
add(value, timestamp = '*', uncompressed: nil, on_duplicate: nil, chunk_size: nil) click to toggle source

Add a value to the series.

@param value [Numeric] the value to add @param timestamp [Time, Numeric] the Time, or integer timestamp in milliseconds, to add the value @param uncompressed [Boolean] if true, stores data in an uncompressed format @param on_duplicate [String, Symbol] a duplication policy for conflict resolution @param chunk_size [Integer] set default chunk size, in bytes, for the time series

@return [Sample] the value that was added @raise [Redis::CommandError] if the value being added is older than the latest timestamp in the series

@see TimeSeries::DuplicatePolicy

# File lib/redis/time_series.rb, line 169
def add(value, timestamp = '*', uncompressed: nil, on_duplicate: nil, chunk_size: nil)
  ts = cmd 'TS.ADD',
           key,
           timestamp,
           value,
           ('UNCOMPRESSED' if uncompressed),
           (['CHUNK_SIZE', chunk_size] if chunk_size),
           (DuplicatePolicy.new(on_duplicate).to_a('ON_DUPLICATE') if on_duplicate)
  Sample.new(ts, value)
end
create(retention: nil, uncompressed: nil, labels: nil, duplicate_policy: nil, chunk_size: nil) click to toggle source

Issues a TS.CREATE command for the current series. You should use class method {Redis::TimeSeries.create} instead. @api private

# File lib/redis/time_series.rb, line 183
def create(retention: nil, uncompressed: nil, labels: nil, duplicate_policy: nil, chunk_size: nil)
  cmd 'TS.CREATE', key,
      (['RETENTION', retention] if retention),
      ('UNCOMPRESSED' if uncompressed),
      (['CHUNK_SIZE', chunk_size] if chunk_size),
      (DuplicatePolicy.new(duplicate_policy).to_a if duplicate_policy),
      (['LABELS', labels.to_a] if labels&.any?)
  self
end
create_rule(dest:, aggregation:) click to toggle source

Create a compaction rule for this series.

@param dest [String, TimeSeries] the destination series (or key) to aggregate the data @param aggregation [Array(<String, Symbol>, Integer), Aggregation]

The aggregation to apply. Can be an {Aggregation} object, or an array of
aggregation_type and duration +[:avg, 120000]+

@return [String] the string “OK” @raise [Redis::TimeSeries::AggregationError] if the given aggregation params are invalid @raise [Redis::CommandError] if the compaction rule cannot be applied to either series

@see TimeSeries.create_rule

# File lib/redis/time_series.rb, line 205
def create_rule(dest:, aggregation:)
  self.class.create_rule(source: self, dest: dest, aggregation: aggregation)
end
decrby(value = 1, timestamp = nil, uncompressed: nil, chunk_size: nil) click to toggle source

Decrement the current value of the series.

@param value [Integer] the amount to decrement by @param timestamp [Time, Integer] the Time or integer millisecond timestamp to save the new value at @param uncompressed [Boolean] if true, stores data in an uncompressed format @param chunk_size [Integer] set default chunk size, in bytes, for the time series

@return [Integer] the timestamp the value was stored at @see oss.redislabs.com/redistimeseries/commands/#tsincrbytsdecrby

# File lib/redis/time_series.rb, line 230
def decrby(value = 1, timestamp = nil, uncompressed: nil, chunk_size: nil)
  cmd 'TS.DECRBY',
      key,
      value,
      (timestamp if timestamp),
      ('UNCOMPRESSED' if uncompressed),
      (['CHUNK_SIZE', chunk_size] if chunk_size)
end
Also aliased as: decrement
decrement(value = 1, timestamp = nil, uncompressed: nil, chunk_size: nil)
Alias for: decrby
delete_rule(dest:) click to toggle source

Delete an existing compaction rule.

@param dest [String, TimeSeries] the destination series (or key) the rule applies to

@return [String] the string “OK” @raise [Redis::CommandError] if the compaction rule does not exist

@see TimeSeries.delete_rule

# File lib/redis/time_series.rb, line 217
def delete_rule(dest:)
  self.class.delete_rule(source: self, dest: dest)
end
destroy() click to toggle source

Delete all data and remove this time series from Redis.

@return [1] if the series existed @return [0] if the series did not exist

# File lib/redis/time_series.rb, line 245
def destroy
  redis.del key
end
get() click to toggle source

Get the most recent sample for this series.

@return [Sample] the most recent sample for this series @return [nil] if there are no samples in the series

@see oss.redislabs.com/redistimeseries/commands/#tsget

# File lib/redis/time_series.rb, line 255
def get
  cmd('TS.GET', key).then do |timestamp, value|
    return unless value
    Sample.new(timestamp, value)
  end
end
incrby(value = 1, timestamp = nil, uncompressed: nil, chunk_size: nil) click to toggle source

Increment the current value of the series.

@param value [Integer] the amount to increment by @param timestamp [Time, Integer] the Time or integer millisecond timestamp to save the new value at @param uncompressed [Boolean] if true, stores data in an uncompressed format @param chunk_size [Integer] set default chunk size, in bytes, for the time series

@return [Integer] the timestamp the value was stored at @see oss.redislabs.com/redistimeseries/commands/#tsincrbytsdecrby

# File lib/redis/time_series.rb, line 271
def incrby(value = 1, timestamp = nil, uncompressed: nil, chunk_size: nil)
  cmd 'TS.INCRBY',
      key,
      value,
      (timestamp if timestamp),
      ('UNCOMPRESSED' if uncompressed),
      (['CHUNK_SIZE', chunk_size] if chunk_size)
end
Also aliased as: increment
increment(value = 1, timestamp = nil, uncompressed: nil, chunk_size: nil)
Alias for: incrby
info() click to toggle source

Get information about the series. Note that all properties of {Info} are also available on the series itself via delegation.

@return [Info] an info object about the current series

@see Info @see oss.redislabs.com/redistimeseries/commands/#tsinfo

# File lib/redis/time_series.rb, line 289
def info
  Info.parse series: self, data: cmd('TS.INFO', key)
end
labels=(val) click to toggle source

Assign labels to the series using TS.ALTER

@param val [Hash] a hash of label-value pairs @return [Hash] the assigned labels

@see oss.redislabs.com/redistimeseries/commands/#tsalter

# File lib/redis/time_series.rb, line 300
def labels=(val)
  cmd 'TS.ALTER', key, 'LABELS', val.to_a
end
madd(*values) click to toggle source
# File lib/redis/time_series.rb, line 304
def madd(*values)
  if values.one? && values.first.is_a?(Hash)
    # Hash of timestamp => value pairs
    args = values.first.map do |ts, val|
      [key, ts, val]
    end.flatten
  elsif values.one? && values.first.is_a?(Array)
    # Array of values, no timestamps
    initial_ts = Time.now.ts_msec
    args = values.first.each_with_index.map do |val, idx|
      [key, initial_ts + idx, val]
    end.flatten
  else
    # Values as individual arguments, no timestamps
    initial_ts = Time.now.ts_msec
    args = values.each_with_index.map do |val, idx|
      [key, initial_ts + idx, val]
    end.flatten
  end
  # TODO: return Sample objects here
  cmd 'TS.MADD', args
end
range(range, count: nil, aggregation: nil) click to toggle source

Get a range of values from the series

@param range [Hash, Range] a time range, or hash of from and to values @param count [Integer] the maximum number of results to return @param aggregation [Array(<String, Symbol>, Integer), Aggregation]

The aggregation to apply. Can be an {Aggregation} object, or an array of
aggregation_type and duration +[:avg, 120000]+

@return [Array<Sample>] an array of samples matching the range query

@see oss.redislabs.com/redistimeseries/commands/#tsrangetsrevrange

# File lib/redis/time_series.rb, line 338
def range(range, count: nil, aggregation: nil)
  if range.is_a?(Hash)
    # This is to support from: and to: passed in as hash keys
    # `range` will swallow all parameters if they're all hash syntax
    count = range.delete(:count)
    aggregation = range.delete(:aggregation)
    range = range.fetch(:from)..range[:to]
  end
  cmd('TS.RANGE',
      key,
      (range.begin || '-'),
      (range.end || '+'),
      (['COUNT', count] if count),
      Aggregation.parse(aggregation)&.to_a
     ).map { |ts, val| Sample.new(ts, val) }
end
retention=(val) click to toggle source

Set data retention time for the series using TS.ALTER

@param val [Integer] the number of milliseconds data should be retained. 0 means retain forever. @return [Integer] the retention value of the series

@see oss.redislabs.com/redistimeseries/commands/#tsalter

# File lib/redis/time_series.rb, line 361
def retention=(val)
  # TODO: this should also accept an ActiveSupport::Duration
  cmd 'TS.ALTER', key, 'RETENTION', val.to_i
end
source() click to toggle source

If this series is the destination of a compaction rule, returns the source series of the data. @return [TimeSeries, nil] the series referred to by {source_key}

# File lib/redis/time_series/info.rb, line 105
def source
  return unless source_key
  @source ||= TimeSeries.new(source_key, redis: series.redis)
end