| """缓存服务模块 - 提供图片和视频的下载、缓存和清理功能""" |
|
|
| import asyncio |
| import base64 |
| from pathlib import Path |
| from typing import Optional, Tuple |
| from curl_cffi.requests import AsyncSession |
|
|
| from app.core.config import setting |
| from app.core.logger import logger |
| from app.services.grok.statsig import get_dynamic_headers |
|
|
|
|
| |
| MIME_TYPES = { |
| '.jpg': 'image/jpeg', '.jpeg': 'image/jpeg', '.png': 'image/png', |
| '.gif': 'image/gif', '.webp': 'image/webp', '.bmp': 'image/bmp', |
| } |
| DEFAULT_MIME = 'image/jpeg' |
| ASSETS_URL = "https://assets.grok.com" |
|
|
|
|
| class CacheService: |
| """缓存服务基类""" |
|
|
| def __init__(self, cache_type: str, timeout: float = 30.0): |
| self.cache_type = cache_type |
| self.cache_dir = Path(f"data/temp/{cache_type}") |
| self.cache_dir.mkdir(parents=True, exist_ok=True) |
| self.timeout = timeout |
| self._cleanup_lock = asyncio.Lock() |
|
|
| def _get_path(self, file_path: str) -> Path: |
| """转换文件路径为缓存路径""" |
| return self.cache_dir / file_path.lstrip('/').replace('/', '-') |
|
|
| def _log(self, level: str, msg: str): |
| """统一日志输出""" |
| getattr(logger, level)(f"[{self.cache_type.upper()}Cache] {msg}") |
|
|
| def _build_headers(self, file_path: str, auth_token: str) -> dict: |
| """构建请求头""" |
| cf = setting.grok_config.get("cf_clearance", "") |
| return { |
| **get_dynamic_headers(pathname=file_path), |
| "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8", |
| "Sec-Fetch-Dest": "document", |
| "Sec-Fetch-Mode": "navigate", |
| "Sec-Fetch-Site": "same-site", |
| "Sec-Fetch-User": "?1", |
| "Upgrade-Insecure-Requests": "1", |
| "Referer": "https://grok.com/", |
| "Cookie": f"{auth_token};{cf}" if cf else auth_token |
| } |
|
|
| async def download(self, file_path: str, auth_token: str, timeout: Optional[float] = None) -> Optional[Path]: |
| """下载并缓存文件""" |
| cache_path = self._get_path(file_path) |
| if cache_path.exists(): |
| self._log("debug", "文件已缓存") |
| return cache_path |
|
|
| |
| retry_codes = setting.grok_config.get("retry_status_codes", [401, 429]) |
| MAX_OUTER_RETRY = 3 |
| |
| for outer_retry in range(MAX_OUTER_RETRY + 1): |
| try: |
| |
| max_403_retries = 5 |
| retry_403_count = 0 |
| |
| while retry_403_count <= max_403_retries: |
| proxy = await setting.get_proxy_async("cache") |
| proxies = {"http": proxy, "https": proxy} if proxy else {} |
| |
| if proxy and outer_retry == 0 and retry_403_count == 0: |
| self._log("debug", f"使用代理: {proxy.split('@')[-1] if '@' in proxy else proxy}") |
|
|
| async with AsyncSession() as session: |
| url = f"{ASSETS_URL}{file_path}" |
| if outer_retry == 0 and retry_403_count == 0: |
| self._log("debug", f"下载: {url}") |
| |
| response = await session.get( |
| url, |
| headers=self._build_headers(file_path, auth_token), |
| proxies=proxies, |
| timeout=timeout or self.timeout, |
| allow_redirects=True, |
| impersonate="chrome133a" |
| ) |
| |
| |
| if response.status_code == 403: |
| retry_403_count += 1 |
| |
| if retry_403_count <= max_403_retries: |
| self._log("warning", f"遇到403错误,正在重试 ({retry_403_count}/{max_403_retries})...") |
| await asyncio.sleep(0.5) |
| continue |
| |
| self._log("error", f"403错误,已重试{retry_403_count-1}次,放弃") |
| return None |
| |
| |
| if response.status_code in retry_codes: |
| if outer_retry < MAX_OUTER_RETRY: |
| delay = (outer_retry + 1) * 0.1 |
| self._log("warning", f"遇到{response.status_code}错误,外层重试 ({outer_retry+1}/{MAX_OUTER_RETRY}),等待{delay}s...") |
| await asyncio.sleep(delay) |
| break |
| else: |
| self._log("error", f"{response.status_code}错误,已重试{outer_retry}次,放弃") |
| return None |
| |
| response.raise_for_status() |
| await asyncio.to_thread(cache_path.write_bytes, response.content) |
| |
| if outer_retry > 0 or retry_403_count > 0: |
| self._log("info", f"重试成功!") |
| else: |
| self._log("debug", "缓存成功") |
| |
| |
| asyncio.create_task(self._safe_cleanup()) |
| return cache_path |
| |
| except Exception as e: |
| if outer_retry < MAX_OUTER_RETRY - 1: |
| self._log("warning", f"下载异常: {e},外层重试 ({outer_retry+1}/{MAX_OUTER_RETRY})...") |
| await asyncio.sleep(0.5) |
| continue |
| |
| self._log("error", f"下载失败: {e}(已重试{outer_retry}次)") |
| return None |
| |
| return None |
|
|
| def get_cached(self, file_path: str) -> Optional[Path]: |
| """获取已缓存的文件""" |
| path = self._get_path(file_path) |
| return path if path.exists() else None |
|
|
| async def _safe_cleanup(self): |
| """安全清理(捕获异常)""" |
| try: |
| await self.cleanup() |
| except Exception as e: |
| self._log("error", f"后台清理失败: {e}") |
|
|
| async def cleanup(self): |
| """清理超限缓存""" |
| if self._cleanup_lock.locked(): |
| return |
| |
| async with self._cleanup_lock: |
| try: |
| max_mb = setting.global_config.get(f"{self.cache_type}_cache_max_size_mb", 500) |
| max_bytes = max_mb * 1024 * 1024 |
|
|
| |
| files = [(f, (s := f.stat()).st_size, s.st_mtime) |
| for f in self.cache_dir.glob("*") if f.is_file()] |
| total = sum(size for _, size, _ in files) |
|
|
| if total <= max_bytes: |
| return |
|
|
| self._log("info", f"清理缓存 {total/1024/1024:.1f}MB -> {max_mb}MB") |
| |
| |
| for path, size, _ in sorted(files, key=lambda x: x[2]): |
| if total <= max_bytes: |
| break |
| await asyncio.to_thread(path.unlink) |
| total -= size |
| |
| self._log("info", f"清理完成: {total/1024/1024:.1f}MB") |
| except Exception as e: |
| self._log("error", f"清理失败: {e}") |
|
|
|
|
| class ImageCache(CacheService): |
| """图片缓存服务""" |
|
|
| def __init__(self): |
| super().__init__("image", timeout=30.0) |
|
|
| async def download_image(self, path: str, token: str) -> Optional[Path]: |
| """下载图片""" |
| return await self.download(path, token) |
|
|
| @staticmethod |
| def to_base64(image_path: Path) -> Optional[str]: |
| """图片转base64""" |
| try: |
| if not image_path.exists(): |
| logger.error(f"[ImageCache] 文件不存在: {image_path}") |
| return None |
|
|
| data = base64.b64encode(image_path.read_bytes()).decode() |
| mime = MIME_TYPES.get(image_path.suffix.lower(), DEFAULT_MIME) |
| return f"data:{mime};base64,{data}" |
| except Exception as e: |
| logger.error(f"[ImageCache] 转换失败: {e}") |
| return None |
|
|
| async def download_base64(self, path: str, token: str) -> Optional[str]: |
| """下载并转为base64(自动删除临时文件)""" |
| try: |
| cache_path = await self.download(path, token) |
| if not cache_path: |
| return None |
|
|
| result = self.to_base64(cache_path) |
| |
| |
| try: |
| cache_path.unlink() |
| except Exception as e: |
| logger.warning(f"[ImageCache] 删除临时文件失败: {e}") |
|
|
| return result |
| except Exception as e: |
| logger.error(f"[ImageCache] 下载base64失败: {e}") |
| return None |
|
|
|
|
| class VideoCache(CacheService): |
| """视频缓存服务""" |
|
|
| def __init__(self): |
| super().__init__("video", timeout=60.0) |
|
|
| async def download_video(self, path: str, token: str) -> Optional[Path]: |
| """下载视频""" |
| return await self.download(path, token) |
|
|
|
|
| |
| image_cache_service = ImageCache() |
| video_cache_service = VideoCache() |
|
|