Update sse.go
Browse files
sse.go
CHANGED
|
@@ -40,7 +40,8 @@ func streamOpenAIResponseToClaudeSSE(
|
|
| 40 |
calculatedInputTokens := calculateInputTokensFromClaudeRequest(originalClaudeRequest)
|
| 41 |
log.Printf("INFO: [%s] (OpenAI->Claude SSE) Bắt đầu SSE. Input tokens ước tính: %d. Model: %s", messageID, calculatedInputTokens, requestedModel)
|
| 42 |
|
| 43 |
-
inputTokens
|
|
|
|
| 44 |
outputTokens := 0
|
| 45 |
finalUsageReceivedFromStream := false // Flag to check if usage is explicitly received from OpenAI stream
|
| 46 |
eventIndex := 0
|
|
@@ -120,11 +121,11 @@ func streamOpenAIResponseToClaudeSSE(
|
|
| 120 |
}
|
| 121 |
if chunk.Usage != nil { // Some OpenAI versions might send usage in chunks
|
| 122 |
if chunk.Usage.CompletionTokens > 0 {
|
| 123 |
-
outputTokens = chunk.Usage.CompletionTokens
|
| 124 |
finalUsageReceivedFromStream = true
|
| 125 |
}
|
| 126 |
if chunk.Usage.PromptTokens > 0 {
|
| 127 |
-
|
| 128 |
}
|
| 129 |
}
|
| 130 |
}
|
|
@@ -180,7 +181,7 @@ func streamOpenAIResponseToClaudeSSE(
|
|
| 180 |
claudeStopReason = mapOpenAIFinishReasonToClaude(openAIFinishReason)
|
| 181 |
}
|
| 182 |
|
| 183 |
-
finalInputTokens :=
|
| 184 |
finalOutputTokens := outputTokens
|
| 185 |
if !finalUsageReceivedFromStream { // If OpenAI didn't send final usage, estimate from accumulated content
|
| 186 |
finalOutputTokens = estimateTokens(accumulatedContent)
|
|
@@ -250,12 +251,12 @@ func proxyClaudeNativeSSE(
|
|
| 250 |
calculatedInputTokens := calculateInputTokensFromClaudeRequest(originalClaudeRequest)
|
| 251 |
log.Printf("INFO: [%s] (Claude Native SSE with Hack) Bắt đầu SSE. Input tokens ước tính: %d. Model: %s", messageID, calculatedInputTokens, requestedModel)
|
| 252 |
|
| 253 |
-
inputTokens := calculatedInputTokens //
|
| 254 |
outputTokens := 0
|
| 255 |
eventIndex := 0
|
| 256 |
|
| 257 |
var upstreamFinalStopReason string
|
| 258 |
-
var upstreamFinalUsage *ClaudeUsage
|
| 259 |
|
| 260 |
doneChan := make(chan struct{})
|
| 261 |
errChan := make(chan error, 1)
|
|
@@ -270,7 +271,6 @@ func proxyClaudeNativeSSE(
|
|
| 270 |
eventIndex++
|
| 271 |
|
| 272 |
// Gửi content_block_start cho client (tự tạo)
|
| 273 |
-
// Claude native thường gửi content_block_start với index 0 và content_block.type="text", content_block.text=""
|
| 274 |
contentStartBlock := ClaudeSSEContentBlock{Type: "text", Text: ""}
|
| 275 |
contentStartEvent := ClaudeSSEEvent{Type: "content_block_start", Index: func() *int { i := 0; return &i }(), ContentBlock: &contentStartBlock}
|
| 276 |
if !sendSSEEvent(c, "content_block_start", contentStartEvent, messageID, eventIndex, true, "(Claude Native SSE with Hack)") {
|
|
@@ -296,27 +296,23 @@ func proxyClaudeNativeSSE(
|
|
| 296 |
continue
|
| 297 |
}
|
| 298 |
|
| 299 |
-
// Chỉ xử lý các dòng data, bỏ qua event: lines vì chúng ta sẽ tự tạo/forward có chọn lọc
|
| 300 |
if strings.HasPrefix(line, "data:") {
|
| 301 |
dataStr := strings.TrimSpace(strings.TrimPrefix(line, "data:"))
|
| 302 |
-
var upstreamEvent ClaudeSSEEvent
|
| 303 |
if err := json.Unmarshal([]byte(dataStr), &upstreamEvent); err != nil {
|
| 304 |
log.Printf("WARN: [%s] (Claude Native SSE with Hack) Không thể giải mã chunk JSON Claude native: %v. Data: %s", messageID, err, dataStr)
|
| 305 |
continue
|
| 306 |
}
|
| 307 |
|
| 308 |
-
// Xử lý các loại event từ Claude native upstream
|
| 309 |
switch upstreamEvent.Type {
|
| 310 |
case "content_block_delta":
|
| 311 |
if upstreamEvent.Delta != nil && upstreamEvent.Delta.Text != nil {
|
| 312 |
textChunk := *upstreamEvent.Delta.Text
|
| 313 |
accumulatedContent += textChunk
|
| 314 |
|
| 315 |
-
// Forward content_block_delta đến client
|
| 316 |
-
// Đảm bảo index được set (thường là 0 cho một content block đơn giản)
|
| 317 |
clientContentDelta := ClaudeSSEEvent{
|
| 318 |
Type: "content_block_delta",
|
| 319 |
-
Index: upstreamEvent.Index,
|
| 320 |
Delta: &ClaudeSSEDelta{
|
| 321 |
Type: "text_delta",
|
| 322 |
Text: &textChunk,
|
|
@@ -325,43 +321,41 @@ func proxyClaudeNativeSSE(
|
|
| 325 |
if clientContentDelta.Index == nil { clientContentDelta.Index = func() *int { i := 0; return &i }() }
|
| 326 |
|
| 327 |
if !sendSSEEvent(c, "content_block_delta", clientContentDelta, messageID, eventIndex, false, "(Claude Native SSE with Hack)") {
|
| 328 |
-
return
|
| 329 |
}
|
| 330 |
eventIndex++
|
| 331 |
|
| 332 |
-
// "Hack" để gửi message_delta với output_tokens cập nhật
|
| 333 |
currentOutputTokens := estimateTokens(accumulatedContent)
|
| 334 |
if currentOutputTokens != outputTokens {
|
| 335 |
outputTokens = currentOutputTokens
|
| 336 |
intermediateUsage := ClaudeSSEUsage{OutputTokens: outputTokens}
|
| 337 |
intermediateDeltaPayload := ClaudeSSEEvent{
|
| 338 |
Type: "message_delta",
|
| 339 |
-
Delta: &ClaudeSSEDelta{},
|
| 340 |
Usage: &intermediateUsage,
|
| 341 |
}
|
| 342 |
if !sendSSEEvent(c, "message_delta", intermediateDeltaPayload, messageID, eventIndex, false, "(Claude Native SSE with Hack)") {
|
| 343 |
-
return
|
| 344 |
}
|
| 345 |
eventIndex++
|
| 346 |
}
|
| 347 |
}
|
| 348 |
-
case "message_delta":
|
| 349 |
-
if upstreamEvent.Usage != nil {
|
| 350 |
-
// Cập nhật outputTokens nếu upstream cung cấp
|
| 351 |
-
// Đây là thông tin chính xác hơn là ước tính
|
| 352 |
if upstreamEvent.Usage.OutputTokens > outputTokens {
|
| 353 |
outputTokens = upstreamEvent.Usage.OutputTokens
|
| 354 |
}
|
| 355 |
-
|
| 356 |
-
//
|
| 357 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 358 |
hackMessageDelta := ClaudeSSEEvent{
|
| 359 |
-
Type:
|
| 360 |
-
Delta: &
|
| 361 |
-
// Claude native message_delta có thể có stop_reason/sequence
|
| 362 |
-
StopReason: upstreamEvent.Delta.StopReason,
|
| 363 |
-
StopSequence: upstreamEvent.Delta.StopSequence,
|
| 364 |
-
},
|
| 365 |
Usage: ¤tHackUsage,
|
| 366 |
}
|
| 367 |
if !sendSSEEvent(c, "message_delta", hackMessageDelta, messageID, eventIndex, false, "(Claude Native SSE with Hack - from upstream delta)") {
|
|
@@ -369,42 +363,46 @@ func proxyClaudeNativeSSE(
|
|
| 369 |
}
|
| 370 |
eventIndex++
|
| 371 |
}
|
|
|
|
| 372 |
if upstreamEvent.Delta != nil && upstreamEvent.Delta.StopReason != nil {
|
| 373 |
upstreamFinalStopReason = *upstreamEvent.Delta.StopReason
|
| 374 |
}
|
| 375 |
|
| 376 |
case "message_stop":
|
| 377 |
log.Printf("INFO: [%s] (Claude Native SSE with Hack) Nhận message_stop từ upstream.", messageID)
|
| 378 |
-
if upstreamEvent.Message != nil && upstreamEvent.Message.StopReason != nil {
|
| 379 |
upstreamFinalStopReason = *upstreamEvent.Message.StopReason
|
| 380 |
}
|
| 381 |
-
|
| 382 |
-
if upstreamEvent.Usage != nil { //
|
| 383 |
-
|
| 384 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 385 |
outputTokens = upstreamEvent.Usage.OutputTokens
|
| 386 |
}
|
| 387 |
}
|
| 388 |
-
return
|
| 389 |
|
| 390 |
case "error":
|
| 391 |
log.Printf("ERROR: [%s] (Claude Native SSE with Hack) Nhận lỗi từ upstream: %+v", messageID, upstreamEvent.Error)
|
| 392 |
streamErrorOccurred = true
|
| 393 |
errorDetails = upstreamEvent.Error
|
| 394 |
-
return
|
| 395 |
|
| 396 |
-
// Các event khác như message_start, content_block_start, content_block_stop từ upstream sẽ được bỏ qua
|
| 397 |
-
// vì chúng ta đã tự tạo các event tương ứng cho client ở đầu luồng.
|
| 398 |
-
// Ping events có thể được forward nếu cần.
|
| 399 |
case "ping":
|
| 400 |
pingEvent := ClaudeSSEEvent{Type: "ping"}
|
| 401 |
if !sendSSEEvent(c, "ping", pingEvent, messageID, eventIndex, false, "(Claude Native SSE with Hack - ping)") {
|
| 402 |
return
|
| 403 |
}
|
| 404 |
eventIndex++
|
| 405 |
-
|
| 406 |
-
default:
|
| 407 |
-
// log.Printf("DEBUG: [%s] (Claude Native SSE with Hack) Bỏ qua event upstream không xác định: %s", messageID, upstreamEvent.Type)
|
| 408 |
}
|
| 409 |
}
|
| 410 |
}
|
|
@@ -421,17 +419,17 @@ func proxyClaudeNativeSSE(
|
|
| 421 |
|
| 422 |
select {
|
| 423 |
case <-doneChan:
|
| 424 |
-
// Normal completion
|
| 425 |
case err := <-errChan:
|
| 426 |
log.Printf("ERROR: [%s] (Claude Native SSE with Hack) Nhận lỗi từ goroutine đọc: %v", messageID, err)
|
| 427 |
streamErrorOccurred = true
|
| 428 |
-
if errorDetails == nil {
|
| 429 |
errorDetails = &ClaudeError{Type: "api_error", Message: fmt.Sprintf("Lỗi đọc phản hồi upstream: %v", err)}
|
| 430 |
}
|
| 431 |
case <-c.Request.Context().Done():
|
| 432 |
log.Printf("INFO: [%s] (Claude Native SSE with Hack) Client ngắt kết nối trong quá trình xử lý stream: %v", messageID, c.Request.Context().Err())
|
| 433 |
streamErrorOccurred = true
|
| 434 |
-
if errorDetails == nil {
|
| 435 |
errorDetails = &ClaudeError{Type: "client_disconnect", Message: "Client ngắt kết nối trong quá trình stream"}
|
| 436 |
}
|
| 437 |
}
|
|
@@ -444,51 +442,51 @@ func proxyClaudeNativeSSE(
|
|
| 444 |
} else if upstreamFinalStopReason != "" {
|
| 445 |
finalStopReasonClient = upstreamFinalStopReason
|
| 446 |
} else {
|
| 447 |
-
finalStopReasonClient = "end_turn"
|
| 448 |
}
|
| 449 |
|
| 450 |
-
// Xác định finalOutputTokens
|
| 451 |
finalOutputTokens := outputTokens
|
| 452 |
-
if upstreamFinalUsage != nil && upstreamFinalUsage.OutputTokens > finalOutputTokens {
|
| 453 |
-
finalOutputTokens = upstreamFinalUsage.OutputTokens
|
| 454 |
-
} else {
|
| 455 |
-
finalOutputTokens = estimateTokens(accumulatedContent)
|
| 456 |
}
|
| 457 |
finalOutputTokens = max(0, finalOutputTokens)
|
| 458 |
|
| 459 |
-
|
| 460 |
log.Printf("INFO: [%s] (Claude Native SSE with Hack) SSE Stream hoàn tất. Lý do dừng: %s. Input: %d, Output: %d. Thời gian: %v. Upstream Stop Reason: %s",
|
| 461 |
messageID, finalStopReasonClient, calculatedInputTokens, finalOutputTokens, time.Since(startTime), upstreamFinalStopReason)
|
| 462 |
|
| 463 |
-
// Gửi "Hack" message_delta cuối cùng với stop_reason và usage (chỉ output_tokens)
|
| 464 |
finalHackUsageData := ClaudeSSEUsage{OutputTokens: finalOutputTokens}
|
| 465 |
finalDeltaStopReason := finalStopReasonClient
|
| 466 |
priorityFinalDeltaPayload := ClaudeSSEEvent{
|
| 467 |
Type: "message_delta",
|
| 468 |
Delta: &ClaudeSSEDelta{
|
| 469 |
StopReason: &finalDeltaStopReason,
|
| 470 |
-
StopSequence: nil,
|
| 471 |
},
|
| 472 |
Usage: &finalHackUsageData,
|
| 473 |
}
|
| 474 |
_ = sendSSEEvent(c, "message_delta", priorityFinalDeltaPayload, messageID, eventIndex, false, "(Claude Native SSE with Hack)")
|
| 475 |
eventIndex++
|
| 476 |
|
| 477 |
-
// Gửi content_block_stop cho client
|
| 478 |
contentStopPayload := ClaudeSSEEvent{Type: "content_block_stop", Index: func() *int { i := 0; return &i }()}
|
| 479 |
_ = sendSSEEvent(c, "content_block_stop", contentStopPayload, messageID, eventIndex, false, "(Claude Native SSE with Hack)")
|
| 480 |
eventIndex++
|
| 481 |
|
| 482 |
-
//
|
| 483 |
-
//
|
| 484 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 485 |
finalStopUsageData := ClaudeSSEUsage{InputTokens: &finalClientInputTokens, OutputTokens: finalOutputTokens}
|
| 486 |
messageStopPayload := ClaudeSSEEvent{Type: "message_stop", Usage: &finalStopUsageData}
|
| 487 |
_ = sendSSEEvent(c, "message_stop", messageStopPayload, messageID, eventIndex, true, "(Claude Native SSE with Hack)")
|
| 488 |
eventIndex++
|
| 489 |
|
| 490 |
if streamErrorOccurred && errorDetails != nil && errorDetails.Type != "client_disconnect" {
|
| 491 |
-
// Nếu lỗi không phải do client disconnect, gửi event error cho client
|
| 492 |
errorPayload := ClaudeSSEEvent{Type: "error", Error: errorDetails}
|
| 493 |
_ = sendSSEEvent(c, "error", errorPayload, messageID, eventIndex, true, "(Claude Native SSE with Hack)")
|
| 494 |
}
|
|
@@ -506,21 +504,16 @@ func sendSSEEvent(c *gin.Context, eventName string, data interface{}, requestID
|
|
| 506 |
jsonData, err := json.Marshal(data)
|
| 507 |
if err != nil {
|
| 508 |
log.Printf("ERROR: [%s] %s Không thể marshal SSE event %d (%s): %v", requestID, logPrefix, eventIndex, eventName, err)
|
| 509 |
-
return true
|
| 510 |
}
|
| 511 |
-
// Đảm bảo có một dòng trống sau mỗi sự kiện data:
|
| 512 |
_, err = fmt.Fprintf(c.Writer, "event: %s\ndata: %s\n\n", eventName, string(jsonData))
|
| 513 |
if err != nil {
|
| 514 |
if shouldLog || eventName == "message_stop" || eventName == "error" {
|
| 515 |
log.Printf("WARN: [%s] %s Không thể ghi SSE event %d (%s) cho client: %v. Client có thể đã ngắt kết nối.", requestID, logPrefix, eventIndex, err)
|
| 516 |
}
|
| 517 |
-
return false
|
| 518 |
}
|
| 519 |
c.Writer.Flush()
|
| 520 |
-
// Vô hiệu hóa: Log TRACE này quá chi tiết
|
| 521 |
-
// if shouldLog {
|
| 522 |
-
// log.Printf("TRACE: [%s] %s SSE: Đã gửi Event %d (%s)", requestID, logPrefix, eventIndex, eventName)
|
| 523 |
-
// }
|
| 524 |
return true
|
| 525 |
}
|
| 526 |
}
|
|
|
|
| 40 |
calculatedInputTokens := calculateInputTokensFromClaudeRequest(originalClaudeRequest)
|
| 41 |
log.Printf("INFO: [%s] (OpenAI->Claude SSE) Bắt đầu SSE. Input tokens ước tính: %d. Model: %s", messageID, calculatedInputTokens, requestedModel)
|
| 42 |
|
| 43 |
+
// inputTokens from OpenAI stream can update this if OpenAI provides it
|
| 44 |
+
currentOpenAIInputTokens := calculatedInputTokens
|
| 45 |
outputTokens := 0
|
| 46 |
finalUsageReceivedFromStream := false // Flag to check if usage is explicitly received from OpenAI stream
|
| 47 |
eventIndex := 0
|
|
|
|
| 121 |
}
|
| 122 |
if chunk.Usage != nil { // Some OpenAI versions might send usage in chunks
|
| 123 |
if chunk.Usage.CompletionTokens > 0 {
|
| 124 |
+
outputTokens = chunk.Usage.CompletionTokens // Use OpenAI's count if available
|
| 125 |
finalUsageReceivedFromStream = true
|
| 126 |
}
|
| 127 |
if chunk.Usage.PromptTokens > 0 {
|
| 128 |
+
currentOpenAIInputTokens = chunk.Usage.PromptTokens // Update input tokens if provided by OpenAI
|
| 129 |
}
|
| 130 |
}
|
| 131 |
}
|
|
|
|
| 181 |
claudeStopReason = mapOpenAIFinishReasonToClaude(openAIFinishReason)
|
| 182 |
}
|
| 183 |
|
| 184 |
+
finalInputTokens := currentOpenAIInputTokens // Use potentially updated input tokens from OpenAI stream
|
| 185 |
finalOutputTokens := outputTokens
|
| 186 |
if !finalUsageReceivedFromStream { // If OpenAI didn't send final usage, estimate from accumulated content
|
| 187 |
finalOutputTokens = estimateTokens(accumulatedContent)
|
|
|
|
| 251 |
calculatedInputTokens := calculateInputTokensFromClaudeRequest(originalClaudeRequest)
|
| 252 |
log.Printf("INFO: [%s] (Claude Native SSE with Hack) Bắt đầu SSE. Input tokens ước tính: %d. Model: %s", messageID, calculatedInputTokens, requestedModel)
|
| 253 |
|
| 254 |
+
// removed: inputTokens := calculatedInputTokens // This variable was unused
|
| 255 |
outputTokens := 0
|
| 256 |
eventIndex := 0
|
| 257 |
|
| 258 |
var upstreamFinalStopReason string
|
| 259 |
+
var upstreamFinalUsage *ClaudeUsage // This is of type *ClaudeUsage
|
| 260 |
|
| 261 |
doneChan := make(chan struct{})
|
| 262 |
errChan := make(chan error, 1)
|
|
|
|
| 271 |
eventIndex++
|
| 272 |
|
| 273 |
// Gửi content_block_start cho client (tự tạo)
|
|
|
|
| 274 |
contentStartBlock := ClaudeSSEContentBlock{Type: "text", Text: ""}
|
| 275 |
contentStartEvent := ClaudeSSEEvent{Type: "content_block_start", Index: func() *int { i := 0; return &i }(), ContentBlock: &contentStartBlock}
|
| 276 |
if !sendSSEEvent(c, "content_block_start", contentStartEvent, messageID, eventIndex, true, "(Claude Native SSE with Hack)") {
|
|
|
|
| 296 |
continue
|
| 297 |
}
|
| 298 |
|
|
|
|
| 299 |
if strings.HasPrefix(line, "data:") {
|
| 300 |
dataStr := strings.TrimSpace(strings.TrimPrefix(line, "data:"))
|
| 301 |
+
var upstreamEvent ClaudeSSEEvent
|
| 302 |
if err := json.Unmarshal([]byte(dataStr), &upstreamEvent); err != nil {
|
| 303 |
log.Printf("WARN: [%s] (Claude Native SSE with Hack) Không thể giải mã chunk JSON Claude native: %v. Data: %s", messageID, err, dataStr)
|
| 304 |
continue
|
| 305 |
}
|
| 306 |
|
|
|
|
| 307 |
switch upstreamEvent.Type {
|
| 308 |
case "content_block_delta":
|
| 309 |
if upstreamEvent.Delta != nil && upstreamEvent.Delta.Text != nil {
|
| 310 |
textChunk := *upstreamEvent.Delta.Text
|
| 311 |
accumulatedContent += textChunk
|
| 312 |
|
|
|
|
|
|
|
| 313 |
clientContentDelta := ClaudeSSEEvent{
|
| 314 |
Type: "content_block_delta",
|
| 315 |
+
Index: upstreamEvent.Index,
|
| 316 |
Delta: &ClaudeSSEDelta{
|
| 317 |
Type: "text_delta",
|
| 318 |
Text: &textChunk,
|
|
|
|
| 321 |
if clientContentDelta.Index == nil { clientContentDelta.Index = func() *int { i := 0; return &i }() }
|
| 322 |
|
| 323 |
if !sendSSEEvent(c, "content_block_delta", clientContentDelta, messageID, eventIndex, false, "(Claude Native SSE with Hack)") {
|
| 324 |
+
return
|
| 325 |
}
|
| 326 |
eventIndex++
|
| 327 |
|
|
|
|
| 328 |
currentOutputTokens := estimateTokens(accumulatedContent)
|
| 329 |
if currentOutputTokens != outputTokens {
|
| 330 |
outputTokens = currentOutputTokens
|
| 331 |
intermediateUsage := ClaudeSSEUsage{OutputTokens: outputTokens}
|
| 332 |
intermediateDeltaPayload := ClaudeSSEEvent{
|
| 333 |
Type: "message_delta",
|
| 334 |
+
Delta: &ClaudeSSEDelta{},
|
| 335 |
Usage: &intermediateUsage,
|
| 336 |
}
|
| 337 |
if !sendSSEEvent(c, "message_delta", intermediateDeltaPayload, messageID, eventIndex, false, "(Claude Native SSE with Hack)") {
|
| 338 |
+
return
|
| 339 |
}
|
| 340 |
eventIndex++
|
| 341 |
}
|
| 342 |
}
|
| 343 |
+
case "message_delta":
|
| 344 |
+
if upstreamEvent.Usage != nil { // upstreamEvent.Usage is *ClaudeSSEUsage
|
|
|
|
|
|
|
| 345 |
if upstreamEvent.Usage.OutputTokens > outputTokens {
|
| 346 |
outputTokens = upstreamEvent.Usage.OutputTokens
|
| 347 |
}
|
| 348 |
+
currentHackUsage := ClaudeSSEUsage{OutputTokens: outputTokens} // Use updated outputTokens
|
| 349 |
+
// Preserve stop_reason/sequence from upstream's message_delta if present
|
| 350 |
+
var deltaDetails ClaudeSSEDelta
|
| 351 |
+
if upstreamEvent.Delta != nil {
|
| 352 |
+
deltaDetails.StopReason = upstreamEvent.Delta.StopReason
|
| 353 |
+
deltaDetails.StopSequence = upstreamEvent.Delta.StopSequence
|
| 354 |
+
}
|
| 355 |
+
|
| 356 |
hackMessageDelta := ClaudeSSEEvent{
|
| 357 |
+
Type: "message_delta",
|
| 358 |
+
Delta: &deltaDetails,
|
|
|
|
|
|
|
|
|
|
|
|
|
| 359 |
Usage: ¤tHackUsage,
|
| 360 |
}
|
| 361 |
if !sendSSEEvent(c, "message_delta", hackMessageDelta, messageID, eventIndex, false, "(Claude Native SSE with Hack - from upstream delta)") {
|
|
|
|
| 363 |
}
|
| 364 |
eventIndex++
|
| 365 |
}
|
| 366 |
+
// Capture stop reason if it's in this message_delta
|
| 367 |
if upstreamEvent.Delta != nil && upstreamEvent.Delta.StopReason != nil {
|
| 368 |
upstreamFinalStopReason = *upstreamEvent.Delta.StopReason
|
| 369 |
}
|
| 370 |
|
| 371 |
case "message_stop":
|
| 372 |
log.Printf("INFO: [%s] (Claude Native SSE with Hack) Nhận message_stop từ upstream.", messageID)
|
| 373 |
+
if upstreamEvent.Message != nil && upstreamEvent.Message.StopReason != nil {
|
| 374 |
upstreamFinalStopReason = *upstreamEvent.Message.StopReason
|
| 375 |
}
|
| 376 |
+
|
| 377 |
+
if upstreamEvent.Usage != nil { // upstreamEvent.Usage is *ClaudeSSEUsage
|
| 378 |
+
// Convert ClaudeSSEUsage to ClaudeUsage for upstreamFinalUsage
|
| 379 |
+
tempUsage := &ClaudeUsage{
|
| 380 |
+
OutputTokens: upstreamEvent.Usage.OutputTokens,
|
| 381 |
+
}
|
| 382 |
+
if upstreamEvent.Usage.InputTokens != nil {
|
| 383 |
+
tempUsage.InputTokens = *upstreamEvent.Usage.InputTokens
|
| 384 |
+
}
|
| 385 |
+
// Assign the converted *ClaudeUsage to upstreamFinalUsage
|
| 386 |
+
upstreamFinalUsage = tempUsage
|
| 387 |
+
|
| 388 |
+
if upstreamEvent.Usage.OutputTokens > outputTokens {
|
| 389 |
outputTokens = upstreamEvent.Usage.OutputTokens
|
| 390 |
}
|
| 391 |
}
|
| 392 |
+
return
|
| 393 |
|
| 394 |
case "error":
|
| 395 |
log.Printf("ERROR: [%s] (Claude Native SSE with Hack) Nhận lỗi từ upstream: %+v", messageID, upstreamEvent.Error)
|
| 396 |
streamErrorOccurred = true
|
| 397 |
errorDetails = upstreamEvent.Error
|
| 398 |
+
return
|
| 399 |
|
|
|
|
|
|
|
|
|
|
| 400 |
case "ping":
|
| 401 |
pingEvent := ClaudeSSEEvent{Type: "ping"}
|
| 402 |
if !sendSSEEvent(c, "ping", pingEvent, messageID, eventIndex, false, "(Claude Native SSE with Hack - ping)") {
|
| 403 |
return
|
| 404 |
}
|
| 405 |
eventIndex++
|
|
|
|
|
|
|
|
|
|
| 406 |
}
|
| 407 |
}
|
| 408 |
}
|
|
|
|
| 419 |
|
| 420 |
select {
|
| 421 |
case <-doneChan:
|
| 422 |
+
// Normal completion
|
| 423 |
case err := <-errChan:
|
| 424 |
log.Printf("ERROR: [%s] (Claude Native SSE with Hack) Nhận lỗi từ goroutine đọc: %v", messageID, err)
|
| 425 |
streamErrorOccurred = true
|
| 426 |
+
if errorDetails == nil {
|
| 427 |
errorDetails = &ClaudeError{Type: "api_error", Message: fmt.Sprintf("Lỗi đọc phản hồi upstream: %v", err)}
|
| 428 |
}
|
| 429 |
case <-c.Request.Context().Done():
|
| 430 |
log.Printf("INFO: [%s] (Claude Native SSE with Hack) Client ngắt kết nối trong quá trình xử lý stream: %v", messageID, c.Request.Context().Err())
|
| 431 |
streamErrorOccurred = true
|
| 432 |
+
if errorDetails == nil {
|
| 433 |
errorDetails = &ClaudeError{Type: "client_disconnect", Message: "Client ngắt kết nối trong quá trình stream"}
|
| 434 |
}
|
| 435 |
}
|
|
|
|
| 442 |
} else if upstreamFinalStopReason != "" {
|
| 443 |
finalStopReasonClient = upstreamFinalStopReason
|
| 444 |
} else {
|
| 445 |
+
finalStopReasonClient = "end_turn"
|
| 446 |
}
|
| 447 |
|
|
|
|
| 448 |
finalOutputTokens := outputTokens
|
| 449 |
+
if upstreamFinalUsage != nil && upstreamFinalUsage.OutputTokens > finalOutputTokens { // upstreamFinalUsage is *ClaudeUsage
|
| 450 |
+
finalOutputTokens = upstreamFinalUsage.OutputTokens
|
| 451 |
+
} else if accumulatedContent != "" { // Only estimate if content was accumulated and no definitive upstream usage
|
| 452 |
+
finalOutputTokens = estimateTokens(accumulatedContent)
|
| 453 |
}
|
| 454 |
finalOutputTokens = max(0, finalOutputTokens)
|
| 455 |
|
|
|
|
| 456 |
log.Printf("INFO: [%s] (Claude Native SSE with Hack) SSE Stream hoàn tất. Lý do dừng: %s. Input: %d, Output: %d. Thời gian: %v. Upstream Stop Reason: %s",
|
| 457 |
messageID, finalStopReasonClient, calculatedInputTokens, finalOutputTokens, time.Since(startTime), upstreamFinalStopReason)
|
| 458 |
|
|
|
|
| 459 |
finalHackUsageData := ClaudeSSEUsage{OutputTokens: finalOutputTokens}
|
| 460 |
finalDeltaStopReason := finalStopReasonClient
|
| 461 |
priorityFinalDeltaPayload := ClaudeSSEEvent{
|
| 462 |
Type: "message_delta",
|
| 463 |
Delta: &ClaudeSSEDelta{
|
| 464 |
StopReason: &finalDeltaStopReason,
|
| 465 |
+
StopSequence: nil,
|
| 466 |
},
|
| 467 |
Usage: &finalHackUsageData,
|
| 468 |
}
|
| 469 |
_ = sendSSEEvent(c, "message_delta", priorityFinalDeltaPayload, messageID, eventIndex, false, "(Claude Native SSE with Hack)")
|
| 470 |
eventIndex++
|
| 471 |
|
|
|
|
| 472 |
contentStopPayload := ClaudeSSEEvent{Type: "content_block_stop", Index: func() *int { i := 0; return &i }()}
|
| 473 |
_ = sendSSEEvent(c, "content_block_stop", contentStopPayload, messageID, eventIndex, false, "(Claude Native SSE with Hack)")
|
| 474 |
eventIndex++
|
| 475 |
|
| 476 |
+
finalClientInputTokens := calculatedInputTokens // Use the initially calculated input tokens for consistency
|
| 477 |
+
// If upstream provided definitive input tokens in its final usage, we could consider using it,
|
| 478 |
+
// but calculatedInputTokens is based on the original request, which is reliable for the client's perspective.
|
| 479 |
+
if upstreamFinalUsage != nil && upstreamFinalUsage.InputTokens > 0 {
|
| 480 |
+
// Potentially use upstreamFinalUsage.InputTokens if it's considered more accurate
|
| 481 |
+
// For now, sticking to calculatedInputTokens for the client-facing message_stop.
|
| 482 |
+
}
|
| 483 |
+
|
| 484 |
finalStopUsageData := ClaudeSSEUsage{InputTokens: &finalClientInputTokens, OutputTokens: finalOutputTokens}
|
| 485 |
messageStopPayload := ClaudeSSEEvent{Type: "message_stop", Usage: &finalStopUsageData}
|
| 486 |
_ = sendSSEEvent(c, "message_stop", messageStopPayload, messageID, eventIndex, true, "(Claude Native SSE with Hack)")
|
| 487 |
eventIndex++
|
| 488 |
|
| 489 |
if streamErrorOccurred && errorDetails != nil && errorDetails.Type != "client_disconnect" {
|
|
|
|
| 490 |
errorPayload := ClaudeSSEEvent{Type: "error", Error: errorDetails}
|
| 491 |
_ = sendSSEEvent(c, "error", errorPayload, messageID, eventIndex, true, "(Claude Native SSE with Hack)")
|
| 492 |
}
|
|
|
|
| 504 |
jsonData, err := json.Marshal(data)
|
| 505 |
if err != nil {
|
| 506 |
log.Printf("ERROR: [%s] %s Không thể marshal SSE event %d (%s): %v", requestID, logPrefix, eventIndex, eventName, err)
|
| 507 |
+
return true
|
| 508 |
}
|
|
|
|
| 509 |
_, err = fmt.Fprintf(c.Writer, "event: %s\ndata: %s\n\n", eventName, string(jsonData))
|
| 510 |
if err != nil {
|
| 511 |
if shouldLog || eventName == "message_stop" || eventName == "error" {
|
| 512 |
log.Printf("WARN: [%s] %s Không thể ghi SSE event %d (%s) cho client: %v. Client có thể đã ngắt kết nối.", requestID, logPrefix, eventIndex, err)
|
| 513 |
}
|
| 514 |
+
return false
|
| 515 |
}
|
| 516 |
c.Writer.Flush()
|
|
|
|
|
|
|
|
|
|
|
|
|
| 517 |
return true
|
| 518 |
}
|
| 519 |
}
|