| package services |
|
|
| import ( |
| "github.com/libaxuan/cursor2api-go/models" |
| "github.com/libaxuan/cursor2api-go/utils" |
| "encoding/json" |
| "strings" |
| "time" |
|
|
| "github.com/gin-gonic/gin" |
| "github.com/sirupsen/logrus" |
| ) |
|
|
| type responseMessageState struct { |
| id string |
| outputIndex int |
| contentIdx int |
| text strings.Builder |
| } |
|
|
| |
| func StreamResponse(c *gin.Context, chatGenerator <-chan interface{}, req *models.ResponseRequest, adapter *ResponseToolAdapter, responseID string) { |
| c.Header("Content-Type", "text/event-stream") |
| c.Header("Cache-Control", "no-cache") |
| c.Header("Connection", "keep-alive") |
| c.Header("Access-Control-Allow-Origin", "*") |
|
|
| if strings.TrimSpace(responseID) == "" { |
| responseID = utils.GenerateResponseID() |
| } |
| createdAt := time.Now().Unix() |
| sequence := 0 |
|
|
| outputItems := make([]interface{}, 0, 4) |
| outputText := strings.Builder{} |
| nextOutputIndex := 0 |
| var usage *models.ResponseUsage |
| var currentMsg *responseMessageState |
|
|
| sendEvent := func(event string, payload map[string]interface{}) { |
| sequence++ |
| payload["type"] = event |
| payload["sequence_number"] = sequence |
| if data, err := json.Marshal(payload); err == nil { |
| if err := utils.WriteSSEEvent(c.Writer, event, string(data)); err != nil { |
| logrus.WithError(err).Warn("failed to write response SSE event") |
| } |
| } |
| } |
|
|
| base := NewResponseFromRequest(req, responseID, createdAt, "in_progress", []interface{}{}, "", nil, nil) |
| sendEvent("response.created", map[string]interface{}{"response": base}) |
| sendEvent("response.in_progress", map[string]interface{}{"response": base}) |
|
|
| openMessage := func() { |
| if currentMsg != nil { |
| return |
| } |
| msgID := utils.GenerateResponseItemID("msg_") |
| idx := nextOutputIndex |
| nextOutputIndex++ |
| currentMsg = &responseMessageState{ |
| id: msgID, |
| outputIndex: idx, |
| contentIdx: 0, |
| } |
|
|
| item := models.ResponseOutputMessage{ |
| ID: msgID, |
| Type: "message", |
| Status: "in_progress", |
| Role: "assistant", |
| Content: []models.ResponseOutputTextContent{}, |
| } |
| sendEvent("response.output_item.added", map[string]interface{}{ |
| "output_index": idx, |
| "item": item, |
| }) |
|
|
| part := models.ResponseOutputTextContent{ |
| Type: "output_text", |
| Text: "", |
| Annotations: []interface{}{}, |
| } |
| sendEvent("response.content_part.added", map[string]interface{}{ |
| "output_index": idx, |
| "content_index": currentMsg.contentIdx, |
| "part": part, |
| }) |
| } |
|
|
| closeMessage := func() { |
| if currentMsg == nil { |
| return |
| } |
| text := currentMsg.text.String() |
| part := models.ResponseOutputTextContent{ |
| Type: "output_text", |
| Text: text, |
| Annotations: []interface{}{}, |
| } |
|
|
| sendEvent("response.output_text.done", map[string]interface{}{ |
| "output_index": currentMsg.outputIndex, |
| "content_index": currentMsg.contentIdx, |
| "text": text, |
| }) |
| sendEvent("response.content_part.done", map[string]interface{}{ |
| "output_index": currentMsg.outputIndex, |
| "content_index": currentMsg.contentIdx, |
| "part": part, |
| }) |
|
|
| item := models.ResponseOutputMessage{ |
| ID: currentMsg.id, |
| Type: "message", |
| Status: "completed", |
| Role: "assistant", |
| Content: []models.ResponseOutputTextContent{part}, |
| } |
|
|
| sendEvent("response.output_item.done", map[string]interface{}{ |
| "output_index": currentMsg.outputIndex, |
| "item": item, |
| }) |
|
|
| outputItems = append(outputItems, item) |
| currentMsg = nil |
| } |
|
|
| emitToolCall := func(toolCall models.ToolCall) { |
| closeMessage() |
| item := buildResponseToolCallItem(toolCall, adapter) |
| idx := nextOutputIndex |
| nextOutputIndex++ |
|
|
| added := withOutputItemStatus(item, "in_progress") |
| sendEvent("response.output_item.added", map[string]interface{}{ |
| "output_index": idx, |
| "item": added, |
| }) |
|
|
| if callInfo, ok := getFunctionCallInfo(item); ok { |
| sendEvent("response.function_call_arguments.delta", map[string]interface{}{ |
| "output_index": idx, |
| "item_id": callInfo.ID, |
| "delta": callInfo.Arguments, |
| }) |
| sendEvent("response.function_call_arguments.done", map[string]interface{}{ |
| "output_index": idx, |
| "item_id": callInfo.ID, |
| "arguments": callInfo.Arguments, |
| "name": callInfo.Name, |
| }) |
| } |
|
|
| done := withOutputItemStatus(item, "completed") |
| sendEvent("response.output_item.done", map[string]interface{}{ |
| "output_index": idx, |
| "item": done, |
| }) |
|
|
| outputItems = append(outputItems, done) |
| } |
|
|
| ctx := c.Request.Context() |
| for { |
| select { |
| case <-ctx.Done(): |
| logrus.Debug("client disconnected during responses streaming") |
| return |
| case data, ok := <-chatGenerator: |
| if !ok { |
| closeMessage() |
| completedAt := time.Now().Unix() |
| final := NewResponseFromRequest(req, responseID, createdAt, "completed", outputItems, outputText.String(), usage, &completedAt) |
| sendEvent("response.completed", map[string]interface{}{"response": final}) |
| return |
| } |
|
|
| switch v := data.(type) { |
| case models.AssistantEvent: |
| switch v.Kind { |
| case models.AssistantEventText: |
| if v.Text != "" { |
| openMessage() |
| currentMsg.text.WriteString(v.Text) |
| outputText.WriteString(v.Text) |
| sendEvent("response.output_text.delta", map[string]interface{}{ |
| "output_index": currentMsg.outputIndex, |
| "content_index": currentMsg.contentIdx, |
| "delta": v.Text, |
| }) |
| } |
| case models.AssistantEventToolCall: |
| if v.ToolCall != nil { |
| emitToolCall(*v.ToolCall) |
| } |
| } |
| case string: |
| if v != "" { |
| openMessage() |
| currentMsg.text.WriteString(v) |
| outputText.WriteString(v) |
| sendEvent("response.output_text.delta", map[string]interface{}{ |
| "output_index": currentMsg.outputIndex, |
| "content_index": currentMsg.contentIdx, |
| "delta": v, |
| }) |
| } |
| case models.Usage: |
| usage = BuildResponseUsage(v) |
| case error: |
| logrus.WithError(v).Error("responses stream generator error") |
| return |
| default: |
| continue |
| } |
| } |
| } |
| } |
|
|
| type functionCallInfo struct { |
| ID string |
| Name string |
| Arguments string |
| } |
|
|
| func getFunctionCallInfo(item interface{}) (*functionCallInfo, bool) { |
| switch v := item.(type) { |
| case models.ResponseFunctionCall: |
| return &functionCallInfo{ID: v.ID, Name: v.Name, Arguments: v.Arguments}, true |
| default: |
| return nil, false |
| } |
| } |
|
|
| func withOutputItemStatus(item interface{}, status string) interface{} { |
| switch v := item.(type) { |
| case models.ResponseOutputMessage: |
| v.Status = status |
| return v |
| case models.ResponseFunctionCall: |
| v.Status = status |
| return v |
| case models.ResponseApplyPatchCall: |
| v.Status = status |
| return v |
| case models.ResponseShellCall: |
| v.Status = status |
| return v |
| case models.ResponseLocalShellCall: |
| v.Status = status |
| return v |
| default: |
| return item |
| } |
| } |
|
|