Spaces:
Paused
Paused
| """缓存服务模块 - 提供图片和视频的下载、缓存和清理功能""" | |
| import asyncio | |
| import base64 | |
| from pathlib import Path | |
| from typing import Optional, Tuple | |
| from curl_cffi.requests import AsyncSession | |
| from app.core.config import setting | |
| from app.core.logger import logger | |
| from app.services.grok.statsig import get_dynamic_headers | |
| # 常量定义 | |
| MIME_TYPES = { | |
| '.jpg': 'image/jpeg', '.jpeg': 'image/jpeg', '.png': 'image/png', | |
| '.gif': 'image/gif', '.webp': 'image/webp', '.bmp': 'image/bmp', | |
| } | |
| DEFAULT_MIME = 'image/jpeg' | |
| ASSETS_URL = "https://assets.grok.com" | |
| class CacheService: | |
| """缓存服务基类""" | |
| def __init__(self, cache_type: str, timeout: float = 30.0): | |
| self.cache_type = cache_type | |
| self.cache_dir = Path(f"data/temp/{cache_type}") | |
| self.cache_dir.mkdir(parents=True, exist_ok=True) | |
| self.timeout = timeout | |
| self._cleanup_lock = asyncio.Lock() | |
| def _get_path(self, file_path: str) -> Path: | |
| """转换文件路径为缓存路径""" | |
| return self.cache_dir / file_path.lstrip('/').replace('/', '-') | |
| def _log(self, level: str, msg: str): | |
| """统一日志输出""" | |
| getattr(logger, level)(f"[{self.cache_type.upper()}Cache] {msg}") | |
| def _build_headers(self, file_path: str, auth_token: str) -> dict: | |
| """构建请求头""" | |
| cf = setting.grok_config.get("cf_clearance", "") | |
| return { | |
| **get_dynamic_headers(pathname=file_path), | |
| "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8", | |
| "Sec-Fetch-Dest": "document", | |
| "Sec-Fetch-Mode": "navigate", | |
| "Sec-Fetch-Site": "same-site", | |
| "Sec-Fetch-User": "?1", | |
| "Upgrade-Insecure-Requests": "1", | |
| "Referer": "https://grok.com/", | |
| "Cookie": f"{auth_token};{cf}" if cf else auth_token | |
| } | |
| async def download(self, file_path: str, auth_token: str, timeout: Optional[float] = None) -> Optional[Path]: | |
| """下载并缓存文件""" | |
| cache_path = self._get_path(file_path) | |
| if cache_path.exists(): | |
| self._log("debug", "文件已缓存") | |
| return cache_path | |
| # 外层重试:可配置状态码(401/429等) | |
| retry_codes = setting.grok_config.get("retry_status_codes", [401, 429]) | |
| MAX_OUTER_RETRY = 3 | |
| for outer_retry in range(MAX_OUTER_RETRY + 1): # +1 确保实际重试3次 | |
| try: | |
| # 内层重试:403代理池重试(cache使用缓存代理,不支持代理池,所以403只重试一次) | |
| max_403_retries = 5 | |
| retry_403_count = 0 | |
| while retry_403_count <= max_403_retries: | |
| proxy = await setting.get_proxy_async("cache") | |
| proxies = {"http": proxy, "https": proxy} if proxy else {} | |
| if proxy and outer_retry == 0 and retry_403_count == 0: | |
| self._log("debug", f"使用代理: {proxy.split('@')[-1] if '@' in proxy else proxy}") | |
| async with AsyncSession() as session: | |
| url = f"{ASSETS_URL}{file_path}" | |
| if outer_retry == 0 and retry_403_count == 0: | |
| self._log("debug", f"下载: {url}") | |
| response = await session.get( | |
| url, | |
| headers=self._build_headers(file_path, auth_token), | |
| proxies=proxies, | |
| timeout=timeout or self.timeout, | |
| allow_redirects=True, | |
| impersonate="chrome133a" | |
| ) | |
| # 检查403错误 - 内层重试(cache不使用代理池,所以直接失败) | |
| if response.status_code == 403: | |
| retry_403_count += 1 | |
| if retry_403_count <= max_403_retries: | |
| self._log("warning", f"遇到403错误,正在重试 ({retry_403_count}/{max_403_retries})...") | |
| await asyncio.sleep(0.5) | |
| continue | |
| self._log("error", f"403错误,已重试{retry_403_count-1}次,放弃") | |
| return None | |
| # 检查可配置状态码错误 - 外层重试 | |
| if response.status_code in retry_codes: | |
| if outer_retry < MAX_OUTER_RETRY: | |
| delay = (outer_retry + 1) * 0.1 # 渐进延迟:0.1s, 0.2s, 0.3s | |
| self._log("warning", f"遇到{response.status_code}错误,外层重试 ({outer_retry+1}/{MAX_OUTER_RETRY}),等待{delay}s...") | |
| await asyncio.sleep(delay) | |
| break # 跳出内层循环,进入外层重试 | |
| else: | |
| self._log("error", f"{response.status_code}错误,已重试{outer_retry}次,放弃") | |
| return None | |
| response.raise_for_status() | |
| await asyncio.to_thread(cache_path.write_bytes, response.content) | |
| if outer_retry > 0 or retry_403_count > 0: | |
| self._log("info", f"重试成功!") | |
| else: | |
| self._log("debug", "缓存成功") | |
| # 异步清理(带错误处理) | |
| asyncio.create_task(self._safe_cleanup()) | |
| return cache_path | |
| except Exception as e: | |
| if outer_retry < MAX_OUTER_RETRY - 1: | |
| self._log("warning", f"下载异常: {e},外层重试 ({outer_retry+1}/{MAX_OUTER_RETRY})...") | |
| await asyncio.sleep(0.5) | |
| continue | |
| self._log("error", f"下载失败: {e}(已重试{outer_retry}次)") | |
| return None | |
| return None | |
| def get_cached(self, file_path: str) -> Optional[Path]: | |
| """获取已缓存的文件""" | |
| path = self._get_path(file_path) | |
| return path if path.exists() else None | |
| async def _safe_cleanup(self): | |
| """安全清理(捕获异常)""" | |
| try: | |
| await self.cleanup() | |
| except Exception as e: | |
| self._log("error", f"后台清理失败: {e}") | |
| async def cleanup(self): | |
| """清理超限缓存""" | |
| if self._cleanup_lock.locked(): | |
| return | |
| async with self._cleanup_lock: | |
| try: | |
| max_mb = setting.global_config.get(f"{self.cache_type}_cache_max_size_mb", 500) | |
| max_bytes = max_mb * 1024 * 1024 | |
| # 获取文件信息 (path, size, mtime) | |
| files = [(f, (s := f.stat()).st_size, s.st_mtime) | |
| for f in self.cache_dir.glob("*") if f.is_file()] | |
| total = sum(size for _, size, _ in files) | |
| if total <= max_bytes: | |
| return | |
| self._log("info", f"清理缓存 {total/1024/1024:.1f}MB -> {max_mb}MB") | |
| # 删除最旧的文件 | |
| for path, size, _ in sorted(files, key=lambda x: x[2]): | |
| if total <= max_bytes: | |
| break | |
| await asyncio.to_thread(path.unlink) | |
| total -= size | |
| self._log("info", f"清理完成: {total/1024/1024:.1f}MB") | |
| except Exception as e: | |
| self._log("error", f"清理失败: {e}") | |
| class ImageCache(CacheService): | |
| """图片缓存服务""" | |
| def __init__(self): | |
| super().__init__("image", timeout=30.0) | |
| async def download_image(self, path: str, token: str) -> Optional[Path]: | |
| """下载图片""" | |
| return await self.download(path, token) | |
| def to_base64(image_path: Path) -> Optional[str]: | |
| """图片转base64""" | |
| try: | |
| if not image_path.exists(): | |
| logger.error(f"[ImageCache] 文件不存在: {image_path}") | |
| return None | |
| data = base64.b64encode(image_path.read_bytes()).decode() | |
| mime = MIME_TYPES.get(image_path.suffix.lower(), DEFAULT_MIME) | |
| return f"data:{mime};base64,{data}" | |
| except Exception as e: | |
| logger.error(f"[ImageCache] 转换失败: {e}") | |
| return None | |
| async def download_base64(self, path: str, token: str) -> Optional[str]: | |
| """下载并转为base64(自动删除临时文件)""" | |
| try: | |
| cache_path = await self.download(path, token) | |
| if not cache_path: | |
| return None | |
| result = self.to_base64(cache_path) | |
| # 清理临时文件 | |
| try: | |
| cache_path.unlink() | |
| except Exception as e: | |
| logger.warning(f"[ImageCache] 删除临时文件失败: {e}") | |
| return result | |
| except Exception as e: | |
| logger.error(f"[ImageCache] 下载base64失败: {e}") | |
| return None | |
| class VideoCache(CacheService): | |
| """视频缓存服务""" | |
| def __init__(self): | |
| super().__init__("video", timeout=60.0) | |
| async def download_video(self, path: str, token: str) -> Optional[Path]: | |
| """下载视频""" | |
| return await self.download(path, token) | |
| # 全局实例 | |
| image_cache_service = ImageCache() | |
| video_cache_service = VideoCache() | |