caidaohz commited on
Commit
d213bd6
·
1 Parent(s): 85d0131

✨ 优化Go版本API代理服务器的流式响应处理

Browse files

- 根据Content-Type动态调整缓冲区大小,提升流式响应的性能
- 优化URL重写模式,减少字符串操作次数
- 增加服务器优雅关闭功能,确保在接收中断信号时安全关闭
- 更新日志信息,提供更清晰的服务器状态反馈

Files changed (1) hide show
  1. main.go +82 -7
main.go CHANGED
@@ -11,8 +11,10 @@ import (
11
  "net/http"
12
  "net/url"
13
  "os"
 
14
  "strings"
15
  "sync/atomic"
 
16
  "time"
17
 
18
  "github.com/gin-contrib/cors"
@@ -271,7 +273,26 @@ func apc_handleAsyncAPIRequest(asyncCtx *AsyncProxyContext, c *gin.Context, pref
271
 
272
  // apc_streamResponseBody 流式转发响应体
273
  func apc_streamResponseBody(asyncCtx *AsyncProxyContext, resp *http.Response) error {
274
- buffer := make([]byte, 32*1024) // 32KB 缓冲区
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
275
 
276
  for {
277
  select {
@@ -427,11 +448,26 @@ func apc_handleHTMLStreamRewrite(asyncCtx *AsyncProxyContext, resp *http.Respons
427
  asyncCtx: asyncCtx,
428
  targetURL: targetURL,
429
  proxyBase: proxyBase,
430
- buffer: make([]byte, 0, 4096),
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
431
  }
432
 
433
  // 流式处理HTML内容
434
- buffer := make([]byte, 8*1024) // 8KB 缓冲区
435
 
436
  for {
437
  select {
@@ -486,7 +522,7 @@ func (h *AsyncHTMLRewriter) processBuffer() []byte {
486
  baseURL := h.targetURL.Scheme + "://" + h.targetURL.Host
487
  content := string(h.buffer)
488
 
489
- // URL重写模式
490
  patterns := []struct{ old, new string }{
491
  {`href="` + baseURL, `href="` + h.proxyBase + baseURL},
492
  {`src="` + baseURL, `src="` + h.proxyBase + baseURL},
@@ -494,14 +530,24 @@ func (h *AsyncHTMLRewriter) processBuffer() []byte {
494
  {`href='` + baseURL, `href='` + h.proxyBase + baseURL},
495
  {`src='` + baseURL, `src='` + h.proxyBase + baseURL},
496
  {`action='` + baseURL, `action='` + h.proxyBase + baseURL},
 
 
 
497
  }
498
 
 
499
  for _, pattern := range patterns {
500
  content = strings.ReplaceAll(content, pattern.old, pattern.new)
501
  }
502
 
503
- // 保留缓冲区末尾防止URL跨块
504
- const keepSize = 1024
 
 
 
 
 
 
505
  if len(h.buffer) > keepSize {
506
  processed := []byte(content[:len(content)-keepSize])
507
  h.buffer = []byte(content[len(content)-keepSize:])
@@ -629,8 +675,37 @@ func main() {
629
  log.Printf("🚀 API代理服务器已启动 (Go优化版) 端口:%s", port)
630
  log.Printf("🕒 统计数据每分钟自动刷新页面")
631
  log.Printf("⚡ 性能优化:异步统计、内存优化、锁竞争减少")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
632
 
633
- r.Run(":" + port)
634
  }
635
 
636
  // handleIndex 处理首页
 
11
  "net/http"
12
  "net/url"
13
  "os"
14
+ "os/signal"
15
  "strings"
16
  "sync/atomic"
17
+ "syscall"
18
  "time"
19
 
20
  "github.com/gin-contrib/cors"
 
273
 
274
  // apc_streamResponseBody 流式转发响应体
275
  func apc_streamResponseBody(asyncCtx *AsyncProxyContext, resp *http.Response) error {
276
+ // 检测是否为流式响应
277
+ contentType := resp.Header.Get("Content-Type")
278
+ isStreaming := strings.Contains(contentType, "text/event-stream") ||
279
+ strings.Contains(contentType, "application/stream+json") ||
280
+ strings.Contains(contentType, "text/plain") // OpenAI的流式响应通常是text/plain
281
+
282
+ var bufferSize int
283
+ if isStreaming {
284
+ bufferSize = 2 * 1024 // 2KB for streaming APIs (OpenAI, Claude等)
285
+ } else if strings.Contains(contentType, "text/html") {
286
+ bufferSize = 8 * 1024 // 8KB for HTML pages
287
+ } else if strings.Contains(contentType, "application/json") {
288
+ bufferSize = 16 * 1024 // 16KB for JSON APIs
289
+ } else if strings.Contains(contentType, "image/") || strings.Contains(contentType, "video/") {
290
+ bufferSize = 64 * 1024 // 64KB for media files
291
+ } else {
292
+ bufferSize = 32 * 1024 // 32KB for other content
293
+ }
294
+
295
+ buffer := make([]byte, bufferSize)
296
 
297
  for {
298
  select {
 
448
  asyncCtx: asyncCtx,
449
  targetURL: targetURL,
450
  proxyBase: proxyBase,
451
+ buffer: make([]byte, 0, 4096), // 初始4KB,动态增长
452
+ }
453
+
454
+ // 根据Content-Length动态调整缓冲区大小
455
+ contentLength := resp.ContentLength
456
+ var bufferSize int
457
+ if contentLength > 0 {
458
+ if contentLength < 1024*1024 { // 小于1MB
459
+ bufferSize = 4 * 1024 // 4KB
460
+ } else if contentLength < 10*1024*1024 { // 小于10MB
461
+ bufferSize = 16 * 1024 // 16KB
462
+ } else {
463
+ bufferSize = 64 * 1024 // 64KB for large pages
464
+ }
465
+ } else {
466
+ bufferSize = 8 * 1024 // 默认8KB
467
  }
468
 
469
  // 流式处理HTML内容
470
+ buffer := make([]byte, bufferSize)
471
 
472
  for {
473
  select {
 
522
  baseURL := h.targetURL.Scheme + "://" + h.targetURL.Host
523
  content := string(h.buffer)
524
 
525
+ // 优化的URL重写模式 - 使用更高效的字符串替换
526
  patterns := []struct{ old, new string }{
527
  {`href="` + baseURL, `href="` + h.proxyBase + baseURL},
528
  {`src="` + baseURL, `src="` + h.proxyBase + baseURL},
 
530
  {`href='` + baseURL, `href='` + h.proxyBase + baseURL},
531
  {`src='` + baseURL, `src='` + h.proxyBase + baseURL},
532
  {`action='` + baseURL, `action='` + h.proxyBase + baseURL},
533
+ // 添加更多常见的URL模式
534
+ {`url("` + baseURL, `url("` + h.proxyBase + baseURL},
535
+ {`url('` + baseURL, `url('` + h.proxyBase + baseURL},
536
  }
537
 
538
+ // 批量替换,减少字符串操作次数
539
  for _, pattern := range patterns {
540
  content = strings.ReplaceAll(content, pattern.old, pattern.new)
541
  }
542
 
543
+ // 动态调整保留大小,根据缓冲区大小优化
544
+ keepSize := 1024 // 默认保留1KB
545
+ if len(h.buffer) > 32*1024 { // 如果缓冲区大于32KB
546
+ keepSize = 2048 // 保留2KB,减少处理频率
547
+ } else if len(h.buffer) < 4*1024 { // 如果缓冲区小于4KB
548
+ keepSize = 512 // 只保留512B,更快处理
549
+ }
550
+
551
  if len(h.buffer) > keepSize {
552
  processed := []byte(content[:len(content)-keepSize])
553
  h.buffer = []byte(content[len(content)-keepSize:])
 
675
  log.Printf("🚀 API代理服务器已启动 (Go优化版) 端口:%s", port)
676
  log.Printf("🕒 统计数据每分钟自动刷新页面")
677
  log.Printf("⚡ 性能优化:异步统计、内存优化、锁竞争减少")
678
+ log.Printf("📊 访问 http://localhost:%s 查看统计信息", port)
679
+
680
+ // 使用自定义HTTP服务器以更好地控制
681
+ srv := &http.Server{
682
+ Addr: ":" + port,
683
+ Handler: r,
684
+ }
685
+
686
+ // 启动服务器
687
+ go func() {
688
+ if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
689
+ log.Fatalf("服务器启动失败: %v", err)
690
+ }
691
+ }()
692
+
693
+ // 等待中断信号
694
+ quit := make(chan os.Signal, 1)
695
+ signal.Notify(quit, os.Interrupt, syscall.SIGTERM)
696
+ <-quit
697
+
698
+ log.Println("正在关闭服务器...")
699
+
700
+ // 优雅关闭
701
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
702
+ defer cancel()
703
+
704
+ if err := srv.Shutdown(ctx); err != nil {
705
+ log.Fatal("服务器强制关闭:", err)
706
+ }
707
 
708
+ log.Println("服务器已关闭")
709
  }
710
 
711
  // handleIndex 处理首页