class Aggro::EventBus
Public: Publishes events to any subscribed listeners.
Attributes
remote_publishers[R]
Public Class Methods
new()
click to toggle source
# File lib/aggro/event_bus.rb, line 6 def initialize @remote_publishers = {} end
Public Instance Methods
publish(topic, event)
click to toggle source
# File lib/aggro/event_bus.rb, line 10 def publish(topic, event) Aggro.server.publish Message::Events.new(topic, [event]) return unless subscriptions.key? topic subscriptions[topic].each do |subscription| sleep 0.01 until subscription.caught_up subscription.handle_event event end end
shutdown()
click to toggle source
# File lib/aggro/event_bus.rb, line 39 def shutdown remote_publishers.values.each(&:stop) end
subscribe(topic, subscriber, event_namespace = nil, filters = {})
click to toggle source
# File lib/aggro/event_bus.rb, line 21 def subscribe(topic, subscriber, event_namespace = nil, filters = {}) subscription = Subscription.new(topic, subscriber, event_namespace, filters, 0) catchup_subscriber topic, subscription subscriptions[topic] ||= [] subscriptions[topic] << subscription subscribe_bus_to_publisher topic subscription end
unsubscribe(topic, subscriber)
click to toggle source
# File lib/aggro/event_bus.rb, line 35 def unsubscribe(topic, subscriber) subscriptions[topic].delete subscriber end
Private Instance Methods
catchup_local(topic, subscription)
click to toggle source
# File lib/aggro/event_bus.rb, line 45 def catchup_local(topic, subscription) Aggro.store.read([topic]).first.events.each do |event| subscription.handle_event event end end
catchup_remote(topic, subscription, node)
click to toggle source
# File lib/aggro/event_bus.rb, line 51 def catchup_remote(topic, subscription, node) message = Message::GetEvents.new(Aggro.local_node.id, topic, 0) response = node.client.post message if response.is_a? Message::Events response.events.each { |event| subscription.handle_event event } else fail 'Could not catchup subscriber' end end
catchup_subscriber(topic, subscription)
click to toggle source
# File lib/aggro/event_bus.rb, line 62 def catchup_subscriber(topic, subscription) node = Locator.new(topic).primary_node if node.is_a? LocalNode catchup_local(topic, subscription) else catchup_remote(topic, subscription, node) end subscription.notify_subscription_caught_up end
handle_events(topic, events)
click to toggle source
# File lib/aggro/event_bus.rb, line 74 def handle_events(topic, events) subscriptions[topic].each do |subscription| events.each { |event| subscription.handle_event event } end end
subscribe_bus_to_publisher(topic)
click to toggle source
# File lib/aggro/event_bus.rb, line 80 def subscribe_bus_to_publisher(topic) node = Locator.new(topic).primary_node return if node.is_a? LocalNode publisher_endpoint = node.publisher_endpoint remote_publishers[publisher_endpoint] ||= begin Subscriber.new(publisher_endpoint, method(:handle_events)).tap(&:bind) end remote_publishers[publisher_endpoint].subscribe_to_topic topic end
subscriptions()
click to toggle source
# File lib/aggro/event_bus.rb, line 93 def subscriptions @subscriptions ||= {} end