class Pubsubstub::StreamAction
Constants
- HEADERS
Public Class Methods
new(*)
click to toggle source
# File lib/pubsubstub/stream_action.rb, line 11 def initialize(*) @subscriptions = Set.new @mutex = Mutex.new end
Public Instance Methods
call(env)
click to toggle source
# File lib/pubsubstub/stream_action.rb, line 16 def call(env) spawn_helper_threads last_event_id = env['HTTP_LAST_EVENT_ID'] request = Rack::Request.new(env) channels = (request.params['channels'] || [:default]).map(&Channel.method(:new)) stream = if use_persistent_connections? subscribe_connection(channels, last_event_id) else send_scrollback(channels, last_event_id) end [200, HEADERS.dup, stream] end
Private Instance Methods
register(*args)
click to toggle source
# File lib/pubsubstub/stream_action.rb, line 58 def register(*args) new_subscription = Subscription.new(*args) @mutex.synchronize { @subscriptions << new_subscription } new_subscription end
release(subscription)
click to toggle source
# File lib/pubsubstub/stream_action.rb, line 64 def release(subscription) @mutex.synchronize { @subscriptions.delete(subscription) } end
send_scrollback(channels, last_event_id)
click to toggle source
# File lib/pubsubstub/stream_action.rb, line 32 def send_scrollback(channels, last_event_id) scrollback_events = [] scrollback_events = channels.flat_map { |c| c.scrollback(since: last_event_id) } if last_event_id scrollback_events = [Pubsubstub.heartbeat_event] if scrollback_events.empty? Stream.new do |connection| scrollback_events.each do |event| connection << event.to_message end end end
spawn_helper_threads()
click to toggle source
# File lib/pubsubstub/stream_action.rb, line 68 def spawn_helper_threads return if defined? @helper_threads_initialized @mutex.synchronize do return if defined? @helper_threads_initialized @helper_threads_initialized = true start_subscriber start_heartbeat end end
start_heartbeat()
click to toggle source
# File lib/pubsubstub/stream_action.rb, line 93 def start_heartbeat Thread.start do info { "Starting heartbeat" } Pubsubstub.report_errors do loop do sleep Pubsubstub.heartbeat_frequency event = Pubsubstub.heartbeat_event @subscriptions.each { |subscription| subscription.push(event) } end end end end
start_subscriber()
click to toggle source
# File lib/pubsubstub/stream_action.rb, line 78 def start_subscriber Thread.start do info { "Starting subscriber" } Pubsubstub.report_errors do begin Pubsubstub.subscriber.start rescue Redis::BaseConnectionError => error error { "Can't subscribe to Redis (#{error.class}: #{error.message}). Retrying in 1 second" } sleep 1 retry end end end end
subscribe_connection(channels, last_event_id)
click to toggle source
# File lib/pubsubstub/stream_action.rb, line 47 def subscribe_connection(channels, last_event_id) Stream.new do |connection| subscription = register(channels, connection) begin subscription.stream(last_event_id) ensure release(subscription) end end end
use_persistent_connections?()
click to toggle source
# File lib/pubsubstub/stream_action.rb, line 43 def use_persistent_connections? Pubsubstub.use_persistent_connections end