class GraphQL::Subscriptions::ActionCableSubscriptions

A subscriptions implementation that sends data as ActionCable broadcastings.

Some things to keep in mind:

@example Adding ActionCableSubscriptions to your schema

class MySchema < GraphQL::Schema
  # ...
  use GraphQL::Subscriptions::ActionCableSubscriptions
end

@example Implementing a channel for GraphQL Subscriptions

class GraphqlChannel < ApplicationCable::Channel
  def subscribed
    @subscription_ids = []
  end

  def execute(data)
    query = data["query"]
    variables = ensure_hash(data["variables"])
    operation_name = data["operationName"]
    context = {
      # Re-implement whatever context methods you need
      # in this channel or ApplicationCable::Channel
      # current_user: current_user,
      # Make sure the channel is in the context
      channel: self,
    }

    result = MySchema.execute(
      query: query,
      context: context,
      variables: variables,
      operation_name: operation_name
    )

    payload = {
      result: result.to_h,
      more: result.subscription?,
    }

    # Track the subscription here so we can remove it
    # on unsubscribe.
    if result.context[:subscription_id]
      @subscription_ids << result.context[:subscription_id]
    end

    transmit(payload)
  end

  def unsubscribed
    @subscription_ids.each { |sid|
      MySchema.subscriptions.delete_subscription(sid)
    }
  end

  private

    def ensure_hash(ambiguous_param)
      case ambiguous_param
      when String
        if ambiguous_param.present?
          ensure_hash(JSON.parse(ambiguous_param))
        else
          {}
        end
      when Hash, ActionController::Parameters
        ambiguous_param
      when nil
        {}
      else
        raise ArgumentError, "Unexpected parameter: #{ambiguous_param}"
      end
    end
end

Constants

EVENT_PREFIX
SUBSCRIPTION_PREFIX

Public Class Methods

new(serializer: Serialize, namespace: '', action_cable: ActionCable, action_cable_coder: ActiveSupport::JSON, **rest) click to toggle source

@param serializer [<#dump(obj), load(string)] Used for serializing messages before handing them to ‘.broadcast(msg)` @param namespace [string] Used to namespace events and subscriptions (default: ”)

Calls superclass method GraphQL::Subscriptions::new
# File lib/graphql/subscriptions/action_cable_subscriptions.rb, line 90
def initialize(serializer: Serialize, namespace: '', action_cable: ActionCable, action_cable_coder: ActiveSupport::JSON, **rest)
  # A per-process map of subscriptions to deliver.
  # This is provided by Rails, so let's use it
  @subscriptions = Concurrent::Map.new
  @events = Concurrent::Map.new { |h, k| h[k] = Concurrent::Map.new { |h2, k2| h2[k2] = Concurrent::Array.new } }
  @action_cable = action_cable
  @action_cable_coder = action_cable_coder
  @serializer = serializer
  @serialize_with_context = case @serializer.method(:load).arity
  when 1
    false
  when 2
    true
  else
    raise ArgumentError, "#{@serializer} must repond to `.load` accepting one or two arguments"
  end
  @transmit_ns = namespace
  super
end

Public Instance Methods

delete_subscription(subscription_id) click to toggle source

The channel was closed, forget about it.

# File lib/graphql/subscriptions/action_cable_subscriptions.rb, line 217
def delete_subscription(subscription_id)
  query = @subscriptions.delete(subscription_id)
  # In case this came from the server, tell the client to unsubscribe:
  @action_cable.server.broadcast(stream_subscription_name(subscription_id), { more: false })
  # This can be `nil` when `.trigger` happens inside an unsubscribed ActionCable channel,
  # see https://github.com/rmosolgo/graphql-ruby/issues/2478
  if query
    events = query.context.namespace(:subscriptions)[:events]
    events.each do |event|
      ev_by_fingerprint = @events[event.topic]
      ev_for_fingerprint = ev_by_fingerprint[event.fingerprint]
      ev_for_fingerprint.delete(event)
      if ev_for_fingerprint.empty?
        ev_by_fingerprint.delete(event.fingerprint)
      end
    end
  end
end
deliver(subscription_id, result) click to toggle source

This subscription was re-evaluated. Send it to the specific stream where this client was waiting.

# File lib/graphql/subscriptions/action_cable_subscriptions.rb, line 120
def deliver(subscription_id, result)
  payload = { result: result.to_h, more: true }
  @action_cable.server.broadcast(stream_subscription_name(subscription_id), payload)
end
execute_all(event, object) click to toggle source

An event was triggered; Push the data over ActionCable. Subscribers will re-evaluate locally.

# File lib/graphql/subscriptions/action_cable_subscriptions.rb, line 112
def execute_all(event, object)
  stream = stream_event_name(event)
  message = @serializer.dump(object)
  @action_cable.server.broadcast(stream, message)
end
load_action_cable_message(message, context) click to toggle source

This is called to turn an ActionCable-broadcasted string (JSON) into a query-ready application object. @param message [String] n ActionCable-broadcasted string (JSON) @param context [GraphQL::Query::Context] the context of the first event for a given subscription fingerprint

# File lib/graphql/subscriptions/action_cable_subscriptions.rb, line 190
def load_action_cable_message(message, context)
  if @serialize_with_context
    @serializer.load(message, context)
  else
    @serializer.load(message)
  end
end
read_subscription(subscription_id) click to toggle source

Return the query from “storage” (in memory)

# File lib/graphql/subscriptions/action_cable_subscriptions.rb, line 199
def read_subscription(subscription_id)
  query = @subscriptions[subscription_id]
  if query.nil?
    # This can happen when a subscription is triggered from an unsubscribed channel,
    # see https://github.com/rmosolgo/graphql-ruby/issues/2478.
    # (This `nil` is handled by `#execute_update`)
    nil
  else
    {
      query_string: query.query_string,
      variables: query.provided_variables,
      context: query.context.to_h,
      operation_name: query.operation_name,
    }
  end
end
setup_stream(channel, initial_event) click to toggle source

Every subscribing channel is listening here, but only one of them takes any action. This is so we can reuse payloads when possible, and make one payload to send to all subscribers.

But the problem is, any channel could close at any time, so each channel has to be ready to take over the primary position.

To make sure there’s always one-and-only-one channel building payloads, let the listener belonging to the first event on the list be the one to build and publish payloads.

# File lib/graphql/subscriptions/action_cable_subscriptions.rb, line 160
def setup_stream(channel, initial_event)
  topic = initial_event.topic
  channel.stream_from(stream_event_name(initial_event), coder: @action_cable_coder) do |message|
    events_by_fingerprint = @events[topic]
    object = nil
    events_by_fingerprint.each do |_fingerprint, events|
      if events.any? && events.first == initial_event
        # The fingerprint has told us that this response should be shared by all subscribers,
        # so just run it once, then deliver the result to every subscriber
        first_event = events.first
        first_subscription_id = first_event.context.fetch(:subscription_id)
        object ||= load_action_cable_message(message, first_event.context)
        result = execute_update(first_subscription_id, first_event, object)
        if !result.nil?
          # Having calculated the result _once_, send the same payload to all subscribers
          events.each do |event|
            subscription_id = event.context.fetch(:subscription_id)
            deliver(subscription_id, result)
          end
        end
      end
    end
    nil
  end
end
write_subscription(query, events) click to toggle source

A query was run where these events were subscribed to. Store them in memory in this ActionCable frontend. It will receive notifications when events come in and re-evaluate the query locally.

# File lib/graphql/subscriptions/action_cable_subscriptions.rb, line 129
def write_subscription(query, events)
  unless (channel = query.context[:channel])
    raise GraphQL::Error, "This GraphQL Subscription client does not support the transport protocol expected"\
      "by the backend Subscription Server implementation (graphql-ruby ActionCableSubscriptions in this case)."\
      "Some official client implementation including Apollo (https://graphql-ruby.org/javascript_client/apollo_subscriptions.html), "\
      "Relay Modern (https://graphql-ruby.org/javascript_client/relay_subscriptions.html#actioncable)."\
      "GraphiQL via `graphiql-rails` may not work out of box (#1051)."
  end
  subscription_id = query.context[:subscription_id] ||= build_id
  stream = stream_subscription_name(subscription_id)
  channel.stream_from(stream)
  @subscriptions[subscription_id] = query
  events.each do |event|
    # Setup a new listener to run all events with this topic in this process
    setup_stream(channel, event)
    # Add this event to the list of events to be updated
    @events[event.topic][event.fingerprint] << event
  end
end

Private Instance Methods

stream_event_name(event) click to toggle source
# File lib/graphql/subscriptions/action_cable_subscriptions.rb, line 242
def stream_event_name(event)
  [EVENT_PREFIX, @transmit_ns, event.topic].join
end
stream_subscription_name(subscription_id) click to toggle source
# File lib/graphql/subscriptions/action_cable_subscriptions.rb, line 238
def stream_subscription_name(subscription_id)
  [SUBSCRIPTION_PREFIX, @transmit_ns, subscription_id].join
end