module Mongo::Operation::SessionsSupported
Shared behavior of operations that support a session.
@since 2.5.2 @api private
Constants
- READ_COMMANDS
- ZERO_TIMESTAMP
Private Instance Methods
Source
# File lib/mongo/operation/shared/sessions_supported.rb, line 165 def add_read_preference(sel, connection) Lint.assert_type(connection, Server::Connection) # https://github.com/mongodb/specifications/blob/master/source/server-selection/server-selection.rst#topology-type-single if connection.description.standalone? # Read preference is never sent to standalones. elsif connection.description.mongos? # When server is a mongos: # - $readPreference is never sent when mode is 'primary' # - When mode is 'secondaryPreferred' $readPreference is only sent # when a non-mode field (i.e. tag_sets) is present # - Otherwise $readPreference is sent if read doc = read.to_mongos if doc sel['$readPreference'] = doc end end elsif connection.server.cluster.single? # In Single topology: # - If no read preference is specified by the application, the driver # adds mode: primaryPreferred. # - If a read preference is specified by the application, the driver # replaces the mode with primaryPreferred. read_doc = if read BSON::Document.new(read.to_doc) else BSON::Document.new end if [nil, 'primary'].include?(read_doc['mode']) read_doc['mode'] = 'primaryPreferred' end sel['$readPreference'] = read_doc else # In replica sets, read preference is passed to the server if one # is specified by the application, and there is no default. if read sel['$readPreference'] = read.to_doc end end end
Adds $readPreference field to the command document.
$readPreference is only sent when the server is a mongos, following the rules described in github.com/mongodb/specifications/blob/master/source/server-selection/server-selection.rst#passing-read-preference-to-mongos. The topology does not matter for figuring out whether to send $readPreference since the decision is always made based on server type.
$readPreference is sent to OP_MSG-grokking replica set members.
@param [ Hash ] sel Existing command document which will be mutated. @param [ Server::Connection
] connection The connection that the
operation will be executed on.
Source
# File lib/mongo/operation/shared/sessions_supported.rb, line 94 def add_write_concern!(sel) sel[:writeConcern] = write_concern.options if write_concern end
Source
# File lib/mongo/operation/shared/sessions_supported.rb, line 98 def apply_autocommit!(selector) session.add_autocommit!(selector) end
Source
# File lib/mongo/operation/shared/sessions_supported.rb, line 49 def apply_causal_consistency!(selector, connection) return unless selector[:startTransaction] apply_causal_consistency_if_possible(selector, connection) end
Adds causal consistency document to the selector, if one can be constructed and the selector is for a startTransaction command.
When operations are performed in a transaction, only the first operation (the one which starts the transaction via startTransaction) is allowed to have a read concern, and with it the causal consistency document, specified.
Source
# File lib/mongo/operation/shared/sessions_supported.rb, line 62 def apply_causal_consistency_if_possible(selector, connection) if !connection.description.standalone? cc_doc = session.send(:causal_consistency_doc) if cc_doc rc_doc = (selector[:readConcern] || read_concern || {}).merge(cc_doc) selector[:readConcern] = Options::Mapper.transform_values_to_strings( rc_doc) end end end
Adds causal consistency document to the selector, if one can be constructed.
In order for the causal consistency document to be constructed, causal consistency must be enabled for the session and the session must have the current operation time. Also, topology must be replica set or sharded cluster.
Source
# File lib/mongo/operation/shared/sessions_supported.rb, line 77 def apply_cluster_time!(selector, connection) if !connection.description.standalone? cluster_time = [ connection.cluster_time, session&.cluster_time, ].compact.max if cluster_time selector['$clusterTime'] = cluster_time end end end
Source
# File lib/mongo/operation/shared/sessions_supported.rb, line 110 def apply_read_pref!(selector) session.apply_read_pref!(selector) if read_command?(selector) end
Source
# File lib/mongo/operation/shared/sessions_supported.rb, line 207 def apply_session_options(sel, connection) apply_cluster_time!(sel, connection) sel[:txnNumber] = BSON::Int64.new(txn_num) if txn_num sel.merge!(lsid: session.session_id) apply_start_transaction!(sel) apply_causal_consistency!(sel, connection) apply_autocommit!(sel) apply_txn_opts!(sel) suppress_read_write_concern!(sel) validate_read_preference!(sel) apply_txn_num!(sel) if session.recovery_token && (sel[:commitTransaction] || sel[:abortTransaction]) then sel[:recoveryToken] = session.recovery_token end end
Source
# File lib/mongo/operation/shared/sessions_supported.rb, line 102 def apply_start_transaction!(selector) session.add_start_transaction!(selector) end
Source
# File lib/mongo/operation/shared/sessions_supported.rb, line 106 def apply_txn_num!(selector) session.add_txn_num!(selector) end
Source
# File lib/mongo/operation/shared/sessions_supported.rb, line 114 def apply_txn_opts!(selector) session.add_txn_opts!(selector, read_command?(selector)) end
Source
# File lib/mongo/operation/shared/sessions_supported.rb, line 225 def build_message(connection) super.tap do |message| if session # Serialize the message to detect client-side problems, # such as invalid BSON keys. The message will be serialized again # later prior to being sent to the connection. message.serialize(BSON::ByteBuffer.new) session.update_state! end end end
Source
# File lib/mongo/operation/shared/sessions_supported.rb, line 126 def command(connection) if Lint.enabled? unless connection.is_a?(Server::Connection) raise Error::LintError, "Connection is not a Connection instance: #{connection}" end end sel = selector(connection).dup add_write_concern!(sel) sel[Protocol::Msg::DATABASE_IDENTIFIER] = db_name add_read_preference(sel, connection) if connection.features.sessions_enabled? apply_cluster_time!(sel, connection) if session && (acknowledged_write? || session.in_transaction?) apply_session_options(sel, connection) end elsif session && session.explicit? apply_session_options(sel, connection) end sel end
Source
# File lib/mongo/operation/shared/sessions_supported.rb, line 73 def flags acknowledged_write? ? [] : [:more_to_come] end
Source
# File lib/mongo/operation/shared/sessions_supported.rb, line 90 def read_command?(sel) READ_COMMANDS.any? { |c| sel[c] } end
Source
# File lib/mongo/operation/shared/sessions_supported.rb, line 118 def suppress_read_write_concern!(selector) session.suppress_read_write_concern!(selector) end
Source
# File lib/mongo/operation/shared/sessions_supported.rb, line 122 def validate_read_preference!(selector) session.validate_read_preference!(selector) if read_command?(selector) end