module GraphQL::Streaming::ActionCableChannel
TODO: This contains MONKEYPATCHES to support stopping certain streams
Public Instance Methods
send_graphql_payload(payload)
click to toggle source
Work around the fact that `transmit` is private
# File lib/graphql/streaming/action_cable_channel.rb, line 24 def send_graphql_payload(payload) transmit(payload) end
stop_specific_streams(streams_to_stop)
click to toggle source
Stop streams which were captured from stream_from
# File lib/graphql/streaming/action_cable_channel.rb, line 13 def stop_specific_streams(streams_to_stop) @_streams ||= [] @_streams -= streams_to_stop streams_to_stop.each do |broadcasting, callback| pubsub.unsubscribe broadcasting, callback logger.info "#{self.class.name} stopped streaming from #{broadcasting}" end nil end
stream_from(*args, &block)
click to toggle source
MONKEY PATCH Return the newly-created stream, so you can stop it later
Calls superclass method
# File lib/graphql/streaming/action_cable_channel.rb, line 7 def stream_from(*args, &block) super streams.last end
Private Instance Methods
clear_graphql_query(query_id)
click to toggle source
Remove any subscriptions or collectors for this query
# File lib/graphql/streaming/action_cable_channel.rb, line 60 def clear_graphql_query(query_id) graphql_queries[query_id].map(&:close) graphql_queries[query_id].clear end
graphql_queries()
click to toggle source
A registry of queries for this channel, keys are query_ids, values are subscribers or collectors
# File lib/graphql/streaming/action_cable_channel.rb, line 67 def graphql_queries @graphql_queries ||= Hash.new { |h, k| h[k] = [] } end
stream_graphql_query(query_id:, &query_exec)
click to toggle source
Setup a subscriber and collector and yield them to the block
# File lib/graphql/streaming/action_cable_channel.rb, line 31 def stream_graphql_query(query_id:, &query_exec) # This object emits patches collector = GraphQL::Streaming::ActionCableCollector.new(self, query_id) # This re-evals the query in response to triggers subscriber = GraphQL::Streaming::ActionCableSubscriber.new(self, query_id) do # No subscriber so we don't re-subscribe reeval_stream_ctx = { collector: collector } query_exec.call(reeval_stream_ctx) end graphql_queries[query_id] << collector graphql_queries[query_id] << subscriber stream_ctx = { collector: collector, subscriber: subscriber, } # make the first GraphQL call query_exec.call(stream_ctx) if !subscriber.subscribed? # If there are no ongoing subscriptions, # tell the client to stop listening for patches clear_graphql_query(query_id) end end