class Mongo::Collection::View::ChangeStream
Provides behaviour around a `$changeStream` pipeline stage in the aggregation framework. Specifying this stage allows users to request that notifications are sent for all changes to a particular collection or database.
@note Only available in server versions 3.6 and higher. @note ChangeStreams do not work properly with JRuby because of the issue documented
here: https://github.com/jruby/jruby/issues/4212 Namely, JRuby eagerly evaluates #next on an Enumerator in a background green thread. So calling #next on the change stream will cause getmores to be called in a loop in the background.
@since 2.5.0
Constants
- FULL_DOCUMENT_DEFAULT
@return [ String ] The fullDocument option default value.
@since 2.5.0
Attributes
@return [ BSON::Document ] The change stream options.
@since 2.5.0
Public Class Methods
Initialize the change stream for the provided collection view, pipeline and options.
@example Create the new change stream view.
ChangeStream.new(view, pipeline, options)
@param [ Collection::View ] view The collection view. @param [ Array<Hash> ] pipeline The pipeline of operators to filter the change notifications. @param [ Hash ] options The change stream options.
@option options [ String ] :full_document Allowed values: ‘default’, ‘updateLookup’. Defaults to ‘default’.
When set to ‘updateLookup’, the change notification for partial updates will include both a delta describing the changes to the document, as well as a copy of the entire document that was changed from some time after the change occurred.
@option options [ BSON::Document, Hash ] :resume_after Specifies the logical starting point for the
new change stream.
@option options [ Integer ] :max_await_time_ms The maximum amount of time for the server to wait
on new documents to satisfy a change stream query.
@option options [ Integer ] :batch_size The number of documents to return per batch. @option options [ BSON::Document, Hash ] :collation The collation to use.
@since 2.5.0
# File lib/mongo/collection/view/change_stream.rb, line 68 def initialize(view, pipeline, options = {}) @view = view @change_stream_filters = pipeline && pipeline.dup @options = options && options.dup.freeze @resume_token = @options[:resume_after] read_with_one_retry { create_cursor! } end
Public Instance Methods
Close the change stream.
@example Close the change stream.
stream.close
@return [ nil ] nil.
@since 2.5.0
# File lib/mongo/collection/view/change_stream.rb, line 114 def close unless closed? begin; @cursor.send(:kill_cursors); rescue; end @cursor = nil end end
Is the change stream closed?
@example Determine whether the change stream is closed.
stream.closed?
@return [ true, false ] If the change stream is closed.
@since 2.5.0
# File lib/mongo/collection/view/change_stream.rb, line 129 def closed? @cursor.nil? end
Iterate through documents returned by the change stream.
@example Iterate through the stream of documents.
stream.each do |document| p document end
@return [ Enumerator ] The enumerator.
@since 2.5.0
@yieldparam [ BSON::Document ] Each change stream document.
# File lib/mongo/collection/view/change_stream.rb, line 88 def each raise StopIteration.new if closed? begin @cursor.each do |doc| cache_resume_token(doc) yield doc end if block_given? @cursor.to_enum rescue => e close if retryable?(e) create_cursor! retry end raise end end
Get a formatted string for use in inspection.
@example Inspect the change stream object.
stream.inspect
@return [ String ] The change stream inspection.
@since 2.5.0
# File lib/mongo/collection/view/change_stream.rb, line 141 def inspect "#<Mongo::Collection::View:ChangeStream:0x#{object_id} filters=#{@change_stream_filters} " + "options=#{@options} resume_token=#{@resume_token}>" end
Private Instance Methods
# File lib/mongo/collection/view/change_stream.rb, line 148 def cache_resume_token(doc) unless @resume_token = (doc[:_id] && doc[:_id].dup) raise Error::MissingResumeToken.new end end
# File lib/mongo/collection/view/change_stream.rb, line 154 def create_cursor! session = client.send(:get_session, @options) server = server_selector.select_server(cluster) result = send_initial_query(server, session) @cursor = Cursor.new(view, result, server, disable_retry: true, session: session) end
# File lib/mongo/collection/view/change_stream.rb, line 161 def pipeline change_doc = { fullDocument: ( @options[:full_document] || FULL_DOCUMENT_DEFAULT ) } change_doc[:resumeAfter] = @resume_token if @resume_token [{ '$changeStream' => change_doc }] + @change_stream_filters end
# File lib/mongo/collection/view/change_stream.rb, line 167 def send_initial_query(server, session) initial_query_op(session).execute(server) end