class Pubsubstub::Subscriber

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/pubsubstub/subscriber.rb, line 6
def initialize
  super
  @subscribed = false
  @listeners = {}
end

Public Instance Methods

add_event_listener(channel_key, callback) click to toggle source
# File lib/pubsubstub/subscriber.rb, line 16
def add_event_listener(channel_key, callback)
  synchronize do
    @listeners[channel_key] ||= Set.new
    !!@listeners[channel_key].add?(callback)
  end
end
remove_event_listener(channel_key, callback) click to toggle source
# File lib/pubsubstub/subscriber.rb, line 23
def remove_event_listener(channel_key, callback)
  synchronize do
    return unless @listeners[channel_key]
    !!@listeners[channel_key].delete?(callback)
  end
end
start() click to toggle source
# File lib/pubsubstub/subscriber.rb, line 38
def start
  redis.psubscribe(pubsub_pattern) do |on|
    on.psubscribe do
      info { "Subscribed to #{pubsub_pattern}" }
      @subscribed = true
    end

    on.punsubscribe do
      info { "Unsubscribed from #{pubsub_pattern}" }
      @subscribed = false
    end

    on.pmessage do |pattern, pubsub_key, message|
      process_message(pubsub_key, message)
    end
  end
ensure
  info { "Terminated" }
end
stop() click to toggle source
# File lib/pubsubstub/subscriber.rb, line 30
def stop
  # redis.client.call allow to bypass the client mutex
  # Since we now that the only other possible caller is blocking on reading the socket this is safe
  synchronize do
    redis._client.call(['punsubscribe', pubsub_pattern])
  end
end
subscribed?() click to toggle source
# File lib/pubsubstub/subscriber.rb, line 12
def subscribed?
  @subscribed
end

Private Instance Methods

dispatch_event(channel_name, event) click to toggle source
# File lib/pubsubstub/subscriber.rb, line 70
def dispatch_event(channel_name, event)
  listeners = listeners_for(channel_name)
  info { "Dispatching event ##{event.id} from #{channel_name} to #{listeners.size} listeners" }
  listeners.each do |listener|
    listener.call(event)
  end
end
listeners_for(channel_name) click to toggle source
# File lib/pubsubstub/subscriber.rb, line 78
def listeners_for(channel_name)
  @listeners.fetch(channel_name) { [] }
end
process_message(pubsub_key, message) click to toggle source
# File lib/pubsubstub/subscriber.rb, line 64
def process_message(pubsub_key, message)
  channel_name = Channel.name_from_pubsub_key(pubsub_key)
  event = Event.from_json(message)
  dispatch_event(channel_name, event)
end
pubsub_pattern() click to toggle source
# File lib/pubsubstub/subscriber.rb, line 60
def pubsub_pattern
  '*.pubsub'
end
redis() click to toggle source

The Redis client suround all calls with a mutex. As such it is crucial to use a dedicated Redis client when blocking on a ‘subscribe` call.

# File lib/pubsubstub/subscriber.rb, line 84
def redis
  @redis ||= Pubsubstub.new_redis
end