Spaces:
Running
Running
File size: 2,986 Bytes
69fb140 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
"""
回應壓縮中間件
使用 gzip 壓縮 API 回應,減少傳輸大小
"""
import gzip
from typing import Callable
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.responses import Response
from core.logging import get_logger
logger = get_logger("middleware.compression")
# 最小壓縮大小(bytes)
MIN_COMPRESS_SIZE = 500
# 可壓縮的 Content-Type
COMPRESSIBLE_TYPES = {
"application/json",
"text/html",
"text/plain",
"text/css",
"text/javascript",
"application/javascript",
}
class GzipMiddleware(BaseHTTPMiddleware):
"""
Gzip 壓縮中間件
功能:
1. 檢查客戶端是否支援 gzip
2. 壓縮大於閾值的回應
3. 只壓縮可壓縮的 Content-Type
"""
async def dispatch(self, request: Request, call_next: Callable) -> Response:
# 檢查客戶端是否支援 gzip
accept_encoding = request.headers.get("accept-encoding", "")
supports_gzip = "gzip" in accept_encoding.lower()
response = await call_next(request)
# 不支援 gzip 或已經壓縮,直接返回
if not supports_gzip:
return response
if response.headers.get("content-encoding"):
return response
# 檢查 Content-Type 是否可壓縮
content_type = response.headers.get("content-type", "")
base_type = content_type.split(";")[0].strip()
if base_type not in COMPRESSIBLE_TYPES:
return response
# 讀取回應內容
body = b""
async for chunk in response.body_iterator:
body += chunk
# 檢查大小是否值得壓縮
if len(body) < MIN_COMPRESS_SIZE:
# 重建回應
return Response(
content=body,
status_code=response.status_code,
headers=dict(response.headers),
media_type=response.media_type,
)
# 壓縮內容
compressed = gzip.compress(body, compresslevel=6)
# 只有壓縮後更小才使用
if len(compressed) >= len(body):
return Response(
content=body,
status_code=response.status_code,
headers=dict(response.headers),
media_type=response.media_type,
)
# 更新標頭
headers = dict(response.headers)
headers["content-encoding"] = "gzip"
headers["content-length"] = str(len(compressed))
# 移除可能衝突的標頭
headers.pop("transfer-encoding", None)
logger.debug(
f"壓縮回應: {len(body)} -> {len(compressed)} bytes "
f"({100 - len(compressed) * 100 // len(body)}% 減少)"
)
return Response(
content=compressed,
status_code=response.status_code,
headers=headers,
media_type=response.media_type,
)
|