class Pubsubstub::Subscriber
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/pubsubstub/subscriber.rb, line 6 def initialize super @subscribed = false @listeners = {} end
Public Instance Methods
add_event_listener(channel_key, callback)
click to toggle source
# File lib/pubsubstub/subscriber.rb, line 16 def add_event_listener(channel_key, callback) synchronize do @listeners[channel_key] ||= Set.new !!@listeners[channel_key].add?(callback) end end
remove_event_listener(channel_key, callback)
click to toggle source
# File lib/pubsubstub/subscriber.rb, line 23 def remove_event_listener(channel_key, callback) synchronize do return unless @listeners[channel_key] !!@listeners[channel_key].delete?(callback) end end
start()
click to toggle source
# File lib/pubsubstub/subscriber.rb, line 38 def start redis.psubscribe(pubsub_pattern) do |on| on.psubscribe do info { "Subscribed to #{pubsub_pattern}" } @subscribed = true end on.punsubscribe do info { "Unsubscribed from #{pubsub_pattern}" } @subscribed = false end on.pmessage do |pattern, pubsub_key, message| process_message(pubsub_key, message) end end ensure info { "Terminated" } end
stop()
click to toggle source
# File lib/pubsubstub/subscriber.rb, line 30 def stop # redis.client.call allow to bypass the client mutex # Since we now that the only other possible caller is blocking on reading the socket this is safe synchronize do redis._client.call(['punsubscribe', pubsub_pattern]) end end
subscribed?()
click to toggle source
# File lib/pubsubstub/subscriber.rb, line 12 def subscribed? @subscribed end
Private Instance Methods
dispatch_event(channel_name, event)
click to toggle source
# File lib/pubsubstub/subscriber.rb, line 70 def dispatch_event(channel_name, event) listeners = listeners_for(channel_name) info { "Dispatching event ##{event.id} from #{channel_name} to #{listeners.size} listeners" } listeners.each do |listener| listener.call(event) end end
listeners_for(channel_name)
click to toggle source
# File lib/pubsubstub/subscriber.rb, line 78 def listeners_for(channel_name) @listeners.fetch(channel_name) { [] } end
process_message(pubsub_key, message)
click to toggle source
# File lib/pubsubstub/subscriber.rb, line 64 def process_message(pubsub_key, message) channel_name = Channel.name_from_pubsub_key(pubsub_key) event = Event.from_json(message) dispatch_event(channel_name, event) end
pubsub_pattern()
click to toggle source
# File lib/pubsubstub/subscriber.rb, line 60 def pubsub_pattern '*.pubsub' end
redis()
click to toggle source
The Redis client suround all calls with a mutex. As such it is crucial to use a dedicated Redis client when blocking on a ‘subscribe` call.
# File lib/pubsubstub/subscriber.rb, line 84 def redis @redis ||= Pubsubstub.new_redis end