class Vines::Cluster
Server instances may be connected to one another in a cluster so they can host a single chat domain, or set of domains, across many servers, transparently to users. A redis database is used for the session routing table, mapping JIDs to their node's location. Redis pubsub channels are used to communicate amongst nodes.
Using a shared in-memory cache, like redis, rather than synchronizing the cache to each node, allows us to add cluster nodes dynamically, without updating all other nodes' config files. It also greatly reduces the amount of memory required by the chat server processes.
Attributes
Public Class Methods
# File lib/vines/cluster.rb, line 29 def initialize(config, &block) @config, @id = config, Kit.uuid @connection = Connection.new @sessions = Sessions.new(self) @publisher = Publisher.new(self) @subscriber = Subscriber.new(self) @pubsub = PubSub.new(self) instance_eval(&block) end
Public Instance Methods
Create a pubsub topic (a.k.a. node), in the given domain, to which messages may be published. The domain argument will be one of the configured pubsub subdomains in conf/config.rb (e.g. games.wonderland.lit, topics.wonderland.lit, etc).
# File lib/vines/cluster.rb, line 144 def add_pubsub_node(domain, node) @pubsub.add_node(domain, node) end
Create a new redis connection.
# File lib/vines/cluster.rb, line 113 def connect @connection.create end
Return the connected streams for this user, without any proxy streams to remote cluster nodes (locally connected streams only).
# File lib/vines/cluster.rb, line 130 def connected_resources(jid) @config.router.connected_resources(jid, jid, false) end
Return the shared redis connection for most queries to use.
# File lib/vines/cluster.rb, line 108 def connection @connection.connect end
Remove a pubsub topic so messages may no longer be broadcast to it.
# File lib/vines/cluster.rb, line 149 def delete_pubsub_node(domain, node) @pubsub.delete_node(domain, node) end
Remove this user from the cluster routing table so that no further stanzas may be routed to them. This must be called when the user's session is terminated, either by logout or stream disconnect.
# File lib/vines/cluster.rb, line 75 def delete_session(jid) @sessions.delete(jid) end
Remove all user sessions from the routing table associated with the given node ID. Cluster
nodes call this themselves during normal shutdown. However, if a node dies without being properly shutdown, the other nodes will cleanup its sessions when they detect the node is offline.
# File lib/vines/cluster.rb, line 83 def delete_sessions(node) @sessions.delete_all(node) end
Notify the session store that this node is still alive. The node broadcasts its current time, so all cluster members' clocks don't necessarily need to be in sync.
# File lib/vines/cluster.rb, line 90 def poke(node, time) @sessions.poke(node, time) end
Return true if the pubsub topic exists and messages may be published to it.
# File lib/vines/cluster.rb, line 172 def pubsub_node?(domain, node) @pubsub.node?(domain, node) end
Return a list of JIDs subscribed to the pubsub topic.
# File lib/vines/cluster.rb, line 183 def pubsub_subscribers(domain, node) @pubsub.subscribers(domain, node) end
Turn an asynchronous redis query into a blocking call by pausing the fiber in which this code is running. Return the result of the query from this method, rather than passing it to a callback block.
# File lib/vines/cluster.rb, line 120 def query(name, *args) fiber, yielding = Fiber.current, true req = connection.send(name, *args) req.errback { fiber.resume rescue yielding = false } req.callback {|response| fiber.resume(response) } Fiber.yield if yielding end
Returns any streams hosted at remote nodes for these JIDs. The streams act like normal EM::Connections, but are actually proxies that route stanzas over redis pubsub channels to remote nodes.
# File lib/vines/cluster.rb, line 59 def remote_sessions(*jids) @sessions.find(*jids).map do |session| StreamProxy.new(self, session) end end
Send the stanza to the node hosting the user's session. The stanza is published to the channel to which the remote node is listening for messages.
# File lib/vines/cluster.rb, line 97 def route(stanza, node) @publisher.route(stanza, node) end
Persist the user's session to the shared redis cache so that other cluster nodes can locate the node hosting this user's connection and route messages to them.
# File lib/vines/cluster.rb, line 68 def save_session(jid, attrs) @sessions.save(jid, attrs) end
Join this node to the cluster by broadcasting its state to the other nodes, subscribing to redis channels, and scheduling periodic heartbeat broadcasts. This method must be called after initialize or this node will not be a cluster member.
# File lib/vines/cluster.rb, line 43 def start @connection.connect @publisher.broadcast(:online) @subscriber.subscribe EM.add_periodic_timer(1) { heartbeat } at_exit do @publisher.broadcast(:offline) @sessions.delete_all(@id) end end
Return the Storage
implementation for this domain or nil if the domain is not hosted here.
# File lib/vines/cluster.rb, line 136 def storage(domain) @config.storage(domain) end
Subscribe the JID
to the pubsub topic so it will receive any messages published to it.
# File lib/vines/cluster.rb, line 155 def subscribe_pubsub(domain, node, jid) @pubsub.subscribe(domain, node, jid) end
Unsubscribe the JID
from all pubsub topics. This is useful when the JID's session ends by logout or disconnect.
# File lib/vines/cluster.rb, line 167 def unsubscribe_all_pubsub(domain, jid) @pubsub.unsubscribe_all(domain, jid) end
Unsubscribe the JID
from the pubsub topic, deregistering its interest in receiving any messages published to it.
# File lib/vines/cluster.rb, line 161 def unsubscribe_pubsub(domain, node, jid) @pubsub.unsubscribe(domain, node, jid) end
Notify the remote node that the user's roster has changed and it should reload the user from storage.
# File lib/vines/cluster.rb, line 103 def update_user(jid, node) @publisher.update_user(jid, node) end
Private Instance Methods
Call this method once per second to broadcast this node's heartbeat and expire stale user sessions. This method must not raise exceptions or the timer will stop.
# File lib/vines/cluster.rb, line 192 def heartbeat @publisher.broadcast(:heartbeat) @sessions.expire rescue => e log.error("Cluster session cleanup failed: #{e}") end