SendMessage()

This commit is contained in:
2022-11-19 15:13:47 +01:00
parent fb37f94c0a
commit 85bfe79115
22 changed files with 1208 additions and 59 deletions

View File

@@ -1,7 +1,7 @@
package db
import (
"blackforestbytes.com/simplecloudnotifier/api/models"
"blackforestbytes.com/simplecloudnotifier/models"
"database/sql"
"gogs.mikescher.com/BlackForestBytes/goext/langext"
"time"
@@ -42,8 +42,8 @@ func (db *Database) CreateUser(ctx TxContext, readKey string, sendKey string, ad
TimestampLastRead: nil,
TimestampLastSent: nil,
MessagesSent: 0,
QuotaToday: 0,
QuotaDay: nil,
QuotaUsed: 0,
QuotaUsedDay: nil,
IsPro: protoken != nil,
ProToken: protoken,
}, nil
@@ -254,3 +254,261 @@ func (db *Database) DeleteClient(ctx TxContext, clientid int64) error {
return nil
}
func (db *Database) GetMessageByUserMessageID(ctx TxContext, usrMsgId string) (*models.Message, error) {
tx, err := ctx.GetOrCreateTransaction(db)
if err != nil {
return nil, err
}
rows, err := tx.QueryContext(ctx, "SELECT * FROM messages WHERE usr_message_id = ? LIMIT 1", usrMsgId)
if err != nil {
return nil, err
}
msg, err := models.DecodeMessage(rows)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, err
}
return &msg, nil
}
func (db *Database) GetChannelByName(ctx TxContext, userid int64, chanName string) (*models.Channel, error) {
tx, err := ctx.GetOrCreateTransaction(db)
if err != nil {
return nil, err
}
rows, err := tx.QueryContext(ctx, "SELECT * FROM channels WHERE owner_user_id = ? OR name = ? LIMIT 1", userid, chanName)
if err != nil {
return nil, err
}
channel, err := models.DecodeChannel(rows)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, err
}
return &channel, nil
}
func (db *Database) CreateChannel(ctx TxContext, userid int64, name string, subscribeKey string, sendKey string) (models.Channel, error) {
tx, err := ctx.GetOrCreateTransaction(db)
if err != nil {
return models.Channel{}, err
}
now := time.Now().UTC()
res, err := tx.ExecContext(ctx, "INSERT INTO channels (owner_user_id, name, subscribe_key, send_key, timestamp_created) VALUES (?, ?, ?, ?, ?)",
userid,
name,
subscribeKey,
sendKey,
time2DB(now))
if err != nil {
return models.Channel{}, err
}
liid, err := res.LastInsertId()
if err != nil {
return models.Channel{}, err
}
return models.Channel{
ChannelID: liid,
OwnerUserID: userid,
Name: name,
SubscribeKey: subscribeKey,
SendKey: sendKey,
TimestampCreated: now,
TimestampLastRead: nil,
TimestampLastSent: nil,
MessagesSent: 0,
}, nil
}
func (db *Database) CreateSubscribtion(ctx TxContext, subscriberUID int64, ownerUID int64, chanName string, chanID int64, confirmed bool) (models.Subscription, error) {
tx, err := ctx.GetOrCreateTransaction(db)
if err != nil {
return models.Subscription{}, err
}
now := time.Now().UTC()
res, err := tx.ExecContext(ctx, "INSERT INTO subscriptions (subscriber_user_id, channel_owner_user_id, channel_name, channel_id, timestamp_created, confirmed) VALUES (?, ?, ?, ?, ?, ?)",
subscriberUID,
ownerUID,
chanName,
chanID,
time2DB(now),
confirmed)
if err != nil {
return models.Subscription{}, err
}
liid, err := res.LastInsertId()
if err != nil {
return models.Subscription{}, err
}
return models.Subscription{
SubscriptionID: liid,
SubscriberUserID: subscriberUID,
ChannelOwnerUserID: ownerUID,
ChannelID: chanID,
ChannelName: chanName,
TimestampCreated: now,
Confirmed: confirmed,
}, nil
}
func (db *Database) CreateMessage(ctx TxContext, senderUserID int64, channel models.Channel, timestampSend *time.Time, title string, content *string, priority int, userMsgId *string) (models.Message, error) {
tx, err := ctx.GetOrCreateTransaction(db)
if err != nil {
return models.Message{}, err
}
now := time.Now().UTC()
res, err := tx.ExecContext(ctx, "INSERT INTO messages (sender_user_id, owner_user_id, channel_name, channel_id, timestamp_real, timestamp_client, title, content, priority, usr_message_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
senderUserID,
channel.OwnerUserID,
channel.Name,
channel.ChannelID,
time2DB(now),
time2DBOpt(timestampSend),
title,
content,
priority,
userMsgId)
if err != nil {
return models.Message{}, err
}
liid, err := res.LastInsertId()
if err != nil {
return models.Message{}, err
}
return models.Message{
SCNMessageID: liid,
SenderUserID: senderUserID,
OwnerUserID: channel.OwnerUserID,
ChannelName: channel.Name,
ChannelID: channel.ChannelID,
TimestampReal: now,
TimestampClient: timestampSend,
Title: title,
Content: content,
Priority: priority,
UserMessageID: userMsgId,
}, nil
}
func (db *Database) ListChannelSubscriptions(ctx TxContext, channelID int64) ([]models.Subscription, error) {
tx, err := ctx.GetOrCreateTransaction(db)
if err != nil {
return nil, err
}
rows, err := tx.QueryContext(ctx, "SELECT * FROM subscriptions WHERE channel_id = ?", channelID)
if err != nil {
return nil, err
}
data, err := models.DecodeSubscriptions(rows)
if err != nil {
return nil, err
}
return data, nil
}
func (db *Database) CreateRetryDelivery(ctx TxContext, client models.Client, msg models.Message) (models.Delivery, error) {
tx, err := ctx.GetOrCreateTransaction(db)
if err != nil {
return models.Delivery{}, err
}
now := time.Now().UTC()
next := now.Add(5 * time.Second)
res, err := tx.ExecContext(ctx, "INSERT INTO deliveries (scn_message_id, receiver_user_id, receiver_client_id, timestamp_created, timestamp_finalized, status, fcm_message_id, next_delivery) VALUES (?, ?, ?, ?, ?, ?, ?)",
msg.SCNMessageID,
client.UserID,
client.ClientID,
time2DB(now),
nil,
models.DeliveryStatusRetry,
nil,
time2DB(next))
if err != nil {
return models.Delivery{}, err
}
liid, err := res.LastInsertId()
if err != nil {
return models.Delivery{}, err
}
return models.Delivery{
DeliveryID: liid,
SCNMessageID: msg.SCNMessageID,
ReceiverUserID: client.UserID,
ReceiverClientID: client.ClientID,
TimestampCreated: now,
TimestampFinalized: nil,
Status: models.DeliveryStatusRetry,
RetryCount: 0,
NextDelivery: langext.Ptr(next),
FCMMessageID: nil,
}, nil
}
func (db *Database) CreateSuccessDelivery(ctx TxContext, client models.Client, msg models.Message, fcmDelivID string) (models.Delivery, error) {
tx, err := ctx.GetOrCreateTransaction(db)
if err != nil {
return models.Delivery{}, err
}
now := time.Now().UTC()
res, err := tx.ExecContext(ctx, "INSERT INTO deliveries (scn_message_id, receiver_user_id, receiver_client_id, timestamp_created, timestamp_finalized, status, fcm_message_id, next_delivery) VALUES (?, ?, ?, ?, ?, ?, ?)",
msg.SCNMessageID,
client.UserID,
client.ClientID,
time2DB(now),
time2DB(now),
models.DeliveryStatusSuccess,
fcmDelivID,
nil)
if err != nil {
return models.Delivery{}, err
}
liid, err := res.LastInsertId()
if err != nil {
return models.Delivery{}, err
}
return models.Delivery{
DeliveryID: liid,
SCNMessageID: msg.SCNMessageID,
ReceiverUserID: client.UserID,
ReceiverClientID: client.ClientID,
TimestampCreated: now,
TimestampFinalized: langext.Ptr(now),
Status: models.DeliveryStatusSuccess,
RetryCount: 0,
NextDelivery: nil,
FCMMessageID: langext.Ptr(fcmDelivID),
}, nil
}

View File

@@ -14,8 +14,8 @@ CREATE TABLE users
messages_sent INTEGER NOT NULL DEFAULT '0',
quota_today INTEGER NOT NULL DEFAULT '0',
quota_day TEXT NULL DEFAULT NULL,
quota_used INTEGER NOT NULL DEFAULT '0',
quota_used_day TEXT NULL DEFAULT NULL,
is_pro INTEGER CHECK(is_pro IN (0, 1)) NOT NULL DEFAULT 0,
pro_token TEXT NULL DEFAULT NULL
@@ -66,8 +66,11 @@ CREATE TABLE subscriptions
subscriber_user_id INTEGER NOT NULL,
channel_owner_user_id INTEGER NOT NULL,
channel_name TEXT NOT NULL,
channel_id INTEGER NOT NULL,
confirmed INTEGER CHECK(confirmed IN (0, 1)) NOT NULL DEFAULT 0
timestamp_created INTEGER NOT NULL,
confirmed INTEGER CHECK(confirmed IN (0, 1)) NOT NULL
);
CREATE UNIQUE INDEX "idx_subscriptions_ref" ON subscriptions (subscriber_user_id, channel_owner_user_id, channel_name);
@@ -76,8 +79,8 @@ CREATE TABLE messages
(
scn_message_id INTEGER PRIMARY KEY AUTOINCREMENT,
sender_user_id INTEGER NOT NULL,
owner_user_id INTEGER NOT NULL,
channel_name TEXT NOT NULL,
channel_id INTEGER NOT NULL,
timestamp_real INTEGER NOT NULL,
@@ -88,8 +91,8 @@ CREATE TABLE messages
priority INTEGER CHECK(priority IN (0, 1, 2)) NOT NULL,
usr_message_id TEXT NULL
);
CREATE INDEX "idx_messages_channel" ON messages (sender_user_id, channel_name);
CREATE INDEX "idx_messages_idempotency" ON messages (sender_user_id, usr_message_id);
CREATE INDEX "idx_messages_channel" ON messages (owner_user_id, channel_name);
CREATE INDEX "idx_messages_idempotency" ON messages (owner_user_id, usr_message_id);
CREATE TABLE deliveries

View File

@@ -1,6 +1,9 @@
package db
import "time"
import (
"gogs.mikescher.com/BlackForestBytes/goext/langext"
"time"
)
func bool2DB(b bool) int {
if b {
@@ -13,3 +16,10 @@ func bool2DB(b bool) int {
func time2DB(t time.Time) int64 {
return t.UnixMilli()
}
func time2DBOpt(t *time.Time) *int64 {
if t == nil {
return nil
}
return langext.Ptr(t.UnixMilli())
}