class Redis::TimeSeries
The Redis::TimeSeries
class is an interface for working with time-series data in Redis
, using the {oss.redislabs.com/redistimeseries RedisTimeSeries} module.
You can't use this gem with vanilla Redis
, the time series module must be compiled and loaded. The easiest way to do this is by running the provided Docker container. Refer to the {oss.redislabs.com/redistimeseries/#setup setup guide} for more info.
+docker run -p 6379:6379 -it –rm redislabs/redistimeseries+
Once you're up and running, you can create a new time series and start recording data. Many commands are documented below, but you should refer to the {oss.redislabs.com/redistimeseries/commands command documentation} for the most authoritative and up-to-date reference.
@example
ts = Redis::TimeSeries.create('time_series_example') ts.add(12345) ts.get #=> #<Redis::TimeSeries::Sample:0x00007ff00d942e60 @time=2020-07-19 16:52:48 -0700, @value=0.12345e5>
Constants
- Info
The
Info
struct wraps the result of theTS.INFO
command with method access. It also applies some limited parsing to the result values, mainly snakifying the property keys, and instantiatingRule
objects if necessary.All properties of the struct are also available on a
TimeSeries
object itself via delegation.@!attribute [r] chunk_count
@return [Integer] number of memory chunks used for the time-series
@!attribute [r] chunk_size
@return [Integer] amount of allocated memory in bytes
@!attribute [r] chunk_type
@return [String] whether the chunk is "compressed" or "uncompressed"
@!attribute [r] first_timestamp
@return [Integer] first timestamp present in the time-series (milliseconds since epoch)
@!attribute [r] labels
@return [Hash] a hash of label-value pairs that represent metadata labels of the time-series
@!attribute [r] last_timestamp
@return [Integer] last timestamp present in the time-series (milliseconds since epoch)
@!attribute [r] max_samples_per_chunk
@return [Integer] maximum number of samples per memory chunk
@!attribute [r] memory_usage
@return [Integer] total number of bytes allocated for the time-series
@!attribute [r] retention_time
@return [Integer] retention time, in milliseconds, for the time-series. A zero value means unlimited retention.
@!attribute [r] rules
@return [Array<Rule>] an array of configured compaction {Rule}s
@!attribute [r] series
@return [TimeSeries] the series this info is from
@!attribute [r] source_key
@return [String, nil] the key of the source series, if this series is the destination of a compaction rule
@!attribute [r] total_samples
@return [Integer] the total number of samples in the series
@see
TimeSeries#info
@see oss.redislabs.com/redistimeseries/commands/#tsinfo- VERSION
Attributes
@return [String] the Redis
key this time series is stored in
Public Class Methods
Create a new time series.
@param key [String] the Redis
key to store time series data in @option options [Hash] :labels
A hash of label-value pairs to apply to this series.
@option options [Redis] :redis (self.class.redis) a different Redis
client to use @option options [Integer] :retention
Maximum age for samples compared to last event time (in milliseconds). With no value, the series will not be trimmed.
@option options [Boolean] :uncompressed
When true, series data will be stored in an uncompressed format.
@option options [String, Symbol] :duplicate_policy
A duplication policy to resolve conflicts when adding values to the series. Valid values are in Redis::TimeSeries::DuplicatePolicy::VALID_POLICIES
@option options [Integer] :chunk_size
Amount of memory, in bytes, to allocate for each chunk of data
@return [Redis::TimeSeries] the created time series @see oss.redislabs.com/redistimeseries/commands/#tscreate
# File lib/redis/time_series.rb, line 48 def create(key, **options) new(key, redis: options.fetch(:redis, redis)).create(**options) end
Create a compaction rule for a series. Note that both source and destination series must exist before the rule can be created.
@param source [String, TimeSeries] the source series (or key) to apply the rule to @param dest [String, TimeSeries] the destination series (or key) to aggregate the data @param aggregation [Array(<String, Symbol>, Integer), Aggregation]
The aggregation to apply. Can be an {Aggregation} object, or an array of aggregation_type and duration +[:avg, 120000]+
@return [String] the string “OK” @raise [Redis::TimeSeries::AggregationError] if the given aggregation params are invalid @raise [Redis::CommandError] if the compaction rule cannot be applied to either series
@see TimeSeries#create_rule
@see oss.redislabs.com/redistimeseries/commands/#tscreaterule
# File lib/redis/time_series.rb, line 67 def create_rule(source:, dest:, aggregation:) cmd 'TS.CREATERULE', key_for(source), key_for(dest), Aggregation.parse(aggregation).to_a end
Delete an existing compaction rule.
@param source [String, TimeSeries] the source series (or key) to remove the rule from @param dest [String, TimeSeries] the destination series (or key) the rule applies to
@return [String] the string “OK” @raise [Redis::CommandError] if the compaction rule does not exist
# File lib/redis/time_series.rb, line 78 def delete_rule(source:, dest:) cmd 'TS.DELETERULE', key_for(source), key_for(dest) end
Delete all data and remove a time series from Redis
.
@param key [String] the key to remove @return [1] if the series existed @return [0] if the series did not exist
# File lib/redis/time_series.rb, line 87 def destroy(key) redis.del key end
# File lib/redis/time_series.rb, line 91 def madd(data) data.reduce([]) do |memo, (key, value)| memo << parse_madd_values(key, value) memo end.then do |args| cmd('TS.MADD', args).each_with_index.map do |result, idx| result.is_a?(Redis::CommandError) ? result : Sample.new(result, args[idx][2]) end end end
Search for a time series matching the provided filters. Refer to the {Filters} documentation for more details on how to filter.
@example Using a filter string
Redis::TimeSeries.query_index('foo=bar') #=> [#<Redis::TimeSeries:0x00007ff00e222788 @key="ts3", @redis=#<Redis...>>]
@example Using the .where alias with hash DSL
Redis::TimeSeries.where(foo: 'bar') #=> [#<Redis::TimeSeries:0x00007ff00e2a1d30 @key="ts3", @redis=#<Redis...>>]
@param filter_value [Hash, String] a set of filters to query with @return [Array<TimeSeries>] an array of series that matched the given filters
@see Filters
@see oss.redislabs.com/redistimeseries/commands/#tsqueryindex @see oss.redislabs.com/redistimeseries/commands/#filtering
# File lib/redis/time_series.rb, line 118 def query_index(filter_value) filters = Filters.new(filter_value) filters.validate! cmd('TS.QUERYINDEX', filters.to_a).map { |key| new(key) } end
Private Class Methods
# File lib/redis/time_series.rb, line 127 def key_for(series_or_string) series_or_string.is_a?(self) ? series_or_string.key : series_or_string.to_s end
# File lib/redis/time_series.rb, line 131 def parse_madd_values(key, raw) if raw.is_a?(Hash) || (raw.is_a?(Array) && raw.first.is_a?(Array)) # multiple timestamp => value pairs raw.map do |timestamp, value| [key, timestamp, value] end elsif raw.is_a? Array # single [timestamp, value] [key, raw.first, raw.last] else # single value, no timestamp [key, '*', raw] end end
Public Instance Methods
Compare series based on Redis
key and configured client. @return [Boolean] whether the two TimeSeries
objects refer to the same series
# File lib/redis/time_series.rb, line 368 def ==(other) return false unless other.is_a?(self.class) key == other.key && redis == other.redis end
Add a value to the series.
@param value [Numeric] the value to add @param timestamp [Time, Numeric] the Time
, or integer timestamp in milliseconds, to add the value @param uncompressed [Boolean] if true, stores data in an uncompressed format @param on_duplicate [String, Symbol] a duplication policy for conflict resolution @param chunk_size [Integer] set default chunk size, in bytes, for the time series
@return [Sample] the value that was added @raise [Redis::CommandError] if the value being added is older than the latest timestamp in the series
@see TimeSeries::DuplicatePolicy
# File lib/redis/time_series.rb, line 169 def add(value, timestamp = '*', uncompressed: nil, on_duplicate: nil, chunk_size: nil) ts = cmd 'TS.ADD', key, timestamp, value, ('UNCOMPRESSED' if uncompressed), (['CHUNK_SIZE', chunk_size] if chunk_size), (DuplicatePolicy.new(on_duplicate).to_a('ON_DUPLICATE') if on_duplicate) Sample.new(ts, value) end
Issues a TS.CREATE command for the current series. You should use class method {Redis::TimeSeries.create} instead. @api private
# File lib/redis/time_series.rb, line 183 def create(retention: nil, uncompressed: nil, labels: nil, duplicate_policy: nil, chunk_size: nil) cmd 'TS.CREATE', key, (['RETENTION', retention] if retention), ('UNCOMPRESSED' if uncompressed), (['CHUNK_SIZE', chunk_size] if chunk_size), (DuplicatePolicy.new(duplicate_policy).to_a if duplicate_policy), (['LABELS', labels.to_a] if labels&.any?) self end
Create a compaction rule for this series.
@param dest [String, TimeSeries] the destination series (or key) to aggregate the data @param aggregation [Array(<String, Symbol>, Integer), Aggregation]
The aggregation to apply. Can be an {Aggregation} object, or an array of aggregation_type and duration +[:avg, 120000]+
@return [String] the string “OK” @raise [Redis::TimeSeries::AggregationError] if the given aggregation params are invalid @raise [Redis::CommandError] if the compaction rule cannot be applied to either series
# File lib/redis/time_series.rb, line 205 def create_rule(dest:, aggregation:) self.class.create_rule(source: self, dest: dest, aggregation: aggregation) end
Decrement the current value of the series.
@param value [Integer] the amount to decrement by @param timestamp [Time, Integer] the Time or integer millisecond timestamp to save the new value at @param uncompressed [Boolean] if true, stores data in an uncompressed format @param chunk_size [Integer] set default chunk size, in bytes, for the time series
@return [Integer] the timestamp the value was stored at @see oss.redislabs.com/redistimeseries/commands/#tsincrbytsdecrby
# File lib/redis/time_series.rb, line 230 def decrby(value = 1, timestamp = nil, uncompressed: nil, chunk_size: nil) cmd 'TS.DECRBY', key, value, (timestamp if timestamp), ('UNCOMPRESSED' if uncompressed), (['CHUNK_SIZE', chunk_size] if chunk_size) end
Delete an existing compaction rule.
@param dest [String, TimeSeries] the destination series (or key) the rule applies to
@return [String] the string “OK” @raise [Redis::CommandError] if the compaction rule does not exist
# File lib/redis/time_series.rb, line 217 def delete_rule(dest:) self.class.delete_rule(source: self, dest: dest) end
Delete all data and remove this time series from Redis
.
@return [1] if the series existed @return [0] if the series did not exist
# File lib/redis/time_series.rb, line 245 def destroy redis.del key end
Get the most recent sample for this series.
@return [Sample] the most recent sample for this series @return [nil] if there are no samples in the series
@see oss.redislabs.com/redistimeseries/commands/#tsget
# File lib/redis/time_series.rb, line 255 def get cmd('TS.GET', key).then do |timestamp, value| return unless value Sample.new(timestamp, value) end end
Increment the current value of the series.
@param value [Integer] the amount to increment by @param timestamp [Time, Integer] the Time or integer millisecond timestamp to save the new value at @param uncompressed [Boolean] if true, stores data in an uncompressed format @param chunk_size [Integer] set default chunk size, in bytes, for the time series
@return [Integer] the timestamp the value was stored at @see oss.redislabs.com/redistimeseries/commands/#tsincrbytsdecrby
# File lib/redis/time_series.rb, line 271 def incrby(value = 1, timestamp = nil, uncompressed: nil, chunk_size: nil) cmd 'TS.INCRBY', key, value, (timestamp if timestamp), ('UNCOMPRESSED' if uncompressed), (['CHUNK_SIZE', chunk_size] if chunk_size) end
Get information about the series. Note that all properties of {Info} are also available on the series itself via delegation.
@return [Info] an info object about the current series
@see Info
@see oss.redislabs.com/redistimeseries/commands/#tsinfo
# File lib/redis/time_series.rb, line 289 def info Info.parse series: self, data: cmd('TS.INFO', key) end
Assign labels to the series using TS.ALTER
@param val [Hash] a hash of label-value pairs @return [Hash] the assigned labels
@see oss.redislabs.com/redistimeseries/commands/#tsalter
# File lib/redis/time_series.rb, line 300 def labels=(val) cmd 'TS.ALTER', key, 'LABELS', val.to_a end
# File lib/redis/time_series.rb, line 304 def madd(*values) if values.one? && values.first.is_a?(Hash) # Hash of timestamp => value pairs args = values.first.map do |ts, val| [key, ts, val] end.flatten elsif values.one? && values.first.is_a?(Array) # Array of values, no timestamps initial_ts = Time.now.ts_msec args = values.first.each_with_index.map do |val, idx| [key, initial_ts + idx, val] end.flatten else # Values as individual arguments, no timestamps initial_ts = Time.now.ts_msec args = values.each_with_index.map do |val, idx| [key, initial_ts + idx, val] end.flatten end # TODO: return Sample objects here cmd 'TS.MADD', args end
Get a range of values from the series
@param range [Hash, Range] a time range, or hash of from
and to
values @param count [Integer] the maximum number of results to return @param aggregation [Array(<String, Symbol>, Integer), Aggregation]
The aggregation to apply. Can be an {Aggregation} object, or an array of aggregation_type and duration +[:avg, 120000]+
@return [Array<Sample>] an array of samples matching the range query
@see oss.redislabs.com/redistimeseries/commands/#tsrangetsrevrange
# File lib/redis/time_series.rb, line 338 def range(range, count: nil, aggregation: nil) if range.is_a?(Hash) # This is to support from: and to: passed in as hash keys # `range` will swallow all parameters if they're all hash syntax count = range.delete(:count) aggregation = range.delete(:aggregation) range = range.fetch(:from)..range[:to] end cmd('TS.RANGE', key, (range.begin || '-'), (range.end || '+'), (['COUNT', count] if count), Aggregation.parse(aggregation)&.to_a ).map { |ts, val| Sample.new(ts, val) } end
Set data retention time for the series using TS.ALTER
@param val [Integer] the number of milliseconds data should be retained. 0
means retain forever. @return [Integer] the retention value of the series
@see oss.redislabs.com/redistimeseries/commands/#tsalter
# File lib/redis/time_series.rb, line 361 def retention=(val) # TODO: this should also accept an ActiveSupport::Duration cmd 'TS.ALTER', key, 'RETENTION', val.to_i end
If this series is the destination of a compaction rule, returns the source series of the data. @return [TimeSeries, nil] the series referred to by {source_key}
# File lib/redis/time_series/info.rb, line 105 def source return unless source_key @source ||= TimeSeries.new(source_key, redis: series.redis) end