Fix failing tests and SQLITE_BUSY errors
This commit is contained in:
@@ -10,6 +10,8 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"git.blackforestbytes.com/BlackForestBytes/goext/dataext"
|
||||
"git.blackforestbytes.com/BlackForestBytes/goext/exerr"
|
||||
"git.blackforestbytes.com/BlackForestBytes/goext/ginext"
|
||||
"git.blackforestbytes.com/BlackForestBytes/goext/syncext"
|
||||
"github.com/rs/zerolog/log"
|
||||
@@ -23,26 +25,28 @@ import (
|
||||
)
|
||||
|
||||
type Application struct {
|
||||
Config scn.Config
|
||||
Gin *ginext.GinWrapper
|
||||
Database *DBPool
|
||||
Pusher push.NotificationClient
|
||||
AndroidPublisher google.AndroidPublisherClient
|
||||
Jobs []Job
|
||||
stopChan chan bool
|
||||
Port string
|
||||
IsRunning *syncext.AtomicBool
|
||||
RequestLogQueue chan models.RequestLog
|
||||
MainDatabaseLock golock.RWMutex
|
||||
Config scn.Config
|
||||
Gin *ginext.GinWrapper
|
||||
Database *DBPool
|
||||
Pusher push.NotificationClient
|
||||
AndroidPublisher google.AndroidPublisherClient
|
||||
Jobs []Job
|
||||
stopChan chan bool
|
||||
Port string
|
||||
IsRunning *syncext.AtomicBool
|
||||
RequestLogQueue chan models.RequestLog
|
||||
MainDatabaseLock golock.RWMutex
|
||||
keyTokenLastUsedDCI *dataext.SyncMap[models.KeyTokenID, *dataext.DelayedCombiningInvoker]
|
||||
}
|
||||
|
||||
func NewApp(db *DBPool) *Application {
|
||||
return &Application{
|
||||
Database: db,
|
||||
stopChan: make(chan bool),
|
||||
IsRunning: syncext.NewAtomicBool(false),
|
||||
RequestLogQueue: make(chan models.RequestLog, 8192),
|
||||
MainDatabaseLock: golock.NewCASMutex(),
|
||||
Database: db,
|
||||
stopChan: make(chan bool),
|
||||
IsRunning: syncext.NewAtomicBool(false),
|
||||
RequestLogQueue: make(chan models.RequestLog, 8192),
|
||||
MainDatabaseLock: golock.NewCASMutex(),
|
||||
keyTokenLastUsedDCI: dataext.NewSyncMap[models.KeyTokenID, *dataext.DelayedCombiningInvoker](),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -117,6 +121,7 @@ func (app *Application) Run() {
|
||||
} else {
|
||||
log.Info().Msg("Manually stopped HTTP-Server")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// ================== STOP JOBS ==================
|
||||
@@ -125,6 +130,25 @@ func (app *Application) Run() {
|
||||
job.Stop()
|
||||
}
|
||||
|
||||
// ================== STOP Keytoken DCI ==================
|
||||
|
||||
func() {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
for i, dci := range app.keyTokenLastUsedDCI.GetAllValues() {
|
||||
running := dci.ExecuteNow()
|
||||
if running {
|
||||
err := dci.WaitForCompletion(ctx)
|
||||
if err != nil {
|
||||
log.Err(err).Msg("Failed to wait for keytoken dci job")
|
||||
} else {
|
||||
log.Info().Msg(fmt.Sprintf("Succesfully waited for finishing of KeyToken-DCI job [%d]", i))
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// ================== STOP DB ==================
|
||||
|
||||
{
|
||||
@@ -223,11 +247,7 @@ func (app *Application) getPermissions(ctx db.TxContext, hdr string) (models.Per
|
||||
}
|
||||
|
||||
if tok != nil {
|
||||
|
||||
err = app.Database.Primary.UpdateKeyTokenLastUsed(ctx, tok.KeyTokenID)
|
||||
if err != nil {
|
||||
return models.PermissionSet{}, err
|
||||
}
|
||||
go app.updateKeyTokenLastUsed(tok.KeyTokenID)
|
||||
|
||||
return models.PermissionSet{Token: tok}, nil
|
||||
}
|
||||
@@ -235,6 +255,37 @@ func (app *Application) getPermissions(ctx db.TxContext, hdr string) (models.Per
|
||||
return models.NewEmptyPermissions(), nil
|
||||
}
|
||||
|
||||
func (app *Application) updateKeyTokenLastUsed(keyTokenID models.KeyTokenID) {
|
||||
app.keyTokenLastUsedDCI.GetAndSetIfNotContainsFunc(keyTokenID, func() *dataext.DelayedCombiningInvoker {
|
||||
return dataext.NewDelayedCombiningInvoker(func() {
|
||||
islock := app.MainDatabaseLock.TryLockWithTimeout(10 * time.Second)
|
||||
if !islock {
|
||||
exerr.New(exerr.TypeInternal, "Failed to lock {MainDatabaseLock} [rw] for {updateKeyTokenLastUsed}").Id("KeyTokenID", keyTokenID).Print()
|
||||
return
|
||||
}
|
||||
defer app.MainDatabaseLock.Unlock()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
_, _ = simplectx.Run(ctx, func(ctx db.TxContext) (any, error) {
|
||||
err := app.Database.Primary.UpdateKeyTokenLastUsed(ctx, keyTokenID)
|
||||
if err != nil {
|
||||
exerr.Wrap(err, "Failed to update keytoken last used").Id("KeyTokenID", keyTokenID).Print()
|
||||
}
|
||||
return nil, nil
|
||||
})
|
||||
}, 5*time.Second, 30*time.Second)
|
||||
}).Request()
|
||||
|
||||
if app.keyTokenLastUsedDCI.Count() > 1024 {
|
||||
rmCount := app.keyTokenLastUsedDCI.DeleteIf(func(key models.KeyTokenID, val *dataext.DelayedCombiningInvoker) bool {
|
||||
return !val.HasPendingRequests()
|
||||
})
|
||||
log.Debug().Msg(fmt.Sprintf("Cleaned up %d old keytoken-DCI jobs", rmCount))
|
||||
}
|
||||
}
|
||||
|
||||
func (app *Application) GetOrCreateChannel(ctx *AppContext, userid models.UserID, displayChanName string, intChanName string) (models.Channel, error) {
|
||||
existingChan, err := app.Database.Primary.GetChannelByName(ctx, userid, intChanName)
|
||||
if err != nil {
|
||||
|
@@ -55,7 +55,6 @@ func (app *Application) DoRequest(gectx *ginext.AppContext, g *gin.Context, lock
|
||||
defer app.MainDatabaseLock.RUnlock()
|
||||
|
||||
} else if lockmode == models.TLockReadWrite {
|
||||
|
||||
islock := app.MainDatabaseLock.TryLockWithTimeout(dl.Sub(time.Now()))
|
||||
if !islock {
|
||||
return ginresp.APIError(g, 500, apierr.INTERNAL_EXCEPTION, "Failed to lock {MainDatabaseLock} [rw]", nil)
|
||||
|
Reference in New Issue
Block a user