class RedisCluster

Constants

RedisClusterDefaultTimeout
RedisClusterHashSlots
RedisClusterRequestTTL

Public Class Methods

new(startup_nodes,connections,opt={}) click to toggle source
# File lib/rediscluster.rb, line 32
def initialize(startup_nodes,connections,opt={})
    @startup_nodes = startup_nodes
    @max_connections = connections
    @connections = {}
    @opt = opt
    @refresh_table_asap = false
    initialize_slots_cache
end

Public Instance Methods

close_existing_connection() click to toggle source

If the current number of connections is already the maximum number allowed, close a random connection. This should be called every time we cache a new connection in the @connections hash.

# File lib/rediscluster.rb, line 144
def close_existing_connection
    while @connections.length >= @max_connections
        @connections.each{|n,r|
            @connections.delete(n)
            begin
                r.client.disconnect
            rescue
            end
            break
        }
    end
end
flush_slots_cache() click to toggle source

Flush the cache, mostly useful for debugging when we want to force redirection.

# File lib/rediscluster.rb, line 99
def flush_slots_cache
    @slots = {}
end
get_connection_by_slot(slot) click to toggle source

Given a slot return the link (Redis instance) to the mapped node. Make sure to create a connection with the node if we don't have one.

# File lib/rediscluster.rb, line 198
def get_connection_by_slot(slot)
    node = @slots[slot]
    # If we don't know what the mapping is, return a random node.
    return get_random_connection if !node
    set_node_name!(node)
    if not @connections[node[:name]]
        begin
            close_existing_connection
            @connections[node[:name]] =
                get_redis_link(node[:host],node[:port])
        rescue
            # This will probably never happen with recent redis-rb
            # versions because the connection is enstablished in a lazy
            # way only when a command is called. However it is wise to
            # handle an instance creation error of some kind.
            return get_random_connection
        end
    end
    @connections[node[:name]]
end
get_key_from_command(argv) click to toggle source

Return the first key in the command arguments.

Currently we just return argv, that is, the first argument after the command name.

This is indeed the key for most commands, and when it is not true the cluster redirection will point us to the right node anyway.

For commands we want to explicitly bad as they don't make sense in the context of cluster, nil is returned.

# File lib/rediscluster.rb, line 129
def get_key_from_command(argv)
    case argv[0].to_s.downcase
    when "info","multi","exec","slaveof","config","shutdown"
        return nil
    else
        # Unknown commands, and all the commands having the key
        # as first argument are handled here:
        # set, get, ...
        return argv[1]
    end
end
get_random_connection() click to toggle source

Return a link to a random node, or raise an error if no node can be contacted. This function is only called when we can't reach the node associated with a given hash slot, or when we don't know the right mapping.

The function will try to get a successful reply to the PING command, otherwise the next node is tried.

# File lib/rediscluster.rb, line 164
def get_random_connection
    e = ""
    @startup_nodes.shuffle.each{|n|
        begin
            set_node_name!(n)
            conn = @connections[n[:name]]

            if !conn
                # Connect the node if it is not connected
                conn = get_redis_link(n[:host],n[:port])
                if conn.ping == "PONG"
                    close_existing_connection
                    @connections[n[:name]] = conn
                    return conn
                else
                    # If the connection is not good close it ASAP in order
                    # to avoid waiting for the GC finalizer. File
                    # descriptors are a rare resource.
                    conn.client.disconnect
                end
            else
                # The node was already connected, test the connection.
                return conn if conn.ping == "PONG"
            end
        rescue => e
            # Just try with the next node.
        end
    }
    raise "Can't reach a single startup node. #{e}"
end
initialize_slots_cache() click to toggle source

Contact the startup nodes and try to fetch the hash slots -> instances map in order to initialize the @slots hash.

# File lib/rediscluster.rb, line 57
def initialize_slots_cache
    @startup_nodes.each{|n|
        begin
            @slots = {}
            @nodes = []

            r = get_redis_link(n[:host],n[:port])
            r.cluster("slots").each {|r|
                (r[0]..r[1]).each{|slot|
                    ip,port = r[2]
                    name = "#{ip}:#{port}"
                    node = {
                        :host => ip, :port => port,
                        :name => name
                    }
                    @nodes << node
                    @slots[slot] = node
                }
            }
            populate_startup_nodes
            @refresh_table_asap = false
        rescue
            # Try with the next node on error.
            next
        end
        # Exit the loop as long as the first node replies
        break
    }
end
keyslot(key) click to toggle source

Return the hash slot from the key.

# File lib/rediscluster.rb, line 104
def keyslot(key)
    # Only hash what is inside {...} if there is such a pattern in the key.
    # Note that the specification requires the content that is between
    # the first { and the first } after the first {. If we found {} without
    # nothing in the middle, the whole key is hashed as usually.
    s = key.index "{"
    if s
        e = key.index "}",s+1
        if e && e != s+1
            key = key[s+1..e-1]
        end
    end
    RedisClusterCRC16.crc16(key) % RedisClusterHashSlots
end
method_missing(*argv) click to toggle source

Currently we handle all the commands using method_missing for simplicity. For a Cluster client actually it will be better to have every single command as a method with the right arity and possibly additional checks (example: RPOPLPUSH with same src/dst key, SORT without GET or BY, and so forth).

# File lib/rediscluster.rb, line 274
def method_missing(*argv)
    send_cluster_command(argv)
end
populate_startup_nodes() click to toggle source

Use @nodes to populate @startup_nodes, so that we have more chances if a subset of the cluster fails.

# File lib/rediscluster.rb, line 89
def populate_startup_nodes
    # Make sure every node has already a name, so that later the
    # Array uniq! method will work reliably.
    @startup_nodes.each{|n| set_node_name! n}
    @nodes.each{|n| @startup_nodes << n}
    @startup_nodes.uniq!
end
send_cluster_command(argv) click to toggle source

Dispatch commands.

# File lib/rediscluster.rb, line 220
def send_cluster_command(argv)
    initialize_slots_cache if @refresh_table_asap
    ttl = RedisClusterRequestTTL; # Max number of redirections
    e = ""
    asking = false
    try_random_node = false
    while ttl > 0
        ttl -= 1
        key = get_key_from_command(argv)
        raise "No way to dispatch this command to Redis Cluster." if !key
        slot = keyslot(key)
        if try_random_node
            r = get_random_connection
            try_random_node = false
        else
            r = get_connection_by_slot(slot)
        end
        begin
            # TODO: use pipelining to send asking and save a rtt.
            r.asking if asking
            asking = false
            return r.send(argv[0].to_sym,*argv[1..-1])
        rescue Errno::ECONNREFUSED, Redis::TimeoutError, Redis::CannotConnectError, Errno::EACCES
            try_random_node = true
            sleep(0.1) if ttl < RedisClusterRequestTTL/2
        rescue => e
            errv = e.to_s.split
            if errv[0] == "MOVED" || errv[0] == "ASK"
                if errv[0] == "ASK"
                    asking = true
                else
                    # Serve replied with MOVED. It's better for us to
                    # ask for CLUSTER NODES the next time.
                    @refresh_table_asap = true
                end
                newslot = errv[1].to_i
                node_ip,node_port = errv[2].split(":")
                if !asking
                    @slots[newslot] = {:host => node_ip,
                                       :port => node_port.to_i}
                end
            else
                raise e
            end
        end
    end
    raise "Too many Cluster redirections? (last error: #{e})"
end
set_node_name!(n) click to toggle source

Given a node (that is just a Ruby hash) give it a name just concatenating the host and port. We use the node name as a key to cache connections to that node.

# File lib/rediscluster.rb, line 49
def set_node_name!(n)
    if !n[:name]
        n[:name] = "#{n[:host]}:#{n[:port]}"
    end
end