class LogStash::Outputs::Redis
This output will send events to a Redis
queue using RPUSH. The RPUSH command is supported in Redis
v0.0.7+. Using PUBLISH to a channel requires at least v1.3.8+. While you may be able to make these Redis
versions work, the best performance and stability will be found in more recent stable versions. Versions 2.6.0+ are recommended.
For more information, see redis.io/[the Redis
homepage]
Public Instance Methods
close()
click to toggle source
# File lib/logstash/outputs/redis.rb, line 164 def close if @batch buffer_flush(:final => true) end if @data_type == 'channel' and @redis @redis.quit @redis = nil end end
congestion_check(key)
click to toggle source
# File lib/logstash/outputs/redis.rb, line 135 def congestion_check(key) return if @congestion_threshold == 0 if (Time.now.to_i - @congestion_check_times[key]) >= @congestion_interval # Check congestion only if enough time has passed since last check. while @redis.llen(key) > @congestion_threshold # Don't push event to Redis key which has reached @congestion_threshold. @logger.warn? and @logger.warn("Redis key size has hit a congestion threshold #{@congestion_threshold} suspending output for #{@congestion_interval} seconds") sleep @congestion_interval end @congestion_check_times[key] = Time.now.to_i end end
flush(events, key, close=false)
click to toggle source
called from Stud::Buffer#buffer_flush when there are events to flush
# File lib/logstash/outputs/redis.rb, line 147 def flush(events, key, close=false) @redis ||= connect # we should not block due to congestion on close # to support this Stud::Buffer#buffer_flush should pass here the :final boolean value. congestion_check(key) unless close @redis.rpush(key, events) end
on_flush_error(e)
click to toggle source
called from Stud::Buffer#buffer_flush when an error occurs
# File lib/logstash/outputs/redis.rb, line 155 def on_flush_error(e) @logger.warn("Failed to send backlog of events to Redis", :identity => identity, :exception => e, :backtrace => e.backtrace ) @redis = connect end
receive(event)
click to toggle source
# File lib/logstash/outputs/redis.rb, line 117 def receive(event) # TODO(sissel): We really should not drop an event, but historically # we have dropped events that fail to be converted to json. # TODO(sissel): Find a way to continue passing events through even # if they fail to convert properly. begin @codec.encode(event) rescue LocalJumpError # This LocalJumpError rescue clause is required to test for regressions # for https://github.com/logstash-plugins/logstash-output-redis/issues/26 # see specs. Without it the LocalJumpError is rescued by the StandardError raise rescue StandardError => e @logger.warn("Error encoding event", :exception => e, :event => event) end end
register()
click to toggle source
# File lib/logstash/outputs/redis.rb, line 90 def register require 'redis' if @batch if @data_type != "list" raise RuntimeError.new( "batch is not supported with data_type #{@data_type}" ) end buffer_initialize( :max_items => @batch_events, :max_interval => @batch_timeout, :logger => @logger ) end @redis = nil if @shuffle_hosts @host.shuffle! end @host_idx = 0 @congestion_check_times = Hash.new { |h,k| h[k] = Time.now.to_i - @congestion_interval } @codec.on_event(&method(:send_to_redis)) end
Private Instance Methods
connect()
click to toggle source
# File lib/logstash/outputs/redis.rb, line 175 def connect @current_host, @current_port = @host[@host_idx].split(':') @host_idx = @host_idx + 1 >= @host.length ? 0 : @host_idx + 1 if not @current_port @current_port = @port end params = { :host => @current_host, :port => @current_port, :timeout => @timeout, :db => @db } @logger.debug("connection params", params) if @password params[:password] = @password.value end Redis.new(params) end
identity()
click to toggle source
A string used to identify a Redis
instance in log messages
# File lib/logstash/outputs/redis.rb, line 199 def identity "redis://#{@password}@#{@current_host}:#{@current_port}/#{@db} #{@data_type}:#{@key}" end
send_to_redis(event, payload)
click to toggle source
# File lib/logstash/outputs/redis.rb, line 203 def send_to_redis(event, payload) # How can I do this sort of thing with codecs? key = event.sprintf(@key) if @batch && @data_type == 'list' # Don't use batched method for pubsub. # Stud::Buffer buffer_receive(payload, key) return end begin @redis ||= connect if @data_type == 'list' congestion_check(key) @redis.rpush(key, payload) else @redis.publish(key, payload) end rescue => e @logger.warn("Failed to send event to Redis", :event => event, :identity => identity, :exception => e, :backtrace => e.backtrace) sleep @reconnect_interval @redis = nil retry end end