Spaces:
Paused
Paused
| package api | |
| import ( | |
| "bufio" | |
| "bytes" | |
| "encoding/json" | |
| "fmt" | |
| "io" | |
| "log" | |
| "plandex-cli/types" | |
| "time" | |
| shared "plandex-shared" | |
| ) | |
| // 3 heartbeat misses = timeout | |
| const HeartbeatTimeout = 16 * time.Second | |
| func connectPlanRespStream(body io.ReadCloser, onStream types.OnStreamPlan) { | |
| reader := bufio.NewReader(body) | |
| timer := time.NewTimer(HeartbeatTimeout) | |
| defer timer.Stop() | |
| go func() { | |
| for { | |
| select { | |
| case <-timer.C: | |
| log.Println("Connection to plan stream timed out due to missing heartbeats") | |
| onStream(types.OnStreamPlanParams{Msg: nil, Err: fmt.Errorf("connection to plan stream timed out due to missing heartbeats")}) | |
| body.Close() | |
| return | |
| default: | |
| } | |
| s, err := readUntilSeparator(reader, shared.STREAM_MESSAGE_SEPARATOR) | |
| if err != nil { | |
| log.Println("Error reading line:", err) | |
| onStream(types.OnStreamPlanParams{Msg: nil, Err: err}) | |
| body.Close() | |
| return | |
| } | |
| timer.Reset(HeartbeatTimeout) | |
| // ignore heartbeats | |
| if s == string(shared.StreamMessageHeartbeat) { | |
| continue | |
| } | |
| var msg shared.StreamMessage | |
| err = json.Unmarshal([]byte(s), &msg) | |
| if err != nil { | |
| log.Println("Error unmarshalling message:", err) | |
| onStream(types.OnStreamPlanParams{Msg: nil, Err: err}) | |
| body.Close() | |
| return | |
| } | |
| // log.Println("connectPlanRespStream: received message:", msg) | |
| onStream(types.OnStreamPlanParams{Msg: &msg, Err: nil}) | |
| if msg.Type == shared.StreamMessageFinished || msg.Type == shared.StreamMessageError || msg.Type == shared.StreamMessageAborted { | |
| body.Close() | |
| return | |
| } | |
| } | |
| }() | |
| } | |
| func readUntilSeparator(reader *bufio.Reader, separator string) (string, error) { | |
| var result []byte | |
| sepBytes := []byte(separator) | |
| for { | |
| b, err := reader.ReadByte() | |
| if err != nil { | |
| return string(result), err | |
| } | |
| result = append(result, b) | |
| if len(result) >= len(sepBytes) && bytes.HasSuffix(result, sepBytes) { | |
| return string(result[:len(result)-len(separator)]), nil | |
| } | |
| } | |
| } | |