Files
goext/cmdext/pipereader.go
T
2026-04-21 11:06:01 +02:00

156 lines
3.0 KiB
Go

package cmdext
import (
"bufio"
"git.blackforestbytes.com/BlackForestBytes/goext/syncext"
"io"
"strings"
"sync"
)
type pipeReader struct {
lineBufferSize *int
stdout io.ReadCloser
stderr io.ReadCloser
}
// Read ready stdout and stdin until finished
// also splits both pipes into lines and calld the listener
func (pr *pipeReader) Read(listener []CommandListener) (string, string, string, error) {
type combevt struct {
line string
stop bool
}
errch := make(chan error, 8)
wg := sync.WaitGroup{}
// [1] read raw stdout
wg.Add(1)
stdoutBufferReader, stdoutBufferWriter := io.Pipe()
var stdout strings.Builder
go func() {
buf := make([]byte, 128)
for {
n, err := pr.stdout.Read(buf)
if n > 0 {
txt := string(buf[:n])
stdout.WriteString(txt)
_, _ = stdoutBufferWriter.Write(buf[:n])
for _, lstr := range listener {
lstr.ReadRawStdout(buf[:n])
}
}
if err == io.EOF {
break
}
if err != nil {
errch <- err
break
}
}
_ = stdoutBufferWriter.Close()
wg.Done()
}()
// [2] read raw stderr
wg.Add(1)
stderrBufferReader, stderrBufferWriter := io.Pipe()
var stderr strings.Builder
go func() {
buf := make([]byte, 128)
for {
n, err := pr.stderr.Read(buf)
if n > 0 {
txt := string(buf[:n])
stderr.WriteString(txt)
_, _ = stderrBufferWriter.Write(buf[:n])
for _, lstr := range listener {
lstr.ReadRawStderr(buf[:n])
}
}
if err == io.EOF {
break
}
if err != nil {
errch <- err
break
}
}
_ = stderrBufferWriter.Close()
wg.Done()
}()
combch := make(chan combevt, 32)
// [3] collect stdout line-by-line
wg.Go(func() {
scanner := bufio.NewScanner(stdoutBufferReader)
if pr.lineBufferSize != nil {
scanner.Buffer([]byte{}, *pr.lineBufferSize)
}
for scanner.Scan() {
txt := scanner.Text()
for _, lstr := range listener {
lstr.ReadStdoutLine(txt)
}
combch <- combevt{txt, false}
}
if err := scanner.Err(); err != nil {
errch <- err
}
combch <- combevt{"", true}
})
// [4] collect stderr line-by-line
wg.Go(func() {
scanner := bufio.NewScanner(stderrBufferReader)
if pr.lineBufferSize != nil {
scanner.Buffer([]byte{}, *pr.lineBufferSize)
}
for scanner.Scan() {
txt := scanner.Text()
for _, lstr := range listener {
lstr.ReadStderrLine(txt)
}
combch <- combevt{txt, false}
}
if err := scanner.Err(); err != nil {
errch <- err
}
combch <- combevt{"", true}
})
// [5] combine stdcombined
wg.Add(1)
var stdcombined strings.Builder
go func() {
stopctr := 0
for stopctr < 2 {
vvv := <-combch
if vvv.stop {
stopctr++
} else {
stdcombined.WriteString(vvv.line + "\n") // this comes from bufio.Scanner and has no newlines...
}
}
wg.Done()
}()
// wait for all (5) goroutines to finish
wg.Wait()
if err, ok := syncext.ReadNonBlocking(errch); ok {
return "", "", "", err
}
return stdout.String(), stderr.String(), stdcombined.String(), nil
}