requests-log db
This commit is contained in:
@@ -3,58 +3,106 @@ package jobs
|
||||
import (
|
||||
"blackforestbytes.com/simplecloudnotifier/logic"
|
||||
"blackforestbytes.com/simplecloudnotifier/models"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/rs/zerolog/log"
|
||||
"gogs.mikescher.com/BlackForestBytes/goext/syncext"
|
||||
"time"
|
||||
)
|
||||
|
||||
type DeliveryRetryJob struct {
|
||||
app *logic.Application
|
||||
running bool
|
||||
stopChannel chan bool
|
||||
app *logic.Application
|
||||
name string
|
||||
isRunning *syncext.AtomicBool
|
||||
isStarted bool
|
||||
sigChannel chan string
|
||||
}
|
||||
|
||||
func NewDeliveryRetryJob(app *logic.Application) *DeliveryRetryJob {
|
||||
return &DeliveryRetryJob{
|
||||
app: app,
|
||||
running: true,
|
||||
stopChannel: make(chan bool, 8),
|
||||
app: app,
|
||||
name: "DeliveryRetryJob",
|
||||
isRunning: syncext.NewAtomicBool(false),
|
||||
isStarted: false,
|
||||
sigChannel: make(chan string),
|
||||
}
|
||||
}
|
||||
|
||||
func (j *DeliveryRetryJob) Start() {
|
||||
if !j.running {
|
||||
panic("cannot re-start job")
|
||||
func (j *DeliveryRetryJob) Start() error {
|
||||
if j.isRunning.Get() {
|
||||
return errors.New("job already running")
|
||||
}
|
||||
if j.isStarted {
|
||||
return errors.New("job was already started") // re-start after stop is not allowed
|
||||
}
|
||||
|
||||
j.isStarted = true
|
||||
|
||||
go j.mainLoop()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (j *DeliveryRetryJob) Stop() {
|
||||
j.running = false
|
||||
log.Info().Msg(fmt.Sprintf("Stopping Job [%s]", j.name))
|
||||
syncext.WriteNonBlocking(j.sigChannel, "stop")
|
||||
j.isRunning.Wait(false)
|
||||
log.Info().Msg(fmt.Sprintf("Stopped Job [%s]", j.name))
|
||||
}
|
||||
|
||||
func (j *DeliveryRetryJob) Running() bool {
|
||||
return j.isRunning.Get()
|
||||
}
|
||||
|
||||
func (j *DeliveryRetryJob) mainLoop() {
|
||||
fastRerun := false
|
||||
j.isRunning.Set(true)
|
||||
|
||||
for j.running {
|
||||
var fastRerun bool = false
|
||||
var err error = nil
|
||||
|
||||
for {
|
||||
interval := 30 * time.Second
|
||||
if fastRerun {
|
||||
j.sleep(1 * time.Second)
|
||||
} else {
|
||||
j.sleep(30 * time.Second)
|
||||
}
|
||||
if !j.running {
|
||||
return
|
||||
interval = 1 * time.Second
|
||||
}
|
||||
|
||||
fastRerun = j.run()
|
||||
signal, okay := syncext.ReadChannelWithTimeout(j.sigChannel, interval)
|
||||
if okay {
|
||||
if signal == "stop" {
|
||||
log.Info().Msg(fmt.Sprintf("Job [%s] received <stop> signal", j.name))
|
||||
break
|
||||
} else if signal == "run" {
|
||||
log.Info().Msg(fmt.Sprintf("Job [%s] received <run> signal", j.name))
|
||||
continue
|
||||
} else {
|
||||
log.Error().Msg(fmt.Sprintf("Received unknown job signal: <%s> in job [%s]", signal, j.name))
|
||||
}
|
||||
}
|
||||
|
||||
log.Debug().Msg(fmt.Sprintf("Run job [%s]", j.name))
|
||||
|
||||
t0 := time.Now()
|
||||
fastRerun, err = j.execute()
|
||||
if err != nil {
|
||||
log.Err(err).Msg(fmt.Sprintf("Failed to execute job [%s]: %s", j.name, err.Error()))
|
||||
} else {
|
||||
t1 := time.Now()
|
||||
log.Debug().Msg(fmt.Sprintf("Job [%s] finished successfully after %f minutes", j.name, (t1.Sub(t0)).Minutes()))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
log.Info().Msg(fmt.Sprintf("Job [%s] exiting main-loop", j.name))
|
||||
|
||||
j.isRunning.Set(false)
|
||||
}
|
||||
|
||||
func (j *DeliveryRetryJob) run() bool {
|
||||
func (j *DeliveryRetryJob) execute() (fastrr bool, err error) {
|
||||
defer func() {
|
||||
if rec := recover(); rec != nil {
|
||||
log.Error().Interface("recover", rec).Msg("Recovered panic in DeliveryRetryJob")
|
||||
err = errors.New(fmt.Sprintf("Panic recovered: %v", rec))
|
||||
fastrr = false
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -63,14 +111,12 @@ func (j *DeliveryRetryJob) run() bool {
|
||||
|
||||
deliveries, err := j.app.Database.Primary.ListRetrieableDeliveries(ctx, 32)
|
||||
if err != nil {
|
||||
log.Err(err).Msg("Failed to query retrieable deliveries")
|
||||
return false
|
||||
return false, err
|
||||
}
|
||||
|
||||
err = ctx.CommitTransaction()
|
||||
if err != nil {
|
||||
log.Err(err).Msg("Failed to commit")
|
||||
return false
|
||||
return false, err
|
||||
}
|
||||
|
||||
if len(deliveries) == 32 {
|
||||
@@ -81,7 +127,7 @@ func (j *DeliveryRetryJob) run() bool {
|
||||
j.redeliver(ctx, delivery)
|
||||
}
|
||||
|
||||
return len(deliveries) == 32
|
||||
return len(deliveries) == 32, nil
|
||||
}
|
||||
|
||||
func (j *DeliveryRetryJob) redeliver(ctx *logic.SimpleContext, delivery models.Delivery) {
|
||||
@@ -139,19 +185,3 @@ func (j *DeliveryRetryJob) redeliver(ctx *logic.SimpleContext, delivery models.D
|
||||
err = ctx.CommitTransaction()
|
||||
|
||||
}
|
||||
|
||||
func (j *DeliveryRetryJob) sleep(d time.Duration) {
|
||||
if !j.running {
|
||||
return
|
||||
}
|
||||
afterCh := time.After(d)
|
||||
for {
|
||||
select {
|
||||
case <-j.stopChannel:
|
||||
j.stopChannel <- true
|
||||
return
|
||||
case <-afterCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
114
scnserver/jobs/RequestLogCleanupJob.go
Normal file
114
scnserver/jobs/RequestLogCleanupJob.go
Normal file
@@ -0,0 +1,114 @@
|
||||
package jobs
|
||||
|
||||
import (
|
||||
"blackforestbytes.com/simplecloudnotifier/logic"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/rs/zerolog/log"
|
||||
"gogs.mikescher.com/BlackForestBytes/goext/syncext"
|
||||
"time"
|
||||
)
|
||||
|
||||
type RequestLogCleanupJob struct {
|
||||
app *logic.Application
|
||||
name string
|
||||
isRunning *syncext.AtomicBool
|
||||
isStarted bool
|
||||
sigChannel chan string
|
||||
}
|
||||
|
||||
func NewRequestLogCleanupJob(app *logic.Application) *DeliveryRetryJob {
|
||||
return &DeliveryRetryJob{
|
||||
app: app,
|
||||
name: "RequestLogCleanupJob",
|
||||
isRunning: syncext.NewAtomicBool(false),
|
||||
isStarted: false,
|
||||
sigChannel: make(chan string),
|
||||
}
|
||||
}
|
||||
|
||||
func (j *RequestLogCleanupJob) Start() error {
|
||||
if j.isRunning.Get() {
|
||||
return errors.New("job already running")
|
||||
}
|
||||
if j.isStarted {
|
||||
return errors.New("job was already started") // re-start after stop is not allowed
|
||||
}
|
||||
|
||||
j.isStarted = true
|
||||
|
||||
go j.mainLoop()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (j *RequestLogCleanupJob) Stop() {
|
||||
log.Info().Msg(fmt.Sprintf("Stopping Job [%s]", j.name))
|
||||
syncext.WriteNonBlocking(j.sigChannel, "stop")
|
||||
j.isRunning.Wait(false)
|
||||
log.Info().Msg(fmt.Sprintf("Stopped Job [%s]", j.name))
|
||||
}
|
||||
|
||||
func (j *RequestLogCleanupJob) Running() bool {
|
||||
return j.isRunning.Get()
|
||||
}
|
||||
|
||||
func (j *RequestLogCleanupJob) mainLoop() {
|
||||
j.isRunning.Set(true)
|
||||
|
||||
var err error = nil
|
||||
|
||||
for {
|
||||
interval := 1 * time.Hour
|
||||
|
||||
signal, okay := syncext.ReadChannelWithTimeout(j.sigChannel, interval)
|
||||
if okay {
|
||||
if signal == "stop" {
|
||||
log.Info().Msg(fmt.Sprintf("Job [%s] received <stop> signal", j.name))
|
||||
break
|
||||
} else if signal == "run" {
|
||||
log.Info().Msg(fmt.Sprintf("Job [%s] received <run> signal", j.name))
|
||||
continue
|
||||
} else {
|
||||
log.Error().Msg(fmt.Sprintf("Received unknown job signal: <%s> in job [%s]", signal, j.name))
|
||||
}
|
||||
}
|
||||
|
||||
log.Debug().Msg(fmt.Sprintf("Run job [%s]", j.name))
|
||||
|
||||
t0 := time.Now()
|
||||
err = j.execute()
|
||||
if err != nil {
|
||||
log.Err(err).Msg(fmt.Sprintf("Failed to execute job [%s]: %s", j.name, err.Error()))
|
||||
} else {
|
||||
t1 := time.Now()
|
||||
log.Debug().Msg(fmt.Sprintf("Job [%s] finished successfully after %f minutes", j.name, (t1.Sub(t0)).Minutes()))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
log.Info().Msg(fmt.Sprintf("Job [%s] exiting main-loop", j.name))
|
||||
|
||||
j.isRunning.Set(false)
|
||||
}
|
||||
|
||||
func (j *RequestLogCleanupJob) execute() (err error) {
|
||||
defer func() {
|
||||
if rec := recover(); rec != nil {
|
||||
log.Error().Interface("recover", rec).Msg("Recovered panic in DeliveryRetryJob")
|
||||
err = errors.New(fmt.Sprintf("Panic recovered: %v", rec))
|
||||
}
|
||||
}()
|
||||
|
||||
ctx := j.app.NewSimpleTransactionContext(10 * time.Second)
|
||||
defer ctx.Cancel()
|
||||
|
||||
deleted, err := j.app.Database.Requests.Cleanup(ctx, j.app.Config.ReqLogHistoryMaxCount, j.app.Config.ReqLogHistoryMaxDuration)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Warn().Msgf("Deleted %d entries from the request-log table", deleted)
|
||||
|
||||
return nil
|
||||
}
|
100
scnserver/jobs/RequestLogCollectorJob.go
Normal file
100
scnserver/jobs/RequestLogCollectorJob.go
Normal file
@@ -0,0 +1,100 @@
|
||||
package jobs
|
||||
|
||||
import (
|
||||
"blackforestbytes.com/simplecloudnotifier/logic"
|
||||
"blackforestbytes.com/simplecloudnotifier/models"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/rs/zerolog/log"
|
||||
"gogs.mikescher.com/BlackForestBytes/goext/syncext"
|
||||
"time"
|
||||
)
|
||||
|
||||
type RequestLogCollectorJob struct {
|
||||
app *logic.Application
|
||||
name string
|
||||
isRunning *syncext.AtomicBool
|
||||
isStarted bool
|
||||
sigChannel chan string
|
||||
}
|
||||
|
||||
func NewRequestLogCollectorJob(app *logic.Application) *RequestLogCollectorJob {
|
||||
return &RequestLogCollectorJob{
|
||||
app: app,
|
||||
name: "RequestLogCollectorJob",
|
||||
isRunning: syncext.NewAtomicBool(false),
|
||||
isStarted: false,
|
||||
sigChannel: make(chan string),
|
||||
}
|
||||
}
|
||||
|
||||
func (j *RequestLogCollectorJob) Start() error {
|
||||
if j.isRunning.Get() {
|
||||
return errors.New("job already running")
|
||||
}
|
||||
if j.isStarted {
|
||||
return errors.New("job was already started") // re-start after stop is not allowed
|
||||
}
|
||||
|
||||
j.isStarted = true
|
||||
|
||||
go j.mainLoop()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (j *RequestLogCollectorJob) Stop() {
|
||||
log.Info().Msg(fmt.Sprintf("Stopping Job [%s]", j.name))
|
||||
syncext.WriteNonBlocking(j.sigChannel, "stop")
|
||||
j.isRunning.Wait(false)
|
||||
log.Info().Msg(fmt.Sprintf("Stopped Job [%s]", j.name))
|
||||
}
|
||||
|
||||
func (j *RequestLogCollectorJob) Running() bool {
|
||||
return j.isRunning.Get()
|
||||
}
|
||||
|
||||
func (j *RequestLogCollectorJob) mainLoop() {
|
||||
j.isRunning.Set(true)
|
||||
|
||||
mainLoop:
|
||||
for {
|
||||
select {
|
||||
case signal := <-j.sigChannel:
|
||||
if signal == "stop" {
|
||||
log.Info().Msg(fmt.Sprintf("Job [%s] received <stop> signal", j.name))
|
||||
break mainLoop
|
||||
} else if signal == "run" {
|
||||
log.Info().Msg(fmt.Sprintf("Job [%s] received <run> signal", j.name))
|
||||
continue
|
||||
} else {
|
||||
log.Error().Msg(fmt.Sprintf("Received unknown job signal: <%s> in job [%s]", signal, j.name))
|
||||
}
|
||||
case obj := <-j.app.RequestLogQueue:
|
||||
err := j.insertLog(obj)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg(fmt.Sprintf("Failed to insert RequestLog {%s} into DB", obj.RequestID))
|
||||
} else {
|
||||
log.Debug().Msg(fmt.Sprintf("Inserted RequestLog '%s' into DB", obj.RequestID))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
log.Info().Msg(fmt.Sprintf("Job [%s] exiting main-loop", j.name))
|
||||
|
||||
j.isRunning.Set(false)
|
||||
}
|
||||
|
||||
func (j *RequestLogCollectorJob) insertLog(rl models.RequestLog) error {
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
_, err := j.app.Database.Requests.InsertRequestLog(ctx, rl.DB())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
Reference in New Issue
Block a user