Plandex_backup / app /cli /api /stream.go
google-labs-jules[bot]
Final deployment for HF with landing page
93d826e
package api
import (
"bufio"
"bytes"
"encoding/json"
"fmt"
"io"
"log"
"plandex-cli/types"
"time"
shared "plandex-shared"
)
// 3 heartbeat misses = timeout
const HeartbeatTimeout = 16 * time.Second
func connectPlanRespStream(body io.ReadCloser, onStream types.OnStreamPlan) {
reader := bufio.NewReader(body)
timer := time.NewTimer(HeartbeatTimeout)
defer timer.Stop()
go func() {
for {
select {
case <-timer.C:
log.Println("Connection to plan stream timed out due to missing heartbeats")
onStream(types.OnStreamPlanParams{Msg: nil, Err: fmt.Errorf("connection to plan stream timed out due to missing heartbeats")})
body.Close()
return
default:
}
s, err := readUntilSeparator(reader, shared.STREAM_MESSAGE_SEPARATOR)
if err != nil {
log.Println("Error reading line:", err)
onStream(types.OnStreamPlanParams{Msg: nil, Err: err})
body.Close()
return
}
timer.Reset(HeartbeatTimeout)
// ignore heartbeats
if s == string(shared.StreamMessageHeartbeat) {
continue
}
var msg shared.StreamMessage
err = json.Unmarshal([]byte(s), &msg)
if err != nil {
log.Println("Error unmarshalling message:", err)
onStream(types.OnStreamPlanParams{Msg: nil, Err: err})
body.Close()
return
}
// log.Println("connectPlanRespStream: received message:", msg)
onStream(types.OnStreamPlanParams{Msg: &msg, Err: nil})
if msg.Type == shared.StreamMessageFinished || msg.Type == shared.StreamMessageError || msg.Type == shared.StreamMessageAborted {
body.Close()
return
}
}
}()
}
func readUntilSeparator(reader *bufio.Reader, separator string) (string, error) {
var result []byte
sepBytes := []byte(separator)
for {
b, err := reader.ReadByte()
if err != nil {
return string(result), err
}
result = append(result, b)
if len(result) >= len(sepBytes) && bytes.HasSuffix(result, sepBytes) {
return string(result[:len(result)-len(separator)]), nil
}
}
}