class ActionCable::Connection::Base
For every WebSocket
connection the Action Cable server accepts, a Connection
object will be instantiated. This instance becomes the parent of all of the channel subscriptions that are created from there on. Incoming messages are then routed to these channel subscriptions based on an identifier sent by the Action Cable consumer. The Connection
itself does not deal with any specific application logic beyond authentication and authorization.
Here’s a basic example:
module ApplicationCable class Connection < ActionCable::Connection::Base identified_by :current_user def connect self.current_user = find_verified_user logger.add_tags current_user.name end def disconnect # Any cleanup work needed when the cable connection is cut. end private def find_verified_user User.find_by_identity(cookies.encrypted[:identity_id]) || reject_unauthorized_connection end end end
First, we declare that this connection can be identified by its current_user. This allows us to later be able to find all connections established for that current_user (and potentially disconnect them). You can declare as many identification indexes as you like. Declaring an identification means that an attr_accessor is automatically set for that key.
Second, we rely on the fact that the WebSocket
connection is established with the cookies from the domain being sent along. This makes it easy to use signed cookies that were set when logging in via a web interface to authorize the WebSocket
connection.
Finally, we add a tag to the connection-specific logger with the name of the current user to easily distinguish their messages in the log.
Pretty simple, eh?
Attributes
Public Class Methods
Source
# File lib/action_cable/connection/base.rb, line 55 def initialize(server, env, coder: ActiveSupport::JSON) @server, @env, @coder = server, env, coder @worker_pool = server.worker_pool @logger = new_tagged_logger @websocket = ActionCable::Connection::WebSocket.new(env, self, event_loop) @subscriptions = ActionCable::Connection::Subscriptions.new(self) @message_buffer = ActionCable::Connection::MessageBuffer.new(self) @_internal_subscriptions = nil @started_at = Time.now end
Public Instance Methods
Source
# File lib/action_cable/connection/base.rb, line 125 def beat transmit type: ActionCable::INTERNAL[:message_types][:ping], message: Time.now.to_i end
Source
# File lib/action_cable/connection/base.rb, line 100 def close(reason: nil, reconnect: true) transmit( type: ActionCable::INTERNAL[:message_types][:disconnect], reason: reason, reconnect: reconnect ) websocket.close end
Close the WebSocket
connection.
Source
# File lib/action_cable/connection/base.rb, line 110 def send_async(method, *arguments) worker_pool.async_invoke(self, method, *arguments) end
Invoke a method on the connection asynchronously through the pool of thread workers.
Source
# File lib/action_cable/connection/base.rb, line 116 def statistics { identifier: connection_identifier, started_at: @started_at, subscriptions: subscriptions.identifiers, request_id: @env["action_dispatch.request_id"] } end
Return a basic hash of statistics for the connection keyed with identifier
, started_at
, subscriptions
, and request_id
. This can be returned by a health check against the connection.
Private Instance Methods
Source
# File lib/action_cable/connection/base.rb, line 201 def allow_request_origin? return true if server.config.disable_request_forgery_protection proto = Rack::Request.new(env).ssl? ? "https" : "http" if server.config.allow_same_origin_as_host && env["HTTP_ORIGIN"] == "#{proto}://#{env['HTTP_HOST']}" true elsif Array(server.config.allowed_request_origins).any? { |allowed_origin| allowed_origin === env["HTTP_ORIGIN"] } true else logger.error("Request origin not allowed: #{env['HTTP_ORIGIN']}") false end end
Source
# File lib/action_cable/connection/base.rb, line 167 def decode(websocket_message) @coder.decode websocket_message end
Source
# File lib/action_cable/connection/base.rb, line 163 def encode(cable_message) @coder.encode cable_message end
Source
# File lib/action_cable/connection/base.rb, line 243 def finished_request_message 'Finished "%s"%s for %s at %s' % [ request.filtered_path, websocket.possible? ? " [WebSocket]" : "[non-WebSocket]", request.ip, Time.now.to_default_s ] end
Source
# File lib/action_cable/connection/base.rb, line 183 def handle_close logger.info finished_request_message server.remove_connection(self) subscriptions.unsubscribe_from_all unsubscribe_from_internal_channel disconnect if respond_to?(:disconnect) end
Source
# File lib/action_cable/connection/base.rb, line 171 def handle_open @protocol = websocket.protocol connect if respond_to?(:connect) subscribe_to_internal_channel send_welcome_message message_buffer.process! server.add_connection(self) rescue ActionCable::Connection::Authorization::UnauthorizedError close(reason: ActionCable::INTERNAL[:disconnect_reasons][:unauthorized], reconnect: false) if websocket.alive? end
Source
# File lib/action_cable/connection/base.rb, line 251 def invalid_request_message "Failed to upgrade to WebSocket (REQUEST_METHOD: %s, HTTP_CONNECTION: %s, HTTP_UPGRADE: %s)" % [ env["REQUEST_METHOD"], env["HTTP_CONNECTION"], env["HTTP_UPGRADE"] ] end
Source
# File lib/action_cable/connection/base.rb, line 229 def new_tagged_logger TaggedLoggerProxy.new server.logger, tags: server.config.log_tags.map { |tag| tag.respond_to?(:call) ? tag.call(request) : tag.to_s.camelize } end
Tags are declared in the server but computed in the connection. This allows us per-connection tailored tags.
Source
# File lib/action_cable/connection/base.rb, line 151 def request # :doc: @request ||= begin environment = Rails.application.env_config.merge(env) if defined?(Rails.application) && Rails.application ActionDispatch::Request.new(environment || env) end end
The request that initiated the WebSocket
connection is available here. This gives access to the environment, cookies, etc.
Source
# File lib/action_cable/connection/base.rb, line 220 def respond_to_invalid_request close(reason: ActionCable::INTERNAL[:disconnect_reasons][:invalid_request]) if websocket.alive? logger.error invalid_request_message logger.info finished_request_message [ 404, { "Content-Type" => "text/plain" }, [ "Page not found" ] ] end
Source
# File lib/action_cable/connection/base.rb, line 215 def respond_to_successful_request logger.info successful_request_message websocket.rack_response end
Source
# File lib/action_cable/connection/base.rb, line 194 def send_welcome_message # Send welcome message to the internal connection monitor channel. # This ensures the connection monitor state is reset after a successful # websocket connection. transmit type: ActionCable::INTERNAL[:message_types][:welcome] end
Source
# File lib/action_cable/connection/base.rb, line 234 def started_request_message 'Started %s "%s"%s for %s at %s' % [ request.request_method, request.filtered_path, websocket.possible? ? " [WebSocket]" : "[non-WebSocket]", request.ip, Time.now.to_default_s ] end
Source
# File lib/action_cable/connection/base.rb, line 257 def successful_request_message "Successfully upgraded to WebSocket (REQUEST_METHOD: %s, HTTP_CONNECTION: %s, HTTP_UPGRADE: %s)" % [ env["REQUEST_METHOD"], env["HTTP_CONNECTION"], env["HTTP_UPGRADE"] ] end