File size: 24,971 Bytes
69b897d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6c6056a
69b897d
6c6056a
69b897d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
const axios = require('axios')
const ProxyHelper = require('../utils/proxyHelper')
const logger = require('../utils/logger')
const config = require('../../config/config')

// 转换模型名称(去掉 azure/ 前缀)
function normalizeModelName(model) {
  if (model && model.startsWith('azure/')) {
    return model.replace('azure/', '')
  }
  return model
}

// 处理 Azure OpenAI 请求
async function handleAzureOpenAIRequest({
  account,
  requestBody,
  headers: _headers = {}, // 前缀下划线表示未使用
  isStream = false,
  endpoint = 'chat/completions'
}) {
  // 声明变量在函数顶部,确保在 catch 块中也能访问
  let requestUrl = ''
  let proxyAgent = null
  let deploymentName = ''

  try {
    // 构建 Azure OpenAI 请求 URL
    const baseUrl = account.azureEndpoint
    deploymentName = account.deploymentName || 'default'
    // Azure Responses API requires preview versions; fall back appropriately
    const apiVersion =
      account.apiVersion || (endpoint === 'responses' ? '2025-04-01-preview' : '2024-02-01')
    if (endpoint === 'chat/completions') {
      requestUrl = `${baseUrl}/openai/deployments/${deploymentName}/chat/completions?api-version=${apiVersion}`
    } else if (endpoint === 'responses') {
      requestUrl = `${baseUrl}/openai/responses?api-version=${apiVersion}`
    } else {
      requestUrl = `${baseUrl}/openai/deployments/${deploymentName}/${endpoint}?api-version=${apiVersion}`
    }

    // 准备请求头
    const requestHeaders = {
      'Content-Type': 'application/json',
      'api-key': account.apiKey
    }

    // 移除不需要的头部
    delete requestHeaders['anthropic-version']
    delete requestHeaders['x-api-key']
    delete requestHeaders['host']

    // 处理请求体
    const processedBody = { ...requestBody }

    // 标准化模型名称
    if (endpoint === 'responses') {
      processedBody.model = deploymentName
    } else if (processedBody.model) {
      processedBody.model = normalizeModelName(processedBody.model)
    } else {
      processedBody.model = 'gpt-4'
    }

    // 使用统一的代理创建工具
    proxyAgent = ProxyHelper.createProxyAgent(account.proxy)

    // 配置请求选项
    const axiosConfig = {
      method: 'POST',
      url: requestUrl,
      headers: requestHeaders,
      data: processedBody,
      timeout: config.requestTimeout || 600000,
      validateStatus: () => true,
      // 添加连接保活选项
      keepAlive: true,
      maxRedirects: 5,
      // 防止socket hang up
      socketKeepAlive: true
    }

    // 如果有代理,添加代理配置
    if (proxyAgent) {
      axiosConfig.httpAgent = proxyAgent
      axiosConfig.httpsAgent = proxyAgent
      axiosConfig.proxy = false
      // 为代理添加额外的keep-alive设置
      if (proxyAgent.options) {
        proxyAgent.options.keepAlive = true
        proxyAgent.options.keepAliveMsecs = 1000
      }
      logger.debug(
        `Using proxy for Azure OpenAI request: ${ProxyHelper.getProxyDescription(account.proxy)}`
      )
    }

    // 流式请求特殊处理
    if (isStream) {
      axiosConfig.responseType = 'stream'
      requestHeaders.accept = 'text/event-stream'
    } else {
      requestHeaders.accept = 'application/json'
    }

    logger.debug(`Making Azure OpenAI request`, {
      requestUrl,
      method: 'POST',
      endpoint,
      deploymentName,
      apiVersion,
      hasProxy: !!proxyAgent,
      proxyInfo: ProxyHelper.maskProxyInfo(account.proxy),
      isStream,
      requestBodySize: JSON.stringify(processedBody).length
    })

    logger.debug('Azure OpenAI request headers', {
      'content-type': requestHeaders['Content-Type'],
      'user-agent': requestHeaders['user-agent'] || 'not-set',
      customHeaders: Object.keys(requestHeaders).filter(
        (key) => !['Content-Type', 'user-agent'].includes(key)
      )
    })

    logger.debug('Azure OpenAI request body', {
      model: processedBody.model,
      messages: processedBody.messages?.length || 0,
      otherParams: Object.keys(processedBody).filter((key) => !['model', 'messages'].includes(key))
    })

    const requestStartTime = Date.now()
    logger.debug(`🔄 Starting Azure OpenAI HTTP request at ${new Date().toISOString()}`)

    // 发送请求
    const response = await axios(axiosConfig)

    const requestDuration = Date.now() - requestStartTime
    logger.debug(`✅ Azure OpenAI HTTP request completed at ${new Date().toISOString()}`)

    logger.debug(`Azure OpenAI response received`, {
      status: response.status,
      statusText: response.statusText,
      duration: `${requestDuration}ms`,
      responseHeaders: Object.keys(response.headers || {}),
      hasData: !!response.data,
      contentType: response.headers?.['content-type'] || 'unknown'
    })

    return response
  } catch (error) {
    const errorDetails = {
      message: error.message,
      code: error.code,
      status: error.response?.status,
      statusText: error.response?.statusText,
      responseData: error.response?.data,
      requestUrl: requestUrl || 'unknown',
      endpoint,
      deploymentName: deploymentName || account?.deploymentName || 'unknown',
      hasProxy: !!proxyAgent,
      proxyType: account?.proxy?.type || 'none',
      isTimeout: error.code === 'ECONNABORTED',
      isNetworkError: !error.response,
      stack: error.stack
    }

    // 特殊错误类型的详细日志
    if (error.code === 'ENOTFOUND') {
      logger.error('DNS Resolution Failed for Azure OpenAI', {
        ...errorDetails,
        hostname: requestUrl && requestUrl !== 'unknown' ? new URL(requestUrl).hostname : 'unknown',
        suggestion: 'Check if Azure endpoint URL is correct and accessible'
      })
    } else if (error.code === 'ECONNREFUSED') {
      logger.error('Connection Refused by Azure OpenAI', {
        ...errorDetails,
        suggestion: 'Check if proxy settings are correct or Azure service is accessible'
      })
    } else if (error.code === 'ECONNRESET' || error.message.includes('socket hang up')) {
      logger.error('🚨 Azure OpenAI Connection Reset / Socket Hang Up', {
        ...errorDetails,
        suggestion:
          'Connection was dropped by Azure OpenAI or proxy. This might be due to long request processing time, proxy timeout, or network instability. Try reducing request complexity or check proxy settings.'
      })
    } else if (error.code === 'ECONNABORTED' || error.code === 'ETIMEDOUT') {
      logger.error('🚨 Azure OpenAI Request Timeout', {
        ...errorDetails,
        timeoutMs: 600000,
        suggestion:
          'Request exceeded 10-minute timeout. Consider reducing model complexity or check if Azure service is responding slowly.'
      })
    } else if (
      error.code === 'CERT_AUTHORITY_INVALID' ||
      error.code === 'UNABLE_TO_VERIFY_LEAF_SIGNATURE'
    ) {
      logger.error('SSL Certificate Error for Azure OpenAI', {
        ...errorDetails,
        suggestion: 'SSL certificate validation failed - check proxy SSL settings'
      })
    } else if (error.response?.status === 401) {
      logger.error('Azure OpenAI Authentication Failed', {
        ...errorDetails,
        suggestion: 'Check if Azure OpenAI API key is valid and not expired'
      })
    } else if (error.response?.status === 404) {
      logger.error('Azure OpenAI Deployment Not Found', {
        ...errorDetails,
        suggestion: 'Check if deployment name and Azure endpoint are correct'
      })
    } else {
      logger.error('Azure OpenAI Request Failed', errorDetails)
    }

    throw error
  }
}

// 安全的流管理器
class StreamManager {
  constructor() {
    this.activeStreams = new Set()
    this.cleanupCallbacks = new Map()
  }

  registerStream(streamId, cleanup) {
    this.activeStreams.add(streamId)
    this.cleanupCallbacks.set(streamId, cleanup)
  }

  cleanup(streamId) {
    if (this.activeStreams.has(streamId)) {
      try {
        const cleanup = this.cleanupCallbacks.get(streamId)
        if (cleanup) {
          cleanup()
        }
      } catch (error) {
        logger.warn(`Stream cleanup error for ${streamId}:`, error.message)
      } finally {
        this.activeStreams.delete(streamId)
        this.cleanupCallbacks.delete(streamId)
      }
    }
  }

  getActiveStreamCount() {
    return this.activeStreams.size
  }
}

const streamManager = new StreamManager()

// SSE 缓冲区大小限制
const MAX_BUFFER_SIZE = 64 * 1024 // 64KB
const MAX_EVENT_SIZE = 16 * 1024 // 16KB 单个事件最大大小

// 处理流式响应
function handleStreamResponse(upstreamResponse, clientResponse, options = {}) {
  const { onData, onEnd, onError } = options
  const streamId = `stream_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`

  logger.info(`Starting Azure OpenAI stream handling`, {
    streamId,
    upstreamStatus: upstreamResponse.status,
    upstreamHeaders: Object.keys(upstreamResponse.headers || {}),
    clientRemoteAddress: clientResponse.req?.connection?.remoteAddress,
    hasOnData: !!onData,
    hasOnEnd: !!onEnd,
    hasOnError: !!onError
  })

  return new Promise((resolve, reject) => {
    let buffer = ''
    let usageData = null
    let actualModel = null
    let hasEnded = false
    let eventCount = 0
    const maxEvents = 10000 // 最大事件数量限制

    // 专门用于保存最后几个chunks以提取usage数据
    let finalChunksBuffer = ''
    const FINAL_CHUNKS_SIZE = 32 * 1024 // 32KB保留最终chunks
    const allParsedEvents = [] // 存储所有解析的事件用于最终usage提取

    // 设置响应头
    clientResponse.setHeader('Content-Type', 'text/event-stream')
    clientResponse.setHeader('Cache-Control', 'no-cache')
    clientResponse.setHeader('Connection', 'keep-alive')
    clientResponse.setHeader('X-Accel-Buffering', 'no')

    // 透传某些头部
    const passThroughHeaders = [
      'x-request-id',
      'x-ratelimit-remaining-requests',
      'x-ratelimit-remaining-tokens'
    ]
    passThroughHeaders.forEach((header) => {
      const value = upstreamResponse.headers[header]
      if (value) {
        clientResponse.setHeader(header, value)
      }
    })

    // 立即刷新响应头
    if (typeof clientResponse.flushHeaders === 'function') {
      clientResponse.flushHeaders()
    }

    // 强化的SSE事件解析,保存所有事件用于最终处理
    const parseSSEForUsage = (data, isFromFinalBuffer = false) => {
      const lines = data.split('\n')

      for (const line of lines) {
        if (line.startsWith('data: ')) {
          try {
            const jsonStr = line.slice(6) // 移除 'data: ' 前缀
            if (jsonStr.trim() === '[DONE]') {
              continue
            }
            const eventData = JSON.parse(jsonStr)

            // 保存所有成功解析的事件
            allParsedEvents.push(eventData)

            // 获取模型信息
            if (eventData.model) {
              actualModel = eventData.model
            }

            // 使用强化的usage提取函数
            const { usageData: extractedUsage, actualModel: extractedModel } =
              extractUsageDataRobust(
                eventData,
                `stream-event-${isFromFinalBuffer ? 'final' : 'normal'}`
              )

            if (extractedUsage && !usageData) {
              usageData = extractedUsage
              if (extractedModel) {
                actualModel = extractedModel
              }
              logger.debug(`🎯 Stream usage captured via robust extraction`, {
                isFromFinalBuffer,
                usageData,
                actualModel
              })
            }

            // 原有的简单提取作为备用
            if (!usageData) {
              // 获取使用统计(Responses API: response.completed -> response.usage)
              if (eventData.type === 'response.completed' && eventData.response) {
                if (eventData.response.model) {
                  actualModel = eventData.response.model
                }
                if (eventData.response.usage) {
                  usageData = eventData.response.usage
                  logger.debug('🎯 Stream usage (backup method - response.usage):', usageData)
                }
              }

              // 兼容 Chat Completions 风格(顶层 usage)
              if (!usageData && eventData.usage) {
                usageData = eventData.usage
                logger.debug('🎯 Stream usage (backup method - top-level):', usageData)
              }
            }
          } catch (e) {
            logger.debug('SSE parsing error (expected for incomplete chunks):', e.message)
          }
        }
      }
    }

    // 注册流清理
    const cleanup = () => {
      if (!hasEnded) {
        hasEnded = true
        try {
          upstreamResponse.data?.removeAllListeners?.()
          upstreamResponse.data?.destroy?.()

          if (!clientResponse.headersSent) {
            clientResponse.status(502).end()
          } else if (!clientResponse.destroyed) {
            clientResponse.end()
          }
        } catch (error) {
          logger.warn('Stream cleanup error:', error.message)
        }
      }
    }

    streamManager.registerStream(streamId, cleanup)

    upstreamResponse.data.on('data', (chunk) => {
      try {
        if (hasEnded || clientResponse.destroyed) {
          return
        }

        eventCount++
        if (eventCount > maxEvents) {
          logger.warn(`Stream ${streamId} exceeded max events limit`)
          cleanup()
          return
        }

        const chunkStr = chunk.toString()

        // 转发数据给客户端
        if (!clientResponse.destroyed) {
          clientResponse.write(chunk)
        }

        // 同时解析数据以捕获 usage 信息,带缓冲区大小限制
        buffer += chunkStr

        // 保留最后的chunks用于最终usage提取(不被truncate影响)
        finalChunksBuffer += chunkStr
        if (finalChunksBuffer.length > FINAL_CHUNKS_SIZE) {
          finalChunksBuffer = finalChunksBuffer.slice(-FINAL_CHUNKS_SIZE)
        }

        // 防止主缓冲区过大 - 但保持最后部分用于usage解析
        if (buffer.length > MAX_BUFFER_SIZE) {
          logger.warn(
            `Stream ${streamId} buffer exceeded limit, truncating main buffer but preserving final chunks`
          )
          // 保留最后1/4而不是1/2,为usage数据留更多空间
          buffer = buffer.slice(-MAX_BUFFER_SIZE / 4)
        }

        // 处理完整的 SSE 事件
        if (buffer.includes('\n\n')) {
          const events = buffer.split('\n\n')
          buffer = events.pop() || '' // 保留最后一个可能不完整的事件

          for (const event of events) {
            if (event.trim() && event.length <= MAX_EVENT_SIZE) {
              parseSSEForUsage(event)
            }
          }
        }

        if (onData) {
          onData(chunk, { usageData, actualModel })
        }
      } catch (error) {
        logger.error('Error processing Azure OpenAI stream chunk:', error)
        if (!hasEnded) {
          cleanup()
          reject(error)
        }
      }
    })

    upstreamResponse.data.on('end', () => {
      if (hasEnded) {
        return
      }

      streamManager.cleanup(streamId)
      hasEnded = true

      try {
        logger.debug(`🔚 Stream ended, performing comprehensive usage extraction for ${streamId}`, {
          mainBufferSize: buffer.length,
          finalChunksBufferSize: finalChunksBuffer.length,
          parsedEventsCount: allParsedEvents.length,
          hasUsageData: !!usageData
        })

        // 多层次的最终usage提取策略
        if (!usageData) {
          logger.debug('🔍 No usage found during stream, trying final extraction methods...')

          // 方法1: 解析剩余的主buffer
          if (buffer.trim() && buffer.length <= MAX_EVENT_SIZE) {
            parseSSEForUsage(buffer, false)
          }

          // 方法2: 解析保留的final chunks buffer
          if (!usageData && finalChunksBuffer.trim()) {
            logger.debug('🔍 Trying final chunks buffer for usage extraction...')
            parseSSEForUsage(finalChunksBuffer, true)
          }

          // 方法3: 从所有解析的事件中重新搜索usage
          if (!usageData && allParsedEvents.length > 0) {
            logger.debug('🔍 Searching through all parsed events for usage...')

            // 倒序查找,因为usage通常在最后
            for (let i = allParsedEvents.length - 1; i >= 0; i--) {
              const { usageData: foundUsage, actualModel: foundModel } = extractUsageDataRobust(
                allParsedEvents[i],
                `final-event-scan-${i}`
              )
              if (foundUsage) {
                usageData = foundUsage
                if (foundModel) {
                  actualModel = foundModel
                }
                logger.debug(`🎯 Usage found in event ${i} during final scan!`)
                break
              }
            }
          }

          // 方法4: 尝试合并所有事件并搜索
          if (!usageData && allParsedEvents.length > 0) {
            logger.debug('🔍 Trying combined events analysis...')
            const combinedData = {
              events: allParsedEvents,
              lastEvent: allParsedEvents[allParsedEvents.length - 1],
              eventCount: allParsedEvents.length
            }

            const { usageData: combinedUsage } = extractUsageDataRobust(
              combinedData,
              'combined-events'
            )
            if (combinedUsage) {
              usageData = combinedUsage
              logger.debug('🎯 Usage found via combined events analysis!')
            }
          }
        }

        // 最终usage状态报告
        if (usageData) {
          logger.debug('✅ Final stream usage extraction SUCCESS', {
            streamId,
            usageData,
            actualModel,
            totalEvents: allParsedEvents.length,
            finalBufferSize: finalChunksBuffer.length
          })
        } else {
          logger.warn('❌ Final stream usage extraction FAILED', {
            streamId,
            totalEvents: allParsedEvents.length,
            finalBufferSize: finalChunksBuffer.length,
            mainBufferSize: buffer.length,
            lastFewEvents: allParsedEvents.slice(-3).map((e) => ({
              type: e.type,
              hasUsage: !!e.usage,
              hasResponse: !!e.response,
              keys: Object.keys(e)
            }))
          })
        }

        if (onEnd) {
          onEnd({ usageData, actualModel })
        }

        if (!clientResponse.destroyed) {
          clientResponse.end()
        }

        resolve({ usageData, actualModel })
      } catch (error) {
        logger.error('Stream end handling error:', error)
        reject(error)
      }
    })

    upstreamResponse.data.on('error', (error) => {
      if (hasEnded) {
        return
      }

      streamManager.cleanup(streamId)
      hasEnded = true

      logger.error('Upstream stream error:', error)

      try {
        if (onError) {
          onError(error)
        }

        if (!clientResponse.headersSent) {
          clientResponse.status(502).json({ error: { message: 'Upstream stream error' } })
        } else if (!clientResponse.destroyed) {
          clientResponse.end()
        }
      } catch (cleanupError) {
        logger.warn('Error during stream error cleanup:', cleanupError.message)
      }

      reject(error)
    })

    // 客户端断开时清理
    const clientCleanup = () => {
      streamManager.cleanup(streamId)
    }

    clientResponse.on('close', clientCleanup)
    clientResponse.on('aborted', clientCleanup)
    clientResponse.on('error', clientCleanup)
  })
}

// 强化的用量数据提取函数
function extractUsageDataRobust(responseData, context = 'unknown') {
  logger.debug(`🔍 Attempting usage extraction for ${context}`, {
    responseDataKeys: Object.keys(responseData || {}),
    responseDataType: typeof responseData,
    hasUsage: !!responseData?.usage,
    hasResponse: !!responseData?.response
  })

  let usageData = null
  let actualModel = null

  try {
    // 策略 1: 顶层 usage (标准 Chat Completions)
    if (responseData?.usage) {
      usageData = responseData.usage
      actualModel = responseData.model
      logger.debug('✅ Usage extracted via Strategy 1 (top-level)', { usageData, actualModel })
    }

    // 策略 2: response.usage (Responses API)
    else if (responseData?.response?.usage) {
      usageData = responseData.response.usage
      actualModel = responseData.response.model || responseData.model
      logger.debug('✅ Usage extracted via Strategy 2 (response.usage)', { usageData, actualModel })
    }

    // 策略 3: 嵌套搜索 - 深度查找 usage 字段
    else {
      const findUsageRecursive = (obj, path = '') => {
        if (!obj || typeof obj !== 'object') {
          return null
        }

        for (const [key, value] of Object.entries(obj)) {
          const currentPath = path ? `${path}.${key}` : key

          if (key === 'usage' && value && typeof value === 'object') {
            logger.debug(`✅ Usage found at path: ${currentPath}`, value)
            return { usage: value, path: currentPath }
          }

          if (typeof value === 'object' && value !== null) {
            const nested = findUsageRecursive(value, currentPath)
            if (nested) {
              return nested
            }
          }
        }
        return null
      }

      const found = findUsageRecursive(responseData)
      if (found) {
        usageData = found.usage
        // Try to find model in the same parent object
        const pathParts = found.path.split('.')
        pathParts.pop() // remove 'usage'
        let modelParent = responseData
        for (const part of pathParts) {
          modelParent = modelParent?.[part]
        }
        actualModel = modelParent?.model || responseData?.model
        logger.debug('✅ Usage extracted via Strategy 3 (recursive)', {
          usageData,
          actualModel,
          foundPath: found.path
        })
      }
    }

    // 策略 4: 特殊响应格式处理
    if (!usageData) {
      // 检查是否有 choices 数组,usage 可能在最后一个 choice 中
      if (responseData?.choices?.length > 0) {
        const lastChoice = responseData.choices[responseData.choices.length - 1]
        if (lastChoice?.usage) {
          usageData = lastChoice.usage
          actualModel = responseData.model || lastChoice.model
          logger.debug('✅ Usage extracted via Strategy 4 (choices)', { usageData, actualModel })
        }
      }
    }

    // 最终验证和记录
    if (usageData) {
      logger.debug('🎯 Final usage extraction result', {
        context,
        usageData,
        actualModel,
        inputTokens: usageData.prompt_tokens || usageData.input_tokens || 0,
        outputTokens: usageData.completion_tokens || usageData.output_tokens || 0,
        totalTokens: usageData.total_tokens || 0
      })
    } else {
      logger.warn('❌ Failed to extract usage data', {
        context,
        responseDataStructure: `${JSON.stringify(responseData, null, 2).substring(0, 1000)}...`,
        availableKeys: Object.keys(responseData || {}),
        responseSize: JSON.stringify(responseData || {}).length
      })
    }
  } catch (extractionError) {
    logger.error('🚨 Error during usage extraction', {
      context,
      error: extractionError.message,
      stack: extractionError.stack,
      responseDataType: typeof responseData
    })
  }

  return { usageData, actualModel }
}

// 处理非流式响应
function handleNonStreamResponse(upstreamResponse, clientResponse) {
  try {
    // 设置状态码
    clientResponse.status(upstreamResponse.status)

    // 设置响应头
    clientResponse.setHeader('Content-Type', 'application/json')

    // 透传某些头部
    const passThroughHeaders = [
      'x-request-id',
      'x-ratelimit-remaining-requests',
      'x-ratelimit-remaining-tokens'
    ]
    passThroughHeaders.forEach((header) => {
      const value = upstreamResponse.headers[header]
      if (value) {
        clientResponse.setHeader(header, value)
      }
    })

    // 返回响应数据
    const responseData = upstreamResponse.data
    clientResponse.json(responseData)

    // 使用强化的用量提取
    const { usageData, actualModel } = extractUsageDataRobust(responseData, 'non-stream')

    return { usageData, actualModel, responseData }
  } catch (error) {
    logger.error('Error handling Azure OpenAI non-stream response:', error)
    throw error
  }
}

module.exports = {
  handleAzureOpenAIRequest,
  handleStreamResponse,
  handleNonStreamResponse,
  normalizeModelName
}