class LogStash::Inputs::Redis

Constants

BATCH_EMPTY_SLEEP

Public Instance Methods

register() click to toggle source
# File lib/logstash/inputs/redis.rb, line 65
def register
  @redis_url = @path.nil? ? "redis://#{@password}@#{@host}:#{@port}/#{@db}" : "#{@password}@#{@path}/#{@db}"

  # just switch on data_type once
  if @data_type == 'list' || @data_type == 'dummy'
    @run_method = method(:list_runner)
    @stop_method = method(:list_stop)
  elsif @data_type == 'channel'
    @run_method = method(:channel_runner)
    @stop_method = method(:subscribe_stop)
  elsif @data_type == 'pattern_channel'
    @run_method = method(:pattern_channel_runner)
    @stop_method = method(:subscribe_stop)
  end

  @list_method = batched? ? method(:list_batch_listener) : method(:list_single_listener)

  @identity = "#{@redis_url} #{@data_type}:#{@key}"
  @logger.info("Registering Redis", :identity => @identity)
end
run(output_queue) click to toggle source
# File lib/logstash/inputs/redis.rb, line 86
def run(output_queue)
  @run_method.call(output_queue)
rescue LogStash::ShutdownSignal
  # ignore and quit
end
stop() click to toggle source
# File lib/logstash/inputs/redis.rb, line 92
def stop
  @stop_method.call
end

Private Instance Methods

batched?() click to toggle source

private methods —————————–

# File lib/logstash/inputs/redis.rb, line 99
def batched?
  @batch_count > 1
end
channel_listener(output_queue) click to toggle source

private

# File lib/logstash/inputs/redis.rb, line 310
def channel_listener(output_queue)
  @redis.subscribe(@key) do |on|
    on.subscribe do |channel, count|
      @logger.info("Subscribed", :channel => channel, :count => count)
    end

    on.message do |channel, message|
      queue_event(message, output_queue, channel)
    end

    on.unsubscribe do |channel, count|
      @logger.info("Unsubscribed", :channel => channel, :count => count)
    end
  end
end
channel_runner(output_queue) click to toggle source

private

# File lib/logstash/inputs/redis.rb, line 303
def channel_runner(output_queue)
  redis_runner do
    channel_listener(output_queue)
  end
end
connect() click to toggle source

private

# File lib/logstash/inputs/redis.rb, line 133
def connect
  redis = new_redis_instance

  # register any renamed Redis commands
  @command_map.each do |name, renamed|
    redis._client.command_map[name.to_sym] = renamed.to_sym
  end

  load_batch_script(redis) if batched? && is_list_type?

  redis
end
is_list_type?() click to toggle source

private

# File lib/logstash/inputs/redis.rb, line 104
def is_list_type?
  @data_type == 'list'
end
list_batch_listener(redis, output_queue) click to toggle source
# File lib/logstash/inputs/redis.rb, line 195
def list_batch_listener(redis, output_queue)
  begin
    results = redis.evalsha(@redis_script_sha, [@key], [@batch_count-1])
    results.each do |item|
      queue_event(item, output_queue)
    end

    if results.size.zero?
      sleep BATCH_EMPTY_SLEEP
    end

    # Below is a commented-out implementation of 'batch fetch'
    # using pipelined LPOP calls. This in practice has been observed to
    # perform exactly the same in terms of event throughput as
    # the evalsha method. Given that the EVALSHA implementation uses
    # one call to Redis instead of N (where N == @batch_count) calls,
    # I decided to go with the 'evalsha' method of fetching N items
    # from Redis in bulk.
    #redis.pipelined do
      #error, item = redis.lpop(@key)
      #(@batch_count-1).times { redis.lpop(@key) }
    #end.each do |item|
      #queue_event(item, output_queue) if item
    #end
    # --- End commented out implementation of 'batch fetch'
    # further to the above, the LUA script now uses lrange and trim
    # which should further improve the efficiency of the script
  rescue ::Redis::CommandError => e
    if e.to_s =~ /NOSCRIPT/ then
      @logger.warn("Redis may have been restarted, reloading Redis batch EVAL script", :exception => e);
      load_batch_script(redis)
      retry
    else
      raise e
    end
  end
end
list_runner(output_queue) click to toggle source

private

# File lib/logstash/inputs/redis.rb, line 183
def list_runner(output_queue)
  while !stop?
    begin
      @redis ||= connect
      @list_method.call(@redis, output_queue)
    rescue => e
      log_error(e)
      retry if reset_for_error_retry(e)
    end
  end
end
list_single_listener(redis, output_queue) click to toggle source
# File lib/logstash/inputs/redis.rb, line 233
def list_single_listener(redis, output_queue)
  item = redis.blpop(@key, 0, :timeout => 1)
  return unless item # from timeout or other conditions

  # blpop returns the 'key' read from as well as the item result
  # we only care about the result (2nd item in the list).
  queue_event(item.last, output_queue)
end
list_stop() click to toggle source

private

# File lib/logstash/inputs/redis.rb, line 172
def list_stop
  redis = @redis # might change during method invocation
  return if redis.nil? || !redis.connected?

  redis.quit rescue nil # does client.disconnect internally
  # check if input retried while executing
  list_stop unless redis.equal? @redis
  @redis = nil
end
load_batch_script(redis) click to toggle source

private

# File lib/logstash/inputs/redis.rb, line 147
  def load_batch_script(redis)
    #A Redis Lua EVAL script to fetch a count of keys
    redis_script = <<EOF
      local batchsize = tonumber(ARGV[1])
      local result = redis.call(\'#{@command_map.fetch('lrange', 'lrange')}\', KEYS[1], 0, batchsize)
      redis.call(\'#{@command_map.fetch('ltrim', 'ltrim')}\', KEYS[1], batchsize + 1, -1)
      return result
EOF
    @redis_script_sha = redis.script(:load, redis_script)
  end
log_error(e) click to toggle source
# File lib/logstash/inputs/redis.rb, line 271
def log_error(e)
  info = { message: e.message, exception: e.class }
  info[:backtrace] = e.backtrace if @logger.debug?

  case e
  when ::Redis::TimeoutError
    # expected for channels in case no data is available
    @logger.debug("Redis timeout, retrying", info)
  when ::Redis::BaseConnectionError, ::Redis::ProtocolError
    @logger.warn("Redis connection error", info)
  when ::Redis::BaseError
    @logger.error("Redis error", info)
  when ::LogStash::ShutdownSignal
    @logger.debug("Received shutdown signal")
  else
    info[:backtrace] ||= e.backtrace
    @logger.error("Unexpected error", info)
  end
end
new_redis_instance() click to toggle source
# File lib/logstash/inputs/redis.rb, line 128
def new_redis_instance
  ::Redis.new(redis_params)
end
pattern_channel_listener(output_queue) click to toggle source

private

# File lib/logstash/inputs/redis.rb, line 333
def pattern_channel_listener(output_queue)
  @redis.psubscribe @key do |on|
    on.psubscribe do |channel, count|
      @logger.info("Subscribed", :channel => channel, :count => count)
    end

    on.pmessage do |pattern, channel, message|
      queue_event(message, output_queue, channel)
    end

    on.punsubscribe do |channel, count|
      @logger.info("Unsubscribed", :channel => channel, :count => count)
    end
  end
end
pattern_channel_runner(output_queue) click to toggle source
# File lib/logstash/inputs/redis.rb, line 326
def pattern_channel_runner(output_queue)
  redis_runner do
    pattern_channel_listener(output_queue)
  end
end
queue_event(msg, output_queue, channel=nil) click to toggle source

private

# File lib/logstash/inputs/redis.rb, line 159
def queue_event(msg, output_queue, channel=nil)
  begin
    @codec.decode(msg) do |event|
      decorate(event)
      event.set("[@metadata][redis_channel]", channel) if !channel.nil?
      output_queue << event
    end
  rescue => e # parse or event creation error
    @logger.error("Failed to create event", :message => msg, :exception => e, :backtrace => e.backtrace);
  end
end
redis_params() click to toggle source

private

# File lib/logstash/inputs/redis.rb, line 109
def redis_params
  params = {
      :timeout => @timeout,
      :db => @db,
      :password => @password.nil? ? nil : @password.value,
      :ssl => @ssl
  }

  if @path.nil?
    params[:host] = @host
    params[:port] = @port
  else
    @logger.warn("Parameter 'path' is set, ignoring parameters: 'host' and 'port'")
    params[:path] = @path
  end

  params
end
redis_runner() { || ... } click to toggle source

private

# File lib/logstash/inputs/redis.rb, line 261
def redis_runner
  begin
    @redis ||= connect
    yield
  rescue => e
    log_error(e)
    retry if reset_for_error_retry(e)
  end
end
reset_for_error_retry(e) click to toggle source

@return [true] if operation is fine to retry

# File lib/logstash/inputs/redis.rb, line 292
def reset_for_error_retry(e)
  return if e.is_a?(::LogStash::ShutdownSignal)

  # Reset the redis variable to trigger reconnect
  @redis = nil

  Stud.stoppable_sleep(1) { stop? }
  !stop? # retry if not stop-ing
end
subscribe_stop() click to toggle source

private

# File lib/logstash/inputs/redis.rb, line 243
def subscribe_stop
  redis = @redis # might change during method invocation
  return if redis.nil? || !redis.connected?

  if redis.subscribed?
    if @data_type == 'pattern_channel'
      redis.punsubscribe
    else
      redis.unsubscribe
    end
  end
  redis.close rescue nil # does client.disconnect
  # check if input retried while executing
  subscribe_stop unless redis.equal? @redis
  @redis = nil
end