KaThaNg commited on
Commit
123cdd5
·
verified ·
1 Parent(s): cf220f2

Update sse.go

Browse files
Files changed (1) hide show
  1. sse.go +284 -105
sse.go CHANGED
@@ -20,6 +20,10 @@ func streamOpenAIResponseToClaudeSSE(
20
  requestedModel string, // Model name determined in handler
21
  originalClaudeRequest *ClaudeRequest,
22
  ) {
 
 
 
 
23
  c.Writer.Header().Set("Content-Type", "text/event-stream")
24
  c.Writer.Header().Set("Cache-Control", "no-cache")
25
  c.Writer.Header().Set("Connection", "keep-alive")
@@ -34,12 +38,11 @@ func streamOpenAIResponseToClaudeSSE(
34
  startTime := time.Now()
35
 
36
  calculatedInputTokens := calculateInputTokensFromClaudeRequest(originalClaudeRequest)
37
- // Giữ lại: Log INFO quan trọng về việc bắt đầu SSE
38
  log.Printf("INFO: [%s] (OpenAI->Claude SSE) Bắt đầu SSE. Input tokens ước tính: %d. Model: %s", messageID, calculatedInputTokens, requestedModel)
39
 
40
  inputTokens := calculatedInputTokens
41
  outputTokens := 0
42
- finalUsageReceivedFromStream := false
43
  eventIndex := 0
44
 
45
  doneChan := make(chan struct{})
@@ -53,7 +56,6 @@ func streamOpenAIResponseToClaudeSSE(
53
  for scanner.Scan() {
54
  select {
55
  case <-c.Request.Context().Done():
56
- // Giữ lại: Log INFO khi client ngắt kết nối
57
  log.Printf("INFO: [%s] (OpenAI->Claude SSE) SSE: Client ngắt kết nối trong vòng lặp đọc.", messageID)
58
  return
59
  default:
@@ -67,15 +69,13 @@ func streamOpenAIResponseToClaudeSSE(
67
  if strings.HasPrefix(line, "data:") {
68
  dataStr := strings.TrimSpace(strings.TrimPrefix(line, "data:"))
69
  if dataStr == "[DONE]" {
70
- // Giữ lại: Log INFO quan trọng
71
  log.Printf("INFO: [%s] (OpenAI->Claude SSE) SSE: Nhận được dấu hiệu [DONE].", messageID)
72
- return
73
  }
74
 
75
  var chunk OpenAIStreamChunk
76
  if err := json.Unmarshal([]byte(dataStr), &chunk); err != nil {
77
- // Giữ lại: Log WARN quan trọng
78
- log.Printf("WARN: [%s] (OpenAI->Claude SSE) SSE: Không thể giải mã chunk JSON: %v. Data: %s", messageID, err, dataStr)
79
  continue
80
  }
81
 
@@ -87,94 +87,85 @@ func streamOpenAIResponseToClaudeSSE(
87
  if choice.Delta.Content != nil {
88
  contentChunk := *choice.Delta.Content
89
  accumulatedContent += contentChunk
90
- currentOutputTokens := estimateTokens(accumulatedContent)
91
-
92
  deltaPayload := ClaudeSSEEvent{
93
  Type: "content_block_delta",
94
- Index: func() *int { i := 0; return &i }(),
95
  Delta: &ClaudeSSEDelta{
96
  Type: "text_delta",
97
  Text: &contentChunk,
98
  },
99
  }
100
- // Không log event này (shouldLog = false)
101
  if !sendSSEEvent(c, "content_block_delta", deltaPayload, messageID, eventIndex, false, "(OpenAI->Claude SSE)") {
102
- return
103
  }
104
  eventIndex++
105
 
 
 
106
  if currentOutputTokens != outputTokens {
107
  outputTokens = currentOutputTokens
108
  intermediateUsage := ClaudeSSEUsage{OutputTokens: outputTokens}
109
  intermediateDeltaPayload := ClaudeSSEEvent{
110
  Type: "message_delta",
111
- Delta: &ClaudeSSEDelta{},
112
  Usage: &intermediateUsage,
113
  }
114
- // Không log event này (shouldLog = false)
115
  if !sendSSEEvent(c, "message_delta", intermediateDeltaPayload, messageID, eventIndex, false, "(OpenAI->Claude SSE)") {
116
- return
117
  }
118
  eventIndex++
119
  }
120
  }
121
  }
122
- // Xử lý Usage nếu trong chunk (phiên bản mới của API OpenAI thể gửi)
123
- if chunk.Usage != nil {
124
- // Vô hiệu hóa: Log INFO này có thể quá chi tiết nếu block usage xuất hiện thường xuyên
125
- // log.Printf("INFO: [%s] (OpenAI->Claude SSE) SSE: Nhận được block usage từ OpenAI stream: %+v", messageID, *chunk.Usage)
126
- if chunk.Usage.CompletionTokens > 0 { // OpenAI gọi là completion_tokens
127
  outputTokens = chunk.Usage.CompletionTokens
128
- finalUsageReceivedFromStream = true // Đánh dấu đã nhận usage cuối cùng từ stream
129
  }
130
- if chunk.Usage.PromptTokens != inputTokens && chunk.Usage.PromptTokens > 0 { // OpenAI gọi là prompt_tokens
131
- inputTokens = chunk.Usage.PromptTokens
132
  }
133
  }
134
  }
135
  }
136
-
137
  if err := scanner.Err(); err != nil {
138
  select {
139
  case <-c.Request.Context().Done():
140
- // Giữ lại: Log INFO
141
  log.Printf("INFO: [%s] (OpenAI->Claude SSE) SSE: Đọc upstream bị gián đoạn bởi client ngắt kết nối: %v", messageID, c.Request.Context().Err())
142
  default:
143
- // Giữ lại: Log ERROR quan trọng
144
  log.Printf("ERROR: [%s] (OpenAI->Claude SSE) SSE: Lỗi đọc nội dung phản hồi upstream: %v", messageID, err)
145
  errChan <- fmt.Errorf("lỗi đọc upstream: %w", err)
146
  }
147
  }
148
  }()
149
 
 
150
  startUsage := ClaudeUsage{InputTokens: calculatedInputTokens, OutputTokens: 0}
151
  startMessage := ClaudeSSEMessage{ID: messageID, Type: "message", Role: "assistant", Content: []ClaudeContentBlock{}, Model: requestedModel, Usage: startUsage}
152
  startEvent := ClaudeSSEEvent{Type: "message_start", Message: &startMessage}
153
- // Log event quan trọng này (shouldLog = true)
154
  if !sendSSEEvent(c, "message_start", startEvent, messageID, eventIndex, true, "(OpenAI->Claude SSE)") {
155
  return
156
  }
157
  eventIndex++
158
 
159
- contentStartBlock := ClaudeSSEContentBlock{Type: "text", Text: ""}
 
160
  contentStartEvent := ClaudeSSEEvent{Type: "content_block_start", Index: func() *int { i := 0; return &i }(), ContentBlock: &contentStartBlock}
161
- // Log event quan trọng này (shouldLog = true)
162
  if !sendSSEEvent(c, "content_block_start", contentStartEvent, messageID, eventIndex, true, "(OpenAI->Claude SSE)") {
163
  return
164
  }
165
  eventIndex++
166
 
 
167
  select {
168
  case <-doneChan:
169
- // hiệu hóa: Log DEBUG không cần thiết
170
- // log.Printf("DEBUG: [%s] (OpenAI->Claude SSE) SSE: Đọc upstream hoàn tất.", messageID)
171
  case err := <-errChan:
172
- // Giữ lại: Log ERROR quan trọng
173
  log.Printf("ERROR: [%s] (OpenAI->Claude SSE) SSE: Nhận lỗi từ goroutine đọc: %v", messageID, err)
174
  streamErrorOccurred = true
175
  errorDetails = &ClaudeError{Type: "api_error", Message: fmt.Sprintf("Lỗi đọc phản hồi upstream: %v", err)}
176
  case <-c.Request.Context().Done():
177
- // Giữ lại: Log INFO quan trọng
178
  log.Printf("INFO: [%s] (OpenAI->Claude SSE) SSE: Client ngắt kết nối trong quá trình xử lý stream: %v", messageID, c.Request.Context().Err())
179
  streamErrorOccurred = true
180
  errorDetails = &ClaudeError{Type: "client_disconnect", Message: "Client ngắt kết nối trong quá trình stream"}
@@ -182,137 +173,324 @@ func streamOpenAIResponseToClaudeSSE(
182
 
183
  var claudeStopReason string
184
  if streamErrorOccurred && errorDetails != nil && errorDetails.Type == "client_disconnect" {
185
- claudeStopReason = "client_disconnect" // Specific reason for client disconnect
186
  } else if streamErrorOccurred {
187
- claudeStopReason = "error" // Generic error if stream failed for other reasons
188
  } else {
189
  claudeStopReason = mapOpenAIFinishReasonToClaude(openAIFinishReason)
190
  }
191
 
192
  finalInputTokens := inputTokens
193
  finalOutputTokens := outputTokens
194
-
195
- if !finalUsageReceivedFromStream { // Nếu không nhận được usage từ stream (ví dụ API cũ hoặc không có)
196
- estimatedOutput := estimateTokens(accumulatedContent)
197
- finalOutputTokens = max(0, estimatedOutput)
198
- if accumulatedContent == "" {
199
- finalOutputTokens = 0
200
- }
201
- } else {
202
- finalOutputTokens = max(0, outputTokens) // Sử dụng giá trị đã nhận từ stream nếu có
203
  }
204
- finalInputTokens = max(0, finalInputTokens) // Đảm bảo không âm
 
 
205
 
206
- // Giữ lại: Log INFO tóm tắt quan trọng khi stream hoàn tất
207
  log.Printf("INFO: [%s] (OpenAI->Claude SSE) SSE Stream hoàn tất. Lý do dừng: %s. Input: %d, Output: %d. Thời gian: %v. OpenAI Finish Reason: %s",
208
  messageID, claudeStopReason, finalInputTokens, finalOutputTokens, time.Since(startTime), safeDeref(openAIFinishReason))
209
 
210
- // Final events to send to client
211
- finalHackUsageData := ClaudeSSEUsage{OutputTokens: finalOutputTokens} // For message_delta
212
- finalStopUsageData := ClaudeSSEUsage{InputTokens: &finalInputTokens, OutputTokens: finalOutputTokens} // For message_stop
213
-
214
  finalDeltaStopReason := claudeStopReason
215
  priorityFinalDeltaPayload := ClaudeSSEEvent{
216
  Type: "message_delta",
217
  Delta: &ClaudeSSEDelta{
218
  StopReason: &finalDeltaStopReason,
219
- StopSequence: nil, // OpenAI doesn't provide this directly in stream delta
220
  },
221
- Usage: &finalHackUsageData, // Claude expects usage in message_delta before stop
222
  }
223
- // Không log event này (shouldLog = false)
224
  _ = sendSSEEvent(c, "message_delta", priorityFinalDeltaPayload, messageID, eventIndex, false, "(OpenAI->Claude SSE)")
225
  eventIndex++
226
 
 
227
  contentStopPayload := ClaudeSSEEvent{Type: "content_block_stop", Index: func() *int { i := 0; return &i }()}
228
- // Không log event này (shouldLog = false)
229
  _ = sendSSEEvent(c, "content_block_stop", contentStopPayload, messageID, eventIndex, false, "(OpenAI->Claude SSE)")
230
  eventIndex++
231
 
232
- messageStopPayload := ClaudeSSEEvent{Type: "message_stop", Usage: &finalStopUsageData} // Claude expects final usage in message_stop
233
- // Log event quan trọng này (shouldLog = true)
 
234
  _ = sendSSEEvent(c, "message_stop", messageStopPayload, messageID, eventIndex, true, "(OpenAI->Claude SSE)")
235
  eventIndex++
236
 
237
  if streamErrorOccurred && errorDetails != nil && errorDetails.Type != "client_disconnect" {
238
  errorPayload := ClaudeSSEEvent{Type: "error", Error: errorDetails}
239
- // Log event lỗi (shouldLog = true)
240
  _ = sendSSEEvent(c, "error", errorPayload, messageID, eventIndex, true, "(OpenAI->Claude SSE)")
241
- // eventIndex++ // Không cần tăng nữa vì đây là event cuối cùng trong trường hợp lỗi
242
  }
243
  }
244
 
245
- // proxyClaudeNativeSSE directly proxies SSE events from a Claude-native upstream
246
  func proxyClaudeNativeSSE(
247
  c *gin.Context,
248
  upstreamResp *http.Response,
249
  claudeRequestID string,
 
250
  ) {
251
  c.Writer.Header().Set("Content-Type", "text/event-stream")
252
  c.Writer.Header().Set("Cache-Control", "no-cache")
253
  c.Writer.Header().Set("Connection", "keep-alive")
254
  c.Writer.Header().Set("X-Content-Type-Options", "nosniff")
255
- // Copy Anthropic-Version header if present in upstream response
256
  if anthropicVersion := upstreamResp.Header.Get("anthropic-version"); anthropicVersion != "" {
257
  c.Writer.Header().Set("anthropic-version", anthropicVersion)
258
  }
259
  c.Writer.Flush()
260
 
261
- // Giữ lại: Log INFO quan trọng
262
- log.Printf("INFO: [%s] (Claude Native SSE) SSE Proxy: Bắt đầu proxying SSE từ upstream Claude native.", claudeRequestID)
 
 
 
263
  startTime := time.Now()
264
 
265
- clientGone := c.Request.Context().Done()
266
- scanner := bufio.NewScanner(upstreamResp.Body)
267
- defer upstreamResp.Body.Close()
268
-
269
- eventCount := 0
270
- lineBuffer := "" // Buffer for multi-line data fields (though rare in Claude SSE)
271
-
272
- for {
273
- select {
274
- case <-clientGone:
275
- // Giữ lại: Log INFO
276
- log.Printf("INFO: [%s] (Claude Native SSE) SSE Proxy: Client ngắt kết nối.", claudeRequestID)
277
- return
278
- default:
279
- if !scanner.Scan() {
280
- if err := scanner.Err(); err != nil {
281
- // Giữ lại: Log ERROR
282
- log.Printf("ERROR: [%s] (Claude Native SSE) SSE Proxy: Lỗi đọc từ upstream: %v", claudeRequestID, err)
283
- // Optionally send a Claude error event if the protocol allows and client is still connected
284
- // For now, just close.
285
- } else {
286
- // Giữ lại: Log INFO
287
- log.Printf("INFO: [%s] (Claude Native SSE) SSE Proxy: Upstream đóng kết nối.", claudeRequestID)
288
- }
289
- // Giữ lại: Log INFO
290
- log.Printf("INFO: [%s] (Claude Native SSE) SSE Proxy: Hoàn thành sau %v, %d events processed.", claudeRequestID, time.Since(startTime), eventCount)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
291
  return
 
292
  }
293
 
294
  line := scanner.Text()
295
- lineBuffer += line + "\n" // Append line to buffer
 
 
296
 
297
- // An empty line signifies end of an event in SSE protocol
298
- if line == "" {
299
- _, err := fmt.Fprint(c.Writer, lineBuffer) // Write the buffered event (includes the final empty line)
300
- if err != nil {
301
- // Giữ lại: Log WARN
302
- log.Printf("WARN: [%s] (Claude Native SSE) SSE Proxy: Lỗi ghi cho client: %v. Client có thể đã ngắt kết nối.", claudeRequestID, err)
303
- return // Stop if we can't write to client
304
  }
305
- c.Writer.Flush()
306
- lineBuffer = "" // Reset buffer for the next event
307
- // Vô hiệu hóa: Log TRACE quá chi tiết
308
- // log.Printf("TRACE: [%s] (Claude Native SSE) SSE Proxy: Flushed event %d", claudeRequestID, eventCount)
309
- }
310
 
311
- // Increment event count when an "event:" line is encountered (heuristic)
312
- if strings.HasPrefix(line, "event:") {
313
- eventCount++
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
314
  }
315
  }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
316
  }
317
  }
318
 
@@ -330,10 +508,11 @@ func sendSSEEvent(c *gin.Context, eventName string, data interface{}, requestID
330
  log.Printf("ERROR: [%s] %s Không thể marshal SSE event %d (%s): %v", requestID, logPrefix, eventIndex, eventName, err)
331
  return true // Continue trying to send other events if this one fails to marshal
332
  }
 
333
  _, err = fmt.Fprintf(c.Writer, "event: %s\ndata: %s\n\n", eventName, string(jsonData))
334
  if err != nil {
335
  if shouldLog || eventName == "message_stop" || eventName == "error" {
336
- log.Printf("WARN: [%s] %s Không thể ghi SSE event %d (%s) cho client: %v. Client có thể đã ngắt kết nối.", requestID, logPrefix, eventIndex, eventName, err)
337
  }
338
  return false // Stop if writing to client fails
339
  }
 
20
  requestedModel string, // Model name determined in handler
21
  originalClaudeRequest *ClaudeRequest,
22
  ) {
23
+ // --- Phần này giữ nguyên logic như phiên bản trước của bạn ---
24
+ // ... (logic gửi message_start, content_block_start, xử lý delta, gửi message_delta "hack", message_stop)
25
+ // ... đảm bảo rằng "hack" gửi output_tokens trong message_delta trung gian được giữ lại.
26
+
27
  c.Writer.Header().Set("Content-Type", "text/event-stream")
28
  c.Writer.Header().Set("Cache-Control", "no-cache")
29
  c.Writer.Header().Set("Connection", "keep-alive")
 
38
  startTime := time.Now()
39
 
40
  calculatedInputTokens := calculateInputTokensFromClaudeRequest(originalClaudeRequest)
 
41
  log.Printf("INFO: [%s] (OpenAI->Claude SSE) Bắt đầu SSE. Input tokens ước tính: %d. Model: %s", messageID, calculatedInputTokens, requestedModel)
42
 
43
  inputTokens := calculatedInputTokens
44
  outputTokens := 0
45
+ finalUsageReceivedFromStream := false // Flag to check if usage is explicitly received from OpenAI stream
46
  eventIndex := 0
47
 
48
  doneChan := make(chan struct{})
 
56
  for scanner.Scan() {
57
  select {
58
  case <-c.Request.Context().Done():
 
59
  log.Printf("INFO: [%s] (OpenAI->Claude SSE) SSE: Client ngắt kết nối trong vòng lặp đọc.", messageID)
60
  return
61
  default:
 
69
  if strings.HasPrefix(line, "data:") {
70
  dataStr := strings.TrimSpace(strings.TrimPrefix(line, "data:"))
71
  if dataStr == "[DONE]" {
 
72
  log.Printf("INFO: [%s] (OpenAI->Claude SSE) SSE: Nhận được dấu hiệu [DONE].", messageID)
73
+ return // OpenAI stream finished
74
  }
75
 
76
  var chunk OpenAIStreamChunk
77
  if err := json.Unmarshal([]byte(dataStr), &chunk); err != nil {
78
+ log.Printf("WARN: [%s] (OpenAI->Claude SSE) SSE: Không thể giải mã chunk JSON OpenAI: %v. Data: %s", messageID, err, dataStr)
 
79
  continue
80
  }
81
 
 
87
  if choice.Delta.Content != nil {
88
  contentChunk := *choice.Delta.Content
89
  accumulatedContent += contentChunk
90
+ // Gửi content_block_delta cho client
 
91
  deltaPayload := ClaudeSSEEvent{
92
  Type: "content_block_delta",
93
+ Index: func() *int { i := 0; return &i }(), // Claude spec: index of the content block
94
  Delta: &ClaudeSSEDelta{
95
  Type: "text_delta",
96
  Text: &contentChunk,
97
  },
98
  }
 
99
  if !sendSSEEvent(c, "content_block_delta", deltaPayload, messageID, eventIndex, false, "(OpenAI->Claude SSE)") {
100
+ return // Client disconnected
101
  }
102
  eventIndex++
103
 
104
+ // "Hack" để gửi message_delta với output_tokens cập nhật
105
+ currentOutputTokens := estimateTokens(accumulatedContent)
106
  if currentOutputTokens != outputTokens {
107
  outputTokens = currentOutputTokens
108
  intermediateUsage := ClaudeSSEUsage{OutputTokens: outputTokens}
109
  intermediateDeltaPayload := ClaudeSSEEvent{
110
  Type: "message_delta",
111
+ Delta: &ClaudeSSEDelta{}, // Delta rỗng cho loại event này
112
  Usage: &intermediateUsage,
113
  }
 
114
  if !sendSSEEvent(c, "message_delta", intermediateDeltaPayload, messageID, eventIndex, false, "(OpenAI->Claude SSE)") {
115
+ return // Client disconnected
116
  }
117
  eventIndex++
118
  }
119
  }
120
  }
121
+ if chunk.Usage != nil { // Some OpenAI versions might send usage in chunks
122
+ if chunk.Usage.CompletionTokens > 0 {
 
 
 
123
  outputTokens = chunk.Usage.CompletionTokens
124
+ finalUsageReceivedFromStream = true
125
  }
126
+ if chunk.Usage.PromptTokens > 0 {
127
+ inputTokens = chunk.Usage.PromptTokens // Update input tokens if provided by OpenAI
128
  }
129
  }
130
  }
131
  }
 
132
  if err := scanner.Err(); err != nil {
133
  select {
134
  case <-c.Request.Context().Done():
 
135
  log.Printf("INFO: [%s] (OpenAI->Claude SSE) SSE: Đọc upstream bị gián đoạn bởi client ngắt kết nối: %v", messageID, c.Request.Context().Err())
136
  default:
 
137
  log.Printf("ERROR: [%s] (OpenAI->Claude SSE) SSE: Lỗi đọc nội dung phản hồi upstream: %v", messageID, err)
138
  errChan <- fmt.Errorf("lỗi đọc upstream: %w", err)
139
  }
140
  }
141
  }()
142
 
143
+ // Gửi message_start
144
  startUsage := ClaudeUsage{InputTokens: calculatedInputTokens, OutputTokens: 0}
145
  startMessage := ClaudeSSEMessage{ID: messageID, Type: "message", Role: "assistant", Content: []ClaudeContentBlock{}, Model: requestedModel, Usage: startUsage}
146
  startEvent := ClaudeSSEEvent{Type: "message_start", Message: &startMessage}
 
147
  if !sendSSEEvent(c, "message_start", startEvent, messageID, eventIndex, true, "(OpenAI->Claude SSE)") {
148
  return
149
  }
150
  eventIndex++
151
 
152
+ // Gửi content_block_start
153
+ contentStartBlock := ClaudeSSEContentBlock{Type: "text", Text: ""} // Text ban đầu rỗng
154
  contentStartEvent := ClaudeSSEEvent{Type: "content_block_start", Index: func() *int { i := 0; return &i }(), ContentBlock: &contentStartBlock}
 
155
  if !sendSSEEvent(c, "content_block_start", contentStartEvent, messageID, eventIndex, true, "(OpenAI->Claude SSE)") {
156
  return
157
  }
158
  eventIndex++
159
 
160
+ // Chờ goroutine đọc xong hoặc có lỗi
161
  select {
162
  case <-doneChan:
163
+ // Normal completion
 
164
  case err := <-errChan:
 
165
  log.Printf("ERROR: [%s] (OpenAI->Claude SSE) SSE: Nhận lỗi từ goroutine đọc: %v", messageID, err)
166
  streamErrorOccurred = true
167
  errorDetails = &ClaudeError{Type: "api_error", Message: fmt.Sprintf("Lỗi đọc phản hồi upstream: %v", err)}
168
  case <-c.Request.Context().Done():
 
169
  log.Printf("INFO: [%s] (OpenAI->Claude SSE) SSE: Client ngắt kết nối trong quá trình xử lý stream: %v", messageID, c.Request.Context().Err())
170
  streamErrorOccurred = true
171
  errorDetails = &ClaudeError{Type: "client_disconnect", Message: "Client ngắt kết nối trong quá trình stream"}
 
173
 
174
  var claudeStopReason string
175
  if streamErrorOccurred && errorDetails != nil && errorDetails.Type == "client_disconnect" {
176
+ claudeStopReason = "client_disconnect"
177
  } else if streamErrorOccurred {
178
+ claudeStopReason = "error"
179
  } else {
180
  claudeStopReason = mapOpenAIFinishReasonToClaude(openAIFinishReason)
181
  }
182
 
183
  finalInputTokens := inputTokens
184
  finalOutputTokens := outputTokens
185
+ if !finalUsageReceivedFromStream { // If OpenAI didn't send final usage, estimate from accumulated content
186
+ finalOutputTokens = estimateTokens(accumulatedContent)
 
 
 
 
 
 
 
187
  }
188
+ finalOutputTokens = max(0, finalOutputTokens)
189
+ finalInputTokens = max(0, finalInputTokens)
190
+
191
 
 
192
  log.Printf("INFO: [%s] (OpenAI->Claude SSE) SSE Stream hoàn tất. Lý do dừng: %s. Input: %d, Output: %d. Thời gian: %v. OpenAI Finish Reason: %s",
193
  messageID, claudeStopReason, finalInputTokens, finalOutputTokens, time.Since(startTime), safeDeref(openAIFinishReason))
194
 
195
+ // "Hack" message_delta cuối cùng với stop_reason và usage (chỉ output_tokens)
196
+ finalHackUsageData := ClaudeSSEUsage{OutputTokens: finalOutputTokens}
 
 
197
  finalDeltaStopReason := claudeStopReason
198
  priorityFinalDeltaPayload := ClaudeSSEEvent{
199
  Type: "message_delta",
200
  Delta: &ClaudeSSEDelta{
201
  StopReason: &finalDeltaStopReason,
202
+ StopSequence: nil,
203
  },
204
+ Usage: &finalHackUsageData,
205
  }
 
206
  _ = sendSSEEvent(c, "message_delta", priorityFinalDeltaPayload, messageID, eventIndex, false, "(OpenAI->Claude SSE)")
207
  eventIndex++
208
 
209
+ // Gửi content_block_stop
210
  contentStopPayload := ClaudeSSEEvent{Type: "content_block_stop", Index: func() *int { i := 0; return &i }()}
 
211
  _ = sendSSEEvent(c, "content_block_stop", contentStopPayload, messageID, eventIndex, false, "(OpenAI->Claude SSE)")
212
  eventIndex++
213
 
214
+ // Gửi message_stop với usage đầy đủ
215
+ finalStopUsageData := ClaudeSSEUsage{InputTokens: &finalInputTokens, OutputTokens: finalOutputTokens}
216
+ messageStopPayload := ClaudeSSEEvent{Type: "message_stop", Usage: &finalStopUsageData}
217
  _ = sendSSEEvent(c, "message_stop", messageStopPayload, messageID, eventIndex, true, "(OpenAI->Claude SSE)")
218
  eventIndex++
219
 
220
  if streamErrorOccurred && errorDetails != nil && errorDetails.Type != "client_disconnect" {
221
  errorPayload := ClaudeSSEEvent{Type: "error", Error: errorDetails}
 
222
  _ = sendSSEEvent(c, "error", errorPayload, messageID, eventIndex, true, "(OpenAI->Claude SSE)")
 
223
  }
224
  }
225
 
226
+ // proxyClaudeNativeSSE xử SSE từ upstream Claude native và **thêm "hack" usage**
227
  func proxyClaudeNativeSSE(
228
  c *gin.Context,
229
  upstreamResp *http.Response,
230
  claudeRequestID string,
231
+ originalClaudeRequest *ClaudeRequest, // Cần để tính input tokens và lấy model
232
  ) {
233
  c.Writer.Header().Set("Content-Type", "text/event-stream")
234
  c.Writer.Header().Set("Cache-Control", "no-cache")
235
  c.Writer.Header().Set("Connection", "keep-alive")
236
  c.Writer.Header().Set("X-Content-Type-Options", "nosniff")
 
237
  if anthropicVersion := upstreamResp.Header.Get("anthropic-version"); anthropicVersion != "" {
238
  c.Writer.Header().Set("anthropic-version", anthropicVersion)
239
  }
240
  c.Writer.Flush()
241
 
242
+ messageID := claudeRequestID
243
+ requestedModel := originalClaudeRequest.Model
244
+ accumulatedContent := ""
245
+ streamErrorOccurred := false
246
+ var errorDetails *ClaudeError
247
  startTime := time.Now()
248
 
249
+ // Input tokens được tính từ yêu cầu gốc, giống như khi chuyển đổi từ OpenAI
250
+ calculatedInputTokens := calculateInputTokensFromClaudeRequest(originalClaudeRequest)
251
+ log.Printf("INFO: [%s] (Claude Native SSE with Hack) Bắt đầu SSE. Input tokens ước tính: %d. Model: %s", messageID, calculatedInputTokens, requestedModel)
252
+
253
+ inputTokens := calculatedInputTokens // inputTokens sẽ không thay đổi trong quá trình stream này
254
+ outputTokens := 0
255
+ eventIndex := 0
256
+
257
+ var upstreamFinalStopReason string
258
+ var upstreamFinalUsage *ClaudeUsage
259
+
260
+ doneChan := make(chan struct{})
261
+ errChan := make(chan error, 1)
262
+
263
+ // Gửi message_start cho client (tự tạo)
264
+ startUsage := ClaudeUsage{InputTokens: calculatedInputTokens, OutputTokens: 0}
265
+ startMessage := ClaudeSSEMessage{ID: messageID, Type: "message", Role: "assistant", Content: []ClaudeContentBlock{}, Model: requestedModel, Usage: startUsage}
266
+ startEvent := ClaudeSSEEvent{Type: "message_start", Message: &startMessage}
267
+ if !sendSSEEvent(c, "message_start", startEvent, messageID, eventIndex, true, "(Claude Native SSE with Hack)") {
268
+ return // Client disconnected
269
+ }
270
+ eventIndex++
271
+
272
+ // Gửi content_block_start cho client (tự tạo)
273
+ // Claude native thường gửi content_block_start với index 0 và content_block.type="text", content_block.text=""
274
+ contentStartBlock := ClaudeSSEContentBlock{Type: "text", Text: ""}
275
+ contentStartEvent := ClaudeSSEEvent{Type: "content_block_start", Index: func() *int { i := 0; return &i }(), ContentBlock: &contentStartBlock}
276
+ if !sendSSEEvent(c, "content_block_start", contentStartEvent, messageID, eventIndex, true, "(Claude Native SSE with Hack)") {
277
+ return // Client disconnected
278
+ }
279
+ eventIndex++
280
+
281
+ go func() {
282
+ defer close(doneChan)
283
+ defer upstreamResp.Body.Close()
284
+
285
+ scanner := bufio.NewScanner(upstreamResp.Body)
286
+ for scanner.Scan() {
287
+ select {
288
+ case <-c.Request.Context().Done():
289
+ log.Printf("INFO: [%s] (Claude Native SSE with Hack) Client ngắt kết nối trong vòng lặp đọc.", messageID)
290
  return
291
+ default:
292
  }
293
 
294
  line := scanner.Text()
295
+ if line == "" { // Skip empty lines between events
296
+ continue
297
+ }
298
 
299
+ // Chỉ xử các dòng data, bỏ qua event: lines chúng ta sẽ tự tạo/forward có chọn lọc
300
+ if strings.HasPrefix(line, "data:") {
301
+ dataStr := strings.TrimSpace(strings.TrimPrefix(line, "data:"))
302
+ var upstreamEvent ClaudeSSEEvent // Sử dụng struct ClaudeSSEEvent để parse
303
+ if err := json.Unmarshal([]byte(dataStr), &upstreamEvent); err != nil {
304
+ log.Printf("WARN: [%s] (Claude Native SSE with Hack) Không thể giải chunk JSON Claude native: %v. Data: %s", messageID, err, dataStr)
305
+ continue
306
  }
 
 
 
 
 
307
 
308
+ // Xử các loại event từ Claude native upstream
309
+ switch upstreamEvent.Type {
310
+ case "content_block_delta":
311
+ if upstreamEvent.Delta != nil && upstreamEvent.Delta.Text != nil {
312
+ textChunk := *upstreamEvent.Delta.Text
313
+ accumulatedContent += textChunk
314
+
315
+ // Forward content_block_delta đến client
316
+ // Đảm bảo index được set (thường là 0 cho một content block đơn giản)
317
+ clientContentDelta := ClaudeSSEEvent{
318
+ Type: "content_block_delta",
319
+ Index: upstreamEvent.Index, // Sử dụng index từ upstream nếu có, nếu không thì 0
320
+ Delta: &ClaudeSSEDelta{
321
+ Type: "text_delta",
322
+ Text: &textChunk,
323
+ },
324
+ }
325
+ if clientContentDelta.Index == nil { clientContentDelta.Index = func() *int { i := 0; return &i }() }
326
+
327
+ if !sendSSEEvent(c, "content_block_delta", clientContentDelta, messageID, eventIndex, false, "(Claude Native SSE with Hack)") {
328
+ return // Client disconnected
329
+ }
330
+ eventIndex++
331
+
332
+ // "Hack" để gửi message_delta với output_tokens cập nhật
333
+ currentOutputTokens := estimateTokens(accumulatedContent)
334
+ if currentOutputTokens != outputTokens {
335
+ outputTokens = currentOutputTokens
336
+ intermediateUsage := ClaudeSSEUsage{OutputTokens: outputTokens}
337
+ intermediateDeltaPayload := ClaudeSSEEvent{
338
+ Type: "message_delta",
339
+ Delta: &ClaudeSSEDelta{}, // Delta rỗng
340
+ Usage: &intermediateUsage,
341
+ }
342
+ if !sendSSEEvent(c, "message_delta", intermediateDeltaPayload, messageID, eventIndex, false, "(Claude Native SSE with Hack)") {
343
+ return // Client disconnected
344
+ }
345
+ eventIndex++
346
+ }
347
+ }
348
+ case "message_delta": // Claude native có thể gửi message_delta với usage
349
+ if upstreamEvent.Usage != nil {
350
+ // Cập nhật outputTokens nếu upstream cung cấp
351
+ // Đây là thông tin chính xác hơn là ước tính
352
+ if upstreamEvent.Usage.OutputTokens > outputTokens {
353
+ outputTokens = upstreamEvent.Usage.OutputTokens
354
+ }
355
+ // Gửi message_delta "hack" với usage từ upstream (nếu có)
356
+ // hoặc usage đã tính toán nếu upstream không gửi
357
+ currentHackUsage := ClaudeSSEUsage{OutputTokens: outputTokens}
358
+ hackMessageDelta := ClaudeSSEEvent{
359
+ Type: "message_delta",
360
+ Delta: &ClaudeSSEDelta{
361
+ // Claude native message_delta có thể có stop_reason/sequence
362
+ StopReason: upstreamEvent.Delta.StopReason,
363
+ StopSequence: upstreamEvent.Delta.StopSequence,
364
+ },
365
+ Usage: &currentHackUsage,
366
+ }
367
+ if !sendSSEEvent(c, "message_delta", hackMessageDelta, messageID, eventIndex, false, "(Claude Native SSE with Hack - from upstream delta)") {
368
+ return
369
+ }
370
+ eventIndex++
371
+ }
372
+ if upstreamEvent.Delta != nil && upstreamEvent.Delta.StopReason != nil {
373
+ upstreamFinalStopReason = *upstreamEvent.Delta.StopReason
374
+ }
375
+
376
+ case "message_stop":
377
+ log.Printf("INFO: [%s] (Claude Native SSE with Hack) Nhận message_stop từ upstream.", messageID)
378
+ if upstreamEvent.Message != nil && upstreamEvent.Message.StopReason != nil { // Older format
379
+ upstreamFinalStopReason = *upstreamEvent.Message.StopReason
380
+ }
381
+ // Newer Claude API sends stop_reason in message_delta, and message_stop has final usage.
382
+ if upstreamEvent.Usage != nil { // Final usage from upstream
383
+ upstreamFinalUsage = upstreamEvent.Usage
384
+ if upstreamEvent.Usage.OutputTokens > outputTokens { // Prefer upstream's final count
385
+ outputTokens = upstreamEvent.Usage.OutputTokens
386
+ }
387
+ }
388
+ return // Kết thúc vòng lặp khi nhận message_stop
389
+
390
+ case "error":
391
+ log.Printf("ERROR: [%s] (Claude Native SSE with Hack) Nhận lỗi từ upstream: %+v", messageID, upstreamEvent.Error)
392
+ streamErrorOccurred = true
393
+ errorDetails = upstreamEvent.Error
394
+ return // Kết thúc vòng lặp
395
+
396
+ // Các event khác như message_start, content_block_start, content_block_stop từ upstream sẽ được bỏ qua
397
+ // vì chúng ta đã tự tạo các event tương ứng cho client ở đầu luồng.
398
+ // Ping events có thể được forward nếu cần.
399
+ case "ping":
400
+ pingEvent := ClaudeSSEEvent{Type: "ping"}
401
+ if !sendSSEEvent(c, "ping", pingEvent, messageID, eventIndex, false, "(Claude Native SSE with Hack - ping)") {
402
+ return
403
+ }
404
+ eventIndex++
405
+
406
+ default:
407
+ // log.Printf("DEBUG: [%s] (Claude Native SSE with Hack) Bỏ qua event upstream không xác định: %s", messageID, upstreamEvent.Type)
408
+ }
409
  }
410
  }
411
+ if err := scanner.Err(); err != nil {
412
+ select {
413
+ case <-c.Request.Context().Done():
414
+ log.Printf("INFO: [%s] (Claude Native SSE with Hack) Đọc upstream bị gián đoạn bởi client ngắt kết nối: %v", messageID, c.Request.Context().Err())
415
+ default:
416
+ log.Printf("ERROR: [%s] (Claude Native SSE with Hack) Lỗi đọc nội dung phản hồi upstream: %v", messageID, err)
417
+ errChan <- fmt.Errorf("lỗi đọc upstream: %w", err)
418
+ }
419
+ }
420
+ }()
421
+
422
+ select {
423
+ case <-doneChan:
424
+ // Normal completion of upstream read
425
+ case err := <-errChan:
426
+ log.Printf("ERROR: [%s] (Claude Native SSE with Hack) Nhận lỗi từ goroutine đọc: %v", messageID, err)
427
+ streamErrorOccurred = true
428
+ if errorDetails == nil { // Ensure errorDetails is set
429
+ errorDetails = &ClaudeError{Type: "api_error", Message: fmt.Sprintf("Lỗi đọc phản hồi upstream: %v", err)}
430
+ }
431
+ case <-c.Request.Context().Done():
432
+ log.Printf("INFO: [%s] (Claude Native SSE with Hack) Client ngắt kết nối trong quá trình xử lý stream: %v", messageID, c.Request.Context().Err())
433
+ streamErrorOccurred = true
434
+ if errorDetails == nil { // Ensure errorDetails is set
435
+ errorDetails = &ClaudeError{Type: "client_disconnect", Message: "Client ngắt kết nối trong quá trình stream"}
436
+ }
437
+ }
438
+
439
+ var finalStopReasonClient string
440
+ if streamErrorOccurred && errorDetails != nil && errorDetails.Type == "client_disconnect" {
441
+ finalStopReasonClient = "client_disconnect"
442
+ } else if streamErrorOccurred {
443
+ finalStopReasonClient = "error"
444
+ } else if upstreamFinalStopReason != "" {
445
+ finalStopReasonClient = upstreamFinalStopReason
446
+ } else {
447
+ finalStopReasonClient = "end_turn" // Default if no specific reason from upstream
448
+ }
449
+
450
+ // Xác định finalOutputTokens
451
+ finalOutputTokens := outputTokens
452
+ if upstreamFinalUsage != nil && upstreamFinalUsage.OutputTokens > finalOutputTokens {
453
+ finalOutputTokens = upstreamFinalUsage.OutputTokens // Ưu tiên usage từ message_stop của upstream
454
+ } else {
455
+ finalOutputTokens = estimateTokens(accumulatedContent) // Nếu không có, ước tính lại
456
+ }
457
+ finalOutputTokens = max(0, finalOutputTokens)
458
+
459
+
460
+ log.Printf("INFO: [%s] (Claude Native SSE with Hack) SSE Stream hoàn tất. Lý do dừng: %s. Input: %d, Output: %d. Thời gian: %v. Upstream Stop Reason: %s",
461
+ messageID, finalStopReasonClient, calculatedInputTokens, finalOutputTokens, time.Since(startTime), upstreamFinalStopReason)
462
+
463
+ // Gửi "Hack" message_delta cuối cùng với stop_reason và usage (chỉ output_tokens)
464
+ finalHackUsageData := ClaudeSSEUsage{OutputTokens: finalOutputTokens}
465
+ finalDeltaStopReason := finalStopReasonClient
466
+ priorityFinalDeltaPayload := ClaudeSSEEvent{
467
+ Type: "message_delta",
468
+ Delta: &ClaudeSSEDelta{
469
+ StopReason: &finalDeltaStopReason,
470
+ StopSequence: nil, // Claude native không thường gửi stop_sequence trong delta này
471
+ },
472
+ Usage: &finalHackUsageData,
473
+ }
474
+ _ = sendSSEEvent(c, "message_delta", priorityFinalDeltaPayload, messageID, eventIndex, false, "(Claude Native SSE with Hack)")
475
+ eventIndex++
476
+
477
+ // Gửi content_block_stop cho client
478
+ contentStopPayload := ClaudeSSEEvent{Type: "content_block_stop", Index: func() *int { i := 0; return &i }()}
479
+ _ = sendSSEEvent(c, "content_block_stop", contentStopPayload, messageID, eventIndex, false, "(Claude Native SSE with Hack)")
480
+ eventIndex++
481
+
482
+ // Gửi message_stop cho client với usage đầy đủ
483
+ // Input tokens là cái đã tính toán, output tokens là cái đã chốt cuối cùng
484
+ finalClientInputTokens := calculatedInputTokens
485
+ finalStopUsageData := ClaudeSSEUsage{InputTokens: &finalClientInputTokens, OutputTokens: finalOutputTokens}
486
+ messageStopPayload := ClaudeSSEEvent{Type: "message_stop", Usage: &finalStopUsageData}
487
+ _ = sendSSEEvent(c, "message_stop", messageStopPayload, messageID, eventIndex, true, "(Claude Native SSE with Hack)")
488
+ eventIndex++
489
+
490
+ if streamErrorOccurred && errorDetails != nil && errorDetails.Type != "client_disconnect" {
491
+ // Nếu lỗi không phải do client disconnect, gửi event error cho client
492
+ errorPayload := ClaudeSSEEvent{Type: "error", Error: errorDetails}
493
+ _ = sendSSEEvent(c, "error", errorPayload, messageID, eventIndex, true, "(Claude Native SSE with Hack)")
494
  }
495
  }
496
 
 
508
  log.Printf("ERROR: [%s] %s Không thể marshal SSE event %d (%s): %v", requestID, logPrefix, eventIndex, eventName, err)
509
  return true // Continue trying to send other events if this one fails to marshal
510
  }
511
+ // Đảm bảo có một dòng trống sau mỗi sự kiện data:
512
  _, err = fmt.Fprintf(c.Writer, "event: %s\ndata: %s\n\n", eventName, string(jsonData))
513
  if err != nil {
514
  if shouldLog || eventName == "message_stop" || eventName == "error" {
515
+ log.Printf("WARN: [%s] %s Không thể ghi SSE event %d (%s) cho client: %v. Client có thể đã ngắt kết nối.", requestID, logPrefix, eventIndex, err)
516
  }
517
  return false // Stop if writing to client fails
518
  }