ListMessages()
This commit is contained in:
133
server/db/cursortoken/token.go
Normal file
133
server/db/cursortoken/token.go
Normal file
@@ -0,0 +1,133 @@
|
||||
package cursortoken
|
||||
|
||||
import (
|
||||
"encoding/base32"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Mode string
|
||||
|
||||
const (
|
||||
CTMStart = "START"
|
||||
CTMNormal = "NORMAL"
|
||||
CTMEnd = "END"
|
||||
)
|
||||
|
||||
type CursorToken struct {
|
||||
Mode Mode
|
||||
Timestamp int64
|
||||
Id int64
|
||||
Direction string
|
||||
}
|
||||
|
||||
type cursorTokenSerialize struct {
|
||||
Timestamp *int64 `json:"ts,omitempty"`
|
||||
Id *int64 `json:"id,omitempty"`
|
||||
Direction *string `json:"dir,omitempty"`
|
||||
}
|
||||
|
||||
func Start() CursorToken {
|
||||
return CursorToken{
|
||||
Mode: CTMStart,
|
||||
Timestamp: 0,
|
||||
Id: 0,
|
||||
Direction: "",
|
||||
}
|
||||
}
|
||||
|
||||
func End() CursorToken {
|
||||
return CursorToken{
|
||||
Mode: CTMEnd,
|
||||
Timestamp: 0,
|
||||
Id: 0,
|
||||
Direction: "",
|
||||
}
|
||||
}
|
||||
|
||||
func Normal(ts time.Time, id int64, dir string) CursorToken {
|
||||
return CursorToken{
|
||||
Mode: CTMNormal,
|
||||
Timestamp: ts.UnixMilli(),
|
||||
Id: id,
|
||||
Direction: dir,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *CursorToken) Token() string {
|
||||
if c.Mode == CTMStart {
|
||||
return "@start"
|
||||
}
|
||||
if c.Mode == CTMEnd {
|
||||
return "@end"
|
||||
}
|
||||
|
||||
// We kinda manually implement omitempty for the CursorToken here
|
||||
// because omitempty does not work for time.Time and otherwise we would always
|
||||
// get weird time values when decoding a token that initially didn't have an Timestamp set
|
||||
// For this usecase we treat Unix=0 as an empty timestamp
|
||||
|
||||
sertok := cursorTokenSerialize{}
|
||||
|
||||
if c.Id != 0 {
|
||||
sertok.Id = &c.Id
|
||||
}
|
||||
|
||||
if c.Timestamp != 0 {
|
||||
sertok.Timestamp = &c.Timestamp
|
||||
}
|
||||
|
||||
if c.Direction != "" {
|
||||
sertok.Direction = &c.Direction
|
||||
}
|
||||
|
||||
body, err := json.Marshal(sertok)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return "tok_" + base32.StdEncoding.EncodeToString(body)
|
||||
}
|
||||
|
||||
func Decode(tok string) (CursorToken, error) {
|
||||
if tok == "" {
|
||||
return Start(), nil
|
||||
}
|
||||
if strings.ToLower(tok) == "@start" {
|
||||
return Start(), nil
|
||||
}
|
||||
if strings.ToLower(tok) == "@end" {
|
||||
return End(), nil
|
||||
}
|
||||
|
||||
if !strings.HasPrefix(tok, "tok_") {
|
||||
return CursorToken{}, errors.New("could not decode token, missing prefix")
|
||||
}
|
||||
|
||||
body, err := base32.StdEncoding.DecodeString(tok[len("tok_"):])
|
||||
if err != nil {
|
||||
return CursorToken{}, err
|
||||
}
|
||||
|
||||
var tokenDeserialize cursorTokenSerialize
|
||||
err = json.Unmarshal(body, &tokenDeserialize)
|
||||
if err != nil {
|
||||
return CursorToken{}, err
|
||||
}
|
||||
|
||||
token := CursorToken{Mode: CTMNormal}
|
||||
|
||||
if tokenDeserialize.Timestamp != nil {
|
||||
token.Timestamp = *tokenDeserialize.Timestamp
|
||||
}
|
||||
if tokenDeserialize.Id != nil {
|
||||
token.Id = *tokenDeserialize.Id
|
||||
}
|
||||
if tokenDeserialize.Direction != nil {
|
||||
token.Direction = *tokenDeserialize.Direction
|
||||
}
|
||||
|
||||
return token, nil
|
||||
}
|
@@ -2,24 +2,15 @@ package db
|
||||
|
||||
import (
|
||||
scn "blackforestbytes.com/simplecloudnotifier"
|
||||
"blackforestbytes.com/simplecloudnotifier/db/schema"
|
||||
"context"
|
||||
"database/sql"
|
||||
_ "embed"
|
||||
"errors"
|
||||
"fmt"
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
"time"
|
||||
)
|
||||
|
||||
//go:embed schema_1.ddl
|
||||
var schema1 string
|
||||
|
||||
//go:embed schema_2.ddl
|
||||
var schema2 string
|
||||
|
||||
//go:embed schema_3.ddl
|
||||
var schema3 string
|
||||
|
||||
type Database struct {
|
||||
db *sql.DB
|
||||
}
|
||||
@@ -37,24 +28,24 @@ func (db *Database) Migrate(ctx context.Context) error {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 24*time.Second)
|
||||
defer cancel()
|
||||
|
||||
schema, err := db.ReadSchema(ctx)
|
||||
if schema == 0 {
|
||||
currschema, err := db.ReadSchema(ctx)
|
||||
if currschema == 0 {
|
||||
|
||||
_, err = db.db.ExecContext(ctx, schema3)
|
||||
_, err = db.db.ExecContext(ctx, schema.Schema3)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
} else if schema == 1 {
|
||||
} else if currschema == 1 {
|
||||
return errors.New("cannot autom. upgrade schema 1")
|
||||
} else if schema == 2 {
|
||||
} else if currschema == 2 {
|
||||
return errors.New("cannot autom. upgrade schema 2") //TODO
|
||||
} else if schema == 3 {
|
||||
} else if currschema == 3 {
|
||||
return nil // current
|
||||
} else {
|
||||
return errors.New(fmt.Sprintf("Unknown DB schema: %d", schema))
|
||||
return errors.New(fmt.Sprintf("Unknown DB schema: %d", currschema))
|
||||
}
|
||||
|
||||
}
|
||||
|
@@ -1,8 +1,10 @@
|
||||
package db
|
||||
|
||||
import (
|
||||
"blackforestbytes.com/simplecloudnotifier/db/cursortoken"
|
||||
"blackforestbytes.com/simplecloudnotifier/models"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"time"
|
||||
)
|
||||
|
||||
@@ -103,3 +105,38 @@ func (db *Database) DeleteMessage(ctx TxContext, scnMessageID int64) error {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (db *Database) ListMessages(ctx TxContext, userid int64, pageSize int, inTok cursortoken.CursorToken) ([]models.Message, cursortoken.CursorToken, error) {
|
||||
tx, err := ctx.GetOrCreateTransaction(db)
|
||||
if err != nil {
|
||||
return nil, cursortoken.CursorToken{}, err
|
||||
}
|
||||
|
||||
if inTok.Mode == cursortoken.CTMEnd {
|
||||
return make([]models.Message, 0), cursortoken.End(), nil
|
||||
}
|
||||
|
||||
pageCond := ""
|
||||
if inTok.Mode == cursortoken.CTMNormal {
|
||||
pageCond = fmt.Sprintf("AND ( timestamp_real < %d OR (timestamp_real = %d AND scn_message_id < %d ) )", inTok.Timestamp, inTok.Timestamp, inTok.Id)
|
||||
}
|
||||
|
||||
rows, err := tx.QueryContext(ctx, "SELECT messages.* FROM messages LEFT JOIN subscriptions subs on messages.channel_id = subs.channel_id WHERE subs.subscriber_user_id = ? AND subs.confirmed = 1 "+pageCond+" ORDER BY timestamp_real DESC LIMIT ?",
|
||||
userid,
|
||||
pageSize+1)
|
||||
if err != nil {
|
||||
return nil, cursortoken.CursorToken{}, err
|
||||
}
|
||||
|
||||
data, err := models.DecodeMessages(rows)
|
||||
if err != nil {
|
||||
return nil, cursortoken.CursorToken{}, err
|
||||
}
|
||||
|
||||
if len(data) <= pageSize {
|
||||
return data, cursortoken.End(), nil
|
||||
} else {
|
||||
outToken := cursortoken.Normal(data[pageSize-1].TimestampReal, data[pageSize-1].SCNMessageID, "DESC")
|
||||
return data[0:pageSize], outToken, nil
|
||||
}
|
||||
}
|
||||
|
12
server/db/schema/assets.go
Normal file
12
server/db/schema/assets.go
Normal file
@@ -0,0 +1,12 @@
|
||||
package schema
|
||||
|
||||
import _ "embed"
|
||||
|
||||
//go:embed schema_1.ddl
|
||||
var Schema1 string
|
||||
|
||||
//go:embed schema_2.ddl
|
||||
var Schema2 string
|
||||
|
||||
//go:embed schema_3.ddl
|
||||
var Schema3 string
|
Reference in New Issue
Block a user