JXJBing's picture
Upload 45 files
1a9e2c2 verified
"""缓存服务模块 - 提供图片和视频的下载、缓存和清理功能"""
import asyncio
import base64
from pathlib import Path
from typing import Optional, Tuple
from curl_cffi.requests import AsyncSession
from app.core.config import setting
from app.core.logger import logger
from app.services.grok.statsig import get_dynamic_headers
# 常量定义
MIME_TYPES = {
'.jpg': 'image/jpeg', '.jpeg': 'image/jpeg', '.png': 'image/png',
'.gif': 'image/gif', '.webp': 'image/webp', '.bmp': 'image/bmp',
}
DEFAULT_MIME = 'image/jpeg'
ASSETS_URL = "https://assets.grok.com"
class CacheService:
"""缓存服务基类"""
def __init__(self, cache_type: str, timeout: float = 30.0):
self.cache_type = cache_type
self.cache_dir = Path(f"data/temp/{cache_type}")
self.cache_dir.mkdir(parents=True, exist_ok=True)
self.timeout = timeout
self._cleanup_lock = asyncio.Lock()
def _get_path(self, file_path: str) -> Path:
"""转换文件路径为缓存路径"""
return self.cache_dir / file_path.lstrip('/').replace('/', '-')
def _log(self, level: str, msg: str):
"""统一日志输出"""
getattr(logger, level)(f"[{self.cache_type.upper()}Cache] {msg}")
def _build_headers(self, file_path: str, auth_token: str) -> dict:
"""构建请求头"""
cf = setting.grok_config.get("cf_clearance", "")
return {
**get_dynamic_headers(pathname=file_path),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "same-site",
"Sec-Fetch-User": "?1",
"Upgrade-Insecure-Requests": "1",
"Referer": "https://grok.com/",
"Cookie": f"{auth_token};{cf}" if cf else auth_token
}
async def download(self, file_path: str, auth_token: str, timeout: Optional[float] = None) -> Optional[Path]:
"""下载并缓存文件"""
cache_path = self._get_path(file_path)
if cache_path.exists():
self._log("debug", "文件已缓存")
return cache_path
# 外层重试:可配置状态码(401/429等)
retry_codes = setting.grok_config.get("retry_status_codes", [401, 429])
MAX_OUTER_RETRY = 3
for outer_retry in range(MAX_OUTER_RETRY + 1): # +1 确保实际重试3次
try:
# 内层重试:403代理池重试(cache使用缓存代理,不支持代理池,所以403只重试一次)
max_403_retries = 5
retry_403_count = 0
while retry_403_count <= max_403_retries:
proxy = await setting.get_proxy_async("cache")
proxies = {"http": proxy, "https": proxy} if proxy else {}
if proxy and outer_retry == 0 and retry_403_count == 0:
self._log("debug", f"使用代理: {proxy.split('@')[-1] if '@' in proxy else proxy}")
async with AsyncSession() as session:
url = f"{ASSETS_URL}{file_path}"
if outer_retry == 0 and retry_403_count == 0:
self._log("debug", f"下载: {url}")
response = await session.get(
url,
headers=self._build_headers(file_path, auth_token),
proxies=proxies,
timeout=timeout or self.timeout,
allow_redirects=True,
impersonate="chrome133a"
)
# 检查403错误 - 内层重试(cache不使用代理池,所以直接失败)
if response.status_code == 403:
retry_403_count += 1
if retry_403_count <= max_403_retries:
self._log("warning", f"遇到403错误,正在重试 ({retry_403_count}/{max_403_retries})...")
await asyncio.sleep(0.5)
continue
self._log("error", f"403错误,已重试{retry_403_count-1}次,放弃")
return None
# 检查可配置状态码错误 - 外层重试
if response.status_code in retry_codes:
if outer_retry < MAX_OUTER_RETRY:
delay = (outer_retry + 1) * 0.1 # 渐进延迟:0.1s, 0.2s, 0.3s
self._log("warning", f"遇到{response.status_code}错误,外层重试 ({outer_retry+1}/{MAX_OUTER_RETRY}),等待{delay}s...")
await asyncio.sleep(delay)
break # 跳出内层循环,进入外层重试
else:
self._log("error", f"{response.status_code}错误,已重试{outer_retry}次,放弃")
return None
response.raise_for_status()
await asyncio.to_thread(cache_path.write_bytes, response.content)
if outer_retry > 0 or retry_403_count > 0:
self._log("info", f"重试成功!")
else:
self._log("debug", "缓存成功")
# 异步清理(带错误处理)
asyncio.create_task(self._safe_cleanup())
return cache_path
except Exception as e:
if outer_retry < MAX_OUTER_RETRY - 1:
self._log("warning", f"下载异常: {e},外层重试 ({outer_retry+1}/{MAX_OUTER_RETRY})...")
await asyncio.sleep(0.5)
continue
self._log("error", f"下载失败: {e}(已重试{outer_retry}次)")
return None
return None
def get_cached(self, file_path: str) -> Optional[Path]:
"""获取已缓存的文件"""
path = self._get_path(file_path)
return path if path.exists() else None
async def _safe_cleanup(self):
"""安全清理(捕获异常)"""
try:
await self.cleanup()
except Exception as e:
self._log("error", f"后台清理失败: {e}")
async def cleanup(self):
"""清理超限缓存"""
if self._cleanup_lock.locked():
return
async with self._cleanup_lock:
try:
max_mb = setting.global_config.get(f"{self.cache_type}_cache_max_size_mb", 500)
max_bytes = max_mb * 1024 * 1024
# 获取文件信息 (path, size, mtime)
files = [(f, (s := f.stat()).st_size, s.st_mtime)
for f in self.cache_dir.glob("*") if f.is_file()]
total = sum(size for _, size, _ in files)
if total <= max_bytes:
return
self._log("info", f"清理缓存 {total/1024/1024:.1f}MB -> {max_mb}MB")
# 删除最旧的文件
for path, size, _ in sorted(files, key=lambda x: x[2]):
if total <= max_bytes:
break
await asyncio.to_thread(path.unlink)
total -= size
self._log("info", f"清理完成: {total/1024/1024:.1f}MB")
except Exception as e:
self._log("error", f"清理失败: {e}")
class ImageCache(CacheService):
"""图片缓存服务"""
def __init__(self):
super().__init__("image", timeout=30.0)
async def download_image(self, path: str, token: str) -> Optional[Path]:
"""下载图片"""
return await self.download(path, token)
@staticmethod
def to_base64(image_path: Path) -> Optional[str]:
"""图片转base64"""
try:
if not image_path.exists():
logger.error(f"[ImageCache] 文件不存在: {image_path}")
return None
data = base64.b64encode(image_path.read_bytes()).decode()
mime = MIME_TYPES.get(image_path.suffix.lower(), DEFAULT_MIME)
return f"data:{mime};base64,{data}"
except Exception as e:
logger.error(f"[ImageCache] 转换失败: {e}")
return None
async def download_base64(self, path: str, token: str) -> Optional[str]:
"""下载并转为base64(自动删除临时文件)"""
try:
cache_path = await self.download(path, token)
if not cache_path:
return None
result = self.to_base64(cache_path)
# 清理临时文件
try:
cache_path.unlink()
except Exception as e:
logger.warning(f"[ImageCache] 删除临时文件失败: {e}")
return result
except Exception as e:
logger.error(f"[ImageCache] 下载base64失败: {e}")
return None
class VideoCache(CacheService):
"""视频缓存服务"""
def __init__(self):
super().__init__("video", timeout=60.0)
async def download_video(self, path: str, token: str) -> Optional[Path]:
"""下载视频"""
return await self.download(path, token)
# 全局实例
image_cache_service = ImageCache()
video_cache_service = VideoCache()