// Copyright (C) MongoDB, Inc. 2017-present. // // Licensed under the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. You may obtain // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 package mongo import ( "context" "errors" "fmt" "net/http" "sync" "sync/atomic" "time" "go.mongodb.org/mongo-driver/v2/bson" "go.mongodb.org/mongo-driver/v2/event" "go.mongodb.org/mongo-driver/v2/internal/httputil" "go.mongodb.org/mongo-driver/v2/internal/logger" "go.mongodb.org/mongo-driver/v2/internal/mongoutil" "go.mongodb.org/mongo-driver/v2/internal/optionsutil" "go.mongodb.org/mongo-driver/v2/internal/ptrutil" "go.mongodb.org/mongo-driver/v2/internal/serverselector" "go.mongodb.org/mongo-driver/v2/internal/uuid" "go.mongodb.org/mongo-driver/v2/mongo/options" "go.mongodb.org/mongo-driver/v2/mongo/readconcern" "go.mongodb.org/mongo-driver/v2/mongo/readpref" "go.mongodb.org/mongo-driver/v2/mongo/writeconcern" "go.mongodb.org/mongo-driver/v2/x/bsonx/bsoncore" "go.mongodb.org/mongo-driver/v2/x/mongo/driver" "go.mongodb.org/mongo-driver/v2/x/mongo/driver/auth" "go.mongodb.org/mongo-driver/v2/x/mongo/driver/description" "go.mongodb.org/mongo-driver/v2/x/mongo/driver/mongocrypt" mcopts "go.mongodb.org/mongo-driver/v2/x/mongo/driver/mongocrypt/options" "go.mongodb.org/mongo-driver/v2/x/mongo/driver/operation" "go.mongodb.org/mongo-driver/v2/x/mongo/driver/session" "go.mongodb.org/mongo-driver/v2/x/mongo/driver/topology" ) const ( defaultLocalThreshold = 15 * time.Millisecond defaultMaxPoolSize = 100 ) var ( // keyVaultCollOpts specifies options used to communicate with the key vault collection keyVaultCollOpts = options.Collection().SetReadConcern(readconcern.Majority()). SetWriteConcern(writeconcern.Majority()) endSessionsBatchSize = 10000 ) // Client is a handle representing a pool of connections to a MongoDB deployment. It is safe for concurrent use by // multiple goroutines. // // The Client type opens and closes connections automatically and maintains a pool of idle connections. For // connection pool configuration options, see documentation for the ClientOptions type in the mongo/options package. type Client struct { id uuid.UUID deployment driver.Deployment localThreshold time.Duration retryWrites bool retryReads bool clock *session.ClusterClock readPreference *readpref.ReadPref readConcern *readconcern.ReadConcern writeConcern *writeconcern.WriteConcern bsonOpts *options.BSONOptions registry *bson.Registry monitor *event.CommandMonitor serverAPI *driver.ServerAPIOptions serverMonitor *event.ServerMonitor sessionPool *session.Pool timeout *time.Duration httpClient *http.Client logger *logger.Logger currentDriverInfo *atomic.Pointer[options.DriverInfo] seenDriverInfo sync.Map // in-use encryption fields isAutoEncryptionSet bool keyVaultClientFLE *Client keyVaultCollFLE *Collection mongocryptdFLE *mongocryptdClient cryptFLE driver.Crypt metadataClientFLE *Client internalClientFLE *Client encryptedFieldsMap map[string]any authenticator driver.Authenticator } // Connect creates a new Client with the given configuration options. // // Connect returns an error if the configuration options are invalid, but does // not validate that the MongoDB deployment is reachable. To verify that the // deployment is reachable, call [Client.Ping]. // // When creating an [options.ClientOptions], the order the methods are called // matters. Later option setter calls overwrite the values from previous option // setter calls, including the ApplyURI method. This allows callers to // determine the order of precedence for setting options. For instance, if // ApplyURI is called before SetAuth, the Credential from SetAuth will // overwrite the values from the connection string. If ApplyURI is called // after SetAuth, then its values will overwrite those from SetAuth. func Connect(opts ...*options.ClientOptions) (*Client, error) { c, err := newClient(opts...) if err != nil { return nil, err } err = c.connect() if err != nil { return nil, err } return c, nil } // newClient creates a new client to connect to a deployment specified by the uri. // // When creating an options.ClientOptions, the order the methods are called matters. Later Set* // methods will overwrite the values from previous Set* method invocations. This includes the // ApplyURI method. This allows callers to determine the order of precedence for option // application. For instance, if ApplyURI is called before SetAuth, the Credential from // SetAuth will overwrite the values from the connection string. If ApplyURI is called // after SetAuth, then its values will overwrite those from SetAuth. // // The opts parameter is processed using options.MergeClientOptions, which will overwrite entire // option fields of previous options, there is no partial overwriting. For example, if Username is // set in the Auth field for the first option, and Password is set for the second but with no // Username, after the merge the Username field will be empty. func newClient(opts ...*options.ClientOptions) (*Client, error) { clientOpts := options.MergeClientOptions(opts...) id, err := uuid.New() if err != nil { return nil, err } client := &Client{ id: id, currentDriverInfo: &atomic.Pointer[options.DriverInfo]{}, } // ClusterClock client.clock = new(session.ClusterClock) // LocalThreshold client.localThreshold = defaultLocalThreshold if clientOpts.LocalThreshold != nil { client.localThreshold = *clientOpts.LocalThreshold } // Monitor if clientOpts.Monitor != nil { client.monitor = clientOpts.Monitor } // ServerMonitor if clientOpts.ServerMonitor != nil { client.serverMonitor = clientOpts.ServerMonitor } // ReadConcern client.readConcern = &readconcern.ReadConcern{} if clientOpts.ReadConcern != nil { client.readConcern = clientOpts.ReadConcern } // ReadPreference client.readPreference = readpref.Primary() if clientOpts.ReadPreference != nil { client.readPreference = clientOpts.ReadPreference } // BSONOptions if clientOpts.BSONOptions != nil { client.bsonOpts = clientOpts.BSONOptions } // Registry client.registry = defaultRegistry if clientOpts.Registry != nil { client.registry = clientOpts.Registry } // RetryWrites client.retryWrites = true // retry writes on by default if clientOpts.RetryWrites != nil { client.retryWrites = *clientOpts.RetryWrites } client.retryReads = true if clientOpts.RetryReads != nil { client.retryReads = *clientOpts.RetryReads } // Timeout client.timeout = clientOpts.Timeout client.httpClient = clientOpts.HTTPClient // WriteConcern if clientOpts.WriteConcern != nil { client.writeConcern = clientOpts.WriteConcern } // AutoEncryptionOptions if clientOpts.AutoEncryptionOptions != nil { client.isAutoEncryptionSet = true if err := client.configureAutoEncryption(clientOpts); err != nil { return nil, err } } else { client.cryptFLE = clientOpts.Crypt } // Deployment if clientOpts.Deployment != nil { client.deployment = clientOpts.Deployment } // Set default options if clientOpts.MaxPoolSize == nil { defaultMaxPoolSize := uint64(defaultMaxPoolSize) clientOpts.MaxPoolSize = &defaultMaxPoolSize } if clientOpts.Auth != nil { client.authenticator, err = auth.CreateAuthenticator( clientOpts.Auth.AuthMechanism, topology.ConvertCreds(clientOpts.Auth), clientOpts.HTTPClient, ) if err != nil { return nil, fmt.Errorf("error creating authenticator: %w", err) } } if clientOpts.DriverInfo != nil { client.AppendDriverInfo(*clientOpts.DriverInfo) } cfg, err := topology.NewAuthenticatorConfig(client.authenticator, topology.WithAuthConfigClock(client.clock), topology.WithAuthConfigClientOptions(clientOpts), topology.WithAuthConfigDriverInfo(client.currentDriverInfo), ) if err != nil { return nil, err } var connectTimeout time.Duration if clientOpts.ConnectTimeout != nil { connectTimeout = *clientOpts.ConnectTimeout } client.serverAPI = topology.ServerAPIFromServerOptions(connectTimeout, cfg.ServerOpts) if client.deployment == nil { client.deployment, err = topology.New(cfg) if err != nil { return nil, wrapErrors(err) } } // Create a logger for the client. client.logger, err = newLogger(clientOpts.LoggerOptions) if err != nil { return nil, fmt.Errorf("invalid logger options: %w", err) } return client, nil } // Connect initializes the Client by starting background monitoring goroutines. // If the Client was created using the NewClient function, this method must be called before a Client can be used. // // Connect starts background goroutines to monitor the state of the deployment and does not do any I/O in the main // goroutine. The Client.Ping method can be used to verify that the connection was created successfully. func (c *Client) connect() error { if connector, ok := c.deployment.(driver.Connector); ok { err := connector.Connect() if err != nil { return wrapErrors(err) } } if c.mongocryptdFLE != nil { if err := c.mongocryptdFLE.connect(); err != nil { return err } } if c.internalClientFLE != nil { if err := c.internalClientFLE.connect(); err != nil { return err } } if c.keyVaultClientFLE != nil && c.keyVaultClientFLE != c.internalClientFLE && c.keyVaultClientFLE != c { if err := c.keyVaultClientFLE.connect(); err != nil { return err } } if c.metadataClientFLE != nil && c.metadataClientFLE != c.internalClientFLE && c.metadataClientFLE != c { if err := c.metadataClientFLE.connect(); err != nil { return err } } var updateChan <-chan description.Topology if subscriber, ok := c.deployment.(driver.Subscriber); ok { sub, err := subscriber.Subscribe() if err != nil { return wrapErrors(err) } updateChan = sub.Updates } c.sessionPool = session.NewPool(updateChan) return nil } // AppendDriverInfo appends the provided [options.DriverInfo] to the metadata // (e.g. name, version, platform) that will be sent to the server in handshake // requests when establishing new connections. // // Repeated calls to AppendDriverInfo with equivalent DriverInfo is a no-op. // // Metadata is limited to 512 bytes; any excess will be truncated. func (c *Client) AppendDriverInfo(info options.DriverInfo) { if _, loaded := c.seenDriverInfo.LoadOrStore(info, struct{}{}); loaded { return } if old := c.currentDriverInfo.Load(); old != nil { if old.Name != "" && info.Name != "" && old.Name != info.Name { info.Name = old.Name + "|" + info.Name } else if old.Name != "" { info.Name = old.Name } if old.Version != "" && info.Version != "" && old.Version != info.Version { info.Version = old.Version + "|" + info.Version } else if old.Version != "" { info.Version = old.Version } if old.Platform != "" && info.Platform != "" && old.Platform != info.Platform { info.Platform = old.Platform + "|" + info.Platform } else if old.Platform != "" { info.Platform = old.Platform } } // Copy-on-write so that the info stored in the client is immutable. infoCopy := new(options.DriverInfo) *infoCopy = info c.currentDriverInfo.Store(infoCopy) } // Disconnect closes sockets to the topology referenced by this Client. It will // shut down any monitoring goroutines, close the idle connection pool, and will // wait until all the in use connections have been returned to the connection // pool and closed before returning. If the context expires via cancellation, // deadline, or timeout before the in use connections have returned, the in use // connections will be closed, resulting in the failure of any in flight read // or write operations. If this method returns with no errors, all connections // associated with this Client have been closed. func (c *Client) Disconnect(ctx context.Context) error { if c.logger != nil { defer c.logger.Close() } if ctx == nil { ctx = context.Background() } if c.httpClient == httputil.DefaultHTTPClient { defer httputil.CloseIdleHTTPConnections(c.httpClient) } c.endSessions(ctx) if c.mongocryptdFLE != nil { if err := c.mongocryptdFLE.disconnect(ctx); err != nil { return err } } if c.internalClientFLE != nil { if err := c.internalClientFLE.Disconnect(ctx); err != nil { return err } } if c.keyVaultClientFLE != nil && c.keyVaultClientFLE != c.internalClientFLE && c.keyVaultClientFLE != c { if err := c.keyVaultClientFLE.Disconnect(ctx); err != nil { return err } } if c.metadataClientFLE != nil && c.metadataClientFLE != c.internalClientFLE && c.metadataClientFLE != c { if err := c.metadataClientFLE.Disconnect(ctx); err != nil { return err } } if c.cryptFLE != nil { c.cryptFLE.Close() } if disconnector, ok := c.deployment.(driver.Disconnector); ok { return wrapErrors(disconnector.Disconnect(ctx)) } return nil } // Ping sends a ping command to verify that the client can connect to the deployment. // // The rp parameter is used to determine which server is selected for the operation. // If it is nil, the client's read preference is used. // // If the server is down, Ping will try to select a server until the client's server selection timeout expires. // This can be configured through the ClientOptions.SetServerSelectionTimeout option when creating a new Client. // After the timeout expires, a server selection error is returned. // // Using Ping reduces application resilience because applications starting up will error if the server is temporarily // unavailable or is failing over (e.g. during autoscaling due to a load spike). func (c *Client) Ping(ctx context.Context, rp *readpref.ReadPref) error { if ctx == nil { ctx = context.Background() } if rp == nil { rp = c.readPreference } db := c.Database("admin") res := db.RunCommand(ctx, bson.D{ {"ping", 1}, }, options.RunCmd().SetReadPreference(rp)) return wrapErrors(res.Err()) } // StartSession starts a new session configured with the given options. // // StartSession does not actually communicate with the server and will not error if the client is // disconnected. // // StartSession is safe to call from multiple goroutines concurrently. However, Sessions returned by StartSession are // not safe for concurrent use by multiple goroutines. // // If the DefaultReadConcern, DefaultWriteConcern, or DefaultReadPreference options are not set, the client's read // concern, write concern, or read preference will be used, respectively. func (c *Client) StartSession(opts ...options.Lister[options.SessionOptions]) (*Session, error) { sessArgs, err := mongoutil.NewOptions(opts...) if err != nil { return nil, err } if sessArgs.CausalConsistency == nil && (sessArgs.Snapshot == nil || !*sessArgs.Snapshot) { sessArgs.CausalConsistency = &options.DefaultCausalConsistency } coreOpts := &session.ClientOptions{ DefaultReadConcern: c.readConcern, DefaultReadPreference: c.readPreference, DefaultWriteConcern: c.writeConcern, } if sessArgs.CausalConsistency != nil { coreOpts.CausalConsistency = sessArgs.CausalConsistency } if bldr := sessArgs.DefaultTransactionOptions; bldr != nil { txnOpts, err := mongoutil.NewOptions[options.TransactionOptions](bldr) if err != nil { return nil, err } if rc := txnOpts.ReadConcern; rc != nil { coreOpts.DefaultReadConcern = rc } if wc := txnOpts.WriteConcern; wc != nil { coreOpts.DefaultWriteConcern = wc } if rp := txnOpts.ReadPreference; rp != nil { coreOpts.DefaultReadPreference = rp } } if sessArgs.Snapshot != nil { coreOpts.Snapshot = sessArgs.Snapshot } if sessArgs.SnapshotTime != nil { coreOpts.SnapshotTime = sessArgs.SnapshotTime } sess, err := session.NewClientSession(c.sessionPool, c.id, coreOpts) if err != nil { return nil, wrapErrors(err) } return &Session{ clientSession: sess, client: c, deployment: c.deployment, }, nil } func (c *Client) endSessions(ctx context.Context) { sessionIDs := c.sessionPool.IDSlice() op := operation.NewEndSessions(nil).ClusterClock(c.clock).Deployment(c.deployment). ServerSelector(&serverselector.ReadPref{ReadPref: readpref.PrimaryPreferred()}). CommandMonitor(c.monitor).Database("admin").Crypt(c.cryptFLE).ServerAPI(c.serverAPI) totalNumIDs := len(sessionIDs) var currentBatch []bsoncore.Document for i := 0; i < totalNumIDs; i++ { currentBatch = append(currentBatch, sessionIDs[i]) // If we are at the end of a batch or the end of the overall IDs array, execute the operation. if ((i+1)%endSessionsBatchSize) == 0 || i == totalNumIDs-1 { // Ignore all errors when ending sessions. _, marshalVal, err := bson.MarshalValue(currentBatch) if err == nil { _ = op.SessionIDs(marshalVal).Execute(ctx) } currentBatch = currentBatch[:0] } } } func (c *Client) configureAutoEncryption(args *options.ClientOptions) error { c.encryptedFieldsMap = args.AutoEncryptionOptions.EncryptedFieldsMap if err := c.configureKeyVaultClientFLE(args); err != nil { return err } if err := c.configureMetadataClientFLE(args); err != nil { return err } mc, err := c.newMongoCrypt(args.AutoEncryptionOptions) if err != nil { return err } // If the crypt_shared library was not loaded, try to spawn and connect to mongocryptd. if mc.CryptSharedLibVersionString() == "" { mongocryptdFLE, err := newMongocryptdClient(args.AutoEncryptionOptions) if err != nil { return err } c.mongocryptdFLE = mongocryptdFLE } c.configureCryptFLE(mc, args.AutoEncryptionOptions) return nil } func (c *Client) getOrCreateInternalClient(args *options.ClientOptions) (*Client, error) { if c.internalClientFLE != nil { return c.internalClientFLE, nil } argsCopy := *args argsCopy.AutoEncryptionOptions = nil argsCopy.MinPoolSize = ptrutil.Ptr[uint64](0) var err error c.internalClientFLE, err = newClient(&argsCopy) return c.internalClientFLE, err } func (c *Client) configureKeyVaultClientFLE(clientOpts *options.ClientOptions) error { aeOpts := clientOpts.AutoEncryptionOptions var err error switch { case aeOpts.KeyVaultClientOptions != nil: c.keyVaultClientFLE, err = newClient(aeOpts.KeyVaultClientOptions) case clientOpts.MaxPoolSize != nil && *clientOpts.MaxPoolSize == 0: c.keyVaultClientFLE = c default: c.keyVaultClientFLE, err = c.getOrCreateInternalClient(clientOpts) } if err != nil { return err } dbName, collName := splitNamespace(aeOpts.KeyVaultNamespace) c.keyVaultCollFLE = c.keyVaultClientFLE.Database(dbName).Collection(collName, keyVaultCollOpts) return nil } func (c *Client) configureMetadataClientFLE(clientOpts *options.ClientOptions) error { aeOpts := clientOpts.AutoEncryptionOptions if aeOpts.BypassAutoEncryption != nil && *aeOpts.BypassAutoEncryption { // no need for a metadata client. return nil } if clientOpts.MaxPoolSize != nil && *clientOpts.MaxPoolSize == 0 { c.metadataClientFLE = c return nil } var err error c.metadataClientFLE, err = c.getOrCreateInternalClient(clientOpts) return err } func (c *Client) newMongoCrypt(opts *options.AutoEncryptionOptions) (*mongocrypt.MongoCrypt, error) { // convert schemas in SchemaMap to bsoncore documents cryptSchemaMap := make(map[string]bsoncore.Document) for k, v := range opts.SchemaMap { schema, err := marshal(v, c.bsonOpts, c.registry) if err != nil { return nil, err } cryptSchemaMap[k] = schema } // convert schemas in EncryptedFieldsMap to bsoncore documents cryptEncryptedFieldsMap := make(map[string]bsoncore.Document) for k, v := range opts.EncryptedFieldsMap { encryptedFields, err := marshal(v, c.bsonOpts, c.registry) if err != nil { return nil, err } cryptEncryptedFieldsMap[k] = encryptedFields } kmsProviders, err := marshal(opts.KmsProviders, c.bsonOpts, c.registry) if err != nil { return nil, fmt.Errorf("error creating KMS providers document: %w", err) } // Set the crypt_shared library override path from the "cryptSharedLibPath" extra option if one // was set. cryptSharedLibPath := "" if val, ok := opts.ExtraOptions["cryptSharedLibPath"]; ok { str, ok := val.(string) if !ok { return nil, fmt.Errorf( `expected AutoEncryption extra option "cryptSharedLibPath" to be a string, but is a %T`, val) } cryptSharedLibPath = str } // Explicitly disable loading the crypt_shared library if requested. Note that this is ONLY // intended for use from tests; there is no supported public API for explicitly disabling // loading the crypt_shared library. cryptSharedLibDisabled := false if v, ok := opts.ExtraOptions["__cryptSharedLibDisabledForTestOnly"]; ok { cryptSharedLibDisabled = v.(bool) } bypassAutoEncryption := opts.BypassAutoEncryption != nil && *opts.BypassAutoEncryption bypassQueryAnalysis := opts.BypassQueryAnalysis != nil && *opts.BypassQueryAnalysis mc, err := mongocrypt.NewMongoCrypt(&mcopts.MongoCryptOptions{ KmsProviders: kmsProviders, LocalSchemaMap: cryptSchemaMap, BypassQueryAnalysis: bypassQueryAnalysis, EncryptedFieldsMap: cryptEncryptedFieldsMap, CryptSharedLibDisabled: cryptSharedLibDisabled || bypassAutoEncryption, CryptSharedLibOverridePath: cryptSharedLibPath, HTTPClient: opts.HTTPClient, KeyExpiration: opts.KeyExpiration, }) if err != nil { return nil, err } var cryptSharedLibRequired bool if val, ok := opts.ExtraOptions["cryptSharedLibRequired"]; ok { b, ok := val.(bool) if !ok { return nil, fmt.Errorf( `expected AutoEncryption extra option "cryptSharedLibRequired" to be a bool, but is a %T`, val) } cryptSharedLibRequired = b } // If the "cryptSharedLibRequired" extra option is set to true, check the MongoCrypt version // string to confirm that the library was successfully loaded. If the version string is empty, // return an error indicating that we couldn't load the crypt_shared library. if cryptSharedLibRequired && mc.CryptSharedLibVersionString() == "" { return nil, errors.New( `AutoEncryption extra option "cryptSharedLibRequired" is true, but we failed to load the crypt_shared library`) } return mc, nil } //nolint:unused // the unused linter thinks that this function is unreachable because "c.newMongoCrypt" always panics without the "cse" build tag set. func (c *Client) configureCryptFLE(mc *mongocrypt.MongoCrypt, opts *options.AutoEncryptionOptions) { bypass := opts.BypassAutoEncryption != nil && *opts.BypassAutoEncryption kr := keyRetriever{coll: c.keyVaultCollFLE} var cir collInfoRetriever // If bypass is true, c.metadataClientFLE is nil and the collInfoRetriever // will not be used. If bypass is false, to the parent client or the // internal client. if !bypass { cir = collInfoRetriever{client: c.metadataClientFLE} } c.cryptFLE = driver.NewCrypt(&driver.CryptOptions{ MongoCrypt: mc, CollInfoFn: cir.cryptCollInfo, KeyFn: kr.cryptKeys, MarkFn: c.mongocryptdFLE.markCommand, TLSConfig: opts.TLSConfig, BypassAutoEncryption: bypass, }) } // validSession returns an error if the session doesn't belong to the client func (c *Client) validSession(sess *session.Client) error { if sess != nil && sess.ClientID != c.id { return ErrWrongClient } return nil } // Database returns a handle for a database with the given name configured with the given DatabaseOptions. func (c *Client) Database(name string, opts ...options.Lister[options.DatabaseOptions]) *Database { return newDatabase(c, name, opts...) } // ListDatabases executes a listDatabases command and returns the result. // // The filter parameter must be a document containing query operators and can be used to select which // databases are included in the result. It cannot be nil. An empty document (e.g. bson.D{}) should be used to include // all databases. // // The opts parameter can be used to specify options for this operation (see the options.ListDatabasesOptions documentation). // // For more information about the command, see https://www.mongodb.com/docs/manual/reference/command/listDatabases/. func (c *Client) ListDatabases(ctx context.Context, filter any, opts ...options.Lister[options.ListDatabasesOptions]) (ListDatabasesResult, error) { if ctx == nil { ctx = context.Background() } sess := sessionFromContext(ctx) err := c.validSession(sess) if err != nil { return ListDatabasesResult{}, err } if sess == nil && c.sessionPool != nil { sess = session.NewImplicitClientSession(c.sessionPool, c.id) defer sess.EndSession() } err = c.validSession(sess) if err != nil { return ListDatabasesResult{}, err } filterDoc, err := marshal(filter, c.bsonOpts, c.registry) if err != nil { return ListDatabasesResult{}, err } var selector description.ServerSelector selector = &serverselector.Composite{ Selectors: []description.ServerSelector{ &serverselector.ReadPref{ReadPref: readpref.Primary()}, &serverselector.Latency{Latency: c.localThreshold}, }, } selector = makeReadPrefSelector(sess, selector, c.localThreshold) lda, err := mongoutil.NewOptions(opts...) if err != nil { return ListDatabasesResult{}, err } op := operation.NewListDatabases(filterDoc). Session(sess).ReadPreference(c.readPreference).CommandMonitor(c.monitor). ServerSelector(selector).ClusterClock(c.clock).Database("admin").Deployment(c.deployment).Crypt(c.cryptFLE). ServerAPI(c.serverAPI).Timeout(c.timeout).Authenticator(c.authenticator) if lda.NameOnly != nil { op = op.NameOnly(*lda.NameOnly) } if lda.AuthorizedDatabases != nil { op = op.AuthorizedDatabases(*lda.AuthorizedDatabases) } retry := driver.RetryNone if c.retryReads { retry = driver.RetryOncePerCommand } op.Retry(retry) err = op.Execute(ctx) if err != nil { return ListDatabasesResult{}, wrapErrors(err) } return newListDatabasesResultFromOperation(op.Result()), nil } // ListDatabaseNames executes a listDatabases command and returns a slice containing the names of all of the databases // on the server. // // The filter parameter must be a document containing query operators and can be used to select which databases // are included in the result. It cannot be nil. An empty document (e.g. bson.D{}) should be used to include all // databases. // // The opts parameter can be used to specify options for this operation (see the options.ListDatabasesOptions // documentation.) // // For more information about the command, see https://www.mongodb.com/docs/manual/reference/command/listDatabases/. func (c *Client) ListDatabaseNames(ctx context.Context, filter any, opts ...options.Lister[options.ListDatabasesOptions]) ([]string, error) { opts = append(opts, options.ListDatabases().SetNameOnly(true)) res, err := c.ListDatabases(ctx, filter, opts...) if err != nil { return nil, err } names := make([]string, 0) for _, spec := range res.Databases { names = append(names, spec.Name) } return names, nil } // WithSession creates a new session context from the ctx and sess parameters // and uses it to call the fn callback. // // WithSession is safe to call from multiple goroutines concurrently. However, // the context passed to the WithSession callback function is not safe for // concurrent use by multiple goroutines. // // If the ctx parameter already contains a Session, that Session will be // replaced with the one provided. // // Any error returned by the fn callback will be returned without any // modifications. func WithSession(ctx context.Context, sess *Session, fn func(context.Context) error) error { return fn(NewSessionContext(ctx, sess)) } // UseSession creates a new Session and uses it to create a new session context, // which is used to call the fn callback. After the callback returns, the // created Session is ended, meaning that any in-progress transactions started // by fn will be aborted even if fn returns an error. // // UseSession is safe to call from multiple goroutines concurrently. However, // the context passed to the UseSession callback function is not safe for // concurrent use by multiple goroutines. // // If the ctx parameter already contains a Session, that Session will be // replaced with the newly created one. // // Any error returned by the fn callback will be returned without any // modifications. func (c *Client) UseSession(ctx context.Context, fn func(context.Context) error) error { return c.UseSessionWithOptions(ctx, options.Session(), fn) } // UseSessionWithOptions operates like UseSession but uses the given // SessionOptions to create the Session. // // UseSessionWithOptions is safe to call from multiple goroutines concurrently. // However, the context passed to the UseSessionWithOptions callback function is // not safe for concurrent use by multiple goroutines. func (c *Client) UseSessionWithOptions( ctx context.Context, opts *options.SessionOptionsBuilder, fn func(context.Context) error, ) error { defaultSess, err := c.StartSession(opts) if err != nil { return err } defer defaultSess.EndSession(ctx) return fn(NewSessionContext(ctx, defaultSess)) } // Watch returns a change stream for all changes on the deployment. See // https://www.mongodb.com/docs/manual/changeStreams/ for more information about change streams. // // The client must be configured with read concern majority or no read concern for a change stream to be created // successfully. // // The pipeline parameter must be an array of documents, each representing a pipeline stage. The pipeline cannot be // nil or empty. The stage documents must all be non-nil. See https://www.mongodb.com/docs/manual/changeStreams/ for a list // of pipeline stages that can be used with change streams. For a pipeline of bson.D documents, the mongo.Pipeline{} // type can be used. // // The opts parameter can be used to specify options for change stream creation (see the options.ChangeStreamOptions // documentation). func (c *Client) Watch(ctx context.Context, pipeline any, opts ...options.Lister[options.ChangeStreamOptions], ) (*ChangeStream, error) { csConfig := changeStreamConfig{ readConcern: c.readConcern, readPreference: c.readPreference, client: c, bsonOpts: c.bsonOpts, registry: c.registry, streamType: ClientStream, crypt: c.cryptFLE, } return newChangeStream(ctx, csConfig, pipeline, opts...) } // NumberSessionsInProgress returns the number of sessions that have been started for this client but have not been // closed (i.e. EndSession has not been called). func (c *Client) NumberSessionsInProgress() int { // The underlying session pool uses an int64 for checkedOut to allow atomic // access. We convert to an int here to maintain backward compatibility with // older versions of the driver that did not atomically access checkedOut. return int(c.sessionPool.CheckedOut()) } func (c *Client) createBaseCursorOptions() driver.CursorOptions { return driver.CursorOptions{ CommandMonitor: c.monitor, Crypt: c.cryptFLE, ServerAPI: c.serverAPI, } } // ClientBulkWrite is a struct that can be used in a client-level BulkWrite operation. type ClientBulkWrite struct { Database string Collection string Model ClientWriteModel } // BulkWrite performs a client-level bulk write operation. func (c *Client) BulkWrite(ctx context.Context, writes []ClientBulkWrite, opts ...options.Lister[options.ClientBulkWriteOptions], ) (*ClientBulkWriteResult, error) { // TODO(GODRIVER-3403): Remove after support for QE with Client.bulkWrite. if c.isAutoEncryptionSet { return nil, errors.New("bulkWrite does not currently support automatic encryption") } if len(writes) == 0 { return nil, fmt.Errorf("invalid writes: %w", ErrEmptySlice) } bwo, err := mongoutil.NewOptions(opts...) if err != nil { return nil, err } if ctx == nil { ctx = context.Background() } sess := sessionFromContext(ctx) if sess == nil && c.sessionPool != nil { sess = session.NewImplicitClientSession(c.sessionPool, c.id) defer sess.EndSession() } err = c.validSession(sess) if err != nil { return nil, err } transactionRunning := sess.TransactionRunning() wc := c.writeConcern if transactionRunning { wc = nil } if bwo.WriteConcern != nil { if transactionRunning { return nil, errors.New("cannot set write concern after starting a transaction") } wc = bwo.WriteConcern } acknowledged := wc.Acknowledged() if !acknowledged { if bwo.Ordered == nil || *bwo.Ordered { return nil, errors.New("cannot request unacknowledged write concern and ordered writes") } sess = nil } writeSelector := &serverselector.Composite{ Selectors: []description.ServerSelector{ &serverselector.Write{}, &serverselector.Latency{Latency: c.localThreshold}, }, } selector := makePinnedSelector(sess, writeSelector) writePairs := make([]clientBulkWritePair, len(writes)) for i, w := range writes { writePairs[i] = clientBulkWritePair{ namespace: fmt.Sprintf("%s.%s", w.Database, w.Collection), model: w.Model, } } op := clientBulkWrite{ writePairs: writePairs, ordered: bwo.Ordered, bypassDocumentValidation: bwo.BypassDocumentValidation, comment: bwo.Comment, let: bwo.Let, session: sess, client: c, selector: selector, writeConcern: wc, } if rawData, ok := optionsutil.Value(bwo.Internal, "rawData").(bool); ok { op.rawData = &rawData } if additionalCmd, ok := optionsutil.Value(bwo.Internal, "addCommandFields").(bson.D); ok { op.additionalCmd = additionalCmd } if bwo.VerboseResults == nil || !(*bwo.VerboseResults) { op.errorsOnly = true } else if !acknowledged { return nil, errors.New("cannot request unacknowledged write concern and verbose results") } op.result.Acknowledged = acknowledged op.result.HasVerboseResults = !op.errorsOnly err = op.execute(ctx) return &op.result, wrapErrors(err) } // newLogger will use the LoggerOptions to create an internal logger and publish // messages using a LogSink. func newLogger(opts *options.LoggerOptions) (*logger.Logger, error) { // If there are no logger options, then create a default logger. if opts == nil { opts = options.Logger() } // If there are no component-level options and the environment does not // contain component variables, then do nothing. if len(opts.ComponentLevels) == 0 && !logger.EnvHasComponentVariables() { return nil, nil } // Otherwise, collect the component-level options and create a logger. componentLevels := make(map[logger.Component]logger.Level) for component, level := range opts.ComponentLevels { componentLevels[logger.Component(component)] = logger.Level(level) } return logger.New(opts.Sink, opts.MaxDocumentLength, componentLevels) }