class AnyCable::Rack::Hub

From github.com/rails/rails/blob/v5.0.1/actioncable/lib/action_cable/subscription_adapter/subscriber_map.rb

Constants

INTERNAL_STREAM

Attributes

sockets[R]
streams[R]

Public Class Methods

new() click to toggle source
# File lib/anycable/rack/hub.rb, line 13
def initialize
  @streams = Hash.new do |streams, stream_id|
    streams[stream_id] = Hash.new { |channels, channel_id| channels[channel_id] = Set.new }
  end
  @sockets = Hash.new { |h, k| h[k] = Set.new }
  @sync = Mutex.new
end

Public Instance Methods

add_socket(socket, identifier) click to toggle source
# File lib/anycable/rack/hub.rb, line 21
def add_socket(socket, identifier)
  @sync.synchronize do
    @streams[INTERNAL_STREAM][identifier] << socket
  end
end
add_subscriber(stream, socket, channel) click to toggle source
# File lib/anycable/rack/hub.rb, line 27
def add_subscriber(stream, socket, channel)
  @sync.synchronize do
    @streams[stream][channel] << socket
    @sockets[socket] << [channel, stream]
  end
end
broadcast(stream, message, coder) click to toggle source
# File lib/anycable/rack/hub.rb, line 66
def broadcast(stream, message, coder)
  list = @sync.synchronize do
    return unless @streams.key?(stream)

    @streams[stream].to_a
  end

  list.each do |(channel_id, sockets)|
    decoded = JSON.parse(message)
    cmessage = channel_message(channel_id, decoded, coder)
    sockets.each { |socket| socket.transmit(cmessage) }
  end
end
broadcast_all(message) click to toggle source
# File lib/anycable/rack/hub.rb, line 80
def broadcast_all(message)
  sockets.each_key { |socket| socket.transmit(message) }
end
close_all() click to toggle source
# File lib/anycable/rack/hub.rb, line 99
def close_all
  hub.sockets.dup.each do |socket|
    hub.remove_socket(socket)
    socket.close
  end
end
disconnect(identifier, reconnect, coder) click to toggle source
# File lib/anycable/rack/hub.rb, line 84
def disconnect(identifier, reconnect, coder)
  sockets = @sync.synchronize do
    return unless @streams[INTERNAL_STREAM].key?(identifier)

    @streams[INTERNAL_STREAM][identifier].to_a
  end

  msg = disconnect_message("remote", reconnect, coder)

  sockets.each do |socket|
    socket.transmit(msg)
    socket.close
  end
end
remove_channel(socket, channel) click to toggle source
# File lib/anycable/rack/hub.rb, line 42
def remove_channel(socket, channel)
  list = @sync.synchronize do
    return unless @sockets.key?(socket)

    @sockets[socket].dup
  end

  list.each do |(channel_id, stream)|
    remove_subscriber(stream, socket, channel) if channel == channel_id
  end
end
remove_socket(socket) click to toggle source
# File lib/anycable/rack/hub.rb, line 54
def remove_socket(socket)
  list = @sync.synchronize do
    return unless @sockets.key?(socket)

    @sockets[socket].dup
  end

  list.each do |(channel_id, stream)|
    remove_subscriber(stream, socket, channel_id)
  end
end
remove_subscriber(stream, socket, channel) click to toggle source
# File lib/anycable/rack/hub.rb, line 34
def remove_subscriber(stream, socket, channel)
  @sync.synchronize do
    @streams[stream][channel].delete(socket)
    @sockets[socket].delete([channel, stream])
    cleanup stream, socket, channel
  end
end

Private Instance Methods

channel_message(channel_id, message, coder) click to toggle source
# File lib/anycable/rack/hub.rb, line 114
def channel_message(channel_id, message, coder)
  coder.encode(identifier: channel_id, message: message)
end
cleanup(stream, socket, channel) click to toggle source
# File lib/anycable/rack/hub.rb, line 108
def cleanup(stream, socket, channel)
  @streams[stream].delete(channel) if @streams[stream][channel].empty?
  @streams.delete(stream) if @streams[stream].empty?
  @sockets.delete(socket) if @sockets[socket].empty?
end
disconnect_message(reason, reconnect, coder) click to toggle source
# File lib/anycable/rack/hub.rb, line 118
def disconnect_message(reason, reconnect, coder)
  coder.encode({type: :disconnect, reason: reason, reconnect: reconnect})
end