class Redisse::Server

Internal: Goliath::API class that defines the server.

See {Redisse#run}.

Constants

HEARTBEAT_PERIOD

Public: The period between heartbeats in seconds.

LONG_POLLING_DELAY

Public: Delay between receiving a message and closing the connection.

Closing the connection is necessary when using long polling, because the client is not able to read the data before the connection is closed. But instead of closing immediately, we delay a bit closing the connection to give a chance for several messages to be sent in a row.

Attributes

redisse[R]

Public Class Methods

new(redisse) click to toggle source
Calls superclass method
# File lib/redisse/server.rb, line 61
def initialize(redisse)
  @redisse = redisse
  super()
end

Public Instance Methods

on_close(env) click to toggle source
# File lib/redisse/server.rb, line 80
def on_close(env)
  env.status[:stats][:connected] -= 1
  env.status[:stats][:served]    += 1
  unsubscribe(env)
  stop_heartbeat(env)
end
options_parser(opts, options) click to toggle source
# File lib/redisse/server.rb, line 195
def options_parser(opts, options)
  opts.on '--redis REDIS_URL', 'URL of the Redis connection' do |url|
    redisse.redis_server = url
  end
  default_port = redisse.default_port
  return unless default_port
  options[:port] = default_port
end
response(env) click to toggle source
# File lib/redisse/server.rb, line 66
def response(env)
  return not_acceptable unless acceptable?(env)
  channels = Array(redisse.channels(env))
  return not_found if channels.empty?
  subscribe(env, channels) or return service_unavailable
  send_history_events(env, channels)
  heartbeat(env)
  streaming_response(200, {
    'Content-Type' => 'text/event-stream',
    'Cache-Control' => 'no-cache',
    'X-Accel-Buffering' => 'no',
  })
end

Private Instance Methods

acceptable?(env) click to toggle source
# File lib/redisse/server.rb, line 188
def acceptable?(env)
  accept_media_types = Rack::AcceptMediaTypes.new(env['HTTP_ACCEPT'])
  accept_media_types.include?('text/event-stream')
end
events_for_channel(channel, last_event_id) click to toggle source
# File lib/redisse/server.rb, line 180
def events_for_channel(channel, last_event_id)
  df = redis.zrangebyscore(channel, last_event_id, '+inf', 'withscores')
  events_scores = EM::Synchrony.sync(df)
  events_scores.each_slice(2).map do |event, score|
    [event, score.to_i]
  end
end
events_for_channels(channels, last_event_id) click to toggle source
# File lib/redisse/server.rb, line 160
def events_for_channels(channels, last_event_id)
  events_with_ids = channels.each_with_object([]) { |channel, events|
    channel_events = events_for_channel(channel, last_event_id)
    events.concat(channel_events)
  }.sort_by!(&:last)
  handle_missing_events(events_with_ids, last_event_id)
  events_with_ids.map(&:first)
end
handle_missing_events(events_with_ids, last_event_id) click to toggle source
# File lib/redisse/server.rb, line 169
def handle_missing_events(events_with_ids, last_event_id)
  first_event, first_event_id = events_with_ids.first
  return unless first_event
  if first_event_id == last_event_id
    events_with_ids.shift
  else
    event = ServerSentEvents.server_sent_event(nil, type: :missedevents)
    events_with_ids.unshift([event])
  end
end
heartbeat(env) click to toggle source
# File lib/redisse/server.rb, line 103
def heartbeat(env)
  env['redisse.heartbeat_timer'.freeze] = EM.add_periodic_timer(HEARTBEAT_PERIOD) do
    env.logger.debug "Sending heartbeat".freeze
    env.stream_send(": hb\n".freeze)
  end
end
last_event_id(env) click to toggle source
# File lib/redisse/server.rb, line 153
def last_event_id(env)
  last_event_id = env['HTTP_LAST_EVENT_ID'] ||
    Rack::Request.new(env).GET['lastEventId']
  last_event_id = last_event_id.to_i
  last_event_id.nonzero? && last_event_id
end
long_polling?(env) click to toggle source
# File lib/redisse/server.rb, line 133
def long_polling?(env)
  key = "redisse.long_polling".freeze
  env.fetch(key) do
    env[key] = Rack::Request.new(env).GET.keys.include?('polling')
  end
end
send_event(env, event) click to toggle source
# File lib/redisse/server.rb, line 123
def send_event(env, event)
  env.status[:stats][:events] += 1
  env.logger.debug { "Sending:\n#{event.chomp.chomp}" }
  env.stream_send(event)
  return unless long_polling?(env)
  env["redisse.long_polling_timer".freeze] ||= EM.add_timer(LONG_POLLING_DELAY) do
    env.stream_close
  end
end
send_history_events(env, channels) click to toggle source
# File lib/redisse/server.rb, line 140
def send_history_events(env, channels)
  last_event_id = last_event_id(env)
  return unless last_event_id
  EM::Synchrony.next_tick do
    events = events_for_channels(channels, last_event_id)
    env.logger.debug { "Sending #{events.size} history events" }
    if (first = events.first) && first.start_with?('type: missedevents')
      env.status[:stats][:missing] += 1
    end
    events.each { |event| send_event(env, event) }
  end
end
stop_heartbeat(env) click to toggle source
# File lib/redisse/server.rb, line 110
def stop_heartbeat(env)
  return unless timer = env['redisse.heartbeat_timer'.freeze]
  env.logger.debug "Stopping heartbeat".freeze
  timer.cancel
end
subscribe(env, channels) click to toggle source
# File lib/redisse/server.rb, line 91
def subscribe(env, channels)
  return unless pubsub { env.stream_close }
  env.status[:stats][:connected] += 1
  env.logger.debug { "Subscribing to #{channels}" }
  env_sender = -> event { send_event(env, event) }
  pubsub_subcribe(channels, env_sender)
  env['redisse.unsubscribe'.freeze] = -> do
    pubsub_unsubscribe_proc(channels, env_sender)
  end
  true
end
unsubscribe(env) click to toggle source
# File lib/redisse/server.rb, line 116
def unsubscribe(env)
  return unless unsubscribe = env['redisse.unsubscribe'.freeze]
  env['redisse.unsubscribe'.freeze] = nil
  env.logger.debug "Unsubscribing".freeze
  unsubscribe.call
end