Files
web/server/vendor/go.mongodb.org/mongo-driver/v2/mongo/database.go

989 lines
32 KiB
Go

// Copyright (C) MongoDB, Inc. 2017-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
package mongo
import (
"context"
"errors"
"fmt"
"time"
"go.mongodb.org/mongo-driver/v2/bson"
"go.mongodb.org/mongo-driver/v2/internal/csfle"
"go.mongodb.org/mongo-driver/v2/internal/csot"
"go.mongodb.org/mongo-driver/v2/internal/mongoutil"
"go.mongodb.org/mongo-driver/v2/internal/optionsutil"
"go.mongodb.org/mongo-driver/v2/internal/serverselector"
"go.mongodb.org/mongo-driver/v2/mongo/options"
"go.mongodb.org/mongo-driver/v2/mongo/readconcern"
"go.mongodb.org/mongo-driver/v2/mongo/readpref"
"go.mongodb.org/mongo-driver/v2/mongo/writeconcern"
"go.mongodb.org/mongo-driver/v2/x/bsonx/bsoncore"
"go.mongodb.org/mongo-driver/v2/x/mongo/driver"
"go.mongodb.org/mongo-driver/v2/x/mongo/driver/description"
"go.mongodb.org/mongo-driver/v2/x/mongo/driver/operation"
"go.mongodb.org/mongo-driver/v2/x/mongo/driver/session"
)
var defaultRunCmdOpts = []options.Lister[options.RunCmdOptions]{options.RunCmd().SetReadPreference(readpref.Primary())}
// Database is a handle to a MongoDB database. It is safe for concurrent use by multiple goroutines.
type Database struct {
client *Client
name string
readConcern *readconcern.ReadConcern
writeConcern *writeconcern.WriteConcern
readPreference *readpref.ReadPref
readSelector description.ServerSelector
writeSelector description.ServerSelector
bsonOpts *options.BSONOptions
registry *bson.Registry
}
func newDatabase(client *Client, name string, opts ...options.Lister[options.DatabaseOptions]) *Database {
args, _ := mongoutil.NewOptions[options.DatabaseOptions](opts...)
rc := client.readConcern
if args.ReadConcern != nil {
rc = args.ReadConcern
}
rp := client.readPreference
if args.ReadPreference != nil {
rp = args.ReadPreference
}
wc := client.writeConcern
if args.WriteConcern != nil {
wc = args.WriteConcern
}
bsonOpts := client.bsonOpts
if args.BSONOptions != nil {
bsonOpts = args.BSONOptions
}
reg := client.registry
if args.Registry != nil {
reg = args.Registry
}
db := &Database{
client: client,
name: name,
readPreference: rp,
readConcern: rc,
writeConcern: wc,
bsonOpts: bsonOpts,
registry: reg,
}
db.readSelector = &serverselector.Composite{
Selectors: []description.ServerSelector{
&serverselector.ReadPref{ReadPref: db.readPreference},
&serverselector.Latency{Latency: db.client.localThreshold},
},
}
db.writeSelector = &serverselector.Composite{
Selectors: []description.ServerSelector{
&serverselector.Write{},
&serverselector.Latency{Latency: db.client.localThreshold},
},
}
return db
}
// Client returns the Client the Database was created from.
func (db *Database) Client() *Client {
return db.client
}
// Name returns the name of the database.
func (db *Database) Name() string {
return db.name
}
// Collection returns a handle for a collection with the given name and options.
//
// If the collection does not exist on the server, it will be created when a
// write operation is performed.
func (db *Database) Collection(name string, opts ...options.Lister[options.CollectionOptions]) *Collection {
return newCollection(db, name, opts...)
}
// Aggregate executes an aggregate command the database.
//
// The pipeline parameter must be a slice of documents, each representing an aggregation stage. The pipeline
// cannot be nil but can be empty. The stage documents must all be non-nil. For a pipeline of bson.D documents, the
// mongo.Pipeline type can be used. See
// https://www.mongodb.com/docs/manual/reference/operator/aggregation-pipeline/#db-aggregate-stages for a list of valid
// stages in database-level aggregations.
//
// The opts parameter can be used to specify options for this operation (see the options.AggregateOptions documentation).
//
// For more information about the command, see https://www.mongodb.com/docs/manual/reference/command/aggregate/.
func (db *Database) Aggregate(
ctx context.Context,
pipeline any,
opts ...options.Lister[options.AggregateOptions],
) (*Cursor, error) {
a := aggregateParams{
ctx: ctx,
pipeline: pipeline,
client: db.client,
registry: db.registry,
readConcern: db.readConcern,
writeConcern: db.writeConcern,
retryRead: db.client.retryReads,
db: db.name,
readSelector: db.readSelector,
writeSelector: db.writeSelector,
readPreference: db.readPreference,
}
return aggregate(a, opts...)
}
func (db *Database) processRunCommand(
ctx context.Context,
cmd any,
cursorCommand bool,
opts ...options.Lister[options.RunCmdOptions],
) (*operation.Command, *session.Client, error) {
args, err := mongoutil.NewOptions[options.RunCmdOptions](append(defaultRunCmdOpts, opts...)...)
if err != nil {
return nil, nil, fmt.Errorf("failed to construct options from builder: %w", err)
}
sess := sessionFromContext(ctx)
if sess == nil && db.client.sessionPool != nil {
sess = session.NewImplicitClientSession(db.client.sessionPool, db.client.id)
}
if err := db.client.validSession(sess); err != nil {
return nil, sess, err
}
if sess != nil && sess.TransactionRunning() && args.ReadPreference != nil && args.ReadPreference.Mode() != readpref.PrimaryMode {
return nil, sess, errors.New("read preference in a transaction must be primary")
}
if isUnorderedMap(cmd) {
return nil, sess, ErrMapForOrderedArgument{"cmd"}
}
runCmdDoc, err := marshal(cmd, db.bsonOpts, db.registry)
if err != nil {
return nil, sess, err
}
var readSelect description.ServerSelector
readSelect = &serverselector.Composite{
Selectors: []description.ServerSelector{
&serverselector.ReadPref{ReadPref: args.ReadPreference},
&serverselector.Latency{Latency: db.client.localThreshold},
},
}
if sess != nil && sess.PinnedServerAddr != nil {
readSelect = makePinnedSelector(sess, readSelect)
}
var op *operation.Command
switch cursorCommand {
case true:
cursorOpts := db.client.createBaseCursorOptions()
cursorOpts.MarshalValueEncoderFn = newEncoderFn(db.bsonOpts, db.registry)
op = operation.NewCursorCommand(runCmdDoc, cursorOpts)
default:
op = operation.NewCommand(runCmdDoc)
}
return op.Session(sess).CommandMonitor(db.client.monitor).
ServerSelector(readSelect).ClusterClock(db.client.clock).
Database(db.name).Deployment(db.client.deployment).
Crypt(db.client.cryptFLE).ReadPreference(args.ReadPreference).ServerAPI(db.client.serverAPI).
Timeout(db.client.timeout).Logger(db.client.logger).Authenticator(db.client.authenticator), sess, nil
}
// RunCommand executes the given command against the database.
//
// This function does not obey the Database's readPreference. To specify a read
// preference, the RunCmdOptions.ReadPreference option must be used.
//
// This function does not obey the Database's readConcern or writeConcern. A
// user must supply these values manually in the user-provided runCommand
// parameter.
//
// The runCommand parameter must be a document for the command to be executed. It cannot be nil.
// This must be an order-preserving type such as bson.D. Map types such as bson.M are not valid.
//
// The opts parameter can be used to specify options for this operation (see the options.RunCmdOptions documentation).
//
// The behavior of RunCommand is undefined if the command document contains any of the following:
// - A session ID or any transaction-specific fields
// - API versioning options when an API version is already declared on the Client
// - maxTimeMS when Timeout is set on the Client
func (db *Database) RunCommand(
ctx context.Context,
runCommand any,
opts ...options.Lister[options.RunCmdOptions],
) *SingleResult {
if ctx == nil {
ctx = context.Background()
}
op, sess, err := db.processRunCommand(ctx, runCommand, false, opts...)
defer closeImplicitSession(sess)
if err != nil {
return &SingleResult{err: err}
}
err = op.Execute(ctx)
// RunCommand can be used to run a write, thus execute may return a write error
rr, convErr := processWriteError(err)
return &SingleResult{
ctx: ctx,
err: convErr,
rdr: bson.Raw(op.Result()),
bsonOpts: db.bsonOpts,
reg: db.registry,
Acknowledged: rr.isAcknowledged(),
}
}
// RunCommandCursor executes the given command against the database and parses the response as a cursor. If the command
// being executed does not return a cursor (e.g. insert), the command will be executed on the server and an error will
// be returned because the server response cannot be parsed as a cursor. This function does not obey the Database's read
// preference. To specify a read preference, the RunCmdOptions.ReadPreference option must be used.
//
// The runCommand parameter must be a document for the command to be executed. It cannot be nil.
// This must be an order-preserving type such as bson.D. Map types such as bson.M are not valid.
//
// The opts parameter can be used to specify options for this operation (see the options.RunCmdOptions documentation).
//
// The behavior of RunCommandCursor is undefined if the command document contains any of the following:
// - A session ID or any transaction-specific fields
// - API versioning options when an API version is already declared on the Client
// - maxTimeMS when Timeout is set on the Client
func (db *Database) RunCommandCursor(
ctx context.Context,
runCommand any,
opts ...options.Lister[options.RunCmdOptions],
) (*Cursor, error) {
if ctx == nil {
ctx = context.Background()
}
op, sess, err := db.processRunCommand(ctx, runCommand, true, opts...)
if err != nil {
closeImplicitSession(sess)
return nil, wrapErrors(err)
}
if err = op.Execute(ctx); err != nil {
closeImplicitSession(sess)
if errors.Is(err, driver.ErrNoCursor) {
return nil, errors.New(
"database response does not contain a cursor; try using RunCommand instead")
}
return nil, wrapErrors(err)
}
bc, err := op.ResultCursor()
if err != nil {
closeImplicitSession(sess)
return nil, wrapErrors(err)
}
cursor, err := newCursorWithSession(bc, db.bsonOpts, db.registry, sess,
withCursorOptionClientTimeout(db.client.timeout))
return cursor, wrapErrors(err)
}
// Drop drops the database on the server. This method ignores "namespace not found" errors so it is safe to drop
// a database that does not exist on the server.
func (db *Database) Drop(ctx context.Context) error {
if ctx == nil {
ctx = context.Background()
}
sess := sessionFromContext(ctx)
if sess == nil && db.client.sessionPool != nil {
sess = session.NewImplicitClientSession(db.client.sessionPool, db.client.id)
defer sess.EndSession()
}
err := db.client.validSession(sess)
if err != nil {
return err
}
wc := db.writeConcern
if sess.TransactionRunning() {
wc = nil
}
if !wc.Acknowledged() {
sess = nil
}
selector := makePinnedSelector(sess, db.writeSelector)
op := operation.NewDropDatabase().
Session(sess).WriteConcern(wc).CommandMonitor(db.client.monitor).
ServerSelector(selector).ClusterClock(db.client.clock).
Database(db.name).Deployment(db.client.deployment).Crypt(db.client.cryptFLE).
ServerAPI(db.client.serverAPI).Authenticator(db.client.authenticator)
err = op.Execute(ctx)
var driverErr driver.Error
if err != nil && (!errors.As(err, &driverErr) || !driverErr.NamespaceNotFound()) {
return wrapErrors(err)
}
return nil
}
// ListCollectionSpecifications executes a listCollections command and returns a slice of CollectionSpecification
// instances representing the collections in the database.
//
// The filter parameter must be a document containing query operators and can be used to select which collections
// are included in the result. It cannot be nil. An empty document (e.g. bson.D{}) should be used to include all
// collections.
//
// The opts parameter can be used to specify options for the operation (see the options.ListCollectionsOptions
// documentation).
//
// For more information about the command, see https://www.mongodb.com/docs/manual/reference/command/listCollections/.
func (db *Database) ListCollectionSpecifications(
ctx context.Context,
filter any,
opts ...options.Lister[options.ListCollectionsOptions],
) ([]CollectionSpecification, error) {
cursor, err := db.ListCollections(ctx, filter, opts...)
if err != nil {
return nil, err
}
var resp []struct {
Name string `bson:"name"`
Type string `bson:"type"`
Info *struct {
ReadOnly bool `bson:"readOnly"`
UUID *bson.Binary `bson:"uuid"`
} `bson:"info"`
Options bson.Raw `bson:"options"`
IDIndex indexListSpecificationResponse `bson:"idIndex"`
}
err = cursor.All(ctx, &resp)
if err != nil {
return nil, err
}
specs := make([]CollectionSpecification, len(resp))
for idx, spec := range resp {
specs[idx] = CollectionSpecification{
Name: spec.Name,
Type: spec.Type,
Options: spec.Options,
IDIndex: IndexSpecification(spec.IDIndex),
}
if spec.Info != nil {
specs[idx].ReadOnly = spec.Info.ReadOnly
specs[idx].UUID = spec.Info.UUID
}
// Pre-4.4 servers report a namespace in their responses, so we only set Namespace manually if it was not in
// the response.
if specs[idx].IDIndex.Namespace == "" {
specs[idx].IDIndex.Namespace = db.name + "." + specs[idx].Name
}
}
return specs, nil
}
// ListCollections executes a listCollections command and returns a cursor over the collections in the database.
//
// The filter parameter must be a document containing query operators and can be used to select which collections
// are included in the result. It cannot be nil. An empty document (e.g. bson.D{}) should be used to include all
// collections.
//
// The opts parameter can be used to specify options for the operation (see the options.ListCollectionsOptions
// documentation).
//
// For more information about the command, see https://www.mongodb.com/docs/manual/reference/command/listCollections/.
//
// BUG(benjirewis): ListCollections prevents listing more than 100 collections per database when running against
// MongoDB version 2.6.
func (db *Database) ListCollections(
ctx context.Context,
filter any,
opts ...options.Lister[options.ListCollectionsOptions],
) (*Cursor, error) {
if ctx == nil {
ctx = context.Background()
}
args, err := mongoutil.NewOptions[options.ListCollectionsOptions](opts...)
if err != nil {
return nil, fmt.Errorf("failed to construct options from builder: %w", err)
}
filterDoc, err := marshal(filter, db.bsonOpts, db.registry)
if err != nil {
return nil, err
}
sess := sessionFromContext(ctx)
if sess == nil && db.client.sessionPool != nil {
sess = session.NewImplicitClientSession(db.client.sessionPool, db.client.id)
}
err = db.client.validSession(sess)
if err != nil {
closeImplicitSession(sess)
return nil, err
}
var selector description.ServerSelector
selector = &serverselector.Composite{
Selectors: []description.ServerSelector{
&serverselector.ReadPref{ReadPref: readpref.Primary()},
&serverselector.Latency{Latency: db.client.localThreshold},
},
}
selector = makeReadPrefSelector(sess, selector, db.client.localThreshold)
op := operation.NewListCollections(filterDoc).
Session(sess).ReadPreference(db.readPreference).CommandMonitor(db.client.monitor).
ServerSelector(selector).ClusterClock(db.client.clock).
Database(db.name).Deployment(db.client.deployment).Crypt(db.client.cryptFLE).
ServerAPI(db.client.serverAPI).Timeout(db.client.timeout).Authenticator(db.client.authenticator)
cursorOpts := db.client.createBaseCursorOptions()
cursorOpts.MarshalValueEncoderFn = newEncoderFn(db.bsonOpts, db.registry)
if args.NameOnly != nil {
op = op.NameOnly(*args.NameOnly)
}
if args.BatchSize != nil {
cursorOpts.BatchSize = *args.BatchSize
op = op.BatchSize(*args.BatchSize)
}
if args.AuthorizedCollections != nil {
op = op.AuthorizedCollections(*args.AuthorizedCollections)
}
if rawData, ok := optionsutil.Value(args.Internal, "rawData").(bool); ok {
op = op.RawData(rawData)
}
retry := driver.RetryNone
if db.client.retryReads {
retry = driver.RetryOncePerCommand
}
op = op.Retry(retry)
err = op.Execute(ctx)
if err != nil {
closeImplicitSession(sess)
return nil, wrapErrors(err)
}
bc, err := op.Result(cursorOpts)
if err != nil {
closeImplicitSession(sess)
return nil, wrapErrors(err)
}
cursor, err := newCursorWithSession(bc, db.bsonOpts, db.registry, sess,
withCursorOptionClientTimeout(db.client.timeout))
return cursor, wrapErrors(err)
}
// ListCollectionNames executes a listCollections command and returns a slice containing the names of the collections
// in the database. This method requires driver version >= 1.1.0.
//
// The filter parameter must be a document containing query operators and can be used to select which collections
// are included in the result. It cannot be nil. An empty document (e.g. bson.D{}) should be used to include all
// collections.
//
// The opts parameter can be used to specify options for the operation (see the options.ListCollectionsOptions
// documentation).
//
// For more information about the command, see https://www.mongodb.com/docs/manual/reference/command/listCollections/.
//
// BUG(benjirewis): ListCollectionNames prevents listing more than 100 collections per database when running against
// MongoDB version 2.6.
func (db *Database) ListCollectionNames(
ctx context.Context,
filter any,
opts ...options.Lister[options.ListCollectionsOptions],
) ([]string, error) {
opts = append(opts, options.ListCollections().SetNameOnly(true))
res, err := db.ListCollections(ctx, filter, opts...)
if err != nil {
return nil, err
}
defer res.Close(ctx)
names := make([]string, 0)
for res.Next(ctx) {
elem, err := res.Current.LookupErr("name")
if err != nil {
return nil, err
}
if elem.Type != bson.TypeString {
return nil, fmt.Errorf("incorrect type for 'name'. got %v. want %v", elem.Type, bson.TypeString)
}
elemName := elem.StringValue()
names = append(names, elemName)
}
res.Close(ctx)
return names, nil
}
// Watch returns a change stream for all changes to the corresponding database. See
// https://www.mongodb.com/docs/manual/changeStreams/ for more information about change streams.
//
// The Database must be configured with read concern majority or no read concern for a change stream to be created
// successfully.
//
// The pipeline parameter must be a slice of documents, each representing a pipeline stage. The pipeline cannot be
// nil but can be empty. The stage documents must all be non-nil. See https://www.mongodb.com/docs/manual/changeStreams/ for
// a list of pipeline stages that can be used with change streams. For a pipeline of bson.D documents, the
// mongo.Pipeline{} type can be used.
//
// The opts parameter can be used to specify options for change stream creation (see the options.ChangeStreamOptions
// documentation).
func (db *Database) Watch(ctx context.Context, pipeline any,
opts ...options.Lister[options.ChangeStreamOptions],
) (*ChangeStream, error) {
csConfig := changeStreamConfig{
readConcern: db.readConcern,
readPreference: db.readPreference,
client: db.client,
bsonOpts: db.bsonOpts,
registry: db.registry,
streamType: DatabaseStream,
databaseName: db.Name(),
crypt: db.client.cryptFLE,
}
return newChangeStream(ctx, csConfig, pipeline, opts...)
}
// CreateCollection creates a new collection on the server with the specified
// name and options.
//
// MongoDB versions < 7.0 will return an error if the collection already exists.
// MongoDB versions >= 7.0 will not return an error if an existing collection
// created with the same name and options already exists.
//
// For more information about the command, see
// https://www.mongodb.com/docs/manual/reference/command/create/.
func (db *Database) CreateCollection(ctx context.Context, name string, opts ...options.Lister[options.CreateCollectionOptions]) error {
args, err := mongoutil.NewOptions(opts...)
if err != nil {
return fmt.Errorf("failed to construct options from builder: %w", err)
}
// Follow In-Use Encryption specification to check for encryptedFields.
// Check for encryptedFields from create options.
ef := args.EncryptedFields
// Check for encryptedFields from the client EncryptedFieldsMap.
if ef == nil {
ef = db.getEncryptedFieldsFromMap(name)
}
if ef != nil {
return db.createCollectionWithEncryptedFields(ctx, name, ef, opts...)
}
return db.createCollection(ctx, name, opts...)
}
// getEncryptedFieldsFromServer tries to get an "encryptedFields" document associated with collectionName by running the "listCollections" command.
// Returns nil and no error if the listCollections command succeeds, but "encryptedFields" is not present.
func (db *Database) getEncryptedFieldsFromServer(ctx context.Context, collectionName string) (any, error) {
// Check if collection has an EncryptedFields configured server-side.
collSpecs, err := db.ListCollectionSpecifications(ctx, bson.D{{"name", collectionName}})
if err != nil {
return nil, err
}
if len(collSpecs) == 0 {
return nil, nil
}
if len(collSpecs) > 1 {
return nil, fmt.Errorf("expected 1 or 0 results from listCollections, got %v", len(collSpecs))
}
collSpec := collSpecs[0]
rawValue, err := collSpec.Options.LookupErr("encryptedFields")
if errors.Is(err, bsoncore.ErrElementNotFound) {
return nil, nil
} else if err != nil {
return nil, err
}
encryptedFields, ok := rawValue.DocumentOK()
if !ok {
return nil, fmt.Errorf("expected encryptedFields of %v to be document, got %v", collectionName, rawValue.Type)
}
return encryptedFields, nil
}
// getEncryptedFieldsFromMap tries to get an "encryptedFields" document associated with collectionName by checking the client EncryptedFieldsMap.
// Returns nil and no error if an EncryptedFieldsMap is not configured, or does not contain an entry for collectionName.
func (db *Database) getEncryptedFieldsFromMap(collectionName string) any {
// Check the EncryptedFieldsMap
efMap := db.client.encryptedFieldsMap
if efMap == nil {
return nil
}
namespace := db.name + "." + collectionName
ef, ok := efMap[namespace]
if ok {
return ef
}
return nil
}
// createCollectionWithEncryptedFields creates a collection with an EncryptedFields.
func (db *Database) createCollectionWithEncryptedFields(
ctx context.Context,
name string,
ef any,
opts ...options.Lister[options.CreateCollectionOptions],
) error {
efBSON, err := marshal(ef, db.bsonOpts, db.registry)
if err != nil {
return fmt.Errorf("error transforming document: %w", err)
}
// Check the wire version to ensure server is 7.0.0 or newer.
// After the wire version check, and before creating the collections, it is possible the server state changes.
// That is OK. This wire version check is a best effort to inform users earlier if using a QEv2 driver with a QEv1 server.
{
const QEv2WireVersion = 21
ctx, cancel := csot.WithServerSelectionTimeout(ctx, db.client.deployment.GetServerSelectionTimeout())
defer cancel()
server, err := db.client.deployment.SelectServer(ctx, &serverselector.Write{})
if err != nil {
return fmt.Errorf("error selecting server to check maxWireVersion: %w", err)
}
conn, err := server.Connection(ctx)
if err != nil {
return fmt.Errorf("error getting connection to check maxWireVersion: %w", err)
}
defer conn.Close()
wireVersionRange := conn.Description().WireVersion
if wireVersionRange.Max < QEv2WireVersion {
return fmt.Errorf("driver support of Queryable Encryption is incompatible with server. Upgrade server to use Queryable Encryption. Got maxWireVersion %v but need maxWireVersion >= %v", wireVersionRange.Max, QEv2WireVersion)
}
}
// Create the two encryption-related, associated collections: `escCollection` and `ecocCollection`.
stateCollectionOpts := options.CreateCollection().
SetClusteredIndex(bson.D{{"key", bson.D{{"_id", 1}}}, {"unique", true}})
// Create ESCCollection.
escCollection, err := csfle.GetEncryptedStateCollectionName(efBSON, name, csfle.EncryptedStateCollection)
if err != nil {
return err
}
if err := db.createCollection(ctx, escCollection, stateCollectionOpts); err != nil {
return err
}
// Create ECOCCollection.
ecocCollection, err := csfle.GetEncryptedStateCollectionName(efBSON, name, csfle.EncryptedCompactionCollection)
if err != nil {
return err
}
if err := db.createCollection(ctx, ecocCollection, stateCollectionOpts); err != nil {
return err
}
// Create a data collection with the 'encryptedFields' option.
op, err := db.createCollectionOperation(name, opts...)
if err != nil {
return err
}
op.EncryptedFields(efBSON)
if err := db.executeCreateOperation(ctx, op); err != nil {
return err
}
// Create an index on the __safeContent__ field in the collection @collectionName.
if _, err := db.Collection(name).Indexes().CreateOne(ctx, IndexModel{Keys: bson.D{{"__safeContent__", 1}}}); err != nil {
return fmt.Errorf("error creating safeContent index: %w", err)
}
return nil
}
// createCollection creates a collection without EncryptedFields.
func (db *Database) createCollection(
ctx context.Context,
name string,
opts ...options.Lister[options.CreateCollectionOptions],
) error {
op, err := db.createCollectionOperation(name, opts...)
if err != nil {
return err
}
return db.executeCreateOperation(ctx, op)
}
func (db *Database) createCollectionOperation(
name string,
opts ...options.Lister[options.CreateCollectionOptions],
) (*operation.Create, error) {
args, err := mongoutil.NewOptions[options.CreateCollectionOptions](opts...)
if err != nil {
return nil, fmt.Errorf("failed to construct options from builder: %w", err)
}
op := operation.NewCreate(name).ServerAPI(db.client.serverAPI).Authenticator(db.client.authenticator)
if args.Capped != nil {
op.Capped(*args.Capped)
}
if args.Collation != nil {
op.Collation(bsoncore.Document(toDocument(args.Collation)))
}
if args.ChangeStreamPreAndPostImages != nil {
csppi, err := marshal(args.ChangeStreamPreAndPostImages, db.bsonOpts, db.registry)
if err != nil {
return nil, err
}
op.ChangeStreamPreAndPostImages(csppi)
}
if args.DefaultIndexOptions != nil {
defaultIndexArgs, err := mongoutil.NewOptions[options.DefaultIndexOptions](args.DefaultIndexOptions)
if err != nil {
return nil, fmt.Errorf("failed to construct DefaultIndexArgs from options: %w", err)
}
idx, doc := bsoncore.AppendDocumentStart(nil)
if defaultIndexArgs.StorageEngine != nil {
storageEngine, err := marshal(defaultIndexArgs.StorageEngine, db.bsonOpts, db.registry)
if err != nil {
return nil, err
}
doc = bsoncore.AppendDocumentElement(doc, "storageEngine", storageEngine)
}
doc, err = bsoncore.AppendDocumentEnd(doc, idx)
if err != nil {
return nil, err
}
op.IndexOptionDefaults(doc)
}
if args.MaxDocuments != nil {
op.Max(*args.MaxDocuments)
}
if args.SizeInBytes != nil {
op.Size(*args.SizeInBytes)
}
if args.StorageEngine != nil {
storageEngine, err := marshal(args.StorageEngine, db.bsonOpts, db.registry)
if err != nil {
return nil, err
}
op.StorageEngine(storageEngine)
}
if args.ValidationAction != nil {
op.ValidationAction(*args.ValidationAction)
}
if args.ValidationLevel != nil {
op.ValidationLevel(*args.ValidationLevel)
}
if args.Validator != nil {
validator, err := marshal(args.Validator, db.bsonOpts, db.registry)
if err != nil {
return nil, err
}
op.Validator(validator)
}
if args.ExpireAfterSeconds != nil {
op.ExpireAfterSeconds(*args.ExpireAfterSeconds)
}
if args.TimeSeriesOptions != nil {
timeSeriesArgs, err := mongoutil.NewOptions[options.TimeSeriesOptions](args.TimeSeriesOptions)
if err != nil {
return nil, fmt.Errorf("failed to construct DefaultIndexArgs from options: %w", err)
}
idx, doc := bsoncore.AppendDocumentStart(nil)
doc = bsoncore.AppendStringElement(doc, "timeField", timeSeriesArgs.TimeField)
if timeSeriesArgs.MetaField != nil {
doc = bsoncore.AppendStringElement(doc, "metaField", *timeSeriesArgs.MetaField)
}
if timeSeriesArgs.Granularity != nil {
doc = bsoncore.AppendStringElement(doc, "granularity", *timeSeriesArgs.Granularity)
}
if timeSeriesArgs.BucketMaxSpan != nil {
bmss := int64(*timeSeriesArgs.BucketMaxSpan / time.Second)
doc = bsoncore.AppendInt64Element(doc, "bucketMaxSpanSeconds", bmss)
}
if timeSeriesArgs.BucketRounding != nil {
brs := int64(*timeSeriesArgs.BucketRounding / time.Second)
doc = bsoncore.AppendInt64Element(doc, "bucketRoundingSeconds", brs)
}
doc, err = bsoncore.AppendDocumentEnd(doc, idx)
if err != nil {
return nil, err
}
op.TimeSeries(doc)
}
if args.ClusteredIndex != nil {
clusteredIndex, err := marshal(args.ClusteredIndex, db.bsonOpts, db.registry)
if err != nil {
return nil, err
}
op.ClusteredIndex(clusteredIndex)
}
return op, nil
}
// CreateView creates a view on the server.
//
// The viewName parameter specifies the name of the view to create. The viewOn
// parameter specifies the name of the collection or view on which this view
// will be created. The pipeline parameter specifies an aggregation pipeline
// that will be exececuted against the source collection or view to create this
// view.
//
// MongoDB versions < 7.0 will return an error if the view already exists.
// MongoDB versions >= 7.0 will not return an error if an existing view created
// with the same name and options already exists.
//
// See https://www.mongodb.com/docs/manual/core/views/ for more information
// about views.
func (db *Database) CreateView(ctx context.Context, viewName, viewOn string, pipeline any,
opts ...options.Lister[options.CreateViewOptions],
) error {
pipelineArray, _, err := marshalAggregatePipeline(pipeline, db.bsonOpts, db.registry)
if err != nil {
return err
}
op := operation.NewCreate(viewName).
ViewOn(viewOn).
Pipeline(pipelineArray).
ServerAPI(db.client.serverAPI).Authenticator(db.client.authenticator)
args, err := mongoutil.NewOptions(opts...)
if err != nil {
return fmt.Errorf("failed to construct options from builder: %w", err)
}
if args.Collation != nil {
op.Collation(bsoncore.Document(toDocument(args.Collation)))
}
return db.executeCreateOperation(ctx, op)
}
func (db *Database) executeCreateOperation(ctx context.Context, op *operation.Create) error {
sess := sessionFromContext(ctx)
if sess == nil && db.client.sessionPool != nil {
sess = session.NewImplicitClientSession(db.client.sessionPool, db.client.id)
defer sess.EndSession()
}
err := db.client.validSession(sess)
if err != nil {
return err
}
wc := db.writeConcern
if sess.TransactionRunning() {
wc = nil
}
if !wc.Acknowledged() {
sess = nil
}
selector := makePinnedSelector(sess, db.writeSelector)
op = op.Session(sess).
WriteConcern(wc).
CommandMonitor(db.client.monitor).
ServerSelector(selector).
ClusterClock(db.client.clock).
Database(db.name).
Deployment(db.client.deployment).
Crypt(db.client.cryptFLE)
return wrapErrors(op.Execute(ctx))
}
// GridFSBucket is used to construct a GridFS bucket which can be used as a
// container for files.
func (db *Database) GridFSBucket(opts ...options.Lister[options.BucketOptions]) *GridFSBucket {
b := &GridFSBucket{
name: "fs",
chunkSize: DefaultGridFSChunkSize,
db: db,
}
bo, _ := mongoutil.NewOptions[options.BucketOptions](opts...)
if bo.Name != nil {
b.name = *bo.Name
}
if bo.ChunkSizeBytes != nil {
b.chunkSize = *bo.ChunkSizeBytes
}
if bo.WriteConcern != nil {
b.wc = bo.WriteConcern
}
if bo.ReadConcern != nil {
b.rc = bo.ReadConcern
}
if bo.ReadPreference != nil {
b.rp = bo.ReadPreference
}
collOpts := options.Collection().SetWriteConcern(b.wc).SetReadConcern(b.rc).SetReadPreference(b.rp)
b.chunksColl = db.Collection(b.name+".chunks", collOpts)
b.filesColl = db.Collection(b.name+".files", collOpts)
b.readBuf = make([]byte, b.chunkSize)
b.writeBuf = make([]byte, b.chunkSize)
return b
}