class AnyCable::Rack::BroadcastSubscribers::RedisSubscriber

Redis Pub/Sub subscriber

Attributes

channel[R]
redis_conn[R]
thread[R]

Public Class Methods

new(hub:, coder:, channel:, **options) click to toggle source
# File lib/anycable/rack/broadcast_subscribers/redis_subscriber.rb, line 15
def initialize(hub:, coder:, channel:, **options)
  super
  @redis_conn = ::Redis.new(options)
  @channel = channel
end

Public Instance Methods

start() click to toggle source
# File lib/anycable/rack/broadcast_subscribers/redis_subscriber.rb, line 21
def start
  subscribe(channel)

  log(:info) { "Subscribed to #{channel}" }
end
stop() click to toggle source
# File lib/anycable/rack/broadcast_subscribers/redis_subscriber.rb, line 27
def stop
  thread&.terminate
end
subscribe(channel) click to toggle source
# File lib/anycable/rack/broadcast_subscribers/redis_subscriber.rb, line 31
def subscribe(channel)
  @thread ||= Thread.new do
    Thread.current.abort_on_exception = true

    redis_conn.without_reconnect do
      redis_conn.subscribe(channel) do |on|
        on.subscribe do |chan, count|
          log(:debug) { "Redis subscriber connected to #{chan} (#{count})" }
        end

        on.unsubscribe do |chan, count|
          log(:debug) { "Redis subscribed disconnected from #{chan} (#{count})" }
        end

        on.message do |_channel, msg|
          handle_message(msg)
        rescue
          log(:error) { "Failed to broadcast message: #{msg}" }
        end
      end
    end
  end
end