class LogStash::Filters::Rediss
Public Instance Methods
filter(event)
click to toggle source
# File lib/logstash/filters/rediss.rb, line 151 def filter(event) # TODO: Maybe refactor the interface into a more flexible one with two # main configs 'cmd' & 'args'. Then it would be possible to eliminate # all if clauses and replace it through one hashmap call, where # the hashmap would be a mapping from 'cmd' -> <cmd_function_ref> # E.q.: cmds.fetch(event.get(@llen), &method(:cmd_not_found_err)) max_retries = @max_retries begin @redis ||= connect if @get key = @cmd_key_is_formatted ? event.sprintf(@get) : event.get(@get) event.set(@target, @redis.get(key)) end if @set key = @cmd_key_is_formatted ? event.sprintf(@set) : event.get(@set) @redis.set(key, event.get(@source)) end if @setex key = @cmd_key_is_formatted ? event.sprintf(@setex) : event.get(@setex) @redis.setex(key, @ttl, event.get(@source)) end if @exists event.set(@target, @redis.exists(event.get(@exists))) end if @del @redis.del(event.get(@del)) end if @hget event.set(@target, @redis.hget(event.get(@hget), event.get(@source))) end if @hset @redis.hset(event.get(@hset), event.get(@field), event.get(@source)) end if @sadd @redis.sadd(event.get(@sadd), event.get(@source)) end if @sismember event.set(@target, @redis.sismember(event.get(@sismember), event.get(@source))) end if @smembers event.set(@target, @redis.smembers(event.get(@smembers))) end if @scard event.set(@target, @redis.scard(event.get(@scard))) end if @llen key = @cmd_key_is_formatted ? event.sprintf(@llen) : event.get(@llen) event.set(@target, @redis.llen(key)) end if @rpush @redis.rpush(event.get(@rpush), event.get(@source)) end if @rpushnx key = @cmd_key_is_formatted ? event.sprintf(@rpushnx) : event.get(@rpushnx) max_lock_retries = @max_lock_retries begin @lock_manager ||= connect_lockmanager @lock_manager.lock!("lock_#{key}", @lock_timeout) do @redis.rpush(key, event.get(@source)) unless @redis.exists(key) end rescue Redlock::LockError => e @logger.warn("Failed to lock section 'rpushnx' for key: #{key}", :event => event, :exception => e) sleep @lock_retry_interval max_lock_retries -= 1 unless max_lock_retries < 0 retry else @logger.error("Max retries reached for trying to lock section 'rpushnx' for key: #{key}", :event => event, :exception => e) end end end if @rpop event.set(@target, @redis.rpop(event.get(@rpop))) end if @lget event.set(@target, @redis.lrange(event.get(@lget), 0, -1)) end rescue => e @logger.warn("Failed to send event to Redis, retrying after #{@reconnect_interval} seconds...", :event => event, :exception => e, :backtrace => e.backtrace) sleep @reconnect_interval @redis = nil @lock_manager = nil max_retries -= 1 unless max_retries < 0 retry else @logger.error("Max retries reached for trying to execute a command", :event => event, :exception => e) end end # filter_matched should go in the last line of our successful code filter_matched(event) end
register()
click to toggle source
# File lib/logstash/filters/rediss.rb, line 142 def register @redis = nil @lock_manager = nil if @shuffle_hosts @host.shuffle! end @host_idx = 0 end
Private Instance Methods
connect()
click to toggle source
# File lib/logstash/filters/rediss.rb, line 268 def connect @current_host, @current_port = @host[@host_idx].split(':') @host_idx = @host_idx + 1 >= @host.length ? 0 : @host_idx + 1 if not @current_port @current_port = @port end params = { :host => @current_host, :port => @current_port, :timeout => @timeout, :db => @db, :ssl => @ssl } @logger.debug("Connection params", params) if @password params[:password] = @password.value end Redis.new(params) end
connect_lockmanager()
click to toggle source
# File lib/logstash/filters/rediss.rb, line 293 def connect_lockmanager @protocol = @ssl ? 'rediss://' : 'redis://' hosts = Array(@host).map { |host| host.prepend(@protocol) unless host.start_with?(@protocol) } @logger.debug("lock_manager hosts", hosts) Redlock::Client.new(hosts) end