Compare commits
3 Commits
Author | SHA1 | Date | |
---|---|---|---|
be9b9e8ccf
|
|||
28cdfc5bd2
|
|||
10a6627323
|
12
go.mod
12
go.mod
@@ -21,17 +21,17 @@ require (
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/bytedance/sonic v1.12.7 // indirect
|
||||
github.com/bytedance/sonic/loader v0.2.2 // indirect
|
||||
github.com/cloudwego/base64x v0.1.4 // indirect
|
||||
github.com/bytedance/sonic v1.12.8 // indirect
|
||||
github.com/bytedance/sonic/loader v0.2.3 // indirect
|
||||
github.com/cloudwego/base64x v0.1.5 // indirect
|
||||
github.com/cloudwego/iasm v0.2.0 // indirect
|
||||
github.com/dustin/go-humanize v1.0.1 // indirect
|
||||
github.com/gabriel-vasile/mimetype v1.4.8 // indirect
|
||||
github.com/gin-contrib/sse v1.0.0 // indirect
|
||||
github.com/go-playground/locales v0.14.1 // indirect
|
||||
github.com/go-playground/universal-translator v0.18.1 // indirect
|
||||
github.com/go-playground/validator/v10 v10.23.0 // indirect
|
||||
github.com/goccy/go-json v0.10.4 // indirect
|
||||
github.com/go-playground/validator/v10 v10.24.0 // indirect
|
||||
github.com/goccy/go-json v0.10.5 // indirect
|
||||
github.com/golang/snappy v0.0.4 // indirect
|
||||
github.com/google/uuid v1.5.0 // indirect
|
||||
github.com/json-iterator/go v1.1.12 // indirect
|
||||
@@ -55,7 +55,7 @@ require (
|
||||
golang.org/x/image v0.23.0 // indirect
|
||||
golang.org/x/net v0.34.0 // indirect
|
||||
golang.org/x/text v0.21.0 // indirect
|
||||
google.golang.org/protobuf v1.36.2 // indirect
|
||||
google.golang.org/protobuf v1.36.4 // indirect
|
||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||
modernc.org/libc v1.37.6 // indirect
|
||||
modernc.org/mathutil v1.6.0 // indirect
|
||||
|
12
go.sum
12
go.sum
@@ -11,6 +11,8 @@ github.com/bytedance/sonic v1.12.6 h1:/isNmCUF2x3Sh8RAp/4mh4ZGkcFAX/hLrzrK3AvpRz
|
||||
github.com/bytedance/sonic v1.12.6/go.mod h1:B8Gt/XvtZ3Fqj+iSKMypzymZxw/FVwgIGKzMzT9r/rk=
|
||||
github.com/bytedance/sonic v1.12.7 h1:CQU8pxOy9HToxhndH0Kx/S1qU/CuS9GnKYrGioDcU1Q=
|
||||
github.com/bytedance/sonic v1.12.7/go.mod h1:tnbal4mxOMju17EGfknm2XyYcpyCnIROYOEYuemj13I=
|
||||
github.com/bytedance/sonic v1.12.8 h1:4xYRVRlXIgvSZ4e8iVTlMF5szgpXd4AfvuWgA8I8lgs=
|
||||
github.com/bytedance/sonic v1.12.8/go.mod h1:uVvFidNmlt9+wa31S1urfwwthTWteBgG0hWuoKAXTx8=
|
||||
github.com/bytedance/sonic/loader v0.1.1/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU=
|
||||
github.com/bytedance/sonic/loader v0.2.0 h1:zNprn+lsIP06C/IqCHs3gPQIvnvpKbbxyXQP1iU4kWM=
|
||||
github.com/bytedance/sonic/loader v0.2.0/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU=
|
||||
@@ -18,8 +20,12 @@ github.com/bytedance/sonic/loader v0.2.1 h1:1GgorWTqf12TA8mma4DDSbaQigE2wOgQo7iC
|
||||
github.com/bytedance/sonic/loader v0.2.1/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU=
|
||||
github.com/bytedance/sonic/loader v0.2.2 h1:jxAJuN9fOot/cyz5Q6dUuMJF5OqQ6+5GfA8FjjQ0R4o=
|
||||
github.com/bytedance/sonic/loader v0.2.2/go.mod h1:N8A3vUdtUebEY2/VQC0MyhYeKUFosQU6FxH2JmUe6VI=
|
||||
github.com/bytedance/sonic/loader v0.2.3 h1:yctD0Q3v2NOGfSWPLPvG2ggA2kV6TS6s4wioyEqssH0=
|
||||
github.com/bytedance/sonic/loader v0.2.3/go.mod h1:N8A3vUdtUebEY2/VQC0MyhYeKUFosQU6FxH2JmUe6VI=
|
||||
github.com/cloudwego/base64x v0.1.4 h1:jwCgWpFanWmN8xoIUHa2rtzmkd5J2plF/dnLS6Xd/0Y=
|
||||
github.com/cloudwego/base64x v0.1.4/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w=
|
||||
github.com/cloudwego/base64x v0.1.5 h1:XPciSp1xaq2VCSt6lF0phncD4koWyULpl5bUxbfCyP4=
|
||||
github.com/cloudwego/base64x v0.1.5/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w=
|
||||
github.com/cloudwego/iasm v0.2.0 h1:1KNIy1I1H9hNNFEEH3DVnI4UujN+1zjpuk6gwHLTssg=
|
||||
github.com/cloudwego/iasm v0.2.0/go.mod h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQPiEFhY=
|
||||
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
|
||||
@@ -56,12 +62,16 @@ github.com/go-playground/validator/v10 v10.22.1 h1:40JcKH+bBNGFczGuoBYgX4I6m/i27
|
||||
github.com/go-playground/validator/v10 v10.22.1/go.mod h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM=
|
||||
github.com/go-playground/validator/v10 v10.23.0 h1:/PwmTwZhS0dPkav3cdK9kV1FsAmrL8sThn8IHr/sO+o=
|
||||
github.com/go-playground/validator/v10 v10.23.0/go.mod h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM=
|
||||
github.com/go-playground/validator/v10 v10.24.0 h1:KHQckvo8G6hlWnrPX4NJJ+aBfWNAE/HH+qdL2cBpCmg=
|
||||
github.com/go-playground/validator/v10 v10.24.0/go.mod h1:GGzBIJMuE98Ic/kJsBXbz1x/7cByt++cQ+YOuDM5wus=
|
||||
github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y=
|
||||
github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg=
|
||||
github.com/goccy/go-json v0.10.3 h1:KZ5WoDbxAIgm2HNbYckL0se1fHD6rz5j4ywS6ebzDqA=
|
||||
github.com/goccy/go-json v0.10.3/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M=
|
||||
github.com/goccy/go-json v0.10.4 h1:JSwxQzIqKfmFX1swYPpUThQZp/Ka4wzJdK0LWVytLPM=
|
||||
github.com/goccy/go-json v0.10.4/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M=
|
||||
github.com/goccy/go-json v0.10.5 h1:Fq85nIqj+gXn/S5ahsiTlK3TmC85qgirsdTP/+DeaC4=
|
||||
github.com/goccy/go-json v0.10.5/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M=
|
||||
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
|
||||
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
|
||||
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
|
||||
@@ -255,6 +265,8 @@ google.golang.org/protobuf v1.36.1 h1:yBPeRvTftaleIgM3PZ/WBIZ7XM/eEYAaEyCwvyjq/g
|
||||
google.golang.org/protobuf v1.36.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
|
||||
google.golang.org/protobuf v1.36.2 h1:R8FeyR1/eLmkutZOM5CWghmo5itiG9z0ktFlTVLuTmU=
|
||||
google.golang.org/protobuf v1.36.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
|
||||
google.golang.org/protobuf v1.36.4 h1:6A3ZDJHn/eNqc1i+IdefRzy/9PokBTPvcqMySR7NNIM=
|
||||
google.golang.org/protobuf v1.36.4/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
|
@@ -1,5 +1,5 @@
|
||||
package goext
|
||||
|
||||
const GoextVersion = "0.0.558"
|
||||
const GoextVersion = "0.0.561"
|
||||
|
||||
const GoextVersionTimestamp = "2025-01-10T15:36:23+0100"
|
||||
const GoextVersionTimestamp = "2025-01-29T11:02:41+0100"
|
||||
|
21
langext/iter.go
Normal file
21
langext/iter.go
Normal file
@@ -0,0 +1,21 @@
|
||||
package langext
|
||||
|
||||
import (
|
||||
"iter"
|
||||
)
|
||||
|
||||
func IterSingleValueSeq[T any](value T) iter.Seq[T] {
|
||||
return func(yield func(T) bool) {
|
||||
if !yield(value) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func IterSingleValueSeq2[T1 any, T2 any](v1 T1, v2 T2) iter.Seq2[T1, T2] {
|
||||
return func(yield func(T1, T2) bool) {
|
||||
if !yield(v1, v2) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
@@ -7,9 +7,10 @@ import (
|
||||
"go.mongodb.org/mongo-driver/mongo/options"
|
||||
"gogs.mikescher.com/BlackForestBytes/goext/exerr"
|
||||
"gogs.mikescher.com/BlackForestBytes/goext/langext"
|
||||
"iter"
|
||||
)
|
||||
|
||||
func (c *Coll[TData]) Find(ctx context.Context, filter bson.M, opts ...*options.FindOptions) ([]TData, error) {
|
||||
func (c *Coll[TData]) createFindQuery(ctx context.Context, filter bson.M, opts ...*options.FindOptions) (*mongo.Cursor, error) {
|
||||
|
||||
pipeline := mongo.Pipeline{}
|
||||
pipeline = append(pipeline, bson.D{{Key: "$match", Value: filter}})
|
||||
@@ -64,6 +65,16 @@ func (c *Coll[TData]) Find(ctx context.Context, filter bson.M, opts ...*options.
|
||||
return nil, exerr.Wrap(err, "mongo-aggregation failed").Any("pipeline", pipeline).Str("collection", c.Name()).Build()
|
||||
}
|
||||
|
||||
return cursor, nil
|
||||
}
|
||||
|
||||
func (c *Coll[TData]) Find(ctx context.Context, filter bson.M, opts ...*options.FindOptions) ([]TData, error) {
|
||||
|
||||
cursor, err := c.createFindQuery(ctx, filter, opts...)
|
||||
if err != nil {
|
||||
return nil, exerr.Wrap(err, "").Build()
|
||||
}
|
||||
|
||||
defer func() { _ = cursor.Close(ctx) }()
|
||||
|
||||
res, err := c.decodeAll(ctx, cursor)
|
||||
@@ -74,6 +85,57 @@ func (c *Coll[TData]) Find(ctx context.Context, filter bson.M, opts ...*options.
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (c *Coll[TData]) FindIterateFunc(ctx context.Context, filter bson.M, fn func(v TData) error, opts ...*options.FindOptions) error {
|
||||
|
||||
cursor, err := c.createFindQuery(ctx, filter, opts...)
|
||||
if err != nil {
|
||||
return exerr.Wrap(err, "").Build()
|
||||
}
|
||||
|
||||
defer func() { _ = cursor.Close(ctx) }()
|
||||
|
||||
for cursor.Next(ctx) {
|
||||
|
||||
v, err := c.decodeSingle(ctx, cursor)
|
||||
if err != nil {
|
||||
return exerr.Wrap(err, "").Build()
|
||||
}
|
||||
|
||||
err = fn(v)
|
||||
if err != nil {
|
||||
return exerr.Wrap(err, "").Build()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Coll[TData]) FindIterate(ctx context.Context, filter bson.M, opts ...*options.FindOptions) iter.Seq2[TData, error] {
|
||||
cursor, err := c.createFindQuery(ctx, filter, opts...)
|
||||
if err != nil {
|
||||
return langext.IterSingleValueSeq2[TData, error](nil, exerr.Wrap(err, "").Build())
|
||||
}
|
||||
|
||||
return func(yield func(TData, error) bool) {
|
||||
defer func() { _ = cursor.Close(ctx) }()
|
||||
|
||||
for cursor.Next(ctx) {
|
||||
v, err := c.decodeSingle(ctx, cursor)
|
||||
if err != nil {
|
||||
if !yield(nil, err) {
|
||||
return
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
if !yield(v, nil) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// converts FindOptions to AggregateOptions
|
||||
func convertFindOpt(v *options.FindOptions) (*options.AggregateOptions, error) {
|
||||
if v == nil {
|
||||
|
345
wmo/queryList.go
345
wmo/queryList.go
@@ -7,6 +7,7 @@ import (
|
||||
ct "gogs.mikescher.com/BlackForestBytes/goext/cursortoken"
|
||||
"gogs.mikescher.com/BlackForestBytes/goext/exerr"
|
||||
"gogs.mikescher.com/BlackForestBytes/goext/langext"
|
||||
"iter"
|
||||
)
|
||||
|
||||
func (c *Coll[TData]) List(ctx context.Context, filter ct.Filter, pageSize *int, inTok ct.CursorToken) ([]TData, ct.CursorToken, error) {
|
||||
@@ -20,8 +21,8 @@ func (c *Coll[TData]) List(ctx context.Context, filter ct.Filter, pageSize *int,
|
||||
return nil, ct.End(), err
|
||||
}
|
||||
return d, tok, nil
|
||||
} else if ctks, ok := inTok.(ct.CTPaginated); ok {
|
||||
d, tok, err := c.listWithPaginatedToken(ctx, filter, pageSize, ctks)
|
||||
} else if ctpag, ok := inTok.(ct.CTPaginated); ok {
|
||||
d, tok, err := c.listWithPaginatedToken(ctx, filter, pageSize, ctpag)
|
||||
if err != nil {
|
||||
return nil, ct.End(), err
|
||||
}
|
||||
@@ -31,159 +32,78 @@ func (c *Coll[TData]) List(ctx context.Context, filter ct.Filter, pageSize *int,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Coll[TData]) listWithKSToken(ctx context.Context, filter ct.Filter, pageSize *int, inTok ct.CTKeySort) ([]TData, ct.CursorToken, error) {
|
||||
if inTok.Mode == ct.CTMEnd {
|
||||
return make([]TData, 0), ct.End(), nil
|
||||
}
|
||||
func (c *Coll[TData]) ListIterateFunc(ctx context.Context, filter ct.Filter, pageSize *int, inTok ct.CursorToken, fn func(v TData) error) error {
|
||||
var cursor *mongo.Cursor
|
||||
var err error
|
||||
|
||||
if pageSize != nil && *pageSize == 0 {
|
||||
return make([]TData, 0), inTok, nil // fast track, we return an empty list and do not advance the cursor token
|
||||
}
|
||||
|
||||
pipeline := mongo.Pipeline{}
|
||||
pf1 := "_id"
|
||||
pd1 := ct.SortASC
|
||||
pf2 := "_id"
|
||||
pd2 := ct.SortASC
|
||||
|
||||
if filter != nil {
|
||||
pipeline = filter.FilterQuery(ctx)
|
||||
pf1, pd1, pf2, pd2 = filter.Pagination(ctx)
|
||||
}
|
||||
|
||||
sortPrimary := pf1
|
||||
sortDirPrimary := pd1
|
||||
sortSecondary := &pf2
|
||||
sortDirSecondary := &pd2
|
||||
|
||||
if pf1 == pf2 {
|
||||
sortSecondary = nil
|
||||
sortDirSecondary = nil
|
||||
}
|
||||
|
||||
paginationPipeline, doubleSortPipeline, err := createPaginationPipeline(c, inTok, sortPrimary, sortDirPrimary, sortSecondary, sortDirSecondary, pageSize)
|
||||
if ctks, ok := inTok.(ct.CTKeySort); ok {
|
||||
_, _, _, _, cursor, err = c.createKSListQuery(ctx, filter, pageSize, ctks)
|
||||
if err != nil {
|
||||
return nil, nil, exerr.
|
||||
Wrap(err, "failed to create pagination").
|
||||
WithType(exerr.TypeCursorTokenDecode).
|
||||
Str("collection", c.Name()).
|
||||
Any("inTok", inTok).
|
||||
Any("sortPrimary", sortPrimary).
|
||||
Any("sortDirPrimary", sortDirPrimary).
|
||||
Any("sortSecondary", sortSecondary).
|
||||
Any("sortDirSecondary", sortDirSecondary).
|
||||
Any("pageSize", pageSize).
|
||||
Build()
|
||||
return exerr.Wrap(err, "").Build()
|
||||
}
|
||||
|
||||
pipeline = append(pipeline, paginationPipeline...)
|
||||
|
||||
for _, ppl := range c.extraModPipeline {
|
||||
pipeline = langext.ArrConcat(pipeline, ppl(ctx))
|
||||
}
|
||||
|
||||
if c.needsDoubleSort(ctx) {
|
||||
pipeline = langext.ArrConcat(pipeline, doubleSortPipeline)
|
||||
}
|
||||
|
||||
cursor, err := c.coll.Aggregate(ctx, pipeline)
|
||||
} else if ctpag, ok := inTok.(ct.CTPaginated); ok {
|
||||
_, cursor, err = c.createPaginatedListQuery(ctx, filter, pageSize, ctpag)
|
||||
if err != nil {
|
||||
return nil, nil, exerr.Wrap(err, "mongo-aggregation failed").Any("pipeline", pipeline).Str("collection", c.Name()).Build()
|
||||
return exerr.Wrap(err, "").Build()
|
||||
}
|
||||
} else {
|
||||
return exerr.New(exerr.TypeCursorTokenDecode, "unknown ct type").Any("token", inTok).Type("tokenType", inTok).Build()
|
||||
}
|
||||
|
||||
defer func() { _ = cursor.Close(ctx) }()
|
||||
|
||||
// fast branch
|
||||
if pageSize == nil {
|
||||
entries, err := c.decodeAll(ctx, cursor)
|
||||
for cursor.Next(ctx) {
|
||||
|
||||
v, err := c.decodeSingle(ctx, cursor)
|
||||
if err != nil {
|
||||
return nil, nil, exerr.Wrap(err, "failed to all-decode entities").Build()
|
||||
}
|
||||
return entries, ct.End(), nil
|
||||
return exerr.Wrap(err, "").Build()
|
||||
}
|
||||
|
||||
entities := make([]TData, 0, cursor.RemainingBatchLength())
|
||||
for (pageSize == nil || len(entities) != *pageSize) && cursor.Next(ctx) {
|
||||
var entry TData
|
||||
entry, err = c.decodeSingle(ctx, cursor)
|
||||
err = fn(v)
|
||||
if err != nil {
|
||||
return nil, nil, exerr.Wrap(err, "failed to decode entity").Build()
|
||||
}
|
||||
entities = append(entities, entry)
|
||||
return exerr.Wrap(err, "").Build()
|
||||
}
|
||||
|
||||
if pageSize == nil || len(entities) < *pageSize || !cursor.TryNext(ctx) {
|
||||
return entities, ct.End(), nil
|
||||
}
|
||||
|
||||
last := entities[len(entities)-1]
|
||||
|
||||
c.EnsureInitializedReflection(last)
|
||||
|
||||
nextToken, err := c.createToken(sortPrimary, sortDirPrimary, sortSecondary, sortDirSecondary, last, pageSize)
|
||||
if err != nil {
|
||||
return nil, nil, exerr.Wrap(err, "failed to create (out)-token").Build()
|
||||
}
|
||||
|
||||
return entities, nextToken, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Coll[TData]) listWithPaginatedToken(ctx context.Context, filter ct.Filter, pageSize *int, inTok ct.CTPaginated) ([]TData, ct.CursorToken, error) {
|
||||
func (c *Coll[TData]) ListIterate(ctx context.Context, filter ct.Filter, pageSize *int, inTok ct.CursorToken) iter.Seq2[TData, error] {
|
||||
var cursor *mongo.Cursor
|
||||
var err error
|
||||
|
||||
page := inTok.Page
|
||||
|
||||
if page < 0 {
|
||||
page = 1
|
||||
}
|
||||
|
||||
pipelineSort := mongo.Pipeline{}
|
||||
pipelineFilter := mongo.Pipeline{}
|
||||
|
||||
if filter != nil {
|
||||
pipelineFilter = filter.FilterQuery(ctx)
|
||||
pf1, pd1, pf2, pd2 := filter.Pagination(ctx)
|
||||
|
||||
pipelineSort, err = createSortOnlyPipeline(pf1, pd1, &pf2, &pd2)
|
||||
if ctks, ok := inTok.(ct.CTKeySort); ok {
|
||||
_, _, _, _, cursor, err = c.createKSListQuery(ctx, filter, pageSize, ctks)
|
||||
if err != nil {
|
||||
return nil, nil, exerr.Wrap(err, "failed to create sort pipeline").Build()
|
||||
return langext.IterSingleValueSeq2[TData, error](nil, exerr.Wrap(err, "").Build())
|
||||
}
|
||||
} else if ctpag, ok := inTok.(ct.CTPaginated); ok {
|
||||
_, cursor, err = c.createPaginatedListQuery(ctx, filter, pageSize, ctpag)
|
||||
if err != nil {
|
||||
return langext.IterSingleValueSeq2[TData, error](nil, exerr.Wrap(err, "").Build())
|
||||
}
|
||||
|
||||
pipelinePaginate := mongo.Pipeline{}
|
||||
if pageSize != nil {
|
||||
pipelinePaginate = append(pipelinePaginate, bson.D{{Key: "$skip", Value: *pageSize * (page - 1)}})
|
||||
pipelinePaginate = append(pipelinePaginate, bson.D{{Key: "$limit", Value: *pageSize}})
|
||||
} else {
|
||||
page = 1
|
||||
return langext.IterSingleValueSeq2[TData, error](nil, exerr.New(exerr.TypeCursorTokenDecode, "unknown ct type").Any("token", inTok).Type("tokenType", inTok).Build())
|
||||
}
|
||||
|
||||
pipelineCount := mongo.Pipeline{}
|
||||
pipelineCount = append(pipelineCount, bson.D{{Key: "$count", Value: "count"}})
|
||||
return func(yield func(TData, error) bool) {
|
||||
defer func() { _ = cursor.Close(ctx) }()
|
||||
|
||||
extrModPipelineResolved := mongo.Pipeline{}
|
||||
for _, ppl := range c.extraModPipeline {
|
||||
extrModPipelineResolved = langext.ArrConcat(extrModPipelineResolved, ppl(ctx))
|
||||
}
|
||||
|
||||
pipelineList := langext.ArrConcat(pipelineFilter, pipelineSort, pipelinePaginate, extrModPipelineResolved, pipelineSort)
|
||||
|
||||
cursorList, err := c.coll.Aggregate(ctx, pipelineList)
|
||||
for cursor.Next(ctx) {
|
||||
v, err := c.decodeSingle(ctx, cursor)
|
||||
if err != nil {
|
||||
return nil, nil, exerr.Wrap(err, "mongo-aggregation failed").Any("pipeline", pipelineList).Str("collection", c.Name()).Build()
|
||||
if !yield(nil, err) {
|
||||
return
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
entities, err := c.decodeAll(ctx, cursorList)
|
||||
if err != nil {
|
||||
return nil, nil, exerr.Wrap(err, "failed to all-decode entities").Build()
|
||||
if !yield(v, nil) {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
tokOut := ct.Page(page + 1)
|
||||
if pageSize == nil || len(entities) < *pageSize {
|
||||
tokOut = ct.PageEnd()
|
||||
}
|
||||
|
||||
return entities, tokOut, nil
|
||||
}
|
||||
|
||||
func (c *Coll[TData]) Count(ctx context.Context, filter ct.RawFilter) (int64, error) {
|
||||
@@ -291,6 +211,185 @@ func (c *Coll[TData]) ListAllIDs(ctx context.Context, filter ct.RawFilter) ([]st
|
||||
return langext.ArrMap(res, func(v idObject) string { return v.ID }), nil
|
||||
}
|
||||
|
||||
// =====================================================================================================================
|
||||
|
||||
func (c *Coll[TData]) createKSListQuery(ctx context.Context, filter ct.Filter, pageSize *int, inTok ct.CTKeySort) (string, ct.SortDirection, *string, *ct.SortDirection, *mongo.Cursor, error) {
|
||||
pipeline := mongo.Pipeline{}
|
||||
pf1 := "_id"
|
||||
pd1 := ct.SortASC
|
||||
pf2 := "_id"
|
||||
pd2 := ct.SortASC
|
||||
|
||||
if filter != nil {
|
||||
pipeline = filter.FilterQuery(ctx)
|
||||
pf1, pd1, pf2, pd2 = filter.Pagination(ctx)
|
||||
}
|
||||
|
||||
sortPrimary := pf1
|
||||
sortDirPrimary := pd1
|
||||
sortSecondary := &pf2
|
||||
sortDirSecondary := &pd2
|
||||
|
||||
if pf1 == pf2 {
|
||||
sortSecondary = nil
|
||||
sortDirSecondary = nil
|
||||
}
|
||||
|
||||
paginationPipeline, doubleSortPipeline, err := createPaginationPipeline(c, inTok, sortPrimary, sortDirPrimary, sortSecondary, sortDirSecondary, pageSize)
|
||||
if err != nil {
|
||||
return "", "", nil, nil, nil, exerr.
|
||||
Wrap(err, "failed to create pagination").
|
||||
WithType(exerr.TypeCursorTokenDecode).
|
||||
Str("collection", c.Name()).
|
||||
Any("inTok", inTok).
|
||||
Any("sortPrimary", sortPrimary).
|
||||
Any("sortDirPrimary", sortDirPrimary).
|
||||
Any("sortSecondary", sortSecondary).
|
||||
Any("sortDirSecondary", sortDirSecondary).
|
||||
Any("pageSize", pageSize).
|
||||
Build()
|
||||
}
|
||||
|
||||
pipeline = append(pipeline, paginationPipeline...)
|
||||
|
||||
for _, ppl := range c.extraModPipeline {
|
||||
pipeline = langext.ArrConcat(pipeline, ppl(ctx))
|
||||
}
|
||||
|
||||
if c.needsDoubleSort(ctx) {
|
||||
pipeline = langext.ArrConcat(pipeline, doubleSortPipeline)
|
||||
}
|
||||
|
||||
cursor, err := c.coll.Aggregate(ctx, pipeline)
|
||||
if err != nil {
|
||||
return "", "", nil, nil, nil, exerr.Wrap(err, "mongo-aggregation failed").Any("pipeline", pipeline).Str("collection", c.Name()).Build()
|
||||
}
|
||||
|
||||
return sortPrimary, sortDirPrimary, sortSecondary, sortDirSecondary, cursor, nil
|
||||
}
|
||||
|
||||
func (c *Coll[TData]) createPaginatedListQuery(ctx context.Context, filter ct.Filter, pageSize *int, inTok ct.CTPaginated) (int, *mongo.Cursor, error) {
|
||||
var err error
|
||||
|
||||
page := inTok.Page
|
||||
|
||||
pipelineSort := mongo.Pipeline{}
|
||||
pipelineFilter := mongo.Pipeline{}
|
||||
|
||||
if filter != nil {
|
||||
pipelineFilter = filter.FilterQuery(ctx)
|
||||
pf1, pd1, pf2, pd2 := filter.Pagination(ctx)
|
||||
|
||||
pipelineSort, err = createSortOnlyPipeline(pf1, pd1, &pf2, &pd2)
|
||||
if err != nil {
|
||||
return 0, nil, exerr.Wrap(err, "failed to create sort pipeline").Build()
|
||||
}
|
||||
}
|
||||
|
||||
pipelinePaginate := mongo.Pipeline{}
|
||||
if pageSize != nil {
|
||||
pipelinePaginate = append(pipelinePaginate, bson.D{{Key: "$skip", Value: *pageSize * (page - 1)}})
|
||||
pipelinePaginate = append(pipelinePaginate, bson.D{{Key: "$limit", Value: *pageSize}})
|
||||
} else {
|
||||
page = 1
|
||||
}
|
||||
|
||||
pipelineCount := mongo.Pipeline{}
|
||||
pipelineCount = append(pipelineCount, bson.D{{Key: "$count", Value: "count"}})
|
||||
|
||||
extrModPipelineResolved := mongo.Pipeline{}
|
||||
for _, ppl := range c.extraModPipeline {
|
||||
extrModPipelineResolved = langext.ArrConcat(extrModPipelineResolved, ppl(ctx))
|
||||
}
|
||||
|
||||
pipelineList := langext.ArrConcat(pipelineFilter, pipelineSort, pipelinePaginate, extrModPipelineResolved, pipelineSort)
|
||||
|
||||
cursorList, err := c.coll.Aggregate(ctx, pipelineList)
|
||||
if err != nil {
|
||||
return 0, nil, exerr.Wrap(err, "mongo-aggregation failed").Any("pipeline", pipelineList).Str("collection", c.Name()).Build()
|
||||
}
|
||||
|
||||
return page, cursorList, nil
|
||||
}
|
||||
|
||||
func (c *Coll[TData]) listWithKSToken(ctx context.Context, filter ct.Filter, pageSize *int, inTok ct.CTKeySort) ([]TData, ct.CursorToken, error) {
|
||||
if inTok.Mode == ct.CTMEnd {
|
||||
return make([]TData, 0), ct.End(), nil
|
||||
}
|
||||
|
||||
if pageSize != nil && *pageSize == 0 {
|
||||
return make([]TData, 0), inTok, nil // fast track, we return an empty list and do not advance the cursor token
|
||||
}
|
||||
|
||||
sortPrimary, sortDirPrimary, sortSecondary, sortDirSecondary, cursor, err := c.createKSListQuery(ctx, filter, pageSize, inTok)
|
||||
if err != nil {
|
||||
return nil, nil, exerr.Wrap(err, "").Build()
|
||||
}
|
||||
|
||||
defer func() { _ = cursor.Close(ctx) }()
|
||||
|
||||
// fast branch
|
||||
if pageSize == nil {
|
||||
entries, err := c.decodeAll(ctx, cursor)
|
||||
if err != nil {
|
||||
return nil, nil, exerr.Wrap(err, "failed to all-decode entities").Build()
|
||||
}
|
||||
return entries, ct.End(), nil
|
||||
}
|
||||
|
||||
entities := make([]TData, 0, cursor.RemainingBatchLength())
|
||||
for (pageSize == nil || len(entities) != *pageSize) && cursor.Next(ctx) {
|
||||
var entry TData
|
||||
entry, err = c.decodeSingle(ctx, cursor)
|
||||
if err != nil {
|
||||
return nil, nil, exerr.Wrap(err, "failed to decode entity").Build()
|
||||
}
|
||||
entities = append(entities, entry)
|
||||
}
|
||||
|
||||
if pageSize == nil || len(entities) < *pageSize || !cursor.TryNext(ctx) {
|
||||
return entities, ct.End(), nil
|
||||
}
|
||||
|
||||
last := entities[len(entities)-1]
|
||||
|
||||
c.EnsureInitializedReflection(last)
|
||||
|
||||
nextToken, err := c.createToken(sortPrimary, sortDirPrimary, sortSecondary, sortDirSecondary, last, pageSize)
|
||||
if err != nil {
|
||||
return nil, nil, exerr.Wrap(err, "failed to create (out)-token").Build()
|
||||
}
|
||||
|
||||
return entities, nextToken, nil
|
||||
}
|
||||
|
||||
func (c *Coll[TData]) listWithPaginatedToken(ctx context.Context, filter ct.Filter, pageSize *int, inTok ct.CTPaginated) ([]TData, ct.CursorToken, error) {
|
||||
var err error
|
||||
|
||||
page := inTok.Page
|
||||
|
||||
if page < 0 {
|
||||
page = 1
|
||||
}
|
||||
|
||||
page, cursorList, err := c.createPaginatedListQuery(ctx, filter, pageSize, inTok)
|
||||
if err != nil {
|
||||
return nil, nil, exerr.Wrap(err, "").Build()
|
||||
}
|
||||
|
||||
entities, err := c.decodeAll(ctx, cursorList)
|
||||
if err != nil {
|
||||
return nil, nil, exerr.Wrap(err, "failed to all-decode entities").Build()
|
||||
}
|
||||
|
||||
tokOut := ct.Page(page + 1)
|
||||
if pageSize == nil || len(entities) < *pageSize {
|
||||
tokOut = ct.PageEnd()
|
||||
}
|
||||
|
||||
return entities, tokOut, nil
|
||||
}
|
||||
|
||||
func createPaginationPipeline[TData any](coll *Coll[TData], token ct.CTKeySort, fieldPrimary string, sortPrimary ct.SortDirection, fieldSecondary *string, sortSecondary *ct.SortDirection, pageSize *int) ([]bson.D, []bson.D, error) {
|
||||
|
||||
cond := bson.A{}
|
||||
|
@@ -7,54 +7,19 @@ import (
|
||||
"gogs.mikescher.com/BlackForestBytes/goext/exerr"
|
||||
"gogs.mikescher.com/BlackForestBytes/goext/langext"
|
||||
pag "gogs.mikescher.com/BlackForestBytes/goext/pagination"
|
||||
"iter"
|
||||
)
|
||||
|
||||
func (c *Coll[TData]) Paginate(ctx context.Context, filter pag.MongoFilter, page int, limit *int) ([]TData, pag.Pagination, error) {
|
||||
page, cursorList, pipelineTotalCount, err := c.createPaginatedQuery(ctx, filter, page, limit)
|
||||
if err != nil {
|
||||
return nil, pag.Pagination{}, exerr.Wrap(err, "").Build()
|
||||
}
|
||||
|
||||
type totalCountResult struct {
|
||||
Count int `bson:"count"`
|
||||
}
|
||||
|
||||
if page < 0 {
|
||||
page = 1
|
||||
}
|
||||
|
||||
pipelineSort := mongo.Pipeline{}
|
||||
pipelineFilter := mongo.Pipeline{}
|
||||
sort := bson.D{}
|
||||
|
||||
if filter != nil {
|
||||
pipelineFilter = filter.FilterQuery(ctx)
|
||||
sort = filter.Sort(ctx)
|
||||
}
|
||||
|
||||
if len(sort) != 0 {
|
||||
pipelineSort = append(pipelineSort, bson.D{{Key: "$sort", Value: sort}})
|
||||
}
|
||||
|
||||
pipelinePaginate := mongo.Pipeline{}
|
||||
if limit != nil {
|
||||
pipelinePaginate = append(pipelinePaginate, bson.D{{Key: "$skip", Value: *limit * (page - 1)}})
|
||||
pipelinePaginate = append(pipelinePaginate, bson.D{{Key: "$limit", Value: *limit}})
|
||||
} else {
|
||||
page = 1
|
||||
}
|
||||
|
||||
pipelineCount := mongo.Pipeline{}
|
||||
pipelineCount = append(pipelineCount, bson.D{{Key: "$count", Value: "count"}})
|
||||
|
||||
extrModPipelineResolved := mongo.Pipeline{}
|
||||
for _, ppl := range c.extraModPipeline {
|
||||
extrModPipelineResolved = langext.ArrConcat(extrModPipelineResolved, ppl(ctx))
|
||||
}
|
||||
|
||||
pipelineList := langext.ArrConcat(pipelineFilter, pipelineSort, pipelinePaginate, extrModPipelineResolved, pipelineSort)
|
||||
pipelineTotalCount := langext.ArrConcat(pipelineFilter, pipelineCount)
|
||||
|
||||
cursorList, err := c.coll.Aggregate(ctx, pipelineList)
|
||||
if err != nil {
|
||||
return nil, pag.Pagination{}, exerr.Wrap(err, "mongo-aggregation failed").Any("pipeline", pipelineList).Str("collection", c.Name()).Build()
|
||||
}
|
||||
|
||||
entities, err := c.decodeAll(ctx, cursorList)
|
||||
if err != nil {
|
||||
return nil, pag.Pagination{}, exerr.Wrap(err, "failed to all-decode entities").Build()
|
||||
@@ -93,3 +58,100 @@ func (c *Coll[TData]) Paginate(ctx context.Context, filter pag.MongoFilter, page
|
||||
|
||||
return entities, paginationObj, nil
|
||||
}
|
||||
|
||||
func (c *Coll[TData]) PaginateIterateFunc(ctx context.Context, filter pag.MongoFilter, page int, limit *int, fn func(v TData) error) error {
|
||||
page, cursor, _, err := c.createPaginatedQuery(ctx, filter, page, limit)
|
||||
if err != nil {
|
||||
return exerr.Wrap(err, "").Build()
|
||||
}
|
||||
|
||||
defer func() { _ = cursor.Close(ctx) }()
|
||||
|
||||
for cursor.Next(ctx) {
|
||||
|
||||
v, err := c.decodeSingle(ctx, cursor)
|
||||
if err != nil {
|
||||
return exerr.Wrap(err, "").Build()
|
||||
}
|
||||
|
||||
err = fn(v)
|
||||
if err != nil {
|
||||
return exerr.Wrap(err, "").Build()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Coll[TData]) PaginateIterate(ctx context.Context, filter pag.MongoFilter, page int, limit *int) iter.Seq2[TData, error] {
|
||||
page, cursor, _, err := c.createPaginatedQuery(ctx, filter, page, limit)
|
||||
if err != nil {
|
||||
return langext.IterSingleValueSeq2[TData, error](nil, exerr.Wrap(err, "").Build())
|
||||
}
|
||||
|
||||
return func(yield func(TData, error) bool) {
|
||||
defer func() { _ = cursor.Close(ctx) }()
|
||||
|
||||
for cursor.Next(ctx) {
|
||||
v, err := c.decodeSingle(ctx, cursor)
|
||||
if err != nil {
|
||||
if !yield(nil, err) {
|
||||
return
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
if !yield(v, nil) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// =====================================================================================================================
|
||||
|
||||
func (c *Coll[TData]) createPaginatedQuery(ctx context.Context, filter pag.MongoFilter, page int, limit *int) (int, *mongo.Cursor, mongo.Pipeline, error) {
|
||||
if page < 0 {
|
||||
page = 1
|
||||
}
|
||||
|
||||
pipelineSort := mongo.Pipeline{}
|
||||
pipelineFilter := mongo.Pipeline{}
|
||||
sort := bson.D{}
|
||||
|
||||
if filter != nil {
|
||||
pipelineFilter = filter.FilterQuery(ctx)
|
||||
sort = filter.Sort(ctx)
|
||||
}
|
||||
|
||||
if len(sort) != 0 {
|
||||
pipelineSort = append(pipelineSort, bson.D{{Key: "$sort", Value: sort}})
|
||||
}
|
||||
|
||||
pipelinePaginate := mongo.Pipeline{}
|
||||
if limit != nil {
|
||||
pipelinePaginate = append(pipelinePaginate, bson.D{{Key: "$skip", Value: *limit * (page - 1)}})
|
||||
pipelinePaginate = append(pipelinePaginate, bson.D{{Key: "$limit", Value: *limit}})
|
||||
} else {
|
||||
page = 1
|
||||
}
|
||||
|
||||
pipelineCount := mongo.Pipeline{}
|
||||
pipelineCount = append(pipelineCount, bson.D{{Key: "$count", Value: "count"}})
|
||||
|
||||
extrModPipelineResolved := mongo.Pipeline{}
|
||||
for _, ppl := range c.extraModPipeline {
|
||||
extrModPipelineResolved = langext.ArrConcat(extrModPipelineResolved, ppl(ctx))
|
||||
}
|
||||
|
||||
pipelineList := langext.ArrConcat(pipelineFilter, pipelineSort, pipelinePaginate, extrModPipelineResolved, pipelineSort)
|
||||
pipelineTotalCount := langext.ArrConcat(pipelineFilter, pipelineCount)
|
||||
|
||||
cursorList, err := c.coll.Aggregate(ctx, pipelineList)
|
||||
if err != nil {
|
||||
return 0, nil, nil, exerr.Wrap(err, "mongo-aggregation failed").Any("pipeline", pipelineList).Str("collection", c.Name()).Build()
|
||||
}
|
||||
|
||||
return page, cursorList, pipelineTotalCount, nil
|
||||
}
|
||||
|
Reference in New Issue
Block a user