class Vines::Cluster

Server instances may be connected to one another in a cluster so they can host a single chat domain, or set of domains, across many servers, transparently to users. A redis database is used for the session routing table, mapping JIDs to their node's location. Redis pubsub channels are used to communicate amongst nodes.

Using a shared in-memory cache, like redis, rather than synchronizing the cache to each node, allows us to add cluster nodes dynamically, without updating all other nodes' config files. It also greatly reduces the amount of memory required by the chat server processes.

Attributes

id[R]

Public Class Methods

new(config, &block) click to toggle source
# File lib/vines/cluster.rb, line 29
def initialize(config, &block)
  @config, @id = config, Kit.uuid
  @connection = Connection.new
  @sessions = Sessions.new(self)
  @publisher = Publisher.new(self)
  @subscriber = Subscriber.new(self)
  @pubsub = PubSub.new(self)
  instance_eval(&block)
end

Public Instance Methods

add_pubsub_node(domain, node) click to toggle source

Create a pubsub topic (a.k.a. node), in the given domain, to which messages may be published. The domain argument will be one of the configured pubsub subdomains in conf/config.rb (e.g. games.wonderland.lit, topics.wonderland.lit, etc).

# File lib/vines/cluster.rb, line 144
def add_pubsub_node(domain, node)
  @pubsub.add_node(domain, node)
end
connect() click to toggle source

Create a new redis connection.

# File lib/vines/cluster.rb, line 113
def connect
  @connection.create
end
connected_resources(jid) click to toggle source

Return the connected streams for this user, without any proxy streams to remote cluster nodes (locally connected streams only).

# File lib/vines/cluster.rb, line 130
def connected_resources(jid)
  @config.router.connected_resources(jid, jid, false)
end
connection() click to toggle source

Return the shared redis connection for most queries to use.

# File lib/vines/cluster.rb, line 108
def connection
  @connection.connect
end
delete_pubsub_node(domain, node) click to toggle source

Remove a pubsub topic so messages may no longer be broadcast to it.

# File lib/vines/cluster.rb, line 149
def delete_pubsub_node(domain, node)
  @pubsub.delete_node(domain, node)
end
delete_session(jid) click to toggle source

Remove this user from the cluster routing table so that no further stanzas may be routed to them. This must be called when the user's session is terminated, either by logout or stream disconnect.

# File lib/vines/cluster.rb, line 75
def delete_session(jid)
  @sessions.delete(jid)
end
delete_sessions(node) click to toggle source

Remove all user sessions from the routing table associated with the given node ID. Cluster nodes call this themselves during normal shutdown. However, if a node dies without being properly shutdown, the other nodes will cleanup its sessions when they detect the node is offline.

# File lib/vines/cluster.rb, line 83
def delete_sessions(node)
  @sessions.delete_all(node)
end
poke(node, time) click to toggle source

Notify the session store that this node is still alive. The node broadcasts its current time, so all cluster members' clocks don't necessarily need to be in sync.

# File lib/vines/cluster.rb, line 90
def poke(node, time)
  @sessions.poke(node, time)
end
pubsub_node?(domain, node) click to toggle source

Return true if the pubsub topic exists and messages may be published to it.

# File lib/vines/cluster.rb, line 172
def pubsub_node?(domain, node)
  @pubsub.node?(domain, node)
end
pubsub_subscribed?(domain, node, jid) click to toggle source

Return true if the JID is a registered subscriber to the pubsub topic and messages published to it should be routed to the JID.

# File lib/vines/cluster.rb, line 178
def pubsub_subscribed?(domain, node, jid)
  @pubsub.subscribed?(domain, node, jid)
end
pubsub_subscribers(domain, node) click to toggle source

Return a list of JIDs subscribed to the pubsub topic.

# File lib/vines/cluster.rb, line 183
def pubsub_subscribers(domain, node)
  @pubsub.subscribers(domain, node)
end
query(name, *args) click to toggle source

Turn an asynchronous redis query into a blocking call by pausing the fiber in which this code is running. Return the result of the query from this method, rather than passing it to a callback block.

# File lib/vines/cluster.rb, line 120
def query(name, *args)
  fiber, yielding = Fiber.current, true
  req = connection.send(name, *args)
  req.errback  { fiber.resume rescue yielding = false }
  req.callback {|response| fiber.resume(response) }
  Fiber.yield if yielding
end
remote_sessions(*jids) click to toggle source

Returns any streams hosted at remote nodes for these JIDs. The streams act like normal EM::Connections, but are actually proxies that route stanzas over redis pubsub channels to remote nodes.

# File lib/vines/cluster.rb, line 59
def remote_sessions(*jids)
  @sessions.find(*jids).map do |session|
    StreamProxy.new(self, session)
  end
end
route(stanza, node) click to toggle source

Send the stanza to the node hosting the user's session. The stanza is published to the channel to which the remote node is listening for messages.

# File lib/vines/cluster.rb, line 97
def route(stanza, node)
  @publisher.route(stanza, node)
end
save_session(jid, attrs) click to toggle source

Persist the user's session to the shared redis cache so that other cluster nodes can locate the node hosting this user's connection and route messages to them.

# File lib/vines/cluster.rb, line 68
def save_session(jid, attrs)
  @sessions.save(jid, attrs)
end
start() click to toggle source

Join this node to the cluster by broadcasting its state to the other nodes, subscribing to redis channels, and scheduling periodic heartbeat broadcasts. This method must be called after initialize or this node will not be a cluster member.

# File lib/vines/cluster.rb, line 43
def start
  @connection.connect
  @publisher.broadcast(:online)
  @subscriber.subscribe

  EM.add_periodic_timer(1) { heartbeat }

  at_exit do
    @publisher.broadcast(:offline)
    @sessions.delete_all(@id)
  end
end
storage(domain) click to toggle source

Return the Storage implementation for this domain or nil if the domain is not hosted here.

# File lib/vines/cluster.rb, line 136
def storage(domain)
  @config.storage(domain)
end
subscribe_pubsub(domain, node, jid) click to toggle source

Subscribe the JID to the pubsub topic so it will receive any messages published to it.

# File lib/vines/cluster.rb, line 155
def subscribe_pubsub(domain, node, jid)
  @pubsub.subscribe(domain, node, jid)
end
unsubscribe_all_pubsub(domain, jid) click to toggle source

Unsubscribe the JID from all pubsub topics. This is useful when the JID's session ends by logout or disconnect.

# File lib/vines/cluster.rb, line 167
def unsubscribe_all_pubsub(domain, jid)
  @pubsub.unsubscribe_all(domain, jid)
end
unsubscribe_pubsub(domain, node, jid) click to toggle source

Unsubscribe the JID from the pubsub topic, deregistering its interest in receiving any messages published to it.

# File lib/vines/cluster.rb, line 161
def unsubscribe_pubsub(domain, node, jid)
  @pubsub.unsubscribe(domain, node, jid)
end
update_user(jid, node) click to toggle source

Notify the remote node that the user's roster has changed and it should reload the user from storage.

# File lib/vines/cluster.rb, line 103
def update_user(jid, node)
  @publisher.update_user(jid, node)
end

Private Instance Methods

heartbeat() click to toggle source

Call this method once per second to broadcast this node's heartbeat and expire stale user sessions. This method must not raise exceptions or the timer will stop.

# File lib/vines/cluster.rb, line 192
def heartbeat
  @publisher.broadcast(:heartbeat)
  @sessions.expire
rescue => e
  log.error("Cluster session cleanup failed: #{e}")
end