diff --git a/server/README.md b/server/README.md index 6f39202..d33efea 100644 --- a/server/README.md +++ b/server/README.md @@ -3,9 +3,8 @@ //TODO - - background job for re-delivery - POST::/messages - - https://firebase.google.com/docs/cloud-messaging/send-message#rest + - https://firebase.google.com/docs/cloud-messaging/send-message#rest !! - List subscriptions on all owned channels (RESTful?) - deploy - full-text-search: https://www.sqlite.org/fts5.html#contentless_tables diff --git a/server/api/handler/api.go b/server/api/handler/api.go index 5f94803..724713c 100644 --- a/server/api/handler/api.go +++ b/server/api/handler/api.go @@ -1110,6 +1110,11 @@ func (h APIHandler) DeleteMessage(g *gin.Context) ginresp.HTTPResponse { return ginresp.InternAPIError(500, apierr.DATABASE_ERROR, "Failed to delete message", err) } + err = h.database.CancelPendingDeliveries(ctx, msg.SCNMessageID) + if err != nil { + return ginresp.InternAPIError(500, apierr.DATABASE_ERROR, "Failed to cancel deliveries", err) + } + return ctx.FinishSuccess(ginresp.JSON(http.StatusOK, msg.FullJSON())) } diff --git a/server/api/handler/message.go b/server/api/handler/message.go index b7bc0ac..7f57516 100644 --- a/server/api/handler/message.go +++ b/server/api/handler/message.go @@ -5,7 +5,6 @@ import ( "blackforestbytes.com/simplecloudnotifier/common/ginresp" "blackforestbytes.com/simplecloudnotifier/db" "blackforestbytes.com/simplecloudnotifier/logic" - "blackforestbytes.com/simplecloudnotifier/models" "database/sql" "fmt" "github.com/gin-gonic/gin" @@ -277,7 +276,7 @@ func (h MessageHandler) sendMessageInternal(ctx *logic.AppContext, UserID *int64 for _, client := range clients { - fcmDelivID, err := h.deliverMessage(ctx, client, msg) + fcmDelivID, err := h.app.DeliverMessage(ctx, client, msg) if err != nil { _, err = h.database.CreateRetryDelivery(ctx, client, msg) if err != nil { @@ -306,15 +305,3 @@ func (h MessageHandler) sendMessageInternal(ctx *logic.AppContext, UserID *int64 SCNMessageID: msg.SCNMessageID, })) } - -func (h MessageHandler) deliverMessage(ctx *logic.AppContext, client models.Client, msg models.Message) (*string, error) { - if client.FCMToken != nil { - fcmDelivID, err := h.app.Firebase.SendNotification(ctx, client, msg) - if err != nil { - return nil, err - } - return langext.Ptr(fcmDelivID), nil - } else { - return langext.Ptr(""), nil - } -} diff --git a/server/cmd/scnserver/main.go b/server/cmd/scnserver/main.go index 135d51f..55a3aac 100644 --- a/server/cmd/scnserver/main.go +++ b/server/cmd/scnserver/main.go @@ -7,6 +7,7 @@ import ( "blackforestbytes.com/simplecloudnotifier/common/ginext" "blackforestbytes.com/simplecloudnotifier/db" "blackforestbytes.com/simplecloudnotifier/firebase" + "blackforestbytes.com/simplecloudnotifier/jobs" "blackforestbytes.com/simplecloudnotifier/logic" "fmt" "github.com/rs/zerolog/log" @@ -36,7 +37,9 @@ func main() { fb := firebase.NewFirebaseApp() - app.Init(conf, ginengine, &fb) + jobRetry := jobs.NewDeliveryRetryJob(app) + + app.Init(conf, ginengine, &fb, []logic.Job{jobRetry}) router.Init(ginengine) diff --git a/server/db/deliveries.go b/server/db/deliveries.go index 3105ce9..0f7effb 100644 --- a/server/db/deliveries.go +++ b/server/db/deliveries.go @@ -1,6 +1,7 @@ package db import ( + scn "blackforestbytes.com/simplecloudnotifier" "blackforestbytes.com/simplecloudnotifier/models" "gogs.mikescher.com/BlackForestBytes/goext/langext" "time" @@ -13,7 +14,7 @@ func (db *Database) CreateRetryDelivery(ctx TxContext, client models.Client, msg } now := time.Now().UTC() - next := now.Add(5 * time.Second) + next := scn.NextDeliveryTimestamp(now) res, err := tx.ExecContext(ctx, "INSERT INTO deliveries (scn_message_id, receiver_user_id, receiver_client_id, timestamp_created, timestamp_finalized, status, fcm_message_id, next_delivery) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", msg.SCNMessageID, @@ -86,3 +87,92 @@ func (db *Database) CreateSuccessDelivery(ctx TxContext, client models.Client, m FCMMessageID: langext.Ptr(fcmDelivID), }, nil } + +func (db *Database) ListRetrieableDeliveries(ctx TxContext, pageSize int) ([]models.Delivery, error) { + tx, err := ctx.GetOrCreateTransaction(db) + if err != nil { + return nil, err + } + + rows, err := tx.QueryContext(ctx, "SELECT * FROM deliveries WHERE status = 'RETRY' AND next_delivery < ? LIMIT ?", + time2DB(time.Now()), + pageSize) + if err != nil { + return nil, err + } + + data, err := models.DecodeDeliveries(rows) + if err != nil { + return nil, err + } + + return data, nil +} + +func (db *Database) SetDeliverySuccess(ctx TxContext, delivery models.Delivery, fcmDelivID string) error { + tx, err := ctx.GetOrCreateTransaction(db) + if err != nil { + return err + } + + _, err = tx.ExecContext(ctx, "UPDATE deliveries SET status = 'SUCCESS', next_delivery = NULL, retry_count = ?, timestamp_finalized = ?, fcm_message_id = ? WHERE delivery_id = ?", + delivery.RetryCount+1, + time2DB(time.Now()), + fcmDelivID, + delivery.DeliveryID) + if err != nil { + return err + } + + return nil +} + +func (db *Database) SetDeliveryFailed(ctx TxContext, delivery models.Delivery) error { + tx, err := ctx.GetOrCreateTransaction(db) + if err != nil { + return err + } + + _, err = tx.ExecContext(ctx, "UPDATE deliveries SET status = 'FAILED', next_delivery = NULL, retry_count = ?, timestamp_finalized = ? WHERE delivery_id = ?", + delivery.RetryCount+1, + time2DB(time.Now()), + delivery.DeliveryID) + if err != nil { + return err + } + + return nil +} + +func (db *Database) SetDeliveryRetry(ctx TxContext, delivery models.Delivery) error { + tx, err := ctx.GetOrCreateTransaction(db) + if err != nil { + return err + } + + _, err = tx.ExecContext(ctx, "UPDATE deliveries SET status = 'RETRY', next_delivery = ?, retry_count = ? WHERE delivery_id = ?", + scn.NextDeliveryTimestamp(time.Now()), + delivery.RetryCount+1, + delivery.DeliveryID) + if err != nil { + return err + } + + return nil +} + +func (db *Database) CancelPendingDeliveries(ctx TxContext, scnMessageID int64) error { + tx, err := ctx.GetOrCreateTransaction(db) + if err != nil { + return err + } + + _, err = tx.ExecContext(ctx, "UPDATE deliveries SET status = 'FAILED', next_delivery = NULL, timestamp_finalized = ? WHERE scn_message_id = ? AND status = 'RETRY'", + time.Now(), + scnMessageID) + if err != nil { + return err + } + + return nil +} diff --git a/server/jobs/DeliveryRetryJob.go b/server/jobs/DeliveryRetryJob.go new file mode 100644 index 0000000..5aea16d --- /dev/null +++ b/server/jobs/DeliveryRetryJob.go @@ -0,0 +1,145 @@ +package jobs + +import ( + "blackforestbytes.com/simplecloudnotifier/logic" + "blackforestbytes.com/simplecloudnotifier/models" + "github.com/rs/zerolog/log" + "time" +) + +type DeliveryRetryJob struct { + app *logic.Application + running bool + stopChannel chan bool +} + +func NewDeliveryRetryJob(app *logic.Application) *DeliveryRetryJob { + return &DeliveryRetryJob{ + app: app, + running: true, + stopChannel: make(chan bool, 8), + } +} + +func (j *DeliveryRetryJob) Start() { + if !j.running { + panic("cannot re-start job") + } + + go j.mainLoop() +} + +func (j *DeliveryRetryJob) Stop() { + j.running = false +} + +func (j *DeliveryRetryJob) mainLoop() { + fastRerun := false + + for j.running { + if fastRerun { + j.sleep(1 * time.Second) + } else { + j.sleep(30 * time.Second) + } + if !j.running { + return + } + + fastRerun = j.run() + + } +} + +func (j *DeliveryRetryJob) run() bool { + defer func() { + if rec := recover(); rec != nil { + log.Error().Interface("rec", rec).Msg("Recovered panic in DeliveryRetryJob") + } + }() + + ctx := j.app.NewSimpleTransactionContext(10 * time.Second) + defer ctx.Cancel() + + deliveries, err := j.app.Database.ListRetrieableDeliveries(ctx, 32) + if err != nil { + log.Err(err).Msg("Failed to query retrieable deliveries") + return false + } + + err = ctx.CommitTransaction() + if err != nil { + log.Err(err).Msg("Failed to commit") + return false + } + + if len(deliveries) == 32 { + log.Warn().Msg("The delivery pipeline is greater than 32 (too much for a single cycle)") + } + + for _, delivery := range deliveries { + j.redeliver(ctx, delivery) + } + + return len(deliveries) == 32 +} + +func (j *DeliveryRetryJob) redeliver(ctx *logic.SimpleContext, delivery models.Delivery) { + + client, err := j.app.Database.GetClient(ctx, delivery.ReceiverUserID, delivery.ReceiverClientID) + if err != nil { + log.Err(err).Int64("ReceiverUserID", delivery.ReceiverUserID).Int64("ReceiverClientID", delivery.ReceiverClientID).Msg("Failed to get client") + ctx.RollbackTransaction() + return + } + + msg, err := j.app.Database.GetMessage(ctx, delivery.SCNMessageID) + if err != nil { + log.Err(err).Int64("SCNMessageID", delivery.SCNMessageID).Msg("Failed to get message") + ctx.RollbackTransaction() + return + } + + fcmDelivID, err := j.app.DeliverMessage(ctx, client, msg) + if err != nil { + err = j.app.Database.SetDeliverySuccess(ctx, delivery, *fcmDelivID) + if err != nil { + log.Err(err).Int64("SCNMessageID", delivery.SCNMessageID).Int64("DeliveryID", delivery.DeliveryID).Msg("Failed to update delivery") + ctx.RollbackTransaction() + return + } + } else if delivery.RetryCount+1 > delivery.MaxRetryCount() { + err = j.app.Database.SetDeliveryFailed(ctx, delivery) + if err != nil { + log.Err(err).Int64("SCNMessageID", delivery.SCNMessageID).Int64("DeliveryID", delivery.DeliveryID).Msg("Failed to update delivery") + ctx.RollbackTransaction() + return + } + } else { + err = j.app.Database.SetDeliveryRetry(ctx, delivery) + if err != nil { + log.Err(err).Int64("SCNMessageID", delivery.SCNMessageID).Int64("DeliveryID", delivery.DeliveryID).Msg("Failed to update delivery") + ctx.RollbackTransaction() + return + } + } + + err = ctx.CommitTransaction() + +} + +func (j *DeliveryRetryJob) sleep(d time.Duration) { + if !j.running { + return + } + afterCh := time.After(d) + for { + select { + case <-j.stopChannel: + j.stopChannel <- true + return + case <-afterCh: + return + } + } +} diff --git a/server/logic/context.go b/server/logic/appcontext.go similarity index 100% rename from server/logic/context.go rename to server/logic/appcontext.go diff --git a/server/logic/application.go b/server/logic/application.go index aa71694..3a30540 100644 --- a/server/logic/application.go +++ b/server/logic/application.go @@ -29,6 +29,7 @@ type Application struct { Database *db.Database Firebase *firebase.App DefaultChannel string + Jobs []Job } func NewApp(db *db.Database) *Application { @@ -38,10 +39,11 @@ func NewApp(db *db.Database) *Application { } } -func (app *Application) Init(cfg scn.Config, g *gin.Engine, fb *firebase.App) { +func (app *Application) Init(cfg scn.Config, g *gin.Engine, fb *firebase.App, jobs []Job) { app.Config = cfg app.Gin = g app.Firebase = fb + app.Jobs = jobs } func (app *Application) Run() { @@ -59,22 +61,33 @@ func (app *Application) Run() { stop := make(chan os.Signal, 1) signal.Notify(stop, os.Interrupt, syscall.SIGTERM) + + for _, job := range app.Jobs { + job.Start() + } + select { case <-stop: ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() + log.Info().Msg("Stopping HTTP-Server") + err := httpserver.Shutdown(ctx) + if err != nil { log.Info().Err(err).Msg("Error while stopping the http-server") - return + } else { + log.Info().Msg("Stopped HTTP-Server") } - log.Info().Msg("Stopped HTTP-Server") case err := <-errChan: log.Error().Err(err).Msg("HTTP-Server failed") } + for _, job := range app.Jobs { + job.Start() + } } func (app *Application) GenerateRandomAuthKey() string { @@ -147,6 +160,11 @@ func (app *Application) StartRequest(g *gin.Context, uri any, query any, body an return actx, nil } +func (app *Application) NewSimpleTransactionContext(timeout time.Duration) *SimpleContext { + ictx, cancel := context.WithTimeout(context.Background(), timeout) + return CreateSimpleContext(ictx, cancel) +} + func (app *Application) getPermissions(ctx *AppContext, hdr string) (PermissionSet, error) { if hdr == "" { return NewEmptyPermissions(), nil @@ -222,3 +240,15 @@ func (app *Application) NormalizeUsername(v string) string { return v } + +func (app *Application) DeliverMessage(ctx context.Context, client models.Client, msg models.Message) (*string, error) { + if client.FCMToken != nil { + fcmDelivID, err := app.Firebase.SendNotification(ctx, client, msg) + if err != nil { + return nil, err + } + return langext.Ptr(fcmDelivID), nil + } else { + return langext.Ptr(""), nil + } +} diff --git a/server/logic/jobs.go b/server/logic/jobs.go new file mode 100644 index 0000000..8f7cc6a --- /dev/null +++ b/server/logic/jobs.go @@ -0,0 +1,6 @@ +package logic + +type Job interface { + Start() + Stop() +} diff --git a/server/logic/simplecontext.go b/server/logic/simplecontext.go new file mode 100644 index 0000000..ac3a112 --- /dev/null +++ b/server/logic/simplecontext.go @@ -0,0 +1,94 @@ +package logic + +import ( + "blackforestbytes.com/simplecloudnotifier/db" + "context" + "database/sql" + "errors" + "github.com/rs/zerolog/log" + "time" +) + +type SimpleContext struct { + inner context.Context + cancelFunc context.CancelFunc + cancelled bool + transaction *sql.Tx +} + +func CreateSimpleContext(innerCtx context.Context, cancelFn context.CancelFunc) *SimpleContext { + return &SimpleContext{ + inner: innerCtx, + cancelFunc: cancelFn, + cancelled: false, + transaction: nil, + } +} + +func (sc *SimpleContext) Deadline() (deadline time.Time, ok bool) { + return sc.inner.Deadline() +} + +func (sc *SimpleContext) Done() <-chan struct{} { + return sc.inner.Done() +} + +func (sc *SimpleContext) Err() error { + return sc.inner.Err() +} + +func (sc *SimpleContext) Value(key any) any { + return sc.inner.Value(key) +} + +func (sc *SimpleContext) Cancel() { + sc.cancelled = true + if sc.transaction != nil { + log.Error().Msg("Rollback transaction") + err := sc.transaction.Rollback() + if err != nil { + panic("failed to rollback transaction: " + err.Error()) + } + sc.transaction = nil + } + sc.cancelFunc() +} + +func (sc *SimpleContext) GetOrCreateTransaction(db *db.Database) (*sql.Tx, error) { + if sc.cancelled { + return nil, errors.New("context cancelled") + } + if sc.transaction != nil { + return sc.transaction, nil + } + tx, err := db.BeginTx(sc) + if err != nil { + return nil, err + } + sc.transaction = tx + return tx, nil +} + +func (sc *SimpleContext) CommitTransaction() error { + if sc.transaction == nil { + return nil + } + err := sc.transaction.Commit() + if err != nil { + return err + } + sc.transaction = nil + return nil +} + +func (sc *SimpleContext) RollbackTransaction() { + if sc.transaction == nil { + return + } + err := sc.transaction.Rollback() + if err != nil { + panic(err) + } + sc.transaction = nil + return +} diff --git a/server/models/delivery.go b/server/models/delivery.go index 6aaa645..c16cb8f 100644 --- a/server/models/delivery.go +++ b/server/models/delivery.go @@ -43,6 +43,10 @@ func (d Delivery) JSON() DeliveryJSON { } } +func (d Delivery) MaxRetryCount() int { + return 5 +} + type DeliveryJSON struct { DeliveryID int64 `json:"delivery_id"` SCNMessageID int64 `json:"scn_message_id"` diff --git a/server/util.go b/server/util.go index 885d186..9d75dbc 100644 --- a/server/util.go +++ b/server/util.go @@ -8,3 +8,7 @@ import ( func QuotaDayString() string { return time.Now().In(timeext.TimezoneBerlin).Format("2006-01-02") } + +func NextDeliveryTimestamp(now time.Time) time.Time { + return now.Add(5 * time.Second) +}