"""Grok API 客户端 - 处理OpenAI到Grok的请求转换和响应处理""" import asyncio import orjson from typing import Dict, List, Tuple, Any, Optional from curl_cffi.requests import AsyncSession as curl_AsyncSession from app.core.config import setting from app.core.logger import logger from app.models.grok_models import Models from app.services.grok.processer import GrokResponseProcessor from app.services.grok.statsig import get_dynamic_headers from app.services.grok.token import token_manager from app.services.grok.upload import ImageUploadManager from app.services.grok.create import PostCreateManager from app.core.exception import GrokApiException # 常量 API_ENDPOINT = "https://grok.com/rest/app-chat/conversations/new" TIMEOUT = 120 BROWSER = "chrome133a" MAX_RETRY = 3 MAX_UPLOADS = 20 # 提高并发上传限制以支持更高并发 class GrokClient: """Grok API 客户端""" _upload_sem = None # 延迟初始化 @staticmethod def _get_upload_semaphore(): """获取上传信号量(动态配置)""" if GrokClient._upload_sem is None: # 从配置读取,如果不可用则使用默认值 max_concurrency = setting.global_config.get("max_upload_concurrency", MAX_UPLOADS) GrokClient._upload_sem = asyncio.Semaphore(max_concurrency) logger.debug(f"[Client] 初始化上传并发限制: {max_concurrency}") return GrokClient._upload_sem @staticmethod async def openai_to_grok(request: dict): """转换OpenAI请求为Grok请求""" model = request["model"] content, images = GrokClient._extract_content(request["messages"]) stream = request.get("stream", False) # 获取模型信息 info = Models.get_model_info(model) grok_model, mode = Models.to_grok(model) is_video = info.get("is_video_model", False) # 视频模型限制 if is_video and len(images) > 1: logger.warning(f"[Client] 视频模型仅支持1张图片,已截取前1张") images = images[:1] return await GrokClient._retry(model, content, images, grok_model, mode, is_video, stream) @staticmethod async def _retry(model: str, content: str, images: List[str], grok_model: str, mode: str, is_video: bool, stream: bool): """重试请求""" last_err = None for i in range(MAX_RETRY): try: token = await token_manager.get_token(model) img_ids, img_uris = await GrokClient._upload(images, token) # 视频模型创建会话 post_id = None if is_video and img_ids and img_uris: post_id = await GrokClient._create_post(img_ids[0], img_uris[0], token) payload = GrokClient._build_payload(content, grok_model, mode, img_ids, img_uris, is_video, post_id) return await GrokClient._request(payload, token, model, stream, post_id) except GrokApiException as e: last_err = e # 检查是否可重试 if e.error_code not in ["HTTP_ERROR", "NO_AVAILABLE_TOKEN"]: raise status = e.context.get("status") if e.context else None retry_codes = setting.grok_config.get("retry_status_codes", [401, 429]) if status not in retry_codes: raise if i < MAX_RETRY - 1: logger.warning(f"[Client] 失败(状态:{status}), 重试 {i+1}/{MAX_RETRY}") await asyncio.sleep(0.5) raise last_err or GrokApiException("请求失败", "REQUEST_ERROR") @staticmethod def _extract_content(messages: List[Dict]) -> Tuple[str, List[str]]: """提取文本和图片,保留角色结构""" formatted_messages = [] images = [] # 角色映射 role_map = { "system": "系统", "user": "用户", "assistant": "grok" } for msg in messages: role = msg.get("role", "user") content = msg.get("content", "") role_prefix = role_map.get(role, role) # 提取文本内容 text_parts = [] if isinstance(content, list): for item in content: if item.get("type") == "text": text_parts.append(item.get("text", "")) elif item.get("type") == "image_url": if url := item.get("image_url", {}).get("url"): images.append(url) else: text_parts.append(content) # 合并该消息的文本并添加角色前缀 msg_text = "".join(text_parts).strip() if msg_text: formatted_messages.append(f"{role_prefix}:{msg_text}") # 用换行符连接所有消息 return "\n".join(formatted_messages), images @staticmethod async def _upload(urls: List[str], token: str) -> Tuple[List[str], List[str]]: """并发上传图片""" if not urls: return [], [] async def upload_limited(url): async with GrokClient._get_upload_semaphore(): return await ImageUploadManager.upload(url, token) results = await asyncio.gather(*[upload_limited(u) for u in urls], return_exceptions=True) ids, uris = [], [] for url, result in zip(urls, results): if isinstance(result, Exception): logger.warning(f"[Client] 上传失败: {url} - {result}") elif isinstance(result, tuple) and len(result) == 2: fid, furi = result if fid: ids.append(fid) uris.append(furi) return ids, uris @staticmethod async def _create_post(file_id: str, file_uri: str, token: str) -> Optional[str]: """创建视频会话""" try: result = await PostCreateManager.create(file_id, file_uri, token) if result and result.get("success"): return result.get("post_id") except Exception as e: logger.warning(f"[Client] 创建会话失败: {e}") return None @staticmethod def _build_payload(content: str, model: str, mode: str, img_ids: List[str], img_uris: List[str], is_video: bool = False, post_id: str = None) -> Dict: """构建请求载荷""" # 视频模型特殊处理 if is_video and img_uris: img_msg = f"https://grok.com/imagine/{post_id}" if post_id else f"https://assets.grok.com/post/{img_uris[0]}" return { "temporary": True, "modelName": "grok-3", "message": f"{img_msg} {content} --mode=custom", "fileAttachments": img_ids, "toolOverrides": {"videoGen": True} } # 标准载荷 return { "temporary": setting.grok_config.get("temporary", True), "modelName": model, "message": content, "fileAttachments": img_ids, "imageAttachments": [], "disableSearch": False, "enableImageGeneration": True, "returnImageBytes": False, "returnRawGrokInXaiRequest": False, "enableImageStreaming": True, "imageGenerationCount": 2, "forceConcise": False, "toolOverrides": {}, "enableSideBySide": True, "sendFinalMetadata": True, "isReasoning": False, "webpageUrls": [], "disableTextFollowUps": True, "responseMetadata": {"requestModelDetails": {"modelId": model}}, "disableMemory": False, "forceSideBySide": False, "modelMode": mode, "isAsyncChat": False } @staticmethod async def _request(payload: dict, token: str, model: str, stream: bool, post_id: str = None): """发送请求""" if not token: raise GrokApiException("认证令牌缺失", "NO_AUTH_TOKEN") # 外层重试:可配置状态码(401/429等) retry_codes = setting.grok_config.get("retry_status_codes", [401, 429]) MAX_OUTER_RETRY = 3 for outer_retry in range(MAX_OUTER_RETRY + 1): # +1 确保实际重试3次 # 内层重试:403代理池重试 max_403_retries = 5 retry_403_count = 0 while retry_403_count <= max_403_retries: # 异步获取代理 from app.core.proxy_pool import proxy_pool # 如果是403重试且使用代理池,强制刷新代理 if retry_403_count > 0 and proxy_pool._enabled: logger.info(f"[Client] 403重试 {retry_403_count}/{max_403_retries},刷新代理...") proxy = await proxy_pool.force_refresh() else: proxy = await setting.get_proxy_async("service") proxies = {"http": proxy, "https": proxy} if proxy else None # 构建请求头(放在循环内以支持重试新Token) headers = GrokClient._build_headers(token) if model == "grok-imagine-0.9": file_attachments = payload.get("fileAttachments", []) ref_id = post_id or (file_attachments[0] if file_attachments else "") if ref_id: headers["Referer"] = f"https://grok.com/imagine/{ref_id}" # 创建会话并执行请求 session = curl_AsyncSession(impersonate=BROWSER) try: response = await session.post( API_ENDPOINT, headers=headers, data=orjson.dumps(payload), timeout=TIMEOUT, stream=True, proxies=proxies ) # 内层403重试:仅当有代理池时触发 if response.status_code == 403 and proxy_pool._enabled: retry_403_count += 1 if retry_403_count <= max_403_retries: logger.warning(f"[Client] 遇到403错误,正在重试 ({retry_403_count}/{max_403_retries})...") await session.close() await asyncio.sleep(0.5) continue logger.error(f"[Client] 403错误,已重试{retry_403_count-1}次,放弃") # 检查可配置状态码错误 - 外层重试 if response.status_code in retry_codes: if outer_retry < MAX_OUTER_RETRY: delay = (outer_retry + 1) * 0.1 logger.warning(f"[Client] 遇到{response.status_code}错误,外层重试 ({outer_retry+1}/{MAX_OUTER_RETRY}),等待{delay}s...") await session.close() await asyncio.sleep(delay) break # 跳出内层循环,进入外层重试 else: logger.error(f"[Client] {response.status_code}错误,已重试{outer_retry}次,放弃") try: GrokClient._handle_error(response, token) finally: await session.close() # 检查其他响应状态 if response.status_code != 200: try: GrokClient._handle_error(response, token) finally: await session.close() # 成功 - 重置失败计数 asyncio.create_task(token_manager.reset_failure(token)) if outer_retry > 0 or retry_403_count > 0: logger.info(f"[Client] 重试成功!") # 处理响应 if stream: # 流式响应由迭代器负责关闭 session result = GrokResponseProcessor.process_stream(response, token, session) else: # 普通响应处理完立即关闭 session try: result = await GrokResponseProcessor.process_normal(response, token, model) finally: await session.close() asyncio.create_task(GrokClient._update_limits(token, model)) return result except Exception as e: await session.close() if "RequestsError" in str(type(e)): logger.error(f"[Client] 网络错误: {e}") raise GrokApiException(f"网络错误: {e}", "NETWORK_ERROR") from e raise raise GrokApiException("请求失败:已达到最大重试次数", "MAX_RETRIES_EXCEEDED") @staticmethod def _build_headers(token: str) -> Dict[str, str]: """构建请求头""" headers = get_dynamic_headers("/rest/app-chat/conversations/new") cf = setting.grok_config.get("cf_clearance", "") headers["Cookie"] = f"{token};{cf}" if cf else token return headers @staticmethod def _handle_error(response, token: str): """处理错误""" if response.status_code == 403: msg = "您的IP被拦截,请尝试以下方法之一: 1.更换IP 2.使用代理 3.配置CF值" data = {"cf_blocked": True, "status": 403} logger.warning(f"[Client] {msg}") else: try: data = response.json() msg = str(data) except: data = response.text msg = data[:200] if data else "未知错误" asyncio.create_task(token_manager.record_failure(token, response.status_code, msg)) asyncio.create_task(token_manager.apply_cooldown(token, response.status_code)) raise GrokApiException( f"请求失败: {response.status_code} - {msg}", "HTTP_ERROR", {"status": response.status_code, "data": data} ) @staticmethod async def _update_limits(token: str, model: str): """更新速率限制""" try: await token_manager.check_limits(token, model) except Exception as e: logger.error(f"[Client] 更新限制失败: {e}")