class Atr::Reactor

Attributes

routing_key_scope[RW]
subscribers[RW]
websocket[RW]

Public Class Methods

new(websocket, routing_key_scope = nil) click to toggle source
# File lib/atr/reactor.rb, line 13
def initialize(websocket, routing_key_scope = nil)
  info "Streaming changes"

  @routing_key_scope = routing_key_scope
  @websocket = websocket

  @subscribers = ::Atr::Registry.scoped_channels(routing_key_scope).map do |channel|
    async.start_subscriber(channel)
  end

  async.run
end

Public Instance Methods

dispatch_message(message) click to toggle source
# File lib/atr/reactor.rb, line 26
def dispatch_message(message)
  puts message.inspect
end
run() click to toggle source
# File lib/atr/reactor.rb, line 30
def run
  while message = @websocket.read
    if message == "unsubscribe"
      unsubscribe_all
    else
      dispatch_message(message)
    end
  end
end
shutdown() click to toggle source
# File lib/atr/reactor.rb, line 73
def shutdown
  ::Atr::Redis.connection.unsubscribe
  ::ActiveRecord::Base.clear_active_connections!
  terminate
end
start_subscriber(channel) click to toggle source
# File lib/atr/reactor.rb, line 79
def start_subscriber(channel)
  ::Atr::Redis.connect unless ::Atr::Redis.connected?

  ::Atr::Redis.connection.subscribe(channel) do |on|
    on.subscribe do |channel, subscriptions|
      puts "Subscribed to ##{channel} (#{subscriptions} subscriptions)"
    end

    on.unsubscribe do |channel, subscriptions|
      puts "Unsubscribed from ##{channel} (#{subscriptions} subscriptions)"
      ::ActiveRecord::Base.clear_active_connections!
      terminate
    end

    on.message do |channel, message|
      shutdown if message == "exit"

      event = Marshal.load(message)

      if ::Atr.config.event_serializer?
        puts "FOUND SERIUALIZER"
        puts ::Atr.config.event_serializer.inspect
        puts ::Atr.config.event_serializer.new(event).to_json
        websocket << ::Atr.config.event_serializer.new(event).to_json
      else
        websocket << event.to_json
      end
    end
  end
end
start_subscribers() click to toggle source

todo: decide between starting individually or subscribing all at once and remove one of the methods

# File lib/atr/reactor.rb, line 41
def start_subscribers
  ::Atr::Redis.connect unless ::Atr::Redis.connected?

  ::Atr::Redis.connection.subscribe(::Atr::Registry.scoped_channels(routing_key_scope)) do |on|
    on.subscribe do |channel, subscriptions|
      puts "Subscribed to ##{channel} (#{subscriptions} subscriptions)"
    end

    on.unsubscribe do |channel, subscriptions|
      ::ActiveRecord::Base.clear_active_connections!
      terminate
    end

    on.message do |channel, message|
      shutdown if message == "exit"

      event = Marshal.load(message)

      if ::Atr.config.event_serializer?

        websocket << ::Atr.config.event_serializer.new(event).to_json
      else
        websocket << event.to_json
      end
    end
  end
rescue Reel::SocketError
  info "Client disconnected"
  ::ActiveRecord::Base.clear_active_connections!
  terminate
end
unsubscribe_all() click to toggle source
# File lib/atr/reactor.rb, line 110
def unsubscribe_all
  ::Atr::Registry.scoped_channels(routing_key_scope).map do |channel|
    ::Atr::Redis.connection.unsubscribe(channel)
  end

  info "clearing connections"
  terminate
end