module GraphQL::Streaming::ActionCableChannel

TODO: This contains MONKEYPATCHES to support stopping certain streams

Public Instance Methods

send_graphql_payload(payload) click to toggle source

Work around the fact that `transmit` is private

# File lib/graphql/streaming/action_cable_channel.rb, line 24
def send_graphql_payload(payload)
  transmit(payload)
end
stop_specific_streams(streams_to_stop) click to toggle source

Stop streams which were captured from stream_from

# File lib/graphql/streaming/action_cable_channel.rb, line 13
def stop_specific_streams(streams_to_stop)
  @_streams ||= []
  @_streams -= streams_to_stop
  streams_to_stop.each do |broadcasting, callback|
    pubsub.unsubscribe broadcasting, callback
    logger.info "#{self.class.name} stopped streaming from #{broadcasting}"
  end
  nil
end
stream_from(*args, &block) click to toggle source

MONKEY PATCH Return the newly-created stream, so you can stop it later

Calls superclass method
# File lib/graphql/streaming/action_cable_channel.rb, line 7
def stream_from(*args, &block)
  super
  streams.last
end

Private Instance Methods

clear_graphql_query(query_id) click to toggle source

Remove any subscriptions or collectors for this query

# File lib/graphql/streaming/action_cable_channel.rb, line 60
def clear_graphql_query(query_id)
  graphql_queries[query_id].map(&:close)
  graphql_queries[query_id].clear
end
graphql_queries() click to toggle source

A registry of queries for this channel, keys are query_ids, values are subscribers or collectors

# File lib/graphql/streaming/action_cable_channel.rb, line 67
def graphql_queries
  @graphql_queries ||= Hash.new { |h, k| h[k] = [] }
end
stream_graphql_query(query_id:, &query_exec) click to toggle source

Setup a subscriber and collector and yield them to the block

# File lib/graphql/streaming/action_cable_channel.rb, line 31
def stream_graphql_query(query_id:, &query_exec)
  # This object emits patches
  collector = GraphQL::Streaming::ActionCableCollector.new(self, query_id)

  # This re-evals the query in response to triggers
  subscriber = GraphQL::Streaming::ActionCableSubscriber.new(self, query_id) do
    # No subscriber so we don't re-subscribe
    reeval_stream_ctx = { collector: collector }
    query_exec.call(reeval_stream_ctx)
  end

  graphql_queries[query_id] << collector
  graphql_queries[query_id] << subscriber

  stream_ctx = {
    collector: collector,
    subscriber: subscriber,
  }
  # make the first GraphQL call
  query_exec.call(stream_ctx)

  if !subscriber.subscribed?
    # If there are no ongoing subscriptions,
    # tell the client to stop listening for patches
    clear_graphql_query(query_id)
  end
end