class LogStash::Filters::Rediss

Public Instance Methods

filter(event) click to toggle source
# File lib/logstash/filters/rediss.rb, line 151
def filter(event)

    # TODO: Maybe refactor the interface into a more flexible one with two
    #       main configs 'cmd' & 'args'. Then it would be possible to eliminate
    #       all if clauses and replace it through one hashmap call, where
    #       the hashmap would be a mapping from 'cmd' -> <cmd_function_ref>
    #       E.q.: cmds.fetch(event.get(@llen), &method(:cmd_not_found_err))
    max_retries = @max_retries
    begin
        @redis ||= connect

        if @get
            key = @cmd_key_is_formatted ? event.sprintf(@get) : event.get(@get)
            event.set(@target, @redis.get(key))
        end

        if @set
            key = @cmd_key_is_formatted ? event.sprintf(@set) : event.get(@set)
            @redis.set(key, event.get(@source))
        end

        if @setex
            key = @cmd_key_is_formatted ? event.sprintf(@setex) : event.get(@setex)
            @redis.setex(key, @ttl, event.get(@source))
        end

        if @exists
            event.set(@target, @redis.exists(event.get(@exists)))
        end

        if @del
            @redis.del(event.get(@del))
        end

        if @hget
            event.set(@target, @redis.hget(event.get(@hget), event.get(@source)))
        end

        if @hset
            @redis.hset(event.get(@hset), event.get(@field), event.get(@source))
        end

        if @sadd
            @redis.sadd(event.get(@sadd), event.get(@source))
        end

        if @sismember
            event.set(@target, @redis.sismember(event.get(@sismember), event.get(@source)))
        end

        if @smembers
            event.set(@target, @redis.smembers(event.get(@smembers)))
        end

        if @scard
            event.set(@target, @redis.scard(event.get(@scard)))
        end

        if @llen
            key = @cmd_key_is_formatted ? event.sprintf(@llen) : event.get(@llen)
            event.set(@target, @redis.llen(key))
        end

        if @rpush
            @redis.rpush(event.get(@rpush), event.get(@source))
        end

        if @rpushnx
            key = @cmd_key_is_formatted ? event.sprintf(@rpushnx) : event.get(@rpushnx)
            max_lock_retries = @max_lock_retries
            begin
                @lock_manager ||= connect_lockmanager
                @lock_manager.lock!("lock_#{key}", @lock_timeout) do
                    @redis.rpush(key, event.get(@source)) unless @redis.exists(key)
                end
            rescue Redlock::LockError => e
                @logger.warn("Failed to lock section 'rpushnx' for key: #{key}",
                             :event => event, :exception => e)
                sleep @lock_retry_interval
                max_lock_retries -= 1
                unless max_lock_retries < 0
                    retry
                else
                    @logger.error("Max retries reached for trying to lock section 'rpushnx' for key: #{key}",
                                  :event => event, :exception => e)
                end
            end
        end

        if @rpop
            event.set(@target, @redis.rpop(event.get(@rpop)))
        end

        if @lget
            event.set(@target, @redis.lrange(event.get(@lget), 0, -1))
        end

    rescue => e
        @logger.warn("Failed to send event to Redis, retrying after #{@reconnect_interval} seconds...", :event => event,
                     :exception => e, :backtrace => e.backtrace)
        sleep @reconnect_interval
        @redis = nil
        @lock_manager = nil
        max_retries -= 1
        unless max_retries < 0
            retry
        else
            @logger.error("Max retries reached for trying to execute a command",
                          :event => event, :exception => e)
        end
    end

    # filter_matched should go in the last line of our successful code
    filter_matched(event)
end
register() click to toggle source
# File lib/logstash/filters/rediss.rb, line 142
def register
    @redis = nil
    @lock_manager = nil
    if @shuffle_hosts
        @host.shuffle!
    end
    @host_idx = 0
end

Private Instance Methods

connect() click to toggle source
# File lib/logstash/filters/rediss.rb, line 268
def connect
    @current_host, @current_port = @host[@host_idx].split(':')
    @host_idx = @host_idx + 1 >= @host.length ? 0 : @host_idx + 1

    if not @current_port
        @current_port = @port
    end

    params = {
        :host => @current_host,
        :port => @current_port,
        :timeout => @timeout,
        :db => @db,
        :ssl => @ssl
    }

    @logger.debug("Connection params", params)

    if @password
        params[:password] = @password.value
    end

    Redis.new(params)
end
connect_lockmanager() click to toggle source
# File lib/logstash/filters/rediss.rb, line 293
def connect_lockmanager
    @protocol =  @ssl ? 'rediss://' : 'redis://'

    hosts = Array(@host).map { |host|
        host.prepend(@protocol) unless host.start_with?(@protocol) 
    }

    @logger.debug("lock_manager hosts", hosts)

    Redlock::Client.new(hosts)
end