package services import ( "github.com/libaxuan/cursor2api-go/models" "github.com/libaxuan/cursor2api-go/utils" "encoding/json" "strings" "time" "github.com/gin-gonic/gin" "github.com/sirupsen/logrus" ) type responseMessageState struct { id string outputIndex int contentIdx int text strings.Builder } // StreamResponse streams a Responses API SSE stream from the Cursor generator. func StreamResponse(c *gin.Context, chatGenerator <-chan interface{}, req *models.ResponseRequest, adapter *ResponseToolAdapter, responseID string) { c.Header("Content-Type", "text/event-stream") c.Header("Cache-Control", "no-cache") c.Header("Connection", "keep-alive") c.Header("Access-Control-Allow-Origin", "*") if strings.TrimSpace(responseID) == "" { responseID = utils.GenerateResponseID() } createdAt := time.Now().Unix() sequence := 0 outputItems := make([]interface{}, 0, 4) outputText := strings.Builder{} nextOutputIndex := 0 var usage *models.ResponseUsage var currentMsg *responseMessageState sendEvent := func(event string, payload map[string]interface{}) { sequence++ payload["type"] = event payload["sequence_number"] = sequence if data, err := json.Marshal(payload); err == nil { if err := utils.WriteSSEEvent(c.Writer, event, string(data)); err != nil { logrus.WithError(err).Warn("failed to write response SSE event") } } } base := NewResponseFromRequest(req, responseID, createdAt, "in_progress", []interface{}{}, "", nil, nil) sendEvent("response.created", map[string]interface{}{"response": base}) sendEvent("response.in_progress", map[string]interface{}{"response": base}) openMessage := func() { if currentMsg != nil { return } msgID := utils.GenerateResponseItemID("msg_") idx := nextOutputIndex nextOutputIndex++ currentMsg = &responseMessageState{ id: msgID, outputIndex: idx, contentIdx: 0, } item := models.ResponseOutputMessage{ ID: msgID, Type: "message", Status: "in_progress", Role: "assistant", Content: []models.ResponseOutputTextContent{}, } sendEvent("response.output_item.added", map[string]interface{}{ "output_index": idx, "item": item, }) part := models.ResponseOutputTextContent{ Type: "output_text", Text: "", Annotations: []interface{}{}, } sendEvent("response.content_part.added", map[string]interface{}{ "output_index": idx, "content_index": currentMsg.contentIdx, "part": part, }) } closeMessage := func() { if currentMsg == nil { return } text := currentMsg.text.String() part := models.ResponseOutputTextContent{ Type: "output_text", Text: text, Annotations: []interface{}{}, } sendEvent("response.output_text.done", map[string]interface{}{ "output_index": currentMsg.outputIndex, "content_index": currentMsg.contentIdx, "text": text, }) sendEvent("response.content_part.done", map[string]interface{}{ "output_index": currentMsg.outputIndex, "content_index": currentMsg.contentIdx, "part": part, }) item := models.ResponseOutputMessage{ ID: currentMsg.id, Type: "message", Status: "completed", Role: "assistant", Content: []models.ResponseOutputTextContent{part}, } sendEvent("response.output_item.done", map[string]interface{}{ "output_index": currentMsg.outputIndex, "item": item, }) outputItems = append(outputItems, item) currentMsg = nil } emitToolCall := func(toolCall models.ToolCall) { closeMessage() item := buildResponseToolCallItem(toolCall, adapter) idx := nextOutputIndex nextOutputIndex++ added := withOutputItemStatus(item, "in_progress") sendEvent("response.output_item.added", map[string]interface{}{ "output_index": idx, "item": added, }) if callInfo, ok := getFunctionCallInfo(item); ok { sendEvent("response.function_call_arguments.delta", map[string]interface{}{ "output_index": idx, "item_id": callInfo.ID, "delta": callInfo.Arguments, }) sendEvent("response.function_call_arguments.done", map[string]interface{}{ "output_index": idx, "item_id": callInfo.ID, "arguments": callInfo.Arguments, "name": callInfo.Name, }) } done := withOutputItemStatus(item, "completed") sendEvent("response.output_item.done", map[string]interface{}{ "output_index": idx, "item": done, }) outputItems = append(outputItems, done) } ctx := c.Request.Context() for { select { case <-ctx.Done(): logrus.Debug("client disconnected during responses streaming") return case data, ok := <-chatGenerator: if !ok { closeMessage() completedAt := time.Now().Unix() final := NewResponseFromRequest(req, responseID, createdAt, "completed", outputItems, outputText.String(), usage, &completedAt) sendEvent("response.completed", map[string]interface{}{"response": final}) return } switch v := data.(type) { case models.AssistantEvent: switch v.Kind { case models.AssistantEventText: if v.Text != "" { openMessage() currentMsg.text.WriteString(v.Text) outputText.WriteString(v.Text) sendEvent("response.output_text.delta", map[string]interface{}{ "output_index": currentMsg.outputIndex, "content_index": currentMsg.contentIdx, "delta": v.Text, }) } case models.AssistantEventToolCall: if v.ToolCall != nil { emitToolCall(*v.ToolCall) } } case string: if v != "" { openMessage() currentMsg.text.WriteString(v) outputText.WriteString(v) sendEvent("response.output_text.delta", map[string]interface{}{ "output_index": currentMsg.outputIndex, "content_index": currentMsg.contentIdx, "delta": v, }) } case models.Usage: usage = BuildResponseUsage(v) case error: logrus.WithError(v).Error("responses stream generator error") return default: continue } } } } type functionCallInfo struct { ID string Name string Arguments string } func getFunctionCallInfo(item interface{}) (*functionCallInfo, bool) { switch v := item.(type) { case models.ResponseFunctionCall: return &functionCallInfo{ID: v.ID, Name: v.Name, Arguments: v.Arguments}, true default: return nil, false } } func withOutputItemStatus(item interface{}, status string) interface{} { switch v := item.(type) { case models.ResponseOutputMessage: v.Status = status return v case models.ResponseFunctionCall: v.Status = status return v case models.ResponseApplyPatchCall: v.Status = status return v case models.ResponseShellCall: v.Status = status return v case models.ResponseLocalShellCall: v.Status = status return v default: return item } }