class LogStash::Outputs::RedisList

This output will send events to a Redis queue using RPUSH. The RPUSH command is supported in Redis v0.0.7+. Using PUBLISH to a channel requires at least v1.3.8+. While you may be able to make these Redis versions work, the best performance and stability will be found in more recent stable versions. Versions 2.6.0+ are recommended.

For more information, see redis.io/[the Redis homepage]

Public Instance Methods

close() click to toggle source
# File lib/logstash/outputs/redis_list.rb, line 199
def close
  if @batch
    buffer_flush(:final => true)
  end
  if @data_type == 'channel' and @redis
    @redis.quit
    @redis = nil
  end
end
congestion_check(key) click to toggle source
# File lib/logstash/outputs/redis_list.rb, line 170
def congestion_check(key)
  return if @congestion_threshold == 0
  if (Time.now.to_i - @congestion_check_times[key]) >= @congestion_interval # Check congestion only if enough time has passed since last check.
    while @redis.llen(key) > @congestion_threshold # Don't push event to Redis key which has reached @congestion_threshold.
      @logger.warn? and @logger.warn("Redis key size has hit a congestion threshold #{@congestion_threshold} suspending output for #{@congestion_interval} seconds")
      sleep @congestion_interval
    end
    @congestion_check_times[key] = Time.now.to_i
  end
end
flush(events, key, close=false) click to toggle source

called from Stud::Buffer#buffer_flush when there are events to flush

# File lib/logstash/outputs/redis_list.rb, line 182
def flush(events, key, close=false)
  @redis ||= connect
  # we should not block due to congestion on close
  # to support this Stud::Buffer#buffer_flush should pass here the :final boolean value.
  congestion_check(key) unless close
  @redis.rpush(key, events)
end
on_flush_error(e) click to toggle source

called from Stud::Buffer#buffer_flush when an error occurs

# File lib/logstash/outputs/redis_list.rb, line 190
def on_flush_error(e)
  @logger.warn("Failed to send backlog of events to Redis",
    :identity => identity,
    :exception => e,
    :backtrace => e.backtrace
  )
  @redis = connect
end
receive(event) click to toggle source
# File lib/logstash/outputs/redis_list.rb, line 152
def receive(event)
  # TODO(sissel): We really should not drop an event, but historically
  # we have dropped events that fail to be converted to json.
  # TODO(sissel): Find a way to continue passing events through even
  # if they fail to convert properly.
  begin
    @codec.encode(event)
  rescue LocalJumpError
    # This LocalJumpError rescue clause is required to test for regressions
    # for https://github.com/logstash-plugins/logstash-output-redis/issues/26
    # see specs. Without it the LocalJumpError is rescued by the StandardError
    raise
  rescue StandardError => e
    @logger.warn("Error encoding event", :exception => e,
                 :event => event)
  end
end
register() click to toggle source
# File lib/logstash/outputs/redis_list.rb, line 106
def register
  require 'redis'

  # TODO remove after setting key and data_type to true
  if @queue
    if @key or @data_type
      raise RuntimeError.new(
        "Cannot specify queue parameter and key or data_type"
      )
    end
    @key = @queue
    @data_type = 'list'
  end

  if not @key or not @data_type
    raise RuntimeError.new(
      "Must define queue, or key and data_type parameters"
    )
  end
  # end TODO


  if @batch
    if @data_type != "list"
      raise RuntimeError.new(
        "batch is not supported with data_type #{@data_type}"
      )
    end
    buffer_initialize(
      :max_items => @batch_events,
      :max_interval => @batch_timeout,
      :logger => @logger
    )
  end

  @redis = nil
  if @shuffle_hosts
      @host.shuffle!
  end
  @host_idx = 0

  @congestion_check_times = Hash.new { |h,k| h[k] = Time.now.to_i - @congestion_interval }

  @codec.on_event(&method(:send_to_redis))
end

Private Instance Methods

connect() click to toggle source
# File lib/logstash/outputs/redis_list.rb, line 210
def connect
  @current_host, @current_port = @host[@host_idx].split(':')
  @host_idx = @host_idx + 1 >= @host.length ? 0 : @host_idx + 1

  if not @current_port
    @current_port = @port
  end

  params = {
    :host => @current_host,
    :port => @current_port,
    :timeout => @timeout,
    :db => @db
  }
  @logger.debug(params)

  if @password
    params[:password] = @password.value
  end

  Redis.new(params)
end
identity() click to toggle source

A string used to identify a Redis instance in log messages

# File lib/logstash/outputs/redis_list.rb, line 234
def identity
  @name || "redis://#{@password}@#{@current_host}:#{@current_port}/#{@db} #{@data_type}:#{@key}"
end
send_to_redis(event, payload) click to toggle source
# File lib/logstash/outputs/redis_list.rb, line 238
def send_to_redis(event, payload)
  # How can I do this sort of thing with codecs?
  key = event.sprintf(@key)
  value = event.sprintf(@value)

  if @batch && @data_type == 'list' # Don't use batched method for pubsub.
    # Stud::Buffer
    buffer_receive(payload, key)
    return
  end

  begin
    @redis ||= connect
    if @data_type == 'list'
      congestion_check(key)
      @redis.lpush(key, value)
      @redis.ltrim(key, 0, @length)
      @redis.expire(key, @expire)
    else
      @redis.publish(key, payload)
    end
  rescue => e
    @logger.warn("Failed to send event to Redis", :event => event,
                 :identity => identity, :exception => e,
                 :backtrace => e.backtrace)
    sleep @reconnect_interval
    @redis = nil
    retry
  end
end