Compare commits
	
		
			7 Commits
		
	
	
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 5ba84bd8ee | |||
| 1260b2dc77 | |||
| 7d18b913c6 | |||
| d1f9069f2f | |||
| fa6d73301e | |||
| bfe62799d3 | |||
| ede912eb7b | 
| @@ -6,7 +6,12 @@ | ||||
| name: Build Docker and Deploy | ||||
| run-name: Build & Deploy ${{ gitea.ref }} on ${{ gitea.actor }} | ||||
|  | ||||
| on: [push] | ||||
| on: | ||||
|   push: | ||||
|     branches: | ||||
|       - '*' | ||||
|       - '**' | ||||
|  | ||||
|  | ||||
| jobs: | ||||
|   run_tests: | ||||
| @@ -34,3 +39,17 @@ jobs: | ||||
|       - name: Run tests | ||||
|         run: cd "${{ gitea.workspace }}" && make test | ||||
|  | ||||
|       - name: Send failure mail | ||||
|         if: failure() | ||||
|         uses: dawidd6/action-send-mail@v3 | ||||
|         with: | ||||
|           server_address: smtp.fastmail.com | ||||
|           server_port: 465 | ||||
|           secure: true | ||||
|           username: ${{secrets.MAIL_USERNAME}} | ||||
|           password: ${{secrets.MAIL_PASSWORD}} | ||||
|           subject: Pipeline on '${{ gitea.repository }}' failed | ||||
|           to: ${{ steps.commiter_info.outputs.MAIL }} | ||||
|           from: Gitea Actions <gitea_actions@blackforestbytes.de> | ||||
|           body: "Go to https://gogs.blackforestbytes.com/${{ gitea.repository }}/actions" | ||||
|  | ||||
|   | ||||
							
								
								
									
										2
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								go.mod
									
									
									
									
									
								
							| @@ -1,6 +1,6 @@ | ||||
| module gogs.mikescher.com/BlackForestBytes/goext | ||||
|  | ||||
| go 1.19 | ||||
| go 1.21 | ||||
|  | ||||
| require ( | ||||
| 	github.com/gin-gonic/gin v1.9.1 | ||||
|   | ||||
| @@ -1,5 +1,5 @@ | ||||
| package goext | ||||
|  | ||||
| const GoextVersion = "0.0.312" | ||||
| const GoextVersion = "0.0.319" | ||||
|  | ||||
| const GoextVersionTimestamp = "2023-11-10T10:16:31+0100" | ||||
| const GoextVersionTimestamp = "2023-11-13T16:45:00+0100" | ||||
|   | ||||
| @@ -5,6 +5,7 @@ import ( | ||||
| 	"go.mongodb.org/mongo-driver/bson/bsoncodec" | ||||
| 	"go.mongodb.org/mongo-driver/bson/bsontype" | ||||
| 	"go.mongodb.org/mongo-driver/bson/primitive" | ||||
| 	"gogs.mikescher.com/BlackForestBytes/goext/langext" | ||||
| 	"gogs.mikescher.com/BlackForestBytes/goext/rfctime" | ||||
| 	"reflect" | ||||
| ) | ||||
| @@ -18,9 +19,21 @@ func CreateGoExtBsonRegistry() *bsoncodec.Registry { | ||||
| 	rb.RegisterTypeDecoder(reflect.TypeOf(rfctime.RFC3339NanoTime{}), rfctime.RFC3339NanoTime{}) | ||||
| 	rb.RegisterTypeDecoder(reflect.TypeOf(&rfctime.RFC3339NanoTime{}), rfctime.RFC3339NanoTime{}) | ||||
|  | ||||
| 	rb.RegisterTypeDecoder(reflect.TypeOf(rfctime.UnixTime{}), rfctime.UnixTime{}) | ||||
| 	rb.RegisterTypeDecoder(reflect.TypeOf(&rfctime.UnixTime{}), rfctime.UnixTime{}) | ||||
|  | ||||
| 	rb.RegisterTypeDecoder(reflect.TypeOf(rfctime.UnixMilliTime{}), rfctime.UnixMilliTime{}) | ||||
| 	rb.RegisterTypeDecoder(reflect.TypeOf(&rfctime.UnixMilliTime{}), rfctime.UnixMilliTime{}) | ||||
|  | ||||
| 	rb.RegisterTypeDecoder(reflect.TypeOf(rfctime.UnixNanoTime{}), rfctime.UnixNanoTime{}) | ||||
| 	rb.RegisterTypeDecoder(reflect.TypeOf(&rfctime.UnixNanoTime{}), rfctime.UnixNanoTime{}) | ||||
|  | ||||
| 	rb.RegisterTypeDecoder(reflect.TypeOf(rfctime.Date{}), rfctime.Date{}) | ||||
| 	rb.RegisterTypeDecoder(reflect.TypeOf(&rfctime.Date{}), rfctime.Date{}) | ||||
|  | ||||
| 	rb.RegisterTypeDecoder(reflect.TypeOf(rfctime.SecondsF64(0)), rfctime.SecondsF64(0)) | ||||
| 	rb.RegisterTypeDecoder(reflect.TypeOf(langext.Ptr(rfctime.SecondsF64(0))), rfctime.SecondsF64(0)) | ||||
|  | ||||
| 	bsoncodec.DefaultValueEncoders{}.RegisterDefaultEncoders(rb) | ||||
| 	bsoncodec.DefaultValueDecoders{}.RegisterDefaultDecoders(rb) | ||||
|  | ||||
|   | ||||
| @@ -2,7 +2,14 @@ package rfctime | ||||
|  | ||||
| import ( | ||||
| 	"encoding/json" | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| 	"go.mongodb.org/mongo-driver/bson" | ||||
| 	"go.mongodb.org/mongo-driver/bson/bsoncodec" | ||||
| 	"go.mongodb.org/mongo-driver/bson/bsonrw" | ||||
| 	"go.mongodb.org/mongo-driver/bson/bsontype" | ||||
| 	"gogs.mikescher.com/BlackForestBytes/goext/timeext" | ||||
| 	"reflect" | ||||
| 	"time" | ||||
| ) | ||||
|  | ||||
| @@ -54,6 +61,63 @@ func (d SecondsF64) MarshalJSON() ([]byte, error) { | ||||
| 	return json.Marshal(secs) | ||||
| } | ||||
|  | ||||
| func (d *SecondsF64) UnmarshalBSONValue(bt bsontype.Type, data []byte) error { | ||||
| 	if bt == bson.TypeNull { | ||||
| 		// we can't set nil in UnmarshalBSONValue (so we use default(struct)) | ||||
| 		// Use mongoext.CreateGoExtBsonRegistry if you need to unmarsh pointer values | ||||
| 		// https://stackoverflow.com/questions/75167597 | ||||
| 		// https://jira.mongodb.org/browse/GODRIVER-2252 | ||||
| 		*d = SecondsF64(0) | ||||
| 		return nil | ||||
| 	} | ||||
| 	if bt != bson.TypeDouble { | ||||
| 		return errors.New(fmt.Sprintf("cannot unmarshal %v into SecondsF64", bt)) | ||||
| 	} | ||||
| 	var tt float64 | ||||
| 	err := bson.RawValue{Type: bt, Value: data}.Unmarshal(&tt) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	*d = SecondsF64(tt) | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (d SecondsF64) MarshalBSONValue() (bsontype.Type, []byte, error) { | ||||
| 	return bson.MarshalValue(d.Seconds()) | ||||
| } | ||||
|  | ||||
| func (d SecondsF64) DecodeValue(dc bsoncodec.DecodeContext, vr bsonrw.ValueReader, val reflect.Value) error { | ||||
| 	if val.Kind() == reflect.Ptr && val.IsNil() { | ||||
| 		if !val.CanSet() { | ||||
| 			return errors.New("ValueUnmarshalerDecodeValue") | ||||
| 		} | ||||
| 		val.Set(reflect.New(val.Type().Elem())) | ||||
| 	} | ||||
|  | ||||
| 	tp, src, err := bsonrw.Copier{}.CopyValueToBytes(vr) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	if val.Kind() == reflect.Ptr && len(src) == 0 { | ||||
| 		val.Set(reflect.Zero(val.Type())) | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	err = d.UnmarshalBSONValue(tp, src) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	if val.Kind() == reflect.Ptr { | ||||
| 		val.Set(reflect.ValueOf(&d)) | ||||
| 	} else { | ||||
| 		val.Set(reflect.ValueOf(d)) | ||||
| 	} | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func NewSecondsF64(t time.Duration) SecondsF64 { | ||||
| 	return SecondsF64(t) | ||||
| } | ||||
|   | ||||
| @@ -2,6 +2,13 @@ package rfctime | ||||
|  | ||||
| import ( | ||||
| 	"encoding/json" | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| 	"go.mongodb.org/mongo-driver/bson" | ||||
| 	"go.mongodb.org/mongo-driver/bson/bsoncodec" | ||||
| 	"go.mongodb.org/mongo-driver/bson/bsonrw" | ||||
| 	"go.mongodb.org/mongo-driver/bson/bsontype" | ||||
| 	"reflect" | ||||
| 	"strconv" | ||||
| 	"time" | ||||
| ) | ||||
| @@ -59,6 +66,63 @@ func (t *UnixTime) UnmarshalText(data []byte) error { | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (t *UnixTime) UnmarshalBSONValue(bt bsontype.Type, data []byte) error { | ||||
| 	if bt == bson.TypeNull { | ||||
| 		// we can't set nil in UnmarshalBSONValue (so we use default(struct)) | ||||
| 		// Use mongoext.CreateGoExtBsonRegistry if you need to unmarsh pointer values | ||||
| 		// https://stackoverflow.com/questions/75167597 | ||||
| 		// https://jira.mongodb.org/browse/GODRIVER-2252 | ||||
| 		*t = UnixTime{} | ||||
| 		return nil | ||||
| 	} | ||||
| 	if bt != bson.TypeDateTime { | ||||
| 		return errors.New(fmt.Sprintf("cannot unmarshal %v into UnixTime", bt)) | ||||
| 	} | ||||
| 	var tt time.Time | ||||
| 	err := bson.RawValue{Type: bt, Value: data}.Unmarshal(&tt) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	*t = UnixTime(tt) | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (t UnixTime) MarshalBSONValue() (bsontype.Type, []byte, error) { | ||||
| 	return bson.MarshalValue(time.Time(t)) | ||||
| } | ||||
|  | ||||
| func (t UnixTime) DecodeValue(dc bsoncodec.DecodeContext, vr bsonrw.ValueReader, val reflect.Value) error { | ||||
| 	if val.Kind() == reflect.Ptr && val.IsNil() { | ||||
| 		if !val.CanSet() { | ||||
| 			return errors.New("ValueUnmarshalerDecodeValue") | ||||
| 		} | ||||
| 		val.Set(reflect.New(val.Type().Elem())) | ||||
| 	} | ||||
|  | ||||
| 	tp, src, err := bsonrw.Copier{}.CopyValueToBytes(vr) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	if val.Kind() == reflect.Ptr && len(src) == 0 { | ||||
| 		val.Set(reflect.Zero(val.Type())) | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	err = t.UnmarshalBSONValue(tp, src) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	if val.Kind() == reflect.Ptr { | ||||
| 		val.Set(reflect.ValueOf(&t)) | ||||
| 	} else { | ||||
| 		val.Set(reflect.ValueOf(t)) | ||||
| 	} | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (t UnixTime) Serialize() string { | ||||
| 	return strconv.FormatInt(t.Time().Unix(), 10) | ||||
| } | ||||
|   | ||||
| @@ -2,6 +2,13 @@ package rfctime | ||||
|  | ||||
| import ( | ||||
| 	"encoding/json" | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| 	"go.mongodb.org/mongo-driver/bson" | ||||
| 	"go.mongodb.org/mongo-driver/bson/bsoncodec" | ||||
| 	"go.mongodb.org/mongo-driver/bson/bsonrw" | ||||
| 	"go.mongodb.org/mongo-driver/bson/bsontype" | ||||
| 	"reflect" | ||||
| 	"strconv" | ||||
| 	"time" | ||||
| ) | ||||
| @@ -59,6 +66,63 @@ func (t *UnixMilliTime) UnmarshalText(data []byte) error { | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (t *UnixMilliTime) UnmarshalBSONValue(bt bsontype.Type, data []byte) error { | ||||
| 	if bt == bson.TypeNull { | ||||
| 		// we can't set nil in UnmarshalBSONValue (so we use default(struct)) | ||||
| 		// Use mongoext.CreateGoExtBsonRegistry if you need to unmarsh pointer values | ||||
| 		// https://stackoverflow.com/questions/75167597 | ||||
| 		// https://jira.mongodb.org/browse/GODRIVER-2252 | ||||
| 		*t = UnixMilliTime{} | ||||
| 		return nil | ||||
| 	} | ||||
| 	if bt != bson.TypeDateTime { | ||||
| 		return errors.New(fmt.Sprintf("cannot unmarshal %v into UnixMilliTime", bt)) | ||||
| 	} | ||||
| 	var tt time.Time | ||||
| 	err := bson.RawValue{Type: bt, Value: data}.Unmarshal(&tt) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	*t = UnixMilliTime(tt) | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (t UnixMilliTime) MarshalBSONValue() (bsontype.Type, []byte, error) { | ||||
| 	return bson.MarshalValue(time.Time(t)) | ||||
| } | ||||
|  | ||||
| func (t UnixMilliTime) DecodeValue(dc bsoncodec.DecodeContext, vr bsonrw.ValueReader, val reflect.Value) error { | ||||
| 	if val.Kind() == reflect.Ptr && val.IsNil() { | ||||
| 		if !val.CanSet() { | ||||
| 			return errors.New("ValueUnmarshalerDecodeValue") | ||||
| 		} | ||||
| 		val.Set(reflect.New(val.Type().Elem())) | ||||
| 	} | ||||
|  | ||||
| 	tp, src, err := bsonrw.Copier{}.CopyValueToBytes(vr) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	if val.Kind() == reflect.Ptr && len(src) == 0 { | ||||
| 		val.Set(reflect.Zero(val.Type())) | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	err = t.UnmarshalBSONValue(tp, src) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	if val.Kind() == reflect.Ptr { | ||||
| 		val.Set(reflect.ValueOf(&t)) | ||||
| 	} else { | ||||
| 		val.Set(reflect.ValueOf(t)) | ||||
| 	} | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (t UnixMilliTime) Serialize() string { | ||||
| 	return strconv.FormatInt(t.Time().UnixMilli(), 10) | ||||
| } | ||||
|   | ||||
| @@ -2,6 +2,13 @@ package rfctime | ||||
|  | ||||
| import ( | ||||
| 	"encoding/json" | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| 	"go.mongodb.org/mongo-driver/bson" | ||||
| 	"go.mongodb.org/mongo-driver/bson/bsoncodec" | ||||
| 	"go.mongodb.org/mongo-driver/bson/bsonrw" | ||||
| 	"go.mongodb.org/mongo-driver/bson/bsontype" | ||||
| 	"reflect" | ||||
| 	"strconv" | ||||
| 	"time" | ||||
| ) | ||||
| @@ -59,6 +66,63 @@ func (t *UnixNanoTime) UnmarshalText(data []byte) error { | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (t *UnixNanoTime) UnmarshalBSONValue(bt bsontype.Type, data []byte) error { | ||||
| 	if bt == bson.TypeNull { | ||||
| 		// we can't set nil in UnmarshalBSONValue (so we use default(struct)) | ||||
| 		// Use mongoext.CreateGoExtBsonRegistry if you need to unmarsh pointer values | ||||
| 		// https://stackoverflow.com/questions/75167597 | ||||
| 		// https://jira.mongodb.org/browse/GODRIVER-2252 | ||||
| 		*t = UnixNanoTime{} | ||||
| 		return nil | ||||
| 	} | ||||
| 	if bt != bson.TypeDateTime { | ||||
| 		return errors.New(fmt.Sprintf("cannot unmarshal %v into UnixNanoTime", bt)) | ||||
| 	} | ||||
| 	var tt time.Time | ||||
| 	err := bson.RawValue{Type: bt, Value: data}.Unmarshal(&tt) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	*t = UnixNanoTime(tt) | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (t UnixNanoTime) MarshalBSONValue() (bsontype.Type, []byte, error) { | ||||
| 	return bson.MarshalValue(time.Time(t)) | ||||
| } | ||||
|  | ||||
| func (t UnixNanoTime) DecodeValue(dc bsoncodec.DecodeContext, vr bsonrw.ValueReader, val reflect.Value) error { | ||||
| 	if val.Kind() == reflect.Ptr && val.IsNil() { | ||||
| 		if !val.CanSet() { | ||||
| 			return errors.New("ValueUnmarshalerDecodeValue") | ||||
| 		} | ||||
| 		val.Set(reflect.New(val.Type().Elem())) | ||||
| 	} | ||||
|  | ||||
| 	tp, src, err := bsonrw.Copier{}.CopyValueToBytes(vr) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	if val.Kind() == reflect.Ptr && len(src) == 0 { | ||||
| 		val.Set(reflect.Zero(val.Type())) | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	err = t.UnmarshalBSONValue(tp, src) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	if val.Kind() == reflect.Ptr { | ||||
| 		val.Set(reflect.ValueOf(&t)) | ||||
| 	} else { | ||||
| 		val.Set(reflect.ValueOf(t)) | ||||
| 	} | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (t UnixNanoTime) Serialize() string { | ||||
| 	return strconv.FormatInt(t.Time().UnixNano(), 10) | ||||
| } | ||||
|   | ||||
| @@ -1,33 +1,28 @@ | ||||
| package syncext | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"gogs.mikescher.com/BlackForestBytes/goext/langext" | ||||
| 	"sync" | ||||
| 	"time" | ||||
| ) | ||||
|  | ||||
| type AtomicBool struct { | ||||
| 	v        bool | ||||
| 	listener map[string]chan bool | ||||
| 	lock     sync.Mutex | ||||
| type Atomic[T any] struct { | ||||
| 	v    T | ||||
| 	lock sync.RWMutex | ||||
| } | ||||
|  | ||||
| func NewAtomicBool(value bool) *AtomicBool { | ||||
| 	return &AtomicBool{ | ||||
| 		v:        value, | ||||
| 		listener: make(map[string]chan bool), | ||||
| 		lock:     sync.Mutex{}, | ||||
| func NewAtomic[T any](value T) *Atomic[T] { | ||||
| 	return &Atomic[T]{ | ||||
| 		v:    value, | ||||
| 		lock: sync.RWMutex{}, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (a *AtomicBool) Get() bool { | ||||
| 	a.lock.Lock() | ||||
| 	defer a.lock.Unlock() | ||||
| func (a *Atomic[T]) Get() T { | ||||
| 	a.lock.RLock() | ||||
| 	defer a.lock.RUnlock() | ||||
| 	return a.v | ||||
| } | ||||
|  | ||||
| func (a *AtomicBool) Set(value bool) bool { | ||||
| func (a *Atomic[T]) Set(value T) T { | ||||
| 	a.lock.Lock() | ||||
| 	defer a.lock.Unlock() | ||||
|  | ||||
| @@ -35,79 +30,5 @@ func (a *AtomicBool) Set(value bool) bool { | ||||
|  | ||||
| 	a.v = value | ||||
|  | ||||
| 	for k, v := range a.listener { | ||||
| 		select { | ||||
| 		case v <- value: | ||||
| 			// message sent | ||||
| 		default: | ||||
| 			// no receiver on channel | ||||
| 			delete(a.listener, k) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	return oldValue | ||||
| } | ||||
|  | ||||
| func (a *AtomicBool) Wait(waitFor bool) { | ||||
| 	_ = a.WaitWithContext(context.Background(), waitFor) | ||||
| } | ||||
|  | ||||
| func (a *AtomicBool) WaitWithTimeout(timeout time.Duration, waitFor bool) error { | ||||
| 	ctx, cancel := context.WithTimeout(context.Background(), timeout) | ||||
| 	defer cancel() | ||||
| 	return a.WaitWithContext(ctx, waitFor) | ||||
| } | ||||
|  | ||||
| func (a *AtomicBool) WaitWithContext(ctx context.Context, waitFor bool) error { | ||||
| 	if err := ctx.Err(); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	if a.Get() == waitFor { | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	uuid, _ := langext.NewHexUUID() | ||||
|  | ||||
| 	waitchan := make(chan bool) | ||||
|  | ||||
| 	a.lock.Lock() | ||||
| 	a.listener[uuid] = waitchan | ||||
| 	a.lock.Unlock() | ||||
| 	defer func() { | ||||
| 		a.lock.Lock() | ||||
| 		delete(a.listener, uuid) | ||||
| 		a.lock.Unlock() | ||||
| 	}() | ||||
|  | ||||
| 	for { | ||||
| 		if err := ctx.Err(); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
|  | ||||
| 		timeOut := 1024 * time.Millisecond | ||||
|  | ||||
| 		if dl, ok := ctx.Deadline(); ok { | ||||
| 			timeOutMax := dl.Sub(time.Now()) | ||||
| 			if timeOutMax <= 0 { | ||||
| 				timeOut = 0 | ||||
| 			} else if 0 < timeOutMax && timeOutMax < timeOut { | ||||
| 				timeOut = timeOutMax | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		if v, ok := ReadChannelWithTimeout(waitchan, timeOut); ok { | ||||
| 			if v == waitFor { | ||||
| 				return nil | ||||
| 			} | ||||
| 		} else { | ||||
| 			if err := ctx.Err(); err != nil { | ||||
| 				return err | ||||
| 			} | ||||
|  | ||||
| 			if a.Get() == waitFor { | ||||
| 				return nil | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|   | ||||
							
								
								
									
										113
									
								
								syncext/bool.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										113
									
								
								syncext/bool.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,113 @@ | ||||
| package syncext | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"gogs.mikescher.com/BlackForestBytes/goext/langext" | ||||
| 	"sync" | ||||
| 	"time" | ||||
| ) | ||||
|  | ||||
| type AtomicBool struct { | ||||
| 	v        bool | ||||
| 	listener map[string]chan bool | ||||
| 	lock     sync.Mutex | ||||
| } | ||||
|  | ||||
| func NewAtomicBool(value bool) *AtomicBool { | ||||
| 	return &AtomicBool{ | ||||
| 		v:        value, | ||||
| 		listener: make(map[string]chan bool), | ||||
| 		lock:     sync.Mutex{}, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (a *AtomicBool) Get() bool { | ||||
| 	a.lock.Lock() | ||||
| 	defer a.lock.Unlock() | ||||
| 	return a.v | ||||
| } | ||||
|  | ||||
| func (a *AtomicBool) Set(value bool) bool { | ||||
| 	a.lock.Lock() | ||||
| 	defer a.lock.Unlock() | ||||
|  | ||||
| 	oldValue := a.v | ||||
|  | ||||
| 	a.v = value | ||||
|  | ||||
| 	for k, v := range a.listener { | ||||
| 		select { | ||||
| 		case v <- value: | ||||
| 			// message sent | ||||
| 		default: | ||||
| 			// no receiver on channel | ||||
| 			delete(a.listener, k) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	return oldValue | ||||
| } | ||||
|  | ||||
| func (a *AtomicBool) Wait(waitFor bool) { | ||||
| 	_ = a.WaitWithContext(context.Background(), waitFor) | ||||
| } | ||||
|  | ||||
| func (a *AtomicBool) WaitWithTimeout(timeout time.Duration, waitFor bool) error { | ||||
| 	ctx, cancel := context.WithTimeout(context.Background(), timeout) | ||||
| 	defer cancel() | ||||
| 	return a.WaitWithContext(ctx, waitFor) | ||||
| } | ||||
|  | ||||
| func (a *AtomicBool) WaitWithContext(ctx context.Context, waitFor bool) error { | ||||
| 	if err := ctx.Err(); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	if a.Get() == waitFor { | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	uuid, _ := langext.NewHexUUID() | ||||
|  | ||||
| 	waitchan := make(chan bool) | ||||
|  | ||||
| 	a.lock.Lock() | ||||
| 	a.listener[uuid] = waitchan | ||||
| 	a.lock.Unlock() | ||||
| 	defer func() { | ||||
| 		a.lock.Lock() | ||||
| 		delete(a.listener, uuid) | ||||
| 		a.lock.Unlock() | ||||
| 	}() | ||||
|  | ||||
| 	for { | ||||
| 		if err := ctx.Err(); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
|  | ||||
| 		timeOut := 1024 * time.Millisecond | ||||
|  | ||||
| 		if dl, ok := ctx.Deadline(); ok { | ||||
| 			timeOutMax := dl.Sub(time.Now()) | ||||
| 			if timeOutMax <= 0 { | ||||
| 				timeOut = 0 | ||||
| 			} else if 0 < timeOutMax && timeOutMax < timeOut { | ||||
| 				timeOut = timeOutMax | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		if v, ok := ReadChannelWithTimeout(waitchan, timeOut); ok { | ||||
| 			if v == waitFor { | ||||
| 				return nil | ||||
| 			} | ||||
| 		} else { | ||||
| 			if err := ctx.Err(); err != nil { | ||||
| 				return err | ||||
| 			} | ||||
|  | ||||
| 			if a.Get() == waitFor { | ||||
| 				return nil | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
| @@ -81,13 +81,13 @@ func (c *Coll[TData]) findOneInternal(ctx context.Context, filter bson.M, allowN | ||||
| 		if cursor.Next(ctx) { | ||||
| 			v, err := c.decodeSingle(ctx, cursor) | ||||
| 			if err != nil { | ||||
| 				return nil, exerr.Wrap(err, "mongo-aggregation [find-one] failed").Any("pipeline", pipeline).Str("collection", c.Name()).Build() | ||||
| 				return nil, exerr.Wrap(err, "mongo-aggregation [find-one] failed to decode results").Any("pipeline", pipeline).Str("collection", c.Name()).Build() | ||||
| 			} | ||||
| 			return &v, nil | ||||
| 		} else if allowNull { | ||||
| 			return nil, nil | ||||
| 		} else { | ||||
| 			return nil, exerr.Wrap(err, "mongo-aggregation [find-one] failed").Any("pipeline", pipeline).Str("collection", c.Name()).Build() | ||||
| 			return nil, exerr.Wrap(mongo.ErrNoDocuments, "mongo-aggregation [find-one] returned no documents").Any("pipeline", pipeline).Str("collection", c.Name()).Build() | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|   | ||||
| @@ -42,8 +42,8 @@ func (c *Coll[TData]) Paginate(ctx context.Context, filter pag.Filter, page int, | ||||
| 	pipelineCount := mongo.Pipeline{} | ||||
| 	pipelineCount = append(pipelineCount, bson.D{{Key: "$count", Value: "count"}}) | ||||
|  | ||||
| 	pipelineList := langext.ArrConcat(mongo.Pipeline{}, pipelineFilter, pipelinePaginate, c.extraModPipeline) | ||||
| 	pipelineTotalCount := langext.ArrConcat(mongo.Pipeline{}, pipelineFilter, pipelineCount) | ||||
| 	pipelineList := langext.ArrConcat(pipelineFilter, pipelineSort, pipelinePaginate, c.extraModPipeline) | ||||
| 	pipelineTotalCount := langext.ArrConcat(pipelineFilter, pipelineCount) | ||||
|  | ||||
| 	cursorList, err := c.coll.Aggregate(ctx, pipelineList) | ||||
| 	if err != nil { | ||||
|   | ||||
		Reference in New Issue
	
	Block a user