class Mongo::Collection::View::ChangeStream

Provides behaviour around a `$changeStream` pipeline stage in the aggregation framework. Specifying this stage allows users to request that notifications are sent for all changes to a particular collection or database.

@note Only available in server versions 3.6 and higher. @note ChangeStreams do not work properly with JRuby because of the issue documented

here: https://github.com/jruby/jruby/issues/4212
Namely, JRuby eagerly evaluates #next on an Enumerator in a background green thread.
So calling #next on the change stream will cause getmores to be called in a loop in the background.

@since 2.5.0

Constants

FULL_DOCUMENT_DEFAULT

@return [ String ] The fullDocument option default value.

@since 2.5.0

Attributes

options[R]

@return [ BSON::Document ] The change stream options.

@since 2.5.0

Public Class Methods

new(view, pipeline, options = {}) click to toggle source

Initialize the change stream for the provided collection view, pipeline and options.

@example Create the new change stream view.

ChangeStream.new(view, pipeline, options)

@param [ Collection::View ] view The collection view. @param [ Array<Hash> ] pipeline The pipeline of operators to filter the change notifications. @param [ Hash ] options The change stream options.

@option options [ String ] :full_document Allowed values: ‘default’, ‘updateLookup’. Defaults to ‘default’.

When set to ‘updateLookup’, the change notification for partial updates will include both a delta
describing the changes to the document, as well as a copy of the entire document that was changed
from some time after the change occurred.

@option options [ BSON::Document, Hash ] :resume_after Specifies the logical starting point for the

new change stream.

@option options [ Integer ] :max_await_time_ms The maximum amount of time for the server to wait

on new documents to satisfy a change stream query.

@option options [ Integer ] :batch_size The number of documents to return per batch. @option options [ BSON::Document, Hash ] :collation The collation to use.

@since 2.5.0

# File lib/mongo/collection/view/change_stream.rb, line 68
def initialize(view, pipeline, options = {})
  @view = view
  @change_stream_filters = pipeline && pipeline.dup
  @options = options && options.dup.freeze
  @resume_token = @options[:resume_after]
  read_with_one_retry { create_cursor! }
end

Public Instance Methods

close() click to toggle source

Close the change stream.

@example Close the change stream.

stream.close

@return [ nil ] nil.

@since 2.5.0

# File lib/mongo/collection/view/change_stream.rb, line 114
def close
  unless closed?
    begin; @cursor.send(:kill_cursors); rescue; end
    @cursor = nil
  end
end
closed?() click to toggle source

Is the change stream closed?

@example Determine whether the change stream is closed.

stream.closed?

@return [ true, false ] If the change stream is closed.

@since 2.5.0

# File lib/mongo/collection/view/change_stream.rb, line 129
def closed?
  @cursor.nil?
end
each() { |doc| ... } click to toggle source

Iterate through documents returned by the change stream.

@example Iterate through the stream of documents.

stream.each do |document|
  p document
end

@return [ Enumerator ] The enumerator.

@since 2.5.0

@yieldparam [ BSON::Document ] Each change stream document.

# File lib/mongo/collection/view/change_stream.rb, line 88
def each
  raise StopIteration.new if closed?
  begin
    @cursor.each do |doc|
      cache_resume_token(doc)
      yield doc
    end if block_given?
    @cursor.to_enum
  rescue => e
    close
    if retryable?(e)
      create_cursor!
      retry
    end
    raise
  end
end
inspect() click to toggle source

Get a formatted string for use in inspection.

@example Inspect the change stream object.

stream.inspect

@return [ String ] The change stream inspection.

@since 2.5.0

# File lib/mongo/collection/view/change_stream.rb, line 141
def inspect
  "#<Mongo::Collection::View:ChangeStream:0x#{object_id} filters=#{@change_stream_filters} " +
    "options=#{@options} resume_token=#{@resume_token}>"
end

Private Instance Methods

cache_resume_token(doc) click to toggle source
# File lib/mongo/collection/view/change_stream.rb, line 148
def cache_resume_token(doc)
  unless @resume_token = (doc[:_id] && doc[:_id].dup)
    raise Error::MissingResumeToken.new
  end
end
create_cursor!() click to toggle source
# File lib/mongo/collection/view/change_stream.rb, line 154
def create_cursor!
  session = client.send(:get_session, @options)
  server = server_selector.select_server(cluster)
  result = send_initial_query(server, session)
  @cursor = Cursor.new(view, result, server, disable_retry: true, session: session)
end
pipeline() click to toggle source
# File lib/mongo/collection/view/change_stream.rb, line 161
def pipeline
  change_doc = { fullDocument: ( @options[:full_document] || FULL_DOCUMENT_DEFAULT ) }
  change_doc[:resumeAfter] = @resume_token if @resume_token
  [{ '$changeStream' => change_doc }] + @change_stream_filters
end
send_initial_query(server, session) click to toggle source
# File lib/mongo/collection/view/change_stream.rb, line 167
def send_initial_query(server, session)
  initial_query_op(session).execute(server)
end