class AnyCable::Rack::Hub
Constants
- INTERNAL_STREAM
Attributes
sockets[R]
streams[R]
Public Class Methods
new()
click to toggle source
# File lib/anycable/rack/hub.rb, line 13 def initialize @streams = Hash.new do |streams, stream_id| streams[stream_id] = Hash.new { |channels, channel_id| channels[channel_id] = Set.new } end @sockets = Hash.new { |h, k| h[k] = Set.new } @sync = Mutex.new end
Public Instance Methods
add_socket(socket, identifier)
click to toggle source
# File lib/anycable/rack/hub.rb, line 21 def add_socket(socket, identifier) @sync.synchronize do @streams[INTERNAL_STREAM][identifier] << socket end end
add_subscriber(stream, socket, channel)
click to toggle source
# File lib/anycable/rack/hub.rb, line 27 def add_subscriber(stream, socket, channel) @sync.synchronize do @streams[stream][channel] << socket @sockets[socket] << [channel, stream] end end
broadcast(stream, message, coder)
click to toggle source
# File lib/anycable/rack/hub.rb, line 66 def broadcast(stream, message, coder) list = @sync.synchronize do return unless @streams.key?(stream) @streams[stream].to_a end list.each do |(channel_id, sockets)| decoded = JSON.parse(message) cmessage = channel_message(channel_id, decoded, coder) sockets.each { |socket| socket.transmit(cmessage) } end end
broadcast_all(message)
click to toggle source
# File lib/anycable/rack/hub.rb, line 80 def broadcast_all(message) sockets.each_key { |socket| socket.transmit(message) } end
close_all()
click to toggle source
# File lib/anycable/rack/hub.rb, line 99 def close_all hub.sockets.dup.each do |socket| hub.remove_socket(socket) socket.close end end
disconnect(identifier, reconnect, coder)
click to toggle source
# File lib/anycable/rack/hub.rb, line 84 def disconnect(identifier, reconnect, coder) sockets = @sync.synchronize do return unless @streams[INTERNAL_STREAM].key?(identifier) @streams[INTERNAL_STREAM][identifier].to_a end msg = disconnect_message("remote", reconnect, coder) sockets.each do |socket| socket.transmit(msg) socket.close end end
remove_channel(socket, channel)
click to toggle source
# File lib/anycable/rack/hub.rb, line 42 def remove_channel(socket, channel) list = @sync.synchronize do return unless @sockets.key?(socket) @sockets[socket].dup end list.each do |(channel_id, stream)| remove_subscriber(stream, socket, channel) if channel == channel_id end end
remove_socket(socket)
click to toggle source
# File lib/anycable/rack/hub.rb, line 54 def remove_socket(socket) list = @sync.synchronize do return unless @sockets.key?(socket) @sockets[socket].dup end list.each do |(channel_id, stream)| remove_subscriber(stream, socket, channel_id) end end
remove_subscriber(stream, socket, channel)
click to toggle source
# File lib/anycable/rack/hub.rb, line 34 def remove_subscriber(stream, socket, channel) @sync.synchronize do @streams[stream][channel].delete(socket) @sockets[socket].delete([channel, stream]) cleanup stream, socket, channel end end
Private Instance Methods
channel_message(channel_id, message, coder)
click to toggle source
# File lib/anycable/rack/hub.rb, line 114 def channel_message(channel_id, message, coder) coder.encode(identifier: channel_id, message: message) end
cleanup(stream, socket, channel)
click to toggle source
# File lib/anycable/rack/hub.rb, line 108 def cleanup(stream, socket, channel) @streams[stream].delete(channel) if @streams[stream][channel].empty? @streams.delete(stream) if @streams[stream].empty? @sockets.delete(socket) if @sockets[socket].empty? end
disconnect_message(reason, reconnect, coder)
click to toggle source
# File lib/anycable/rack/hub.rb, line 118 def disconnect_message(reason, reconnect, coder) coder.encode({type: :disconnect, reason: reason, reconnect: reconnect}) end