Compare commits

...

6 Commits

Author SHA1 Message Date
72883cf6bd v0.0.73 2023-01-31 10:56:30 +01:00
ff08d5f180 v0.0.72 2023-01-30 19:55:55 +01:00
72d6b538f7 v0.0.71 2023-01-29 22:28:08 +01:00
48dd30fb94 v0.0.70 2023-01-29 22:07:28 +01:00
b7c5756f11 v0.0.69 2023-01-29 22:00:40 +01:00
2070a432a5 v0.0.68 2023-01-29 21:27:55 +01:00
8 changed files with 552 additions and 102 deletions

73
cmdext/builder.go Normal file
View File

@@ -0,0 +1,73 @@
package cmdext
import (
"fmt"
"time"
)
type CommandRunner struct {
program string
args []string
timeout *time.Duration
env []string
listener []CommandListener
}
func Runner(program string) *CommandRunner {
return &CommandRunner{
program: program,
args: make([]string, 0),
timeout: nil,
env: make([]string, 0),
listener: make([]CommandListener, 0),
}
}
func (r *CommandRunner) Arg(arg string) *CommandRunner {
r.args = append(r.args, arg)
return r
}
func (r *CommandRunner) Args(arg []string) *CommandRunner {
r.args = append(r.args, arg...)
return r
}
func (r *CommandRunner) Timeout(timeout time.Duration) *CommandRunner {
r.timeout = &timeout
return r
}
func (r *CommandRunner) Env(key, value string) *CommandRunner {
r.env = append(r.env, fmt.Sprintf("%s=%s", key, value))
return r
}
func (r *CommandRunner) RawEnv(env string) *CommandRunner {
r.env = append(r.env, env)
return r
}
func (r *CommandRunner) Envs(env []string) *CommandRunner {
r.env = append(r.env, env...)
return r
}
func (r *CommandRunner) Listen(lstr CommandListener) *CommandRunner {
r.listener = append(r.listener, lstr)
return r
}
func (r *CommandRunner) ListenStdout(lstr func(string)) *CommandRunner {
r.listener = append(r.listener, genericCommandListener{_readStdoutLine: &lstr})
return r
}
func (r *CommandRunner) ListenStderr(lstr func(string)) *CommandRunner {
r.listener = append(r.listener, genericCommandListener{_readStderrLine: &lstr})
return r
}
func (r *CommandRunner) Run() (CommandResult, error) {
return run(*r)
}

View File

@@ -2,6 +2,7 @@ package cmdext
import (
"bufio"
"io"
"os/exec"
"time"
)
@@ -14,9 +15,10 @@ type CommandResult struct {
CommandTimedOut bool
}
func RunCommand(program string, args []string, timeout *time.Duration) (CommandResult, error) {
func run(opt CommandRunner) (CommandResult, error) {
cmd := exec.Command(opt.program, opt.args...)
cmd := exec.Command(program, args...)
cmd.Env = append(cmd.Env, opt.env...)
stdoutPipe, err := cmd.StdoutPipe()
if err != nil {
@@ -33,110 +35,160 @@ func RunCommand(program string, args []string, timeout *time.Duration) (CommandR
return CommandResult{}, err
}
errch := make(chan error, 1)
errch := make(chan error, 3)
go func() { errch <- cmd.Wait() }()
// [1] read raw stdout
stdoutBufferReader, stdoutBufferWriter := io.Pipe()
stdout := ""
go func() {
buf := make([]byte, 128)
for true {
n, out := stdoutPipe.Read(buf)
if n > 0 {
txt := string(buf[:n])
stdout += txt
_, _ = stdoutBufferWriter.Write(buf[:n])
for _, lstr := range opt.listener {
lstr.ReadRawStdout(buf[:n])
}
}
if out == io.EOF {
break
}
if out != nil {
errch <- out
_ = cmd.Process.Kill()
break
}
}
_ = stdoutBufferWriter.Close()
}()
// [2] read raw stderr
stderrBufferReader, stderrBufferWriter := io.Pipe()
stderr := ""
go func() {
buf := make([]byte, 128)
for true {
n, err := stderrPipe.Read(buf)
if n > 0 {
txt := string(buf[:n])
stderr += txt
_, _ = stderrBufferWriter.Write(buf[:n])
for _, lstr := range opt.listener {
lstr.ReadRawStderr(buf[:n])
}
}
if err == io.EOF {
break
}
if err != nil {
errch <- err
_ = cmd.Process.Kill()
break
}
}
_ = stderrBufferWriter.Close()
}()
combch := make(chan string, 32)
stopCombch := make(chan bool)
stdout := ""
// [3] collect stdout line-by-line
go func() {
scanner := bufio.NewScanner(stdoutPipe)
scanner := bufio.NewScanner(stdoutBufferReader)
for scanner.Scan() {
txt := scanner.Text()
stdout += txt
for _, lstr := range opt.listener {
lstr.ReadStdoutLine(txt)
}
combch <- txt
}
}()
stderr := ""
// [4] collect stderr line-by-line
go func() {
scanner := bufio.NewScanner(stderrPipe)
scanner := bufio.NewScanner(stderrBufferReader)
for scanner.Scan() {
txt := scanner.Text()
stderr += txt
for _, lstr := range opt.listener {
lstr.ReadStderrLine(txt)
}
combch <- txt
}
}()
defer func() {
stopCombch <- true
}()
defer func() { stopCombch <- true }()
// [5] combine stdcombined
stdcombined := ""
go func() {
for {
select {
case txt := <-combch:
stdcombined += txt
stdcombined += txt + "\n" // this comes from bufio.Scanner and has no newlines...
case <-stopCombch:
return
}
}
}()
if timeout != nil {
// [6] run
select {
var timeoutChan <-chan time.Time = make(chan time.Time, 1)
if opt.timeout != nil {
timeoutChan = time.After(*opt.timeout)
}
case <-time.After(*timeout):
_ = cmd.Process.Kill()
select {
case <-timeoutChan:
_ = cmd.Process.Kill()
for _, lstr := range opt.listener {
lstr.Timeout()
}
return CommandResult{
StdOut: stdout,
StdErr: stderr,
StdCombined: stdcombined,
ExitCode: -1,
CommandTimedOut: true,
}, nil
case err := <-errch:
if exiterr, ok := err.(*exec.ExitError); ok {
excode := exiterr.ExitCode()
for _, lstr := range opt.listener {
lstr.Finished(excode)
}
return CommandResult{
StdOut: stdout,
StdErr: stderr,
StdCombined: stdcombined,
ExitCode: -1,
CommandTimedOut: true,
ExitCode: excode,
CommandTimedOut: false,
}, nil
case err := <-errch:
if exiterr, ok := err.(*exec.ExitError); ok {
return CommandResult{
StdOut: stdout,
StdErr: stderr,
StdCombined: stdcombined,
ExitCode: exiterr.ExitCode(),
CommandTimedOut: false,
}, nil
} else if err != nil {
return CommandResult{}, err
} else {
return CommandResult{
StdOut: stdout,
StdErr: stderr,
StdCombined: stdcombined,
ExitCode: 0,
CommandTimedOut: false,
}, nil
} else if err != nil {
return CommandResult{}, err
} else {
for _, lstr := range opt.listener {
lstr.Finished(0)
}
}
} else {
select {
case err := <-errch:
if exiterr, ok := err.(*exec.ExitError); ok {
return CommandResult{
StdOut: stdout,
StdErr: stderr,
StdCombined: stdcombined,
ExitCode: exiterr.ExitCode(),
CommandTimedOut: false,
}, nil
} else if err != nil {
return CommandResult{}, err
} else {
return CommandResult{
StdOut: stdout,
StdErr: stderr,
StdCombined: stdcombined,
ExitCode: 0,
CommandTimedOut: false,
}, nil
}
return CommandResult{
StdOut: stdout,
StdErr: stderr,
StdCombined: stdcombined,
ExitCode: 0,
CommandTimedOut: false,
}, nil
}
}
}

59
cmdext/cmdrunner_test.go Normal file
View File

@@ -0,0 +1,59 @@
package cmdext
import "testing"
func TestStdout(t *testing.T) {
res1, err := Runner("printf").Arg("hello").Run()
if err != nil {
t.Errorf("%v", err)
}
if res1.StdErr != "" {
t.Errorf("res1.StdErr == '%v'", res1.StdErr)
}
if res1.StdOut != "hello" {
t.Errorf("res1.StdOut == '%v'", res1.StdOut)
}
if res1.StdCombined != "hello\n" {
t.Errorf("res1.StdCombined == '%v'", res1.StdCombined)
}
}
func TestStderr(t *testing.T) {
res1, err := Runner("python").Arg("-c").Arg("import sys; print(\"error\", file=sys.stderr, end='')").Run()
if err != nil {
t.Errorf("%v", err)
}
if res1.StdErr != "error" {
t.Errorf("res1.StdErr == '%v'", res1.StdErr)
}
if res1.StdOut != "" {
t.Errorf("res1.StdOut == '%v'", res1.StdOut)
}
if res1.StdCombined != "error\n" {
t.Errorf("res1.StdCombined == '%v'", res1.StdCombined)
}
}
func TestStdcombined(t *testing.T) {
res1, err := Runner("python").
Arg("-c").
Arg("import sys; import time; print(\"1\", file=sys.stderr, flush=True); time.sleep(0.1); print(\"2\", file=sys.stdout, flush=True); time.sleep(0.1); print(\"3\", file=sys.stderr, flush=True)").
Run()
if err != nil {
t.Errorf("%v", err)
}
if res1.StdErr != "1\n3\n" {
t.Errorf("res1.StdErr == '%v'", res1.StdErr)
}
if res1.StdOut != "2\n" {
t.Errorf("res1.StdOut == '%v'", res1.StdOut)
}
if res1.StdCombined != "1\n2\n3\n" {
t.Errorf("res1.StdCombined == '%v'", res1.StdCombined)
}
}

12
cmdext/helper.go Normal file
View File

@@ -0,0 +1,12 @@
package cmdext
import "time"
func RunCommand(program string, args []string, timeout *time.Duration) (CommandResult, error) {
b := Runner(program)
b = b.Args(args)
if timeout != nil {
b = b.Timeout(*timeout)
}
return b.Run()
}

57
cmdext/listener.go Normal file
View File

@@ -0,0 +1,57 @@
package cmdext
type CommandListener interface {
ReadRawStdout([]byte)
ReadRawStderr([]byte)
ReadStdoutLine(string)
ReadStderrLine(string)
Finished(int)
Timeout()
}
type genericCommandListener struct {
_readRawStdout *func([]byte)
_readRawStderr *func([]byte)
_readStdoutLine *func(string)
_readStderrLine *func(string)
_finished *func(int)
_timeout *func()
}
func (g genericCommandListener) ReadRawStdout(v []byte) {
if g._readRawStdout != nil {
(*g._readRawStdout)(v)
}
}
func (g genericCommandListener) ReadRawStderr(v []byte) {
if g._readRawStderr != nil {
(*g._readRawStderr)(v)
}
}
func (g genericCommandListener) ReadStdoutLine(v string) {
if g._readStdoutLine != nil {
(*g._readStdoutLine)(v)
}
}
func (g genericCommandListener) ReadStderrLine(v string) {
if g._readStderrLine != nil {
(*g._readStderrLine)(v)
}
}
func (g genericCommandListener) Finished(v int) {
if g._finished != nil {
(*g._finished)(v)
}
}
func (g genericCommandListener) Timeout() {
if g._timeout != nil {
(*g._timeout)()
}
}

View File

@@ -265,6 +265,16 @@ func ArrMap[T1 any, T2 any](arr []T1, conv func(v T1) T2) []T2 {
return r
}
func ArrFilterMap[T1 any, T2 any](arr []T1, filter func(v T1) bool, conv func(v T1) T2) []T2 {
r := make([]T2, 0, len(arr))
for _, v := range arr {
if filter(v) {
r = append(r, conv(v))
}
}
return r
}
func ArrSum[T NumberConstraint](arr []T) T {
var r T = 0
for _, v := range arr {

182
rfctime/rfc3339.go Normal file
View File

@@ -0,0 +1,182 @@
package rfctime
import (
"encoding/json"
"time"
)
type RFC3339Time time.Time
func (t RFC3339Time) Time() time.Time {
return time.Time(t)
}
func (t RFC3339Time) MarshalBinary() ([]byte, error) {
return (time.Time)(t).MarshalBinary()
}
func (t *RFC3339Time) UnmarshalBinary(data []byte) error {
return (*time.Time)(t).UnmarshalBinary(data)
}
func (t RFC3339Time) GobEncode() ([]byte, error) {
return (time.Time)(t).GobEncode()
}
func (t *RFC3339Time) GobDecode(data []byte) error {
return (*time.Time)(t).GobDecode(data)
}
func (t *RFC3339Time) UnmarshalJSON(data []byte) error {
str := ""
if err := json.Unmarshal(data, &str); err != nil {
return err
}
t0, err := time.Parse(t.FormatStr(), str)
if err != nil {
return err
}
*t = RFC3339Time(t0)
return nil
}
func (t RFC3339Time) MarshalJSON() ([]byte, error) {
str := t.Time().Format(t.FormatStr())
return json.Marshal(str)
}
func (t RFC3339Time) MarshalText() ([]byte, error) {
b := make([]byte, 0, len(t.FormatStr()))
return t.Time().AppendFormat(b, t.FormatStr()), nil
}
func (t *RFC3339Time) UnmarshalText(data []byte) error {
var err error
v, err := time.Parse(t.FormatStr(), string(data))
if err != nil {
return err
}
tt := RFC3339Time(v)
*t = tt
return nil
}
func (t RFC3339Time) Serialize() string {
return t.Time().Format(t.FormatStr())
}
func (t RFC3339Time) FormatStr() string {
return time.RFC3339
}
func (t RFC3339Time) After(u RFCTime) bool {
return t.Time().After(u.Time())
}
func (t RFC3339Time) Before(u RFCTime) bool {
return t.Time().Before(u.Time())
}
func (t RFC3339Time) Equal(u RFCTime) bool {
return t.Time().Equal(u.Time())
}
func (t RFC3339Time) IsZero() bool {
return t.Time().IsZero()
}
func (t RFC3339Time) Date() (year int, month time.Month, day int) {
return t.Time().Date()
}
func (t RFC3339Time) Year() int {
return t.Time().Year()
}
func (t RFC3339Time) Month() time.Month {
return t.Time().Month()
}
func (t RFC3339Time) Day() int {
return t.Time().Day()
}
func (t RFC3339Time) Weekday() time.Weekday {
return t.Time().Weekday()
}
func (t RFC3339Time) ISOWeek() (year, week int) {
return t.Time().ISOWeek()
}
func (t RFC3339Time) Clock() (hour, min, sec int) {
return t.Time().Clock()
}
func (t RFC3339Time) Hour() int {
return t.Time().Hour()
}
func (t RFC3339Time) Minute() int {
return t.Time().Minute()
}
func (t RFC3339Time) Second() int {
return t.Time().Second()
}
func (t RFC3339Time) Nanosecond() int {
return t.Time().Nanosecond()
}
func (t RFC3339Time) YearDay() int {
return t.Time().YearDay()
}
func (t RFC3339Time) Add(d time.Duration) RFC3339Time {
return RFC3339Time(t.Time().Add(d))
}
func (t RFC3339Time) Sub(u RFCTime) time.Duration {
return t.Time().Sub(u.Time())
}
func (t RFC3339Time) AddDate(years int, months int, days int) RFC3339Time {
return RFC3339Time(t.Time().AddDate(years, months, days))
}
func (t RFC3339Time) Unix() int64 {
return t.Time().Unix()
}
func (t RFC3339Time) UnixMilli() int64 {
return t.Time().UnixMilli()
}
func (t RFC3339Time) UnixMicro() int64 {
return t.Time().UnixMicro()
}
func (t RFC3339Time) UnixNano() int64 {
return t.Time().UnixNano()
}
func (t RFC3339Time) Format(layout string) string {
return t.Time().Format(layout)
}
func (t RFC3339Time) GoString() string {
return t.Time().GoString()
}
func (t RFC3339Time) String() string {
return t.Time().String()
}
func NewRFC3339(t time.Time) RFC3339Time {
return RFC3339Time(t)
}
func NowRFC3339() RFC3339Time {
return RFC3339Time(time.Now())
}

View File

@@ -2,58 +2,50 @@ package syncext
import (
"context"
"sync/atomic"
"gogs.mikescher.com/BlackForestBytes/goext/langext"
"sync"
"time"
)
type AtomicBool struct {
v int32
waiter chan bool // unbuffered
v bool
listener map[string]chan bool
lock sync.Mutex
}
func NewAtomicBool(value bool) *AtomicBool {
if value {
return &AtomicBool{v: 1, waiter: make(chan bool)}
} else {
return &AtomicBool{v: 0, waiter: make(chan bool)}
return &AtomicBool{
v: value,
listener: make(map[string]chan bool),
lock: sync.Mutex{},
}
}
func (a *AtomicBool) Get() bool {
return atomic.LoadInt32(&a.v) == 1
a.lock.Lock()
defer a.lock.Unlock()
return a.v
}
func (a *AtomicBool) Set(value bool) {
if value {
atomic.StoreInt32(&a.v, 1)
} else {
atomic.StoreInt32(&a.v, 0)
}
a.lock.Lock()
defer a.lock.Unlock()
select {
case a.waiter <- value:
// message sent
default:
// no receiver on channel
a.v = value
for k, v := range a.listener {
select {
case v <- value:
// message sent
default:
// no receiver on channel
delete(a.listener, k)
}
}
}
func (a *AtomicBool) Wait(waitFor bool) {
if a.Get() == waitFor {
return
}
for {
if v, ok := ReadChannelWithTimeout(a.waiter, 128*time.Millisecond); ok {
if v == waitFor {
return
}
} else {
if a.Get() == waitFor {
return
}
}
}
_ = a.WaitWithContext(context.Background(), waitFor)
}
func (a *AtomicBool) WaitWithTimeout(timeout time.Duration, waitFor bool) error {
@@ -71,12 +63,25 @@ func (a *AtomicBool) WaitWithContext(ctx context.Context, waitFor bool) error {
return nil
}
uuid, _ := langext.NewHexUUID()
waitchan := make(chan bool)
a.lock.Lock()
a.listener[uuid] = waitchan
a.lock.Unlock()
defer func() {
a.lock.Lock()
delete(a.listener, uuid)
a.lock.Unlock()
}()
for {
if err := ctx.Err(); err != nil {
return err
}
timeOut := 128 * time.Millisecond
timeOut := 1024 * time.Millisecond
if dl, ok := ctx.Deadline(); ok {
timeOutMax := dl.Sub(time.Now())
@@ -87,7 +92,7 @@ func (a *AtomicBool) WaitWithContext(ctx context.Context, waitFor bool) error {
}
}
if v, ok := ReadChannelWithTimeout(a.waiter, timeOut); ok {
if v, ok := ReadChannelWithTimeout(waitchan, timeOut); ok {
if v == waitFor {
return nil
}