cursor / services /response_stream.go
cacode's picture
Upload 48 files
1766992 verified
package services
import (
"github.com/libaxuan/cursor2api-go/models"
"github.com/libaxuan/cursor2api-go/utils"
"encoding/json"
"strings"
"time"
"github.com/gin-gonic/gin"
"github.com/sirupsen/logrus"
)
type responseMessageState struct {
id string
outputIndex int
contentIdx int
text strings.Builder
}
// StreamResponse streams a Responses API SSE stream from the Cursor generator.
func StreamResponse(c *gin.Context, chatGenerator <-chan interface{}, req *models.ResponseRequest, adapter *ResponseToolAdapter, responseID string) {
c.Header("Content-Type", "text/event-stream")
c.Header("Cache-Control", "no-cache")
c.Header("Connection", "keep-alive")
c.Header("Access-Control-Allow-Origin", "*")
if strings.TrimSpace(responseID) == "" {
responseID = utils.GenerateResponseID()
}
createdAt := time.Now().Unix()
sequence := 0
outputItems := make([]interface{}, 0, 4)
outputText := strings.Builder{}
nextOutputIndex := 0
var usage *models.ResponseUsage
var currentMsg *responseMessageState
sendEvent := func(event string, payload map[string]interface{}) {
sequence++
payload["type"] = event
payload["sequence_number"] = sequence
if data, err := json.Marshal(payload); err == nil {
if err := utils.WriteSSEEvent(c.Writer, event, string(data)); err != nil {
logrus.WithError(err).Warn("failed to write response SSE event")
}
}
}
base := NewResponseFromRequest(req, responseID, createdAt, "in_progress", []interface{}{}, "", nil, nil)
sendEvent("response.created", map[string]interface{}{"response": base})
sendEvent("response.in_progress", map[string]interface{}{"response": base})
openMessage := func() {
if currentMsg != nil {
return
}
msgID := utils.GenerateResponseItemID("msg_")
idx := nextOutputIndex
nextOutputIndex++
currentMsg = &responseMessageState{
id: msgID,
outputIndex: idx,
contentIdx: 0,
}
item := models.ResponseOutputMessage{
ID: msgID,
Type: "message",
Status: "in_progress",
Role: "assistant",
Content: []models.ResponseOutputTextContent{},
}
sendEvent("response.output_item.added", map[string]interface{}{
"output_index": idx,
"item": item,
})
part := models.ResponseOutputTextContent{
Type: "output_text",
Text: "",
Annotations: []interface{}{},
}
sendEvent("response.content_part.added", map[string]interface{}{
"output_index": idx,
"content_index": currentMsg.contentIdx,
"part": part,
})
}
closeMessage := func() {
if currentMsg == nil {
return
}
text := currentMsg.text.String()
part := models.ResponseOutputTextContent{
Type: "output_text",
Text: text,
Annotations: []interface{}{},
}
sendEvent("response.output_text.done", map[string]interface{}{
"output_index": currentMsg.outputIndex,
"content_index": currentMsg.contentIdx,
"text": text,
})
sendEvent("response.content_part.done", map[string]interface{}{
"output_index": currentMsg.outputIndex,
"content_index": currentMsg.contentIdx,
"part": part,
})
item := models.ResponseOutputMessage{
ID: currentMsg.id,
Type: "message",
Status: "completed",
Role: "assistant",
Content: []models.ResponseOutputTextContent{part},
}
sendEvent("response.output_item.done", map[string]interface{}{
"output_index": currentMsg.outputIndex,
"item": item,
})
outputItems = append(outputItems, item)
currentMsg = nil
}
emitToolCall := func(toolCall models.ToolCall) {
closeMessage()
item := buildResponseToolCallItem(toolCall, adapter)
idx := nextOutputIndex
nextOutputIndex++
added := withOutputItemStatus(item, "in_progress")
sendEvent("response.output_item.added", map[string]interface{}{
"output_index": idx,
"item": added,
})
if callInfo, ok := getFunctionCallInfo(item); ok {
sendEvent("response.function_call_arguments.delta", map[string]interface{}{
"output_index": idx,
"item_id": callInfo.ID,
"delta": callInfo.Arguments,
})
sendEvent("response.function_call_arguments.done", map[string]interface{}{
"output_index": idx,
"item_id": callInfo.ID,
"arguments": callInfo.Arguments,
"name": callInfo.Name,
})
}
done := withOutputItemStatus(item, "completed")
sendEvent("response.output_item.done", map[string]interface{}{
"output_index": idx,
"item": done,
})
outputItems = append(outputItems, done)
}
ctx := c.Request.Context()
for {
select {
case <-ctx.Done():
logrus.Debug("client disconnected during responses streaming")
return
case data, ok := <-chatGenerator:
if !ok {
closeMessage()
completedAt := time.Now().Unix()
final := NewResponseFromRequest(req, responseID, createdAt, "completed", outputItems, outputText.String(), usage, &completedAt)
sendEvent("response.completed", map[string]interface{}{"response": final})
return
}
switch v := data.(type) {
case models.AssistantEvent:
switch v.Kind {
case models.AssistantEventText:
if v.Text != "" {
openMessage()
currentMsg.text.WriteString(v.Text)
outputText.WriteString(v.Text)
sendEvent("response.output_text.delta", map[string]interface{}{
"output_index": currentMsg.outputIndex,
"content_index": currentMsg.contentIdx,
"delta": v.Text,
})
}
case models.AssistantEventToolCall:
if v.ToolCall != nil {
emitToolCall(*v.ToolCall)
}
}
case string:
if v != "" {
openMessage()
currentMsg.text.WriteString(v)
outputText.WriteString(v)
sendEvent("response.output_text.delta", map[string]interface{}{
"output_index": currentMsg.outputIndex,
"content_index": currentMsg.contentIdx,
"delta": v,
})
}
case models.Usage:
usage = BuildResponseUsage(v)
case error:
logrus.WithError(v).Error("responses stream generator error")
return
default:
continue
}
}
}
}
type functionCallInfo struct {
ID string
Name string
Arguments string
}
func getFunctionCallInfo(item interface{}) (*functionCallInfo, bool) {
switch v := item.(type) {
case models.ResponseFunctionCall:
return &functionCallInfo{ID: v.ID, Name: v.Name, Arguments: v.Arguments}, true
default:
return nil, false
}
}
func withOutputItemStatus(item interface{}, status string) interface{} {
switch v := item.(type) {
case models.ResponseOutputMessage:
v.Status = status
return v
case models.ResponseFunctionCall:
v.Status = status
return v
case models.ResponseApplyPatchCall:
v.Status = status
return v
case models.ResponseShellCall:
v.Status = status
return v
case models.ResponseLocalShellCall:
v.Status = status
return v
default:
return item
}
}