Compare commits

...

3 Commits

Author SHA1 Message Date
bbb33e9fd6 v0.0.44 2022-12-22 15:49:10 +01:00
ac05eff1e8 v0.0.43 2022-12-22 10:23:34 +01:00
1aaad66233 v0.0.42 2022-12-22 10:06:25 +01:00
7 changed files with 343 additions and 70 deletions

View File

@@ -8,10 +8,10 @@ import (
type brcMode int type brcMode int
const ( const (
modeSourceReading = 0 modeSourceReading brcMode = 0
modeSourceFinished = 1 modeSourceFinished brcMode = 1
modeBufferReading = 2 modeBufferReading brcMode = 2
modeBufferFinished = 3 modeBufferFinished brcMode = 3
) )
type BufferedReadCloser interface { type BufferedReadCloser interface {

View File

@@ -19,40 +19,38 @@ import (
// There are also a bunch of unit tests to ensure that the cache is always in a consistent state // There are also a bunch of unit tests to ensure that the cache is always in a consistent state
// //
type LRUData interface{} type LRUMap[TData any] struct {
type LRUMap struct {
maxsize int maxsize int
lock sync.Mutex lock sync.Mutex
cache map[string]*cacheNode cache map[string]*cacheNode[TData]
lfuHead *cacheNode lfuHead *cacheNode[TData]
lfuTail *cacheNode lfuTail *cacheNode[TData]
} }
type cacheNode struct { type cacheNode[TData any] struct {
key string key string
data LRUData data TData
parent *cacheNode parent *cacheNode[TData]
child *cacheNode child *cacheNode[TData]
} }
func NewLRUMap(size int) *LRUMap { func NewLRUMap[TData any](size int) *LRUMap[TData] {
if size <= 2 && size != 0 { if size <= 2 && size != 0 {
panic("Size must be > 2 (or 0)") panic("Size must be > 2 (or 0)")
} }
return &LRUMap{ return &LRUMap[TData]{
maxsize: size, maxsize: size,
lock: sync.Mutex{}, lock: sync.Mutex{},
cache: make(map[string]*cacheNode, size+1), cache: make(map[string]*cacheNode[TData], size+1),
lfuHead: nil, lfuHead: nil,
lfuTail: nil, lfuTail: nil,
} }
} }
func (c *LRUMap) Put(key string, value LRUData) { func (c *LRUMap[TData]) Put(key string, value TData) {
if c.maxsize == 0 { if c.maxsize == 0 {
return // cache disabled return // cache disabled
} }
@@ -70,7 +68,7 @@ func (c *LRUMap) Put(key string, value LRUData) {
} }
// key does not exist: insert into map and add to top of LFU // key does not exist: insert into map and add to top of LFU
node = &cacheNode{ node = &cacheNode[TData]{
key: key, key: key,
data: value, data: value,
parent: nil, parent: nil,
@@ -95,9 +93,9 @@ func (c *LRUMap) Put(key string, value LRUData) {
} }
} }
func (c *LRUMap) TryGet(key string) (LRUData, bool) { func (c *LRUMap[TData]) TryGet(key string) (TData, bool) {
if c.maxsize == 0 { if c.maxsize == 0 {
return nil, false // cache disabled return *new(TData), false // cache disabled
} }
c.lock.Lock() c.lock.Lock()
@@ -105,13 +103,13 @@ func (c *LRUMap) TryGet(key string) (LRUData, bool) {
val, ok := c.cache[key] val, ok := c.cache[key]
if !ok { if !ok {
return nil, false return *new(TData), false
} }
c.moveNodeToTop(val) c.moveNodeToTop(val)
return val.data, ok return val.data, ok
} }
func (c *LRUMap) moveNodeToTop(node *cacheNode) { func (c *LRUMap[TData]) moveNodeToTop(node *cacheNode[TData]) {
// (only called in critical section !) // (only called in critical section !)
if c.lfuHead == node { // fast case if c.lfuHead == node { // fast case
@@ -144,7 +142,7 @@ func (c *LRUMap) moveNodeToTop(node *cacheNode) {
} }
} }
func (c *LRUMap) Size() int { func (c *LRUMap[TData]) Size() int {
c.lock.Lock() c.lock.Lock()
defer c.lock.Unlock() defer c.lock.Unlock()
return len(c.cache) return len(c.cache)

View File

@@ -12,7 +12,7 @@ func init() {
} }
func TestResultCache1(t *testing.T) { func TestResultCache1(t *testing.T) {
cache := NewLRUMap(8) cache := NewLRUMap[string](8)
verifyLRUList(cache, t) verifyLRUList(cache, t)
key := randomKey() key := randomKey()
@@ -39,7 +39,7 @@ func TestResultCache1(t *testing.T) {
if !ok { if !ok {
t.Errorf("cache TryGet returned no value") t.Errorf("cache TryGet returned no value")
} }
if !eq(cacheval, val) { if cacheval != val {
t.Errorf("cache TryGet returned different value (%+v <> %+v)", cacheval, val) t.Errorf("cache TryGet returned different value (%+v <> %+v)", cacheval, val)
} }
@@ -50,7 +50,7 @@ func TestResultCache1(t *testing.T) {
} }
func TestResultCache2(t *testing.T) { func TestResultCache2(t *testing.T) {
cache := NewLRUMap(8) cache := NewLRUMap[string](8)
verifyLRUList(cache, t) verifyLRUList(cache, t)
key1 := "key1" key1 := "key1"
@@ -150,7 +150,7 @@ func TestResultCache2(t *testing.T) {
} }
func TestResultCache3(t *testing.T) { func TestResultCache3(t *testing.T) {
cache := NewLRUMap(8) cache := NewLRUMap[string](8)
verifyLRUList(cache, t) verifyLRUList(cache, t)
key1 := "key1" key1 := "key1"
@@ -160,20 +160,20 @@ func TestResultCache3(t *testing.T) {
cache.Put(key1, val1) cache.Put(key1, val1)
verifyLRUList(cache, t) verifyLRUList(cache, t)
if val, ok := cache.TryGet(key1); !ok || !eq(val, val1) { if val, ok := cache.TryGet(key1); !ok || val != val1 {
t.Errorf("Value in cache should be [val1]") t.Errorf("Value in cache should be [val1]")
} }
cache.Put(key1, val2) cache.Put(key1, val2)
verifyLRUList(cache, t) verifyLRUList(cache, t)
if val, ok := cache.TryGet(key1); !ok || !eq(val, val2) { if val, ok := cache.TryGet(key1); !ok || val != val2 {
t.Errorf("Value in cache should be [val2]") t.Errorf("Value in cache should be [val2]")
} }
} }
// does a basic consistency check over the internal cache representation // does a basic consistency check over the internal cache representation
func verifyLRUList(cache *LRUMap, t *testing.T) { func verifyLRUList[TData any](cache *LRUMap[TData], t *testing.T) {
size := 0 size := 0
tailFound := false tailFound := false
@@ -250,23 +250,10 @@ func randomKey() string {
return strconv.FormatInt(rand.Int63(), 16) return strconv.FormatInt(rand.Int63(), 16)
} }
func randomVal() LRUData { func randomVal() string {
v, err := langext.NewHexUUID() v, err := langext.NewHexUUID()
if err != nil { if err != nil {
panic(err) panic(err)
} }
return &v return v
}
func eq(a LRUData, b LRUData) bool {
v1, ok1 := a.(*string)
v2, ok2 := b.(*string)
if ok1 && ok2 {
if v1 == nil || v2 == nil {
return false
}
return v1 == v2
}
return false
} }

View File

@@ -2,17 +2,17 @@ package dataext
import "sync" import "sync"
type SyncStringSet struct { type SyncSet[TData comparable] struct {
data map[string]bool data map[TData]bool
lock sync.Mutex lock sync.Mutex
} }
func (s *SyncStringSet) Add(value string) bool { func (s *SyncSet[TData]) Add(value TData) bool {
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock() defer s.lock.Unlock()
if s.data == nil { if s.data == nil {
s.data = make(map[string]bool) s.data = make(map[TData]bool)
} }
_, ok := s.data[value] _, ok := s.data[value]
@@ -21,12 +21,12 @@ func (s *SyncStringSet) Add(value string) bool {
return !ok return !ok
} }
func (s *SyncStringSet) AddAll(values []string) { func (s *SyncSet[TData]) AddAll(values []TData) {
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock() defer s.lock.Unlock()
if s.data == nil { if s.data == nil {
s.data = make(map[string]bool) s.data = make(map[TData]bool)
} }
for _, value := range values { for _, value := range values {
@@ -34,12 +34,12 @@ func (s *SyncStringSet) AddAll(values []string) {
} }
} }
func (s *SyncStringSet) Contains(value string) bool { func (s *SyncSet[TData]) Contains(value TData) bool {
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock() defer s.lock.Unlock()
if s.data == nil { if s.data == nil {
s.data = make(map[string]bool) s.data = make(map[TData]bool)
} }
_, ok := s.data[value] _, ok := s.data[value]
@@ -47,15 +47,15 @@ func (s *SyncStringSet) Contains(value string) bool {
return ok return ok
} }
func (s *SyncStringSet) Get() []string { func (s *SyncSet[TData]) Get() []TData {
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock() defer s.lock.Unlock()
if s.data == nil { if s.data == nil {
s.data = make(map[string]bool) s.data = make(map[TData]bool)
} }
r := make([]string, 0, len(s.data)) r := make([]TData, 0, len(s.data))
for k := range s.data { for k := range s.data {
r = append(r, k) r = append(r, k)

View File

@@ -13,6 +13,7 @@ type DB interface {
Ping(ctx context.Context) error Ping(ctx context.Context) error
BeginTransaction(ctx context.Context, iso sql.IsolationLevel) (Tx, error) BeginTransaction(ctx context.Context, iso sql.IsolationLevel) (Tx, error)
AddListener(listener Listener) AddListener(listener Listener)
Exit() error
} }
type database struct { type database struct {
@@ -121,3 +122,7 @@ func (db *database) BeginTransaction(ctx context.Context, iso sql.IsolationLevel
return NewTransaction(xtx, txid, db.lstr), nil return NewTransaction(xtx, txid, db.lstr), nil
} }
func (db *database) Exit() error {
return db.db.Close()
}

View File

@@ -6,24 +6,75 @@ import (
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
) )
func ScanSingle[TData any](rows *sqlx.Rows, close bool) (TData, error) { type StructScanMode string
const (
SModeFast StructScanMode = "FAST"
SModeExtended StructScanMode = "EXTENDED"
)
type StructScanSafety string
const (
Safe StructScanSafety = "SAFE"
Unsafe StructScanSafety = "UNSAFE"
)
func ScanSingle[TData any](rows *sqlx.Rows, mode StructScanMode, sec StructScanSafety, close bool) (TData, error) {
if rows.Next() { if rows.Next() {
var strscan *StructScanner
if sec == Safe {
strscan = NewStructScanner(rows, false)
var data TData var data TData
err := rows.StructScan(&data) err := strscan.Start(&data)
if err != nil { if err != nil {
return *new(TData), err return *new(TData), err
} }
} else if sec == Unsafe {
strscan = NewStructScanner(rows, true)
var data TData
err := strscan.Start(&data)
if err != nil {
return *new(TData), err
}
} else {
return *new(TData), errors.New("unknown value for <sec>")
}
var data TData
if mode == SModeFast {
err := strscan.StructScanBase(&data)
if err != nil {
return *new(TData), err
}
} else if mode == SModeExtended {
var data TData
err := strscan.StructScanExt(&data)
if err != nil {
return *new(TData), err
}
} else {
return *new(TData), errors.New("unknown value for <mode>")
}
if rows.Next() { if rows.Next() {
_ = rows.Close()
return *new(TData), errors.New("sql returned more than onw row")
}
if close { if close {
err = rows.Close() _ = rows.Close()
}
return *new(TData), errors.New("sql returned more than one row")
}
if close {
err := rows.Close()
if err != nil { if err != nil {
return *new(TData), err return *new(TData), err
} }
} }
return data, nil return data, nil
} else { } else {
if close { if close {
_ = rows.Close() _ = rows.Close()
@@ -32,18 +83,49 @@ func ScanSingle[TData any](rows *sqlx.Rows, close bool) (TData, error) {
} }
} }
func ScanAll[TData any](rows *sqlx.Rows, close bool) ([]TData, error) { func ScanAll[TData any](rows *sqlx.Rows, mode StructScanMode, sec StructScanSafety, close bool) ([]TData, error) {
var strscan *StructScanner
if sec == Safe {
strscan = NewStructScanner(rows, false)
var data TData
err := strscan.Start(&data)
if err != nil {
return nil, err
}
} else if sec == Unsafe {
strscan = NewStructScanner(rows, true)
var data TData
err := strscan.Start(&data)
if err != nil {
return nil, err
}
} else {
return nil, errors.New("unknown value for <sec>")
}
res := make([]TData, 0) res := make([]TData, 0)
for rows.Next() { for rows.Next() {
if mode == SModeFast {
var data TData var data TData
err := rows.StructScan(&data) err := strscan.StructScanBase(&data)
if err != nil { if err != nil {
return nil, err return nil, err
} }
res = append(res, data) res = append(res, data)
} else if mode == SModeExtended {
var data TData
err := strscan.StructScanExt(&data)
if err != nil {
return nil, err
}
res = append(res, data)
} else {
return nil, errors.New("unknown value for <mode>")
}
} }
if close { if close {
err := rows.Close() err := strscan.rows.Close()
if err != nil { if err != nil {
return nil, err return nil, err
} }

201
sq/structscanner.go Normal file
View File

@@ -0,0 +1,201 @@
package sq
import (
"errors"
"fmt"
"github.com/jmoiron/sqlx"
"github.com/jmoiron/sqlx/reflectx"
"reflect"
)
// forked from sqlx, but added ability to unmarshal optional-nested structs
type StructScanner struct {
rows *sqlx.Rows
Mapper *reflectx.Mapper
unsafe bool
fields [][]int
values []any
columns []string
}
func NewStructScanner(rows *sqlx.Rows, unsafe bool) *StructScanner {
return &StructScanner{
rows: rows,
Mapper: reflectx.NewMapper("db"),
unsafe: unsafe,
}
}
func (r *StructScanner) Start(dest any) error {
v := reflect.ValueOf(dest)
if v.Kind() != reflect.Ptr {
return errors.New("must pass a pointer, not a value, to StructScan destination")
}
columns, err := r.rows.Columns()
if err != nil {
return err
}
r.columns = columns
r.fields = r.Mapper.TraversalsByName(v.Type(), columns)
// if we are not unsafe and are missing fields, return an error
if f, err := missingFields(r.fields); err != nil && !r.unsafe {
return fmt.Errorf("missing destination name %s in %T", columns[f], dest)
}
r.values = make([]interface{}, len(columns))
return nil
}
// StructScan forked from github.com/jmoiron/sqlx@v1.3.5/sqlx.go
// does also wok with nullabel structs (from LEFT JOIN's)
func (r *StructScanner) StructScanExt(dest any) error {
v := reflect.ValueOf(dest)
if v.Kind() != reflect.Ptr {
return errors.New("must pass a pointer, not a value, to StructScan destination")
}
// ========= STEP 1 :: =========
v = v.Elem()
err := fieldsByTraversal(v, r.fields, r.values)
if err != nil {
return err
}
// scan into the struct field pointers and append to our results
err = r.rows.Scan(r.values...)
if err != nil {
return err
}
nullStructs := make(map[string]bool)
for i, traversal := range r.fields {
if len(traversal) == 0 {
continue
}
isnsil := reflect.ValueOf(r.values[i]).Elem().IsNil()
for i := 1; i < len(traversal); i++ {
canParentNil := reflectx.FieldByIndexes(v, traversal[0:i]).Kind() == reflect.Pointer
k := fmt.Sprintf("%v", traversal[0:i])
if v, ok := nullStructs[k]; ok {
nullStructs[k] = canParentNil && v && isnsil
} else {
nullStructs[k] = canParentNil && isnsil
}
}
}
forcenulled := make(map[string]bool)
for i, traversal := range r.fields {
if len(traversal) == 0 {
continue
}
anyparentnull := false
for i := 1; i < len(traversal); i++ {
k := fmt.Sprintf("%v", traversal[0:i])
if nv, ok := nullStructs[k]; ok && nv {
if _, ok := forcenulled[k]; !ok {
f := reflectx.FieldByIndexes(v, traversal[0:i])
f.Set(reflect.New(f.Type().Elem())) // set to nil
forcenulled[k] = true
}
anyparentnull = true
break
}
}
if anyparentnull {
continue
}
f := reflectx.FieldByIndexes(v, traversal)
val1 := reflect.ValueOf(r.values[i])
val2 := val1.Elem()
val3 := val2.Elem()
if val2.IsNil() {
if f.Kind() != reflect.Pointer {
return errors.New(fmt.Sprintf("Cannot set field %v to NULL value from column '%s' (type: %s)", traversal, r.columns[i], f.Type().String()))
}
f.Set(reflect.New(f.Type().Elem())) // set to nil
} else {
f.Set(val3)
}
}
return r.rows.Err()
}
// StructScan forked from github.com/jmoiron/sqlx@v1.3.5/sqlx.go
// without (relevant) changes
func (r *StructScanner) StructScanBase(dest any) error {
v := reflect.ValueOf(dest)
if v.Kind() != reflect.Ptr {
return errors.New("must pass a pointer, not a value, to StructScan destination")
}
v = v.Elem()
err := fieldsByTraversal(v, r.fields, r.values)
if err != nil {
return err
}
// scan into the struct field pointers and append to our results
err = r.rows.Scan(r.values...)
if err != nil {
return err
}
return r.rows.Err()
}
// fieldsByTraversal forked from github.com/jmoiron/sqlx@v1.3.5/sqlx.go
func fieldsByTraversal(v reflect.Value, traversals [][]int, values []interface{}) error {
v = reflect.Indirect(v)
if v.Kind() != reflect.Struct {
return errors.New("argument not a struct")
}
for i, traversal := range traversals {
if len(traversal) == 0 {
values[i] = new(interface{})
continue
}
f := reflectx.FieldByIndexes(v, traversal)
values[i] = reflect.New(reflect.PointerTo(f.Type())).Interface()
}
return nil
}
// missingFields forked from github.com/jmoiron/sqlx@v1.3.5/sqlx.go
func missingFields(transversals [][]int) (field int, err error) {
for i, t := range transversals {
if len(t) == 0 {
return i, errors.New("missing field")
}
}
return 0, nil
}