class Pubsubstub::Subscription

Attributes

channels[R]
connection[R]
id[R]
queue[R]

Public Class Methods

new(channels, connection) click to toggle source
# File lib/pubsubstub/subscription.rb, line 7
def initialize(channels, connection)
  @id = Random.rand(2 ** 64)
  @connection = connection
  @channels = channels
  @queue = Queue.new
end

Public Instance Methods

push(event) click to toggle source
# File lib/pubsubstub/subscription.rb, line 27
def push(event)
  queue.push(event)
end
stream(last_event_id) click to toggle source
# File lib/pubsubstub/subscription.rb, line 14
def stream(last_event_id)
  info { "Connecting client ##{id} (#{channels.map(&:name).join(', ')})" }
  subscribe
  fetch_scrollback(last_event_id)
  while event = queue.pop
    debug { "Sending event ##{event.id} to client ##{id}"}
    connection << event.to_message
  end
ensure
  info { "Disconnecting client ##{id}" }
  unsubscribe
end

Private Instance Methods

callback() click to toggle source

We use store the callback so that the object_id stays the same. Otherwise we wouldn’t be able to unsubscribe

# File lib/pubsubstub/subscription.rb, line 58
def callback
  @callback ||= method(:push)
end
fetch_scrollback(last_event_id) click to toggle source

This method is not ideal as it doesn’t guarantee order in case of multi-channel subscription

# File lib/pubsubstub/subscription.rb, line 42
def fetch_scrollback(last_event_id)
  event_sent = false
  if last_event_id
    channels.each do |channel|
      channel.scrollback(since: last_event_id).each do |event|
        event_sent = true
        queue.push(event)
      end
    end
  end

  queue.push(Pubsubstub.heartbeat_event) unless event_sent
end
subscribe() click to toggle source
# File lib/pubsubstub/subscription.rb, line 33
def subscribe
  channels.each { |c| Pubsubstub.subscriber.add_event_listener(c.name, callback) }
end
unsubscribe() click to toggle source
# File lib/pubsubstub/subscription.rb, line 37
def unsubscribe
  channels.each { |c| Pubsubstub.subscriber.remove_event_listener(c.name, callback) }
end