class LogStash::Inputs::Redis
Constants
- BATCH_EMPTY_SLEEP
Public Instance Methods
register()
click to toggle source
# File lib/logstash/inputs/redis.rb, line 65 def register @redis_url = @path.nil? ? "redis://#{@password}@#{@host}:#{@port}/#{@db}" : "#{@password}@#{@path}/#{@db}" # just switch on data_type once if @data_type == 'list' || @data_type == 'dummy' @run_method = method(:list_runner) @stop_method = method(:list_stop) elsif @data_type == 'channel' @run_method = method(:channel_runner) @stop_method = method(:subscribe_stop) elsif @data_type == 'pattern_channel' @run_method = method(:pattern_channel_runner) @stop_method = method(:subscribe_stop) end @list_method = batched? ? method(:list_batch_listener) : method(:list_single_listener) @identity = "#{@redis_url} #{@data_type}:#{@key}" @logger.info("Registering Redis", :identity => @identity) end
run(output_queue)
click to toggle source
# File lib/logstash/inputs/redis.rb, line 86 def run(output_queue) @run_method.call(output_queue) rescue LogStash::ShutdownSignal # ignore and quit end
stop()
click to toggle source
# File lib/logstash/inputs/redis.rb, line 92 def stop @stop_method.call end
Private Instance Methods
batched?()
click to toggle source
private methods —————————–
# File lib/logstash/inputs/redis.rb, line 99 def batched? @batch_count > 1 end
channel_listener(output_queue)
click to toggle source
private
# File lib/logstash/inputs/redis.rb, line 310 def channel_listener(output_queue) @redis.subscribe(@key) do |on| on.subscribe do |channel, count| @logger.info("Subscribed", :channel => channel, :count => count) end on.message do |channel, message| queue_event(message, output_queue, channel) end on.unsubscribe do |channel, count| @logger.info("Unsubscribed", :channel => channel, :count => count) end end end
channel_runner(output_queue)
click to toggle source
private
# File lib/logstash/inputs/redis.rb, line 303 def channel_runner(output_queue) redis_runner do channel_listener(output_queue) end end
connect()
click to toggle source
private
# File lib/logstash/inputs/redis.rb, line 133 def connect redis = new_redis_instance # register any renamed Redis commands @command_map.each do |name, renamed| redis._client.command_map[name.to_sym] = renamed.to_sym end load_batch_script(redis) if batched? && is_list_type? redis end
is_list_type?()
click to toggle source
private
# File lib/logstash/inputs/redis.rb, line 104 def is_list_type? @data_type == 'list' end
list_batch_listener(redis, output_queue)
click to toggle source
# File lib/logstash/inputs/redis.rb, line 195 def list_batch_listener(redis, output_queue) begin results = redis.evalsha(@redis_script_sha, [@key], [@batch_count-1]) results.each do |item| queue_event(item, output_queue) end if results.size.zero? sleep BATCH_EMPTY_SLEEP end # Below is a commented-out implementation of 'batch fetch' # using pipelined LPOP calls. This in practice has been observed to # perform exactly the same in terms of event throughput as # the evalsha method. Given that the EVALSHA implementation uses # one call to Redis instead of N (where N == @batch_count) calls, # I decided to go with the 'evalsha' method of fetching N items # from Redis in bulk. #redis.pipelined do #error, item = redis.lpop(@key) #(@batch_count-1).times { redis.lpop(@key) } #end.each do |item| #queue_event(item, output_queue) if item #end # --- End commented out implementation of 'batch fetch' # further to the above, the LUA script now uses lrange and trim # which should further improve the efficiency of the script rescue ::Redis::CommandError => e if e.to_s =~ /NOSCRIPT/ then @logger.warn("Redis may have been restarted, reloading Redis batch EVAL script", :exception => e); load_batch_script(redis) retry else raise e end end end
list_runner(output_queue)
click to toggle source
private
# File lib/logstash/inputs/redis.rb, line 183 def list_runner(output_queue) while !stop? begin @redis ||= connect @list_method.call(@redis, output_queue) rescue => e log_error(e) retry if reset_for_error_retry(e) end end end
list_single_listener(redis, output_queue)
click to toggle source
# File lib/logstash/inputs/redis.rb, line 233 def list_single_listener(redis, output_queue) item = redis.blpop(@key, 0, :timeout => 1) return unless item # from timeout or other conditions # blpop returns the 'key' read from as well as the item result # we only care about the result (2nd item in the list). queue_event(item.last, output_queue) end
list_stop()
click to toggle source
private
# File lib/logstash/inputs/redis.rb, line 172 def list_stop redis = @redis # might change during method invocation return if redis.nil? || !redis.connected? redis.quit rescue nil # does client.disconnect internally # check if input retried while executing list_stop unless redis.equal? @redis @redis = nil end
load_batch_script(redis)
click to toggle source
private
# File lib/logstash/inputs/redis.rb, line 147 def load_batch_script(redis) #A Redis Lua EVAL script to fetch a count of keys redis_script = <<EOF local batchsize = tonumber(ARGV[1]) local result = redis.call(\'#{@command_map.fetch('lrange', 'lrange')}\', KEYS[1], 0, batchsize) redis.call(\'#{@command_map.fetch('ltrim', 'ltrim')}\', KEYS[1], batchsize + 1, -1) return result EOF @redis_script_sha = redis.script(:load, redis_script) end
log_error(e)
click to toggle source
# File lib/logstash/inputs/redis.rb, line 271 def log_error(e) info = { message: e.message, exception: e.class } info[:backtrace] = e.backtrace if @logger.debug? case e when ::Redis::TimeoutError # expected for channels in case no data is available @logger.debug("Redis timeout, retrying", info) when ::Redis::BaseConnectionError, ::Redis::ProtocolError @logger.warn("Redis connection error", info) when ::Redis::BaseError @logger.error("Redis error", info) when ::LogStash::ShutdownSignal @logger.debug("Received shutdown signal") else info[:backtrace] ||= e.backtrace @logger.error("Unexpected error", info) end end
new_redis_instance()
click to toggle source
# File lib/logstash/inputs/redis.rb, line 128 def new_redis_instance ::Redis.new(redis_params) end
pattern_channel_listener(output_queue)
click to toggle source
private
# File lib/logstash/inputs/redis.rb, line 333 def pattern_channel_listener(output_queue) @redis.psubscribe @key do |on| on.psubscribe do |channel, count| @logger.info("Subscribed", :channel => channel, :count => count) end on.pmessage do |pattern, channel, message| queue_event(message, output_queue, channel) end on.punsubscribe do |channel, count| @logger.info("Unsubscribed", :channel => channel, :count => count) end end end
pattern_channel_runner(output_queue)
click to toggle source
# File lib/logstash/inputs/redis.rb, line 326 def pattern_channel_runner(output_queue) redis_runner do pattern_channel_listener(output_queue) end end
queue_event(msg, output_queue, channel=nil)
click to toggle source
private
# File lib/logstash/inputs/redis.rb, line 159 def queue_event(msg, output_queue, channel=nil) begin @codec.decode(msg) do |event| decorate(event) event.set("[@metadata][redis_channel]", channel) if !channel.nil? output_queue << event end rescue => e # parse or event creation error @logger.error("Failed to create event", :message => msg, :exception => e, :backtrace => e.backtrace); end end
redis_params()
click to toggle source
private
# File lib/logstash/inputs/redis.rb, line 109 def redis_params params = { :timeout => @timeout, :db => @db, :password => @password.nil? ? nil : @password.value, :ssl => @ssl } if @path.nil? params[:host] = @host params[:port] = @port else @logger.warn("Parameter 'path' is set, ignoring parameters: 'host' and 'port'") params[:path] = @path end params end
redis_runner() { || ... }
click to toggle source
private
# File lib/logstash/inputs/redis.rb, line 261 def redis_runner begin @redis ||= connect yield rescue => e log_error(e) retry if reset_for_error_retry(e) end end
reset_for_error_retry(e)
click to toggle source
@return [true] if operation is fine to retry
# File lib/logstash/inputs/redis.rb, line 292 def reset_for_error_retry(e) return if e.is_a?(::LogStash::ShutdownSignal) # Reset the redis variable to trigger reconnect @redis = nil Stud.stoppable_sleep(1) { stop? } !stop? # retry if not stop-ing end
subscribe_stop()
click to toggle source
private
# File lib/logstash/inputs/redis.rb, line 243 def subscribe_stop redis = @redis # might change during method invocation return if redis.nil? || !redis.connected? if redis.subscribed? if @data_type == 'pattern_channel' redis.punsubscribe else redis.unsubscribe end end redis.close rescue nil # does client.disconnect # check if input retried while executing subscribe_stop unless redis.equal? @redis @redis = nil end