class Sensu::Transport::Redis
Constants
- REDIS_KEYSPACE
The
Redis
keyspace to use for the transport.
Public Class Methods
Sensu::Transport::Base::new
# File lib/sensu/transport/redis.rb, line 11 def initialize @options = {} @connections = {} super end
Public Instance Methods
Close ALL Redis
connections.
# File lib/sensu/transport/redis.rb, line 59 def close @connections.each_value do |connection| connection.close end end
Redis
transport connection setup. This method sets `@options`, creates a named Redis
connection “redis”, and sets the deferred status to `:succeeded` via `succeed()`.
@param options [Hash, String]
# File lib/sensu/transport/redis.rb, line 22 def connect(options={}) @options = options || {} redis_connection("redis") do |connection| connection.callback do succeed end end end
Indicates if ALL Redis
connections are connected.
@return [TrueClass, FalseClass]
# File lib/sensu/transport/redis.rb, line 52 def connected? !@connections.empty? && @connections.values.all? do |connection| connection.connected? end end
Publish a message to the Redis
transport. The transport pipe type determines the method of sending messages to consumers using Redis
, either using PubSub or a list. The appropriate publish method is call for the pipe type given. The Redis
transport ignores publish options.
@param type [Symbol] the transport pipe type, possible values
are: :direct and :fanout.
@param pipe [String] the transport pipe name. @param message [String] the message to be published to the transport. @param options [Hash] IGNORED by this transport. @yield [info] passes publish info to an optional callback/block. @yieldparam info [Hash] contains publish information, which
may contain an error object.
# File lib/sensu/transport/redis.rb, line 79 def publish(type, pipe, message, options={}, &callback) case type.to_sym when :fanout pubsub_publish(pipe, message, &callback) when :direct list_publish(pipe, message, &callback) end end
Reconnect to the Redis
transport. The Redis
connections used by the transport have auto-reconnect disabled; if a single connection is unhealthy, all connections are closed, the transport is reset, and new connections are made. If the transport is not already reconnecting to Redis
, the `@before_reconnect` transport callback is called.
@param force [Boolean] the reconnect.
# File lib/sensu/transport/redis.rb, line 39 def reconnect(force=false) @before_reconnect.call unless @reconnecting unless @reconnecting && !force @reconnecting = true close reset connect(@options) end end
Redis
transport pipe/funnel stats, such as message and consumer counts. This method is currently unable to determine the consumer count for a Redis
list.
@param funnel [String] the transport funnel to get stats for. @param options [Hash] IGNORED by this transport. @yield [info] passes list stats to the callback/block. @yieldparam info [Hash] contains list stats.
# File lib/sensu/transport/redis.rb, line 140 def stats(funnel, options={}) redis_connection("redis") do |connection| connection.llen(funnel) do |messages| info = { :messages => messages, :consumers => 0 } yield(info) end end end
Subscribe to a Redis
transport pipe. The transport pipe type determines the method of consuming messages from Redis
, either using PubSub or a list. The appropriate subscribe method is call for the pipe type given. The Redis
transport ignores subscribe options and the funnel name.
@param type [Symbol] the transport pipe type, possible values
are: :direct and :fanout.
@param pipe [String] the transport pipe name. @param funnel [String] IGNORED by this transport. @param options [Hash] IGNORED by this transport. @yield [info, message] passes message info and content to
the consumer callback/block.
@yieldparam info [Hash] contains message information. @yieldparam message [String] message.
# File lib/sensu/transport/redis.rb, line 103 def subscribe(type, pipe, funnel=nil, options={}, &callback) case type.to_sym when :fanout pubsub_subscribe(pipe, &callback) when :direct list_subscribe(pipe, &callback) end end
Unsubscribe from all transport pipes. This method iterates through the current named Redis
connections, unsubscribing the “pubsub” connection from Redis
channels, and closing/deleting BLPOP connections.
@yield [info] passes info to an optional callback/block. @yieldparam info [Hash] empty hash.
Sensu::Transport::Base#unsubscribe
# File lib/sensu/transport/redis.rb, line 119 def unsubscribe @connections.each do |name, connection| case name when "pubsub" connection.unsubscribe when /^#{REDIS_KEYSPACE}/ connection.close @connections.delete(name) end end super end
Private Instance Methods
Shift a message off of a Redis
list and schedule another shift on the next tick of the event loop (reactor). Redis
BLPOP is a connection blocking Redis
command, this method creates a named Redis
connection for each list. Multiple Redis
connections for BLPOP commands is far more efficient than timer or next tick polling with LPOP.
@param list [String] @yield [info, message] passes message info and content to
the consumer/method callback/block.
@yieldparam info [Hash] an empty hash. @yieldparam message [String] message content.
# File lib/sensu/transport/redis.rb, line 307 def list_blpop(list, &callback) redis_connection(list) do |connection| connection.blpop(list, 0) do |_, message| EM::next_tick { list_blpop(list, &callback) } callback.call({}, message) end end end
Push (publish) a message onto a Redis
list. The `redis_key()` method is used to create a Redis
list key, using the transport pipe name. The publish callback info includes the current list size (queued).
@param pipe [String] the transport pipe name. @param message [String] the message to be published to the transport. @yield [info] passes publish info to an optional callback/block. @yieldparam info [Hash] contains publish information. @yieldparam queued [String] current list size.
# File lib/sensu/transport/redis.rb, line 285 def list_publish(pipe, message) list = redis_key("list", pipe) redis_connection("redis") do |connection| connection.rpush(list, message) do |queued| info = {:queued => queued} yield(info) if block_given? end end end
Subscribe to a Redis
list, shifting message off as they become available. The `redis_key()` method is used to create a Redis
list key, using the transport pipe name. The `list_blpop()` method is used to do the actual work.
@param pipe [String] the transport pipe name. @yield [info, message] passes message info and content to
the consumer/method callback/block.
@yieldparam info [Hash] an empty hash. @yieldparam message [String] message content.
# File lib/sensu/transport/redis.rb, line 326 def list_subscribe(pipe, &callback) list = redis_key("list", pipe) list_blpop(list, &callback) end
Monitor current Redis
connections, the connection “pool”. A timer is used to check on the connections, every `3` seconds. If one or more connections is not connected, a forced `reconnect()` is triggered. If all connections are connected after reconnecting, the transport `@after_reconnect` callback is called. If a connection monitor (timer) already exists, it is canceled.
# File lib/sensu/transport/redis.rb, line 166 def monitor_connections @connection_monitor.cancel if @connection_monitor @connection_monitor = EM::PeriodicTimer.new(3) do if !connected? reconnect(true) elsif @reconnecting @after_reconnect.call @reconnecting = false end end end
Publish a message to a Redis
channel (PubSub). The `redis_key()` method is used to create a Redis
channel key, using the transport pipe name. The publish callback info includes the current subscriber count for the Redis
channel.
@param pipe [String] the transport pipe name. @param message [String] the message to be published to the transport. @yield [info] passes publish info to an optional callback/block. @yieldparam info [Hash] contains publish information. @yieldparam subscribers [String] current subscriber count.
# File lib/sensu/transport/redis.rb, line 231 def pubsub_publish(pipe, message) channel = redis_key("channel", pipe) redis_connection("redis") do |connection| connection.publish(channel, message) do |subscribers| info = {:subscribers => subscribers} yield(info) if block_given? end end end
Subscribe to a Redis
channel (PubSub). The `redis_key()` method is used to create a Redis
channel key, using the transport pipe name. The named Redis
connection “pubsub” is used for the Redis
SUBSCRIBE command set, as the Redis
context is limited and enforced for the connection. The subscribe callback is called whenever a message is published to the Redis
channel. Channel messages with the type “subscribe” and “unsubscribe” are ignored, only messages with type “message” are passsed to the provided consumer/method callback/block.
@param pipe [String] the transport pipe name. @yield [info, message] passes message info and content to
the consumer/method callback/block.
@yieldparam info [Hash] contains the channel name. @yieldparam message [String] message content.
# File lib/sensu/transport/redis.rb, line 258 def pubsub_subscribe(pipe) channel = redis_key("channel", pipe) redis_connection("pubsub") do |connection| connection.subscribe(channel) do |type, channel, message| case type when "subscribe" @logger.debug("subscribed to redis channel: #{channel}") if @logger when "unsubscribe" @logger.debug("unsubscribed from redis channel: #{channel}") if @logger when "message" info = {:channel => channel} yield(info, message) end end end end
Return or setup a named Redis
connection. This method creates a Redis
connection object using the provided Redis
transport options. Redis
auto-reconnect is disabled as the connection “pool” is monitored as a whole. The transport `@on_error` callback is called when Redis
errors are encountered. This method creates/replaces the connection monitor after setting up the connection and before adding it to the pool.
@param name [String] the Redis
connection name. @yield [Object] passes the named connection object to the
callback/block.
# File lib/sensu/transport/redis.rb, line 189 def redis_connection(name) if @connections[name] yield(@connections[name]) else Sensu::Redis.connect(@options) do |connection| connection.auto_reconnect = false connection.reconnect_on_error = false connection.on_error do |error| @on_error.call(error) end monitor_connections @connections[name] = connection yield(connection) end end end
Create a Redis
key within the defined Redis
keyspace. This method is used to create keys that are unlikely to collide. The Redis
connection database number is included in the Redis
key as pubsub is not scoped to the selected database.
@param type [String] @param name [String] @return [String]
# File lib/sensu/transport/redis.rb, line 214 def redis_key(type, name) db = @options.is_a?(Hash) ? (@options[:db] || 0) : 0 [REDIS_KEYSPACE, db, type, name].join(":") end
Reset instance variables, called when reconnecting.
# File lib/sensu/transport/redis.rb, line 155 def reset @connections = {} end