// Copyright (C) MongoDB, Inc. 2017-present. // // Licensed under the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. You may obtain // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 package mongo import ( "context" "errors" "fmt" "time" "go.mongodb.org/mongo-driver/v2/bson" "go.mongodb.org/mongo-driver/v2/internal/mongoutil" "go.mongodb.org/mongo-driver/v2/internal/serverselector" "go.mongodb.org/mongo-driver/v2/mongo/options" "go.mongodb.org/mongo-driver/v2/x/bsonx/bsoncore" "go.mongodb.org/mongo-driver/v2/x/mongo/driver" "go.mongodb.org/mongo-driver/v2/x/mongo/driver/operation" "go.mongodb.org/mongo-driver/v2/x/mongo/driver/session" ) // ErrWrongClient is returned when a user attempts to pass in a session created by a different client than // the method call is using. var ErrWrongClient = errors.New("session was not created by this client") var withTransactionTimeout = 120 * time.Second // Session is a MongoDB logical session. Sessions can be used to enable causal // consistency for a group of operations or to execute operations in an ACID // transaction. A new Session can be created from a Client instance. A Session // created from a Client must only be used to execute operations using that // Client or a Database or Collection created from that Client. For more // information about sessions, and their use cases, see // https://www.mongodb.com/docs/manual/reference/server-sessions/, // https://www.mongodb.com/docs/manual/core/read-isolation-consistency-recency/#causal-consistency, and // https://www.mongodb.com/docs/manual/core/transactions/. // // Implementations of Session are not safe for concurrent use by multiple // goroutines. type Session struct { clientSession *session.Client client *Client deployment driver.Deployment didCommitAfterStart bool // true if commit was called after start with no other operations } type sessionKey struct{} // NewSessionContext returns a Context that holds the given Session. If the // Context already contains a Session, that Session will be replaced with the // one provided. // // The returned Context can be used with Collection methods like // [Collection.InsertOne] or [Collection.Find] to run operations in a Session. func NewSessionContext(parent context.Context, sess *Session) context.Context { return context.WithValue(parent, sessionKey{}, sess) } // SessionFromContext extracts the mongo.Session object stored in a Context. This can be used on a SessionContext that // was created implicitly through one of the callback-based session APIs or explicitly by calling NewSessionContext. If // there is no Session stored in the provided Context, nil is returned. func SessionFromContext(ctx context.Context) *Session { val := ctx.Value(sessionKey{}) if val == nil { return nil } sess, ok := val.(*Session) if !ok { return nil } return sess } // ID returns the current ID document associated with the session. The ID // document is in the form {"id": }. func (s *Session) ID() bson.Raw { return bson.Raw(s.clientSession.SessionID) } // EndSession aborts any existing transactions and close the session. func (s *Session) EndSession(ctx context.Context) { if s.clientSession.TransactionInProgress() { // ignore all errors aborting during an end session _ = s.AbortTransaction(ctx) } s.clientSession.EndSession() } // WithTransaction starts a transaction on this session and runs the fn // callback. Errors with the TransientTransactionError and // UnknownTransactionCommitResult labels are retried for up to 120 seconds. // Inside the callback, the SessionContext must be used as the Context parameter // for any operations that should be part of the transaction. If the ctx // parameter already has a Session attached to it, it will be replaced by this // session. The fn callback may be run multiple times during WithTransaction due // to retry attempts, so it must be idempotent. // // If a command inside the callback fn fails, it may cause the transaction on // the server to be aborted. This situation is normally handled transparently by // the driver. However, if the application does not return that error from the // fn, the driver will not be able to determine whether the transaction was // aborted or not. The driver will then retry the block indefinitely. // // To avoid this situation, the application MUST NOT silently handle errors // within the callback fn. If the application needs to handle errors within the // block, it MUST return them after doing so. // // Non-retryable operation errors or any operation errors that occur after the // timeout expires will be returned without retrying. If the callback fails, the // driver will call AbortTransaction. Because this method must succeed to ensure // that server-side resources are properly cleaned up, context deadlines and // cancellations will not be respected during this call. For a usage example, // see the Client.StartSession method documentation. func (s *Session) WithTransaction( ctx context.Context, fn func(ctx context.Context) (any, error), opts ...options.Lister[options.TransactionOptions], ) (any, error) { timeout := time.NewTimer(withTransactionTimeout) defer timeout.Stop() var err error for { err = s.StartTransaction(opts...) if err != nil { return nil, err } res, err := fn(NewSessionContext(ctx, s)) if err != nil { if s.clientSession.TransactionRunning() { // Wrap the user-provided Context in a new one that behaves like context.Background() for deadlines and // cancellations, but forwards Value requests to the original one. _ = s.AbortTransaction(newBackgroundContext(ctx)) } select { case <-timeout.C: return nil, err default: } if errorHasLabel(err, driver.TransientTransactionError) { continue } return res, err } // Check if callback intentionally aborted and, if so, return immediately // with no error. err = s.clientSession.CheckAbortTransaction() if err != nil { return res, nil } // If context has errored, run AbortTransaction and return, as the CommitLoop // has no chance of succeeding. // // Aborting after a failed CommitTransaction is dangerous. Failed transaction // commits may unpin the session server-side, and subsequent transaction aborts // may run on a new mongos which could end up with commit and abort being executed // simultaneously. if ctx.Err() != nil { // Wrap the user-provided Context in a new one that behaves like context.Background() for deadlines and // cancellations, but forwards Value requests to the original one. _ = s.AbortTransaction(newBackgroundContext(ctx)) return nil, ctx.Err() } CommitLoop: for { err = s.CommitTransaction(newBackgroundContext(ctx)) // End when error is nil, as transaction has been committed. if err == nil { return res, nil } select { case <-timeout.C: return res, err default: } var cerr CommandError if errors.As(err, &cerr) { if cerr.HasErrorLabel(driver.UnknownTransactionCommitResult) && !cerr.IsMaxTimeMSExpiredError() { continue } if cerr.HasErrorLabel(driver.TransientTransactionError) { break CommitLoop } } return res, err } } } // StartTransaction starts a new transaction. This method returns an error if // there is already a transaction in-progress for this session. func (s *Session) StartTransaction(opts ...options.Lister[options.TransactionOptions]) error { err := s.clientSession.CheckStartTransaction() if err != nil { return err } s.didCommitAfterStart = false args, err := mongoutil.NewOptions[options.TransactionOptions](opts...) if err != nil { return fmt.Errorf("failed to construct options from builder: %w", err) } coreOpts := &session.TransactionOptions{ ReadConcern: args.ReadConcern, ReadPreference: args.ReadPreference, WriteConcern: args.WriteConcern, } return s.clientSession.StartTransaction(coreOpts) } // AbortTransaction aborts the active transaction for this session. This method // returns an error if there is no active transaction for this session or if the // transaction has been committed or aborted. func (s *Session) AbortTransaction(ctx context.Context) error { err := s.clientSession.CheckAbortTransaction() if err != nil { return err } // Do not run the abort command if the transaction is in starting state if s.clientSession.TransactionStarting() || s.didCommitAfterStart { return s.clientSession.AbortTransaction() } selector := makePinnedSelector(s.clientSession, &serverselector.Write{}) s.clientSession.Aborting = true _ = operation.NewAbortTransaction().Session(s.clientSession).ClusterClock(s.client.clock).Database("admin"). Deployment(s.deployment).WriteConcern(s.clientSession.CurrentWc).ServerSelector(selector). Retry(driver.RetryOncePerCommand).CommandMonitor(s.client.monitor). RecoveryToken(bsoncore.Document(s.clientSession.RecoveryToken)).ServerAPI(s.client.serverAPI). Authenticator(s.client.authenticator).Logger(s.client.logger).Execute(ctx) s.clientSession.Aborting = false _ = s.clientSession.AbortTransaction() return nil } // CommitTransaction commits the active transaction for this session. This // method returns an error if there is no active transaction for this session or // if the transaction has been aborted. func (s *Session) CommitTransaction(ctx context.Context) error { err := s.clientSession.CheckCommitTransaction() if err != nil { return err } // Do not run the commit command if the transaction is in started state if s.clientSession.TransactionStarting() || s.didCommitAfterStart { s.didCommitAfterStart = true return s.clientSession.CommitTransaction() } if s.clientSession.TransactionCommitted() { s.clientSession.RetryingCommit = true } selector := makePinnedSelector(s.clientSession, &serverselector.Write{}) s.clientSession.Committing = true op := operation.NewCommitTransaction(). Session(s.clientSession).ClusterClock(s.client.clock).Database("admin").Deployment(s.deployment). WriteConcern(s.clientSession.CurrentWc).ServerSelector(selector).Retry(driver.RetryOncePerCommand). CommandMonitor(s.client.monitor).RecoveryToken(bsoncore.Document(s.clientSession.RecoveryToken)). ServerAPI(s.client.serverAPI).Authenticator(s.client.authenticator).Logger(s.client.logger) err = op.Execute(ctx) // Return error without updating transaction state if it is a timeout, as the transaction has not // actually been committed. if IsTimeout(err) { return wrapErrors(err) } s.clientSession.Committing = false commitErr := s.clientSession.CommitTransaction() // We set the write concern to majority for subsequent calls to CommitTransaction. s.clientSession.UpdateCommitTransactionWriteConcern() if err != nil { return wrapErrors(err) } return commitErr } // ClusterTime returns the current cluster time document associated with the // session. func (s *Session) ClusterTime() bson.Raw { return s.clientSession.ClusterTime } // SnapshotTime returns the current snapshot time associated with the session. func (s *Session) SnapshotTime() bson.Timestamp { return s.clientSession.SnapshotTime } // AdvanceClusterTime advances the cluster time for a session. This method // returns an error if the session has ended. func (s *Session) AdvanceClusterTime(d bson.Raw) error { return s.clientSession.AdvanceClusterTime(d) } // OperationTime returns the current operation time document associated with the // session. func (s *Session) OperationTime() *bson.Timestamp { return s.clientSession.OperationTime } // AdvanceOperationTime advances the operation time for a session. This method // returns an error if the session has ended. func (s *Session) AdvanceOperationTime(ts *bson.Timestamp) error { return s.clientSession.AdvanceOperationTime(ts) } // Client is the Client associated with the session. func (s *Session) Client() *Client { return s.client } // sessionFromContext checks for a sessionImpl in the argued context and returns the session if it // exists func sessionFromContext(ctx context.Context) *session.Client { if ses := SessionFromContext(ctx); ses != nil { return ses.clientSession } return nil }