File size: 9,993 Bytes
1a9e2c2 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 | """缓存服务模块 - 提供图片和视频的下载、缓存和清理功能"""
import asyncio
import base64
from pathlib import Path
from typing import Optional, Tuple
from curl_cffi.requests import AsyncSession
from app.core.config import setting
from app.core.logger import logger
from app.services.grok.statsig import get_dynamic_headers
# 常量定义
MIME_TYPES = {
'.jpg': 'image/jpeg', '.jpeg': 'image/jpeg', '.png': 'image/png',
'.gif': 'image/gif', '.webp': 'image/webp', '.bmp': 'image/bmp',
}
DEFAULT_MIME = 'image/jpeg'
ASSETS_URL = "https://assets.grok.com"
class CacheService:
"""缓存服务基类"""
def __init__(self, cache_type: str, timeout: float = 30.0):
self.cache_type = cache_type
self.cache_dir = Path(f"data/temp/{cache_type}")
self.cache_dir.mkdir(parents=True, exist_ok=True)
self.timeout = timeout
self._cleanup_lock = asyncio.Lock()
def _get_path(self, file_path: str) -> Path:
"""转换文件路径为缓存路径"""
return self.cache_dir / file_path.lstrip('/').replace('/', '-')
def _log(self, level: str, msg: str):
"""统一日志输出"""
getattr(logger, level)(f"[{self.cache_type.upper()}Cache] {msg}")
def _build_headers(self, file_path: str, auth_token: str) -> dict:
"""构建请求头"""
cf = setting.grok_config.get("cf_clearance", "")
return {
**get_dynamic_headers(pathname=file_path),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "same-site",
"Sec-Fetch-User": "?1",
"Upgrade-Insecure-Requests": "1",
"Referer": "https://grok.com/",
"Cookie": f"{auth_token};{cf}" if cf else auth_token
}
async def download(self, file_path: str, auth_token: str, timeout: Optional[float] = None) -> Optional[Path]:
"""下载并缓存文件"""
cache_path = self._get_path(file_path)
if cache_path.exists():
self._log("debug", "文件已缓存")
return cache_path
# 外层重试:可配置状态码(401/429等)
retry_codes = setting.grok_config.get("retry_status_codes", [401, 429])
MAX_OUTER_RETRY = 3
for outer_retry in range(MAX_OUTER_RETRY + 1): # +1 确保实际重试3次
try:
# 内层重试:403代理池重试(cache使用缓存代理,不支持代理池,所以403只重试一次)
max_403_retries = 5
retry_403_count = 0
while retry_403_count <= max_403_retries:
proxy = await setting.get_proxy_async("cache")
proxies = {"http": proxy, "https": proxy} if proxy else {}
if proxy and outer_retry == 0 and retry_403_count == 0:
self._log("debug", f"使用代理: {proxy.split('@')[-1] if '@' in proxy else proxy}")
async with AsyncSession() as session:
url = f"{ASSETS_URL}{file_path}"
if outer_retry == 0 and retry_403_count == 0:
self._log("debug", f"下载: {url}")
response = await session.get(
url,
headers=self._build_headers(file_path, auth_token),
proxies=proxies,
timeout=timeout or self.timeout,
allow_redirects=True,
impersonate="chrome133a"
)
# 检查403错误 - 内层重试(cache不使用代理池,所以直接失败)
if response.status_code == 403:
retry_403_count += 1
if retry_403_count <= max_403_retries:
self._log("warning", f"遇到403错误,正在重试 ({retry_403_count}/{max_403_retries})...")
await asyncio.sleep(0.5)
continue
self._log("error", f"403错误,已重试{retry_403_count-1}次,放弃")
return None
# 检查可配置状态码错误 - 外层重试
if response.status_code in retry_codes:
if outer_retry < MAX_OUTER_RETRY:
delay = (outer_retry + 1) * 0.1 # 渐进延迟:0.1s, 0.2s, 0.3s
self._log("warning", f"遇到{response.status_code}错误,外层重试 ({outer_retry+1}/{MAX_OUTER_RETRY}),等待{delay}s...")
await asyncio.sleep(delay)
break # 跳出内层循环,进入外层重试
else:
self._log("error", f"{response.status_code}错误,已重试{outer_retry}次,放弃")
return None
response.raise_for_status()
await asyncio.to_thread(cache_path.write_bytes, response.content)
if outer_retry > 0 or retry_403_count > 0:
self._log("info", f"重试成功!")
else:
self._log("debug", "缓存成功")
# 异步清理(带错误处理)
asyncio.create_task(self._safe_cleanup())
return cache_path
except Exception as e:
if outer_retry < MAX_OUTER_RETRY - 1:
self._log("warning", f"下载异常: {e},外层重试 ({outer_retry+1}/{MAX_OUTER_RETRY})...")
await asyncio.sleep(0.5)
continue
self._log("error", f"下载失败: {e}(已重试{outer_retry}次)")
return None
return None
def get_cached(self, file_path: str) -> Optional[Path]:
"""获取已缓存的文件"""
path = self._get_path(file_path)
return path if path.exists() else None
async def _safe_cleanup(self):
"""安全清理(捕获异常)"""
try:
await self.cleanup()
except Exception as e:
self._log("error", f"后台清理失败: {e}")
async def cleanup(self):
"""清理超限缓存"""
if self._cleanup_lock.locked():
return
async with self._cleanup_lock:
try:
max_mb = setting.global_config.get(f"{self.cache_type}_cache_max_size_mb", 500)
max_bytes = max_mb * 1024 * 1024
# 获取文件信息 (path, size, mtime)
files = [(f, (s := f.stat()).st_size, s.st_mtime)
for f in self.cache_dir.glob("*") if f.is_file()]
total = sum(size for _, size, _ in files)
if total <= max_bytes:
return
self._log("info", f"清理缓存 {total/1024/1024:.1f}MB -> {max_mb}MB")
# 删除最旧的文件
for path, size, _ in sorted(files, key=lambda x: x[2]):
if total <= max_bytes:
break
await asyncio.to_thread(path.unlink)
total -= size
self._log("info", f"清理完成: {total/1024/1024:.1f}MB")
except Exception as e:
self._log("error", f"清理失败: {e}")
class ImageCache(CacheService):
"""图片缓存服务"""
def __init__(self):
super().__init__("image", timeout=30.0)
async def download_image(self, path: str, token: str) -> Optional[Path]:
"""下载图片"""
return await self.download(path, token)
@staticmethod
def to_base64(image_path: Path) -> Optional[str]:
"""图片转base64"""
try:
if not image_path.exists():
logger.error(f"[ImageCache] 文件不存在: {image_path}")
return None
data = base64.b64encode(image_path.read_bytes()).decode()
mime = MIME_TYPES.get(image_path.suffix.lower(), DEFAULT_MIME)
return f"data:{mime};base64,{data}"
except Exception as e:
logger.error(f"[ImageCache] 转换失败: {e}")
return None
async def download_base64(self, path: str, token: str) -> Optional[str]:
"""下载并转为base64(自动删除临时文件)"""
try:
cache_path = await self.download(path, token)
if not cache_path:
return None
result = self.to_base64(cache_path)
# 清理临时文件
try:
cache_path.unlink()
except Exception as e:
logger.warning(f"[ImageCache] 删除临时文件失败: {e}")
return result
except Exception as e:
logger.error(f"[ImageCache] 下载base64失败: {e}")
return None
class VideoCache(CacheService):
"""视频缓存服务"""
def __init__(self):
super().__init__("video", timeout=60.0)
async def download_video(self, path: str, token: str) -> Optional[Path]:
"""下载视频"""
return await self.download(path, token)
# 全局实例
image_cache_service = ImageCache()
video_cache_service = VideoCache()
|