class Atr::Reactor
Attributes
routing_key_scope[RW]
subscribers[RW]
websocket[RW]
Public Class Methods
new(websocket, routing_key_scope = nil)
click to toggle source
# File lib/atr/reactor.rb, line 13 def initialize(websocket, routing_key_scope = nil) info "Streaming changes" @routing_key_scope = routing_key_scope @websocket = websocket @subscribers = ::Atr::Registry.scoped_channels(routing_key_scope).map do |channel| async.start_subscriber(channel) end async.run end
Public Instance Methods
dispatch_message(message)
click to toggle source
# File lib/atr/reactor.rb, line 26 def dispatch_message(message) puts message.inspect end
run()
click to toggle source
# File lib/atr/reactor.rb, line 30 def run while message = @websocket.read if message == "unsubscribe" unsubscribe_all else dispatch_message(message) end end end
shutdown()
click to toggle source
# File lib/atr/reactor.rb, line 73 def shutdown ::Atr::Redis.connection.unsubscribe ::ActiveRecord::Base.clear_active_connections! terminate end
start_subscriber(channel)
click to toggle source
# File lib/atr/reactor.rb, line 79 def start_subscriber(channel) ::Atr::Redis.connect unless ::Atr::Redis.connected? ::Atr::Redis.connection.subscribe(channel) do |on| on.subscribe do |channel, subscriptions| puts "Subscribed to ##{channel} (#{subscriptions} subscriptions)" end on.unsubscribe do |channel, subscriptions| puts "Unsubscribed from ##{channel} (#{subscriptions} subscriptions)" ::ActiveRecord::Base.clear_active_connections! terminate end on.message do |channel, message| shutdown if message == "exit" event = Marshal.load(message) if ::Atr.config.event_serializer? puts "FOUND SERIUALIZER" puts ::Atr.config.event_serializer.inspect puts ::Atr.config.event_serializer.new(event).to_json websocket << ::Atr.config.event_serializer.new(event).to_json else websocket << event.to_json end end end end
start_subscribers()
click to toggle source
todo: decide between starting individually or subscribing all at once and remove one of the methods
# File lib/atr/reactor.rb, line 41 def start_subscribers ::Atr::Redis.connect unless ::Atr::Redis.connected? ::Atr::Redis.connection.subscribe(::Atr::Registry.scoped_channels(routing_key_scope)) do |on| on.subscribe do |channel, subscriptions| puts "Subscribed to ##{channel} (#{subscriptions} subscriptions)" end on.unsubscribe do |channel, subscriptions| ::ActiveRecord::Base.clear_active_connections! terminate end on.message do |channel, message| shutdown if message == "exit" event = Marshal.load(message) if ::Atr.config.event_serializer? websocket << ::Atr.config.event_serializer.new(event).to_json else websocket << event.to_json end end end rescue Reel::SocketError info "Client disconnected" ::ActiveRecord::Base.clear_active_connections! terminate end
unsubscribe_all()
click to toggle source
# File lib/atr/reactor.rb, line 110 def unsubscribe_all ::Atr::Registry.scoped_channels(routing_key_scope).map do |channel| ::Atr::Redis.connection.unsubscribe(channel) end info "clearing connections" terminate end