add support for page-based cursortokens (like goext)
Some checks failed
Build Docker and Deploy / Build Docker Container (push) Successful in 52s
Build Docker and Deploy / Run Unit-Tests (push) Failing after 2m49s
Build Docker and Deploy / Deploy to Server (push) Has been skipped

This commit is contained in:
2025-12-03 19:46:09 +01:00
parent d26f18f356
commit 4b8ebf15d2
4 changed files with 339 additions and 33 deletions

View File

@@ -4,21 +4,27 @@ import (
"encoding/base32"
"encoding/json"
"errors"
"fmt"
"strconv"
"strings"
"time"
"git.blackforestbytes.com/BlackForestBytes/goext/langext"
)
type Mode string //@enum:type
const (
CTMStart = "START"
CTMNormal = "NORMAL"
CTMEnd = "END"
CTMStart = "START"
CTMNormal = "NORMAL"
CTMPaginated = "PAGINATED"
CTMEnd = "END"
)
type CursorToken struct {
Mode Mode
Timestamp int64
Timestamp *int64
Page *int
Id string
Direction string
FilterHash string
@@ -34,7 +40,8 @@ type cursorTokenSerialize struct {
func Start() CursorToken {
return CursorToken{
Mode: CTMStart,
Timestamp: 0,
Timestamp: langext.Ptr[int64](0),
Page: nil,
Id: "",
Direction: "",
FilterHash: "",
@@ -44,7 +51,8 @@ func Start() CursorToken {
func End() CursorToken {
return CursorToken{
Mode: CTMEnd,
Timestamp: 0,
Timestamp: nil,
Page: nil,
Id: "",
Direction: "",
FilterHash: "",
@@ -54,13 +62,22 @@ func End() CursorToken {
func Normal(ts time.Time, id string, dir string, filter string) CursorToken {
return CursorToken{
Mode: CTMNormal,
Timestamp: ts.UnixMilli(),
Timestamp: langext.Ptr(ts.UnixMilli()),
Page: nil,
Id: id,
Direction: dir,
FilterHash: filter,
}
}
func Paginated(p int) CursorToken {
return CursorToken{
Mode: CTMPaginated,
Timestamp: nil,
Page: langext.Ptr(p),
}
}
func (c *CursorToken) Token() string {
if c.Mode == CTMStart {
return "@start"
@@ -69,6 +86,10 @@ func (c *CursorToken) Token() string {
return "@end"
}
if c.Page != nil {
return fmt.Sprintf("$%d", *c.Page)
}
// We kinda manually implement omitempty for the CursorToken here
// because omitempty does not work for time.Time and otherwise we would always
// get weird time values when decoding a token that initially didn't have an Timestamp set
@@ -80,8 +101,8 @@ func (c *CursorToken) Token() string {
sertok.Id = &c.Id
}
if c.Timestamp != 0 {
sertok.Timestamp = &c.Timestamp
if c.Timestamp != nil && *c.Timestamp != 0 {
sertok.Timestamp = langext.Ptr(*c.Timestamp)
}
if c.Direction != "" {
@@ -111,6 +132,14 @@ func Decode(tok string) (CursorToken, error) {
return End(), nil
}
if strings.HasPrefix(tok, "$") {
p, err := strconv.ParseInt(tok[1:], 10, 0)
if err != nil {
return CursorToken{}, errors.New("could not decode paginated token")
}
return Paginated(int(p)), nil
}
if !strings.HasPrefix(tok, "tok_") {
return CursorToken{}, errors.New("could not decode token, missing prefix")
}
@@ -129,7 +158,7 @@ func Decode(tok string) (CursorToken, error) {
token := CursorToken{Mode: CTMNormal}
if tokenDeserialize.Timestamp != nil {
token.Timestamp = *tokenDeserialize.Timestamp
token.Timestamp = langext.Ptr(*tokenDeserialize.Timestamp)
}
if tokenDeserialize.Id != nil {
token.Id = *tokenDeserialize.Id

View File

@@ -1,12 +1,13 @@
package primary
import (
"errors"
"time"
"blackforestbytes.com/simplecloudnotifier/db"
ct "blackforestbytes.com/simplecloudnotifier/db/cursortoken"
"blackforestbytes.com/simplecloudnotifier/models"
"errors"
"git.blackforestbytes.com/BlackForestBytes/goext/sq"
"time"
)
func (db *Database) GetMessageByUserMessageID(ctx db.TxContext, usrMsgId string) (*models.Message, error) {
@@ -87,26 +88,32 @@ func (db *Database) ListMessages(ctx db.TxContext, filter models.MessageFilter,
return nil, ct.CursorToken{}, 0, err
}
pageCond := "1=1"
if inTok.Mode == ct.CTMNormal {
pageCond = "timestamp_real < :tokts OR (timestamp_real = :tokts AND message_id < :tokid )"
}
filterCond, filterJoin, prepParams, err := filter.SQL()
orderClause := ""
pageCond := "1=1"
limitCond := ""
if pageSize != nil {
orderClause = "ORDER BY COALESCE(timestamp_client, timestamp_real) DESC, message_id DESC LIMIT :lim"
limitCond = "LIMIT :lim"
prepParams["lim"] = *pageSize + 1
} else {
orderClause = "ORDER BY COALESCE(timestamp_client, timestamp_real) DESC, message_id DESC"
}
sqlQueryList := "SELECT " + "messages.*" + " FROM messages " + filterJoin + " WHERE ( " + pageCond + " ) AND ( " + filterCond + " ) " + orderClause
sqlQueryCount := "SELECT " + " COUNT(*) AS count FROM messages " + filterJoin + " WHERE ( " + filterCond + " ) "
if inTok.Mode == ct.CTMNormal {
pageCond = "timestamp_real < :tokts OR (timestamp_real = :tokts AND message_id < :tokid )"
prepParams["tokts"] = inTok.Timestamp
prepParams["tokid"] = inTok.Id
} else if inTok.Mode == ct.CTMPaginated {
if pageSize != nil {
limitCond = "LIMIT :lim OFFSET :off"
prepParams["lim"] = *pageSize + 1
prepParams["off"] = (*pageSize) * (*inTok.Page)
}
}
prepParams["tokts"] = inTok.Timestamp
prepParams["tokid"] = inTok.Id
orderClause := "ORDER BY COALESCE(timestamp_client, timestamp_real) DESC, message_id DESC"
sqlQueryList := "SELECT " + "messages.*" + " FROM messages " + filterJoin + " WHERE ( " + pageCond + " ) AND ( " + filterCond + " ) " + orderClause + " " + limitCond
sqlQueryCount := "SELECT " + " COUNT(*) AS count FROM messages " + filterJoin + " WHERE ( " + filterCond + " ) "
if inTok.Mode == ct.CTMEnd {
@@ -132,7 +139,12 @@ func (db *Database) ListMessages(ctx db.TxContext, filter models.MessageFilter,
return nil, ct.CursorToken{}, 0, err
}
outToken := ct.Normal(dataList[*pageSize-1].Timestamp(), dataList[*pageSize-1].MessageID.String(), "DESC", filter.Hash())
var outToken ct.CursorToken
if inTok.Mode == ct.CTMPaginated {
outToken = ct.Paginated(*inTok.Page + 1)
} else {
outToken = ct.Normal(dataList[*pageSize-1].Timestamp(), dataList[*pageSize-1].MessageID.String(), "DESC", filter.Hash())
}
return dataList[0:*pageSize], outToken, dataCount.Count, nil
}

View File

@@ -52,14 +52,29 @@ func (db *Database) ListRequestLogs(ctx context.Context, filter models.RequestLo
return make([]models.RequestLog, 0), ct.End(), nil
}
pageCond := "1=1"
if inTok.Mode == ct.CTMNormal {
pageCond = "timestamp_created < :tokts OR (timestamp_created = :tokts AND request_id < :tokid )"
}
filterCond, filterJoin, prepParams, err := filter.SQL()
orderClause := ""
pageCond := "1=1"
limitCond := ""
if pageSize != nil {
limitCond = "LIMIT :lim"
prepParams["lim"] = *pageSize + 1
}
if inTok.Mode == ct.CTMNormal {
pageCond = "timestamp_created < :tokts OR (timestamp_created = :tokts AND request_id < :tokid )"
prepParams["tokts"] = inTok.Timestamp
prepParams["tokid"] = inTok.Id
} else if inTok.Mode == ct.CTMPaginated {
if pageSize != nil {
limitCond = "LIMIT :lim OFFSET :off"
prepParams["lim"] = *pageSize + 1
prepParams["off"] = (*pageSize) * (*inTok.Page)
}
}
orderClause := "ORDER BY timestamp_created DESC, request_id DESC"
if pageSize != nil {
orderClause = "ORDER BY timestamp_created DESC, request_id DESC LIMIT :lim"
prepParams["lim"] = *pageSize + 1
@@ -67,7 +82,7 @@ func (db *Database) ListRequestLogs(ctx context.Context, filter models.RequestLo
orderClause = "ORDER BY timestamp_created DESC, request_id DESC"
}
sqlQuery := "SELECT " + "requests.*" + " FROM requests " + filterJoin + " WHERE ( " + pageCond + " ) AND ( " + filterCond + " ) " + orderClause
sqlQuery := "SELECT " + "requests.*" + " FROM requests " + filterJoin + " WHERE ( " + pageCond + " ) AND ( " + filterCond + " ) " + orderClause + " " + limitCond
prepParams["tokts"] = inTok.Timestamp
prepParams["tokid"] = inTok.Id

View File

@@ -1313,3 +1313,253 @@ func TestListMessagesSubscriptionStatusAllNoSubscription(t *testing.T) {
}
}
func TestListMessagesPaginatedDirect(t *testing.T) {
ws, baseUrl, stop := tt.StartSimpleWebserver(t)
defer stop()
data := tt.InitDefaultData(t, ws)
type msg struct {
ChannelId string `json:"channel_id"`
ChannelInternalName string `json:"channel_internal_name"`
Content string `json:"content"`
MessageId string `json:"message_id"`
OwnerUserId string `json:"owner_user_id"`
Priority int `json:"priority"`
SenderIp string `json:"sender_ip"`
SenderName string `json:"sender_name"`
SenderUserId string `json:"sender_user_id"`
Timestamp string `json:"timestamp"`
Title string `json:"title"`
Trimmed bool `json:"trimmed"`
UsrMessageId string `json:"usr_message_id"`
}
type mglist struct {
Messages []msg `json:"messages"`
NPT string `json:"next_page_token"`
PageSize int `json:"page_size"`
}
// User 16 has 23 messages: "Lorem Ipsum 01" through "Lorem Ipsum 23"
// With page_size=10:
// Page 0 ($0): messages 23-14 (10 items)
// Page 1 ($1): messages 13-04 (10 items)
// Page 2 ($2): messages 03-01 (3 items)
// Test $0 - first page (same as @start)
{
msgList := tt.RequestAuthGet[mglist](t, data.User[16].AdminKey, baseUrl, fmt.Sprintf("/api/v2/messages?page_size=%d&next_page_token=%s", 10, "$0"))
tt.AssertEqual(t, "msgList.len", 10, len(msgList.Messages))
tt.AssertEqual(t, "msgList.PageSize", 10, msgList.PageSize)
tt.AssertEqual(t, "msgList[0]", "Lorem Ipsum 23", msgList.Messages[0].Title)
tt.AssertEqual(t, "msgList[9]", "Lorem Ipsum 14", msgList.Messages[9].Title)
tt.AssertEqual(t, "msgList.NPT", "$1", msgList.NPT)
}
// Test $1 - second page
{
msgList := tt.RequestAuthGet[mglist](t, data.User[16].AdminKey, baseUrl, fmt.Sprintf("/api/v2/messages?page_size=%d&next_page_token=%s", 10, "$1"))
tt.AssertEqual(t, "msgList.len", 10, len(msgList.Messages))
tt.AssertEqual(t, "msgList.PageSize", 10, msgList.PageSize)
tt.AssertEqual(t, "msgList[0]", "Lorem Ipsum 13", msgList.Messages[0].Title)
tt.AssertEqual(t, "msgList[9]", "Lorem Ipsum 04", msgList.Messages[9].Title)
tt.AssertEqual(t, "msgList.NPT", "$2", msgList.NPT)
}
// Test $2 - third/last page
{
msgList := tt.RequestAuthGet[mglist](t, data.User[16].AdminKey, baseUrl, fmt.Sprintf("/api/v2/messages?page_size=%d&next_page_token=%s", 10, "$2"))
tt.AssertEqual(t, "msgList.len", 3, len(msgList.Messages))
tt.AssertEqual(t, "msgList.PageSize", 10, msgList.PageSize)
tt.AssertEqual(t, "msgList[0]", "Lorem Ipsum 03", msgList.Messages[0].Title)
tt.AssertEqual(t, "msgList[2]", "Lorem Ipsum 01", msgList.Messages[2].Title)
tt.AssertEqual(t, "msgList.NPT", "@end", msgList.NPT)
}
}
func TestListMessagesPaginatedDirectJumpToMiddle(t *testing.T) {
ws, baseUrl, stop := tt.StartSimpleWebserver(t)
defer stop()
data := tt.InitDefaultData(t, ws)
type msg struct {
ChannelId string `json:"channel_id"`
ChannelInternalName string `json:"channel_internal_name"`
Content string `json:"content"`
MessageId string `json:"message_id"`
OwnerUserId string `json:"owner_user_id"`
Priority int `json:"priority"`
SenderIp string `json:"sender_ip"`
SenderName string `json:"sender_name"`
SenderUserId string `json:"sender_user_id"`
Timestamp string `json:"timestamp"`
Title string `json:"title"`
Trimmed bool `json:"trimmed"`
UsrMessageId string `json:"usr_message_id"`
}
type mglist struct {
Messages []msg `json:"messages"`
NPT string `json:"next_page_token"`
PageSize int `json:"page_size"`
}
// Jump directly to page 1 (second page) without going through page 0
{
msgList := tt.RequestAuthGet[mglist](t, data.User[16].AdminKey, baseUrl, fmt.Sprintf("/api/v2/messages?page_size=%d&next_page_token=%s", 10, "$1"))
tt.AssertEqual(t, "msgList.len", 10, len(msgList.Messages))
tt.AssertEqual(t, "msgList[0]", "Lorem Ipsum 13", msgList.Messages[0].Title)
tt.AssertEqual(t, "msgList.NPT", "$2", msgList.NPT)
}
// Jump directly to page 2 (third page) without going through pages 0 and 1
{
msgList := tt.RequestAuthGet[mglist](t, data.User[16].AdminKey, baseUrl, fmt.Sprintf("/api/v2/messages?page_size=%d&next_page_token=%s", 10, "$2"))
tt.AssertEqual(t, "msgList.len", 3, len(msgList.Messages))
tt.AssertEqual(t, "msgList[0]", "Lorem Ipsum 03", msgList.Messages[0].Title)
tt.AssertEqual(t, "msgList.NPT", "@end", msgList.NPT)
}
}
func TestListMessagesPaginatedDirectBeyondEnd(t *testing.T) {
ws, baseUrl, stop := tt.StartSimpleWebserver(t)
defer stop()
data := tt.InitDefaultData(t, ws)
type msg struct {
ChannelId string `json:"channel_id"`
ChannelInternalName string `json:"channel_internal_name"`
Content string `json:"content"`
MessageId string `json:"message_id"`
OwnerUserId string `json:"owner_user_id"`
Priority int `json:"priority"`
SenderIp string `json:"sender_ip"`
SenderName string `json:"sender_name"`
SenderUserId string `json:"sender_user_id"`
Timestamp string `json:"timestamp"`
Title string `json:"title"`
Trimmed bool `json:"trimmed"`
UsrMessageId string `json:"usr_message_id"`
}
type mglist struct {
Messages []msg `json:"messages"`
NPT string `json:"next_page_token"`
PageSize int `json:"page_size"`
}
// Request a page beyond the last available data (page 10 when only 3 pages exist)
{
msgList := tt.RequestAuthGet[mglist](t, data.User[16].AdminKey, baseUrl, fmt.Sprintf("/api/v2/messages?page_size=%d&next_page_token=%s", 10, "$10"))
tt.AssertEqual(t, "msgList.len", 0, len(msgList.Messages))
tt.AssertEqual(t, "msgList.NPT", "@end", msgList.NPT)
}
}
func TestListMessagesPaginatedDirectWithFilters(t *testing.T) {
ws, baseUrl, stop := tt.StartSimpleWebserver(t)
defer stop()
data := tt.InitDefaultData(t, ws)
type msg struct {
ChannelId string `json:"channel_id"`
ChannelInternalName string `json:"channel_internal_name"`
Content string `json:"content"`
MessageId string `json:"message_id"`
OwnerUserId string `json:"owner_user_id"`
Priority int `json:"priority"`
SenderIp string `json:"sender_ip"`
SenderName string `json:"sender_name"`
SenderUserId string `json:"sender_user_id"`
Timestamp string `json:"timestamp"`
Title string `json:"title"`
Trimmed bool `json:"trimmed"`
UsrMessageId string `json:"usr_message_id"`
}
type mglist struct {
Messages []msg `json:"messages"`
NPT string `json:"next_page_token"`
PageSize int `json:"page_size"`
TotalCount int `json:"total_count"`
}
// Test pagination with a filter applied
// User 0 has 22 messages, filter by priority=1 should give 11 messages
{
// First page
msgList0 := tt.RequestAuthGet[mglist](t, data.User[0].AdminKey, baseUrl, fmt.Sprintf("/api/v2/messages?page_size=%d&next_page_token=%s&priority=1", 5, "$0"))
tt.AssertEqual(t, "msgList0.len", 5, len(msgList0.Messages))
tt.AssertEqual(t, "msgList0.NPT", "$1", msgList0.NPT)
// Second page
msgList1 := tt.RequestAuthGet[mglist](t, data.User[0].AdminKey, baseUrl, fmt.Sprintf("/api/v2/messages?page_size=%d&next_page_token=%s&priority=1", 5, "$1"))
tt.AssertEqual(t, "msgList1.len", 5, len(msgList1.Messages))
tt.AssertEqual(t, "msgList1.NPT", "$2", msgList1.NPT)
// Third page (last)
msgList2 := tt.RequestAuthGet[mglist](t, data.User[0].AdminKey, baseUrl, fmt.Sprintf("/api/v2/messages?page_size=%d&next_page_token=%s&priority=1", 5, "$2"))
tt.AssertEqual(t, "msgList2.len", 1, len(msgList2.Messages))
tt.AssertEqual(t, "msgList2.NPT", "@end", msgList2.NPT)
}
}
func TestListMessagesPaginatedDirectChainedNavigation(t *testing.T) {
ws, baseUrl, stop := tt.StartSimpleWebserver(t)
defer stop()
data := tt.InitDefaultData(t, ws)
type msg struct {
ChannelId string `json:"channel_id"`
ChannelInternalName string `json:"channel_internal_name"`
Content string `json:"content"`
MessageId string `json:"message_id"`
OwnerUserId string `json:"owner_user_id"`
Priority int `json:"priority"`
SenderIp string `json:"sender_ip"`
SenderName string `json:"sender_name"`
SenderUserId string `json:"sender_user_id"`
Timestamp string `json:"timestamp"`
Title string `json:"title"`
Trimmed bool `json:"trimmed"`
UsrMessageId string `json:"usr_message_id"`
}
type mglist struct {
Messages []msg `json:"messages"`
NPT string `json:"next_page_token"`
PageSize int `json:"page_size"`
}
// Test that following the returned NPT from $0 correctly navigates through all pages
var allMessages []msg
npt := "$0"
for npt != "@end" {
msgList := tt.RequestAuthGet[mglist](t, data.User[16].AdminKey, baseUrl, fmt.Sprintf("/api/v2/messages?page_size=%d&next_page_token=%s", 10, npt))
allMessages = append(allMessages, msgList.Messages...)
npt = msgList.NPT
}
// User 16 has 23 messages total
tt.AssertEqual(t, "total messages", 23, len(allMessages))
tt.AssertEqual(t, "first message", "Lorem Ipsum 23", allMessages[0].Title)
tt.AssertEqual(t, "last message", "Lorem Ipsum 01", allMessages[22].Title)
}
func TestListMessagesPaginatedDirectInvalidToken(t *testing.T) {
ws, baseUrl, stop := tt.StartSimpleWebserver(t)
defer stop()
data := tt.InitDefaultData(t, ws)
// Test invalid paginated token (non-numeric after $)
tt.RequestAuthGetShouldFail(t, data.User[16].AdminKey, baseUrl, fmt.Sprintf("/api/v2/messages?page_size=%d&next_page_token=%s", 10, "$abc"), 400, apierr.PAGETOKEN_ERROR)
// Test invalid paginated token (empty after $)
tt.RequestAuthGetShouldFail(t, data.User[16].AdminKey, baseUrl, fmt.Sprintf("/api/v2/messages?page_size=%d&next_page_token=%s", 10, "$"), 400, apierr.PAGETOKEN_ERROR)
// Test invalid paginated token (float)
tt.RequestAuthGetShouldFail(t, data.User[16].AdminKey, baseUrl, fmt.Sprintf("/api/v2/messages?page_size=%d&next_page_token=%s", 10, "$1.5"), 400, apierr.PAGETOKEN_ERROR)
}