class GraphQL::Subscriptions::ActionCableSubscriptions
A subscriptions implementation that sends data as ActionCable broadcastings.
Some things to keep in mind:
-
No queueing system; ActiveJob should be added
-
Take care to reload context when re-delivering the subscription. (see {Query#subscription_update?})
-
Avoid the async ActionCable adapter and use the redis or PostgreSQL adapters instead. Otherwise calling
trigger
won’t work from background jobs or the Rails console.
@example Adding ActionCableSubscriptions
to your schema
class MySchema < GraphQL::Schema # ... use GraphQL::Subscriptions::ActionCableSubscriptions end
@example Implementing a channel for GraphQL
Subscriptions
class GraphqlChannel < ApplicationCable::Channel def subscribed @subscription_ids = [] end def execute(data) query = data["query"] variables = ensure_hash(data["variables"]) operation_name = data["operationName"] context = { # Re-implement whatever context methods you need # in this channel or ApplicationCable::Channel # current_user: current_user, # Make sure the channel is in the context channel: self, } result = MySchema.execute( query: query, context: context, variables: variables, operation_name: operation_name ) payload = { result: result.to_h, more: result.subscription?, } # Track the subscription here so we can remove it # on unsubscribe. if result.context[:subscription_id] @subscription_ids << result.context[:subscription_id] end transmit(payload) end def unsubscribed @subscription_ids.each { |sid| MySchema.subscriptions.delete_subscription(sid) } end private def ensure_hash(ambiguous_param) case ambiguous_param when String if ambiguous_param.present? ensure_hash(JSON.parse(ambiguous_param)) else {} end when Hash, ActionController::Parameters ambiguous_param when nil {} else raise ArgumentError, "Unexpected parameter: #{ambiguous_param}" end end end
Constants
- EVENT_PREFIX
- SUBSCRIPTION_PREFIX
Public Class Methods
@param serializer [<#dump(obj), load(string)] Used for serializing messages before handing them to ‘.broadcast(msg)` @param namespace [string] Used to namespace events and subscriptions (default: ”)
GraphQL::Subscriptions::new
# File lib/graphql/subscriptions/action_cable_subscriptions.rb, line 90 def initialize(serializer: Serialize, namespace: '', action_cable: ActionCable, action_cable_coder: ActiveSupport::JSON, **rest) # A per-process map of subscriptions to deliver. # This is provided by Rails, so let's use it @subscriptions = Concurrent::Map.new @events = Concurrent::Map.new { |h, k| h[k] = Concurrent::Map.new { |h2, k2| h2[k2] = Concurrent::Array.new } } @action_cable = action_cable @action_cable_coder = action_cable_coder @serializer = serializer @serialize_with_context = case @serializer.method(:load).arity when 1 false when 2 true else raise ArgumentError, "#{@serializer} must repond to `.load` accepting one or two arguments" end @transmit_ns = namespace super end
Public Instance Methods
The channel was closed, forget about it.
# File lib/graphql/subscriptions/action_cable_subscriptions.rb, line 217 def delete_subscription(subscription_id) query = @subscriptions.delete(subscription_id) # In case this came from the server, tell the client to unsubscribe: @action_cable.server.broadcast(stream_subscription_name(subscription_id), { more: false }) # This can be `nil` when `.trigger` happens inside an unsubscribed ActionCable channel, # see https://github.com/rmosolgo/graphql-ruby/issues/2478 if query events = query.context.namespace(:subscriptions)[:events] events.each do |event| ev_by_fingerprint = @events[event.topic] ev_for_fingerprint = ev_by_fingerprint[event.fingerprint] ev_for_fingerprint.delete(event) if ev_for_fingerprint.empty? ev_by_fingerprint.delete(event.fingerprint) end end end end
This subscription was re-evaluated. Send it to the specific stream where this client was waiting.
# File lib/graphql/subscriptions/action_cable_subscriptions.rb, line 120 def deliver(subscription_id, result) payload = { result: result.to_h, more: true } @action_cable.server.broadcast(stream_subscription_name(subscription_id), payload) end
An event was triggered; Push the data over ActionCable. Subscribers will re-evaluate locally.
# File lib/graphql/subscriptions/action_cable_subscriptions.rb, line 112 def execute_all(event, object) stream = stream_event_name(event) message = @serializer.dump(object) @action_cable.server.broadcast(stream, message) end
This is called to turn an ActionCable-broadcasted string (JSON) into a query-ready application object. @param message [String] n ActionCable-broadcasted string (JSON) @param context [GraphQL::Query::Context] the context of the first event for a given subscription fingerprint
# File lib/graphql/subscriptions/action_cable_subscriptions.rb, line 190 def load_action_cable_message(message, context) if @serialize_with_context @serializer.load(message, context) else @serializer.load(message) end end
Return the query from “storage” (in memory)
# File lib/graphql/subscriptions/action_cable_subscriptions.rb, line 199 def read_subscription(subscription_id) query = @subscriptions[subscription_id] if query.nil? # This can happen when a subscription is triggered from an unsubscribed channel, # see https://github.com/rmosolgo/graphql-ruby/issues/2478. # (This `nil` is handled by `#execute_update`) nil else { query_string: query.query_string, variables: query.provided_variables, context: query.context.to_h, operation_name: query.operation_name, } end end
Every subscribing channel is listening here, but only one of them takes any action. This is so we can reuse payloads when possible, and make one payload to send to all subscribers.
But the problem is, any channel could close at any time, so each channel has to be ready to take over the primary position.
To make sure there’s always one-and-only-one channel building payloads, let the listener belonging to the first event on the list be the one to build and publish payloads.
# File lib/graphql/subscriptions/action_cable_subscriptions.rb, line 160 def setup_stream(channel, initial_event) topic = initial_event.topic channel.stream_from(stream_event_name(initial_event), coder: @action_cable_coder) do |message| events_by_fingerprint = @events[topic] object = nil events_by_fingerprint.each do |_fingerprint, events| if events.any? && events.first == initial_event # The fingerprint has told us that this response should be shared by all subscribers, # so just run it once, then deliver the result to every subscriber first_event = events.first first_subscription_id = first_event.context.fetch(:subscription_id) object ||= load_action_cable_message(message, first_event.context) result = execute_update(first_subscription_id, first_event, object) if !result.nil? # Having calculated the result _once_, send the same payload to all subscribers events.each do |event| subscription_id = event.context.fetch(:subscription_id) deliver(subscription_id, result) end end end end nil end end
A query was run where these events were subscribed to. Store them in memory in this ActionCable frontend. It will receive notifications when events come in and re-evaluate the query locally.
# File lib/graphql/subscriptions/action_cable_subscriptions.rb, line 129 def write_subscription(query, events) unless (channel = query.context[:channel]) raise GraphQL::Error, "This GraphQL Subscription client does not support the transport protocol expected"\ "by the backend Subscription Server implementation (graphql-ruby ActionCableSubscriptions in this case)."\ "Some official client implementation including Apollo (https://graphql-ruby.org/javascript_client/apollo_subscriptions.html), "\ "Relay Modern (https://graphql-ruby.org/javascript_client/relay_subscriptions.html#actioncable)."\ "GraphiQL via `graphiql-rails` may not work out of box (#1051)." end subscription_id = query.context[:subscription_id] ||= build_id stream = stream_subscription_name(subscription_id) channel.stream_from(stream) @subscriptions[subscription_id] = query events.each do |event| # Setup a new listener to run all events with this topic in this process setup_stream(channel, event) # Add this event to the list of events to be updated @events[event.topic][event.fingerprint] << event end end
Private Instance Methods
# File lib/graphql/subscriptions/action_cable_subscriptions.rb, line 242 def stream_event_name(event) [EVENT_PREFIX, @transmit_ns, event.topic].join end
# File lib/graphql/subscriptions/action_cable_subscriptions.rb, line 238 def stream_subscription_name(subscription_id) [SUBSCRIPTION_PREFIX, @transmit_ns, subscription_id].join end