Files
web/server/vendor/go.mongodb.org/mongo-driver/v2/mongo/session.go

344 lines
12 KiB
Go

// Copyright (C) MongoDB, Inc. 2017-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
package mongo
import (
"context"
"errors"
"fmt"
"time"
"go.mongodb.org/mongo-driver/v2/bson"
"go.mongodb.org/mongo-driver/v2/internal/mongoutil"
"go.mongodb.org/mongo-driver/v2/internal/serverselector"
"go.mongodb.org/mongo-driver/v2/mongo/options"
"go.mongodb.org/mongo-driver/v2/x/bsonx/bsoncore"
"go.mongodb.org/mongo-driver/v2/x/mongo/driver"
"go.mongodb.org/mongo-driver/v2/x/mongo/driver/operation"
"go.mongodb.org/mongo-driver/v2/x/mongo/driver/session"
)
// ErrWrongClient is returned when a user attempts to pass in a session created by a different client than
// the method call is using.
var ErrWrongClient = errors.New("session was not created by this client")
var withTransactionTimeout = 120 * time.Second
// Session is a MongoDB logical session. Sessions can be used to enable causal
// consistency for a group of operations or to execute operations in an ACID
// transaction. A new Session can be created from a Client instance. A Session
// created from a Client must only be used to execute operations using that
// Client or a Database or Collection created from that Client. For more
// information about sessions, and their use cases, see
// https://www.mongodb.com/docs/manual/reference/server-sessions/,
// https://www.mongodb.com/docs/manual/core/read-isolation-consistency-recency/#causal-consistency, and
// https://www.mongodb.com/docs/manual/core/transactions/.
//
// Implementations of Session are not safe for concurrent use by multiple
// goroutines.
type Session struct {
clientSession *session.Client
client *Client
deployment driver.Deployment
didCommitAfterStart bool // true if commit was called after start with no other operations
}
type sessionKey struct{}
// NewSessionContext returns a Context that holds the given Session. If the
// Context already contains a Session, that Session will be replaced with the
// one provided.
//
// The returned Context can be used with Collection methods like
// [Collection.InsertOne] or [Collection.Find] to run operations in a Session.
func NewSessionContext(parent context.Context, sess *Session) context.Context {
return context.WithValue(parent, sessionKey{}, sess)
}
// SessionFromContext extracts the mongo.Session object stored in a Context. This can be used on a SessionContext that
// was created implicitly through one of the callback-based session APIs or explicitly by calling NewSessionContext. If
// there is no Session stored in the provided Context, nil is returned.
func SessionFromContext(ctx context.Context) *Session {
val := ctx.Value(sessionKey{})
if val == nil {
return nil
}
sess, ok := val.(*Session)
if !ok {
return nil
}
return sess
}
// ID returns the current ID document associated with the session. The ID
// document is in the form {"id": <BSON binary value>}.
func (s *Session) ID() bson.Raw {
return bson.Raw(s.clientSession.SessionID)
}
// EndSession aborts any existing transactions and close the session.
func (s *Session) EndSession(ctx context.Context) {
if s.clientSession.TransactionInProgress() {
// ignore all errors aborting during an end session
_ = s.AbortTransaction(ctx)
}
s.clientSession.EndSession()
}
// WithTransaction starts a transaction on this session and runs the fn
// callback. Errors with the TransientTransactionError and
// UnknownTransactionCommitResult labels are retried for up to 120 seconds.
// Inside the callback, the SessionContext must be used as the Context parameter
// for any operations that should be part of the transaction. If the ctx
// parameter already has a Session attached to it, it will be replaced by this
// session. The fn callback may be run multiple times during WithTransaction due
// to retry attempts, so it must be idempotent.
//
// If a command inside the callback fn fails, it may cause the transaction on
// the server to be aborted. This situation is normally handled transparently by
// the driver. However, if the application does not return that error from the
// fn, the driver will not be able to determine whether the transaction was
// aborted or not. The driver will then retry the block indefinitely.
//
// To avoid this situation, the application MUST NOT silently handle errors
// within the callback fn. If the application needs to handle errors within the
// block, it MUST return them after doing so.
//
// Non-retryable operation errors or any operation errors that occur after the
// timeout expires will be returned without retrying. If the callback fails, the
// driver will call AbortTransaction. Because this method must succeed to ensure
// that server-side resources are properly cleaned up, context deadlines and
// cancellations will not be respected during this call. For a usage example,
// see the Client.StartSession method documentation.
func (s *Session) WithTransaction(
ctx context.Context,
fn func(ctx context.Context) (any, error),
opts ...options.Lister[options.TransactionOptions],
) (any, error) {
timeout := time.NewTimer(withTransactionTimeout)
defer timeout.Stop()
var err error
for {
err = s.StartTransaction(opts...)
if err != nil {
return nil, err
}
res, err := fn(NewSessionContext(ctx, s))
if err != nil {
if s.clientSession.TransactionRunning() {
// Wrap the user-provided Context in a new one that behaves like context.Background() for deadlines and
// cancellations, but forwards Value requests to the original one.
_ = s.AbortTransaction(newBackgroundContext(ctx))
}
select {
case <-timeout.C:
return nil, err
default:
}
if errorHasLabel(err, driver.TransientTransactionError) {
continue
}
return res, err
}
// Check if callback intentionally aborted and, if so, return immediately
// with no error.
err = s.clientSession.CheckAbortTransaction()
if err != nil {
return res, nil
}
// If context has errored, run AbortTransaction and return, as the CommitLoop
// has no chance of succeeding.
//
// Aborting after a failed CommitTransaction is dangerous. Failed transaction
// commits may unpin the session server-side, and subsequent transaction aborts
// may run on a new mongos which could end up with commit and abort being executed
// simultaneously.
if ctx.Err() != nil {
// Wrap the user-provided Context in a new one that behaves like context.Background() for deadlines and
// cancellations, but forwards Value requests to the original one.
_ = s.AbortTransaction(newBackgroundContext(ctx))
return nil, ctx.Err()
}
CommitLoop:
for {
err = s.CommitTransaction(newBackgroundContext(ctx))
// End when error is nil, as transaction has been committed.
if err == nil {
return res, nil
}
select {
case <-timeout.C:
return res, err
default:
}
var cerr CommandError
if errors.As(err, &cerr) {
if cerr.HasErrorLabel(driver.UnknownTransactionCommitResult) && !cerr.IsMaxTimeMSExpiredError() {
continue
}
if cerr.HasErrorLabel(driver.TransientTransactionError) {
break CommitLoop
}
}
return res, err
}
}
}
// StartTransaction starts a new transaction. This method returns an error if
// there is already a transaction in-progress for this session.
func (s *Session) StartTransaction(opts ...options.Lister[options.TransactionOptions]) error {
err := s.clientSession.CheckStartTransaction()
if err != nil {
return err
}
s.didCommitAfterStart = false
args, err := mongoutil.NewOptions[options.TransactionOptions](opts...)
if err != nil {
return fmt.Errorf("failed to construct options from builder: %w", err)
}
coreOpts := &session.TransactionOptions{
ReadConcern: args.ReadConcern,
ReadPreference: args.ReadPreference,
WriteConcern: args.WriteConcern,
}
return s.clientSession.StartTransaction(coreOpts)
}
// AbortTransaction aborts the active transaction for this session. This method
// returns an error if there is no active transaction for this session or if the
// transaction has been committed or aborted.
func (s *Session) AbortTransaction(ctx context.Context) error {
err := s.clientSession.CheckAbortTransaction()
if err != nil {
return err
}
// Do not run the abort command if the transaction is in starting state
if s.clientSession.TransactionStarting() || s.didCommitAfterStart {
return s.clientSession.AbortTransaction()
}
selector := makePinnedSelector(s.clientSession, &serverselector.Write{})
s.clientSession.Aborting = true
_ = operation.NewAbortTransaction().Session(s.clientSession).ClusterClock(s.client.clock).Database("admin").
Deployment(s.deployment).WriteConcern(s.clientSession.CurrentWc).ServerSelector(selector).
Retry(driver.RetryOncePerCommand).CommandMonitor(s.client.monitor).
RecoveryToken(bsoncore.Document(s.clientSession.RecoveryToken)).ServerAPI(s.client.serverAPI).
Authenticator(s.client.authenticator).Logger(s.client.logger).Execute(ctx)
s.clientSession.Aborting = false
_ = s.clientSession.AbortTransaction()
return nil
}
// CommitTransaction commits the active transaction for this session. This
// method returns an error if there is no active transaction for this session or
// if the transaction has been aborted.
func (s *Session) CommitTransaction(ctx context.Context) error {
err := s.clientSession.CheckCommitTransaction()
if err != nil {
return err
}
// Do not run the commit command if the transaction is in started state
if s.clientSession.TransactionStarting() || s.didCommitAfterStart {
s.didCommitAfterStart = true
return s.clientSession.CommitTransaction()
}
if s.clientSession.TransactionCommitted() {
s.clientSession.RetryingCommit = true
}
selector := makePinnedSelector(s.clientSession, &serverselector.Write{})
s.clientSession.Committing = true
op := operation.NewCommitTransaction().
Session(s.clientSession).ClusterClock(s.client.clock).Database("admin").Deployment(s.deployment).
WriteConcern(s.clientSession.CurrentWc).ServerSelector(selector).Retry(driver.RetryOncePerCommand).
CommandMonitor(s.client.monitor).RecoveryToken(bsoncore.Document(s.clientSession.RecoveryToken)).
ServerAPI(s.client.serverAPI).Authenticator(s.client.authenticator).Logger(s.client.logger)
err = op.Execute(ctx)
// Return error without updating transaction state if it is a timeout, as the transaction has not
// actually been committed.
if IsTimeout(err) {
return wrapErrors(err)
}
s.clientSession.Committing = false
commitErr := s.clientSession.CommitTransaction()
// We set the write concern to majority for subsequent calls to CommitTransaction.
s.clientSession.UpdateCommitTransactionWriteConcern()
if err != nil {
return wrapErrors(err)
}
return commitErr
}
// ClusterTime returns the current cluster time document associated with the
// session.
func (s *Session) ClusterTime() bson.Raw {
return s.clientSession.ClusterTime
}
// SnapshotTime returns the current snapshot time associated with the session.
func (s *Session) SnapshotTime() bson.Timestamp {
return s.clientSession.SnapshotTime
}
// AdvanceClusterTime advances the cluster time for a session. This method
// returns an error if the session has ended.
func (s *Session) AdvanceClusterTime(d bson.Raw) error {
return s.clientSession.AdvanceClusterTime(d)
}
// OperationTime returns the current operation time document associated with the
// session.
func (s *Session) OperationTime() *bson.Timestamp {
return s.clientSession.OperationTime
}
// AdvanceOperationTime advances the operation time for a session. This method
// returns an error if the session has ended.
func (s *Session) AdvanceOperationTime(ts *bson.Timestamp) error {
return s.clientSession.AdvanceOperationTime(ts)
}
// Client is the Client associated with the session.
func (s *Session) Client() *Client {
return s.client
}
// sessionFromContext checks for a sessionImpl in the argued context and returns the session if it
// exists
func sessionFromContext(ctx context.Context) *session.Client {
if ses := SessionFromContext(ctx); ses != nil {
return ses.clientSession
}
return nil
}