| """Grok API 客户端 - 处理OpenAI到Grok的请求转换和响应处理""" |
|
|
| import asyncio |
| import orjson |
| from typing import Dict, List, Tuple, Any, Optional |
| from curl_cffi.requests import AsyncSession as curl_AsyncSession |
|
|
| from app.core.config import setting |
| from app.core.logger import logger |
| from app.models.grok_models import Models |
| from app.services.grok.processer import GrokResponseProcessor |
| from app.services.grok.statsig import get_dynamic_headers |
| from app.services.grok.token import token_manager |
| from app.services.grok.upload import ImageUploadManager |
| from app.services.grok.create import PostCreateManager |
| from app.core.exception import GrokApiException |
|
|
|
|
| |
| API_ENDPOINT = "https://grok.com/rest/app-chat/conversations/new" |
| TIMEOUT = 120 |
| BROWSER = "chrome133a" |
| MAX_RETRY = 3 |
| MAX_UPLOADS = 20 |
|
|
|
|
| class GrokClient: |
| """Grok API 客户端""" |
| |
| _upload_sem = None |
|
|
| @staticmethod |
| def _get_upload_semaphore(): |
| """获取上传信号量(动态配置)""" |
| if GrokClient._upload_sem is None: |
| |
| max_concurrency = setting.global_config.get("max_upload_concurrency", MAX_UPLOADS) |
| GrokClient._upload_sem = asyncio.Semaphore(max_concurrency) |
| logger.debug(f"[Client] 初始化上传并发限制: {max_concurrency}") |
| return GrokClient._upload_sem |
|
|
| @staticmethod |
| async def openai_to_grok(request: dict): |
| """转换OpenAI请求为Grok请求""" |
| model = request["model"] |
| content, images = GrokClient._extract_content(request["messages"]) |
| stream = request.get("stream", False) |
| |
| |
| info = Models.get_model_info(model) |
| grok_model, mode = Models.to_grok(model) |
| is_video = info.get("is_video_model", False) |
| |
| |
| if is_video and len(images) > 1: |
| logger.warning(f"[Client] 视频模型仅支持1张图片,已截取前1张") |
| images = images[:1] |
| |
| return await GrokClient._retry(model, content, images, grok_model, mode, is_video, stream) |
|
|
| @staticmethod |
| async def _retry(model: str, content: str, images: List[str], grok_model: str, mode: str, is_video: bool, stream: bool): |
| """重试请求""" |
| last_err = None |
|
|
| for i in range(MAX_RETRY): |
| try: |
| token = await token_manager.get_token(model) |
| img_ids, img_uris = await GrokClient._upload(images, token) |
|
|
| |
| post_id = None |
| if is_video and img_ids and img_uris: |
| post_id = await GrokClient._create_post(img_ids[0], img_uris[0], token) |
|
|
| payload = GrokClient._build_payload(content, grok_model, mode, img_ids, img_uris, is_video, post_id) |
| return await GrokClient._request(payload, token, model, stream, post_id) |
|
|
| except GrokApiException as e: |
| last_err = e |
| |
| if e.error_code not in ["HTTP_ERROR", "NO_AVAILABLE_TOKEN"]: |
| raise |
|
|
| status = e.context.get("status") if e.context else None |
| retry_codes = setting.grok_config.get("retry_status_codes", [401, 429]) |
| |
| if status not in retry_codes: |
| raise |
|
|
| if i < MAX_RETRY - 1: |
| logger.warning(f"[Client] 失败(状态:{status}), 重试 {i+1}/{MAX_RETRY}") |
| await asyncio.sleep(0.5) |
|
|
| raise last_err or GrokApiException("请求失败", "REQUEST_ERROR") |
|
|
| @staticmethod |
| def _extract_content(messages: List[Dict]) -> Tuple[str, List[str]]: |
| """提取文本和图片,保留角色结构""" |
| formatted_messages = [] |
| images = [] |
|
|
| |
| role_map = { |
| "system": "系统", |
| "user": "用户", |
| "assistant": "grok" |
| } |
| |
| for msg in messages: |
| role = msg.get("role", "user") |
| content = msg.get("content", "") |
| role_prefix = role_map.get(role, role) |
| |
| |
| text_parts = [] |
| if isinstance(content, list): |
| for item in content: |
| if item.get("type") == "text": |
| text_parts.append(item.get("text", "")) |
| elif item.get("type") == "image_url": |
| if url := item.get("image_url", {}).get("url"): |
| images.append(url) |
| else: |
| text_parts.append(content) |
| |
| |
| msg_text = "".join(text_parts).strip() |
| if msg_text: |
| formatted_messages.append(f"{role_prefix}:{msg_text}") |
| |
| |
| return "\n".join(formatted_messages), images |
|
|
| @staticmethod |
| async def _upload(urls: List[str], token: str) -> Tuple[List[str], List[str]]: |
| """并发上传图片""" |
| if not urls: |
| return [], [] |
| |
| async def upload_limited(url): |
| async with GrokClient._get_upload_semaphore(): |
| return await ImageUploadManager.upload(url, token) |
| |
| results = await asyncio.gather(*[upload_limited(u) for u in urls], return_exceptions=True) |
| |
| ids, uris = [], [] |
| for url, result in zip(urls, results): |
| if isinstance(result, Exception): |
| logger.warning(f"[Client] 上传失败: {url} - {result}") |
| elif isinstance(result, tuple) and len(result) == 2: |
| fid, furi = result |
| if fid: |
| ids.append(fid) |
| uris.append(furi) |
| |
| return ids, uris |
|
|
| @staticmethod |
| async def _create_post(file_id: str, file_uri: str, token: str) -> Optional[str]: |
| """创建视频会话""" |
| try: |
| result = await PostCreateManager.create(file_id, file_uri, token) |
| if result and result.get("success"): |
| return result.get("post_id") |
| except Exception as e: |
| logger.warning(f"[Client] 创建会话失败: {e}") |
| return None |
|
|
| @staticmethod |
| def _build_payload(content: str, model: str, mode: str, img_ids: List[str], img_uris: List[str], is_video: bool = False, post_id: str = None) -> Dict: |
| """构建请求载荷""" |
| |
| if is_video and img_uris: |
| img_msg = f"https://grok.com/imagine/{post_id}" if post_id else f"https://assets.grok.com/post/{img_uris[0]}" |
| return { |
| "temporary": True, |
| "modelName": "grok-3", |
| "message": f"{img_msg} {content} --mode=custom", |
| "fileAttachments": img_ids, |
| "toolOverrides": {"videoGen": True} |
| } |
| |
| |
| return { |
| "temporary": setting.grok_config.get("temporary", True), |
| "modelName": model, |
| "message": content, |
| "fileAttachments": img_ids, |
| "imageAttachments": [], |
| "disableSearch": False, |
| "enableImageGeneration": True, |
| "returnImageBytes": False, |
| "returnRawGrokInXaiRequest": False, |
| "enableImageStreaming": True, |
| "imageGenerationCount": 2, |
| "forceConcise": False, |
| "toolOverrides": {}, |
| "enableSideBySide": True, |
| "sendFinalMetadata": True, |
| "isReasoning": False, |
| "webpageUrls": [], |
| "disableTextFollowUps": True, |
| "responseMetadata": {"requestModelDetails": {"modelId": model}}, |
| "disableMemory": False, |
| "forceSideBySide": False, |
| "modelMode": mode, |
| "isAsyncChat": False |
| } |
|
|
| @staticmethod |
| async def _request(payload: dict, token: str, model: str, stream: bool, post_id: str = None): |
| """发送请求""" |
| if not token: |
| raise GrokApiException("认证令牌缺失", "NO_AUTH_TOKEN") |
|
|
| |
| retry_codes = setting.grok_config.get("retry_status_codes", [401, 429]) |
| MAX_OUTER_RETRY = 3 |
| |
| for outer_retry in range(MAX_OUTER_RETRY + 1): |
| |
| max_403_retries = 5 |
| retry_403_count = 0 |
| |
| while retry_403_count <= max_403_retries: |
| |
| from app.core.proxy_pool import proxy_pool |
| |
| |
| if retry_403_count > 0 and proxy_pool._enabled: |
| logger.info(f"[Client] 403重试 {retry_403_count}/{max_403_retries},刷新代理...") |
| proxy = await proxy_pool.force_refresh() |
| else: |
| proxy = await setting.get_proxy_async("service") |
| |
| proxies = {"http": proxy, "https": proxy} if proxy else None |
| |
| |
| headers = GrokClient._build_headers(token) |
| if model == "grok-imagine-0.9": |
| file_attachments = payload.get("fileAttachments", []) |
| ref_id = post_id or (file_attachments[0] if file_attachments else "") |
| if ref_id: |
| headers["Referer"] = f"https://grok.com/imagine/{ref_id}" |
|
|
| |
| session = curl_AsyncSession(impersonate=BROWSER) |
| try: |
| response = await session.post( |
| API_ENDPOINT, |
| headers=headers, |
| data=orjson.dumps(payload), |
| timeout=TIMEOUT, |
| stream=True, |
| proxies=proxies |
| ) |
| |
| |
| if response.status_code == 403 and proxy_pool._enabled: |
| retry_403_count += 1 |
| if retry_403_count <= max_403_retries: |
| logger.warning(f"[Client] 遇到403错误,正在重试 ({retry_403_count}/{max_403_retries})...") |
| await session.close() |
| await asyncio.sleep(0.5) |
| continue |
| logger.error(f"[Client] 403错误,已重试{retry_403_count-1}次,放弃") |
| |
| |
| if response.status_code in retry_codes: |
| if outer_retry < MAX_OUTER_RETRY: |
| delay = (outer_retry + 1) * 0.1 |
| logger.warning(f"[Client] 遇到{response.status_code}错误,外层重试 ({outer_retry+1}/{MAX_OUTER_RETRY}),等待{delay}s...") |
| await session.close() |
| await asyncio.sleep(delay) |
| break |
| else: |
| logger.error(f"[Client] {response.status_code}错误,已重试{outer_retry}次,放弃") |
| try: |
| GrokClient._handle_error(response, token) |
| finally: |
| await session.close() |
| |
| |
| if response.status_code != 200: |
| try: |
| GrokClient._handle_error(response, token) |
| finally: |
| await session.close() |
| |
| |
| asyncio.create_task(token_manager.reset_failure(token)) |
| |
| if outer_retry > 0 or retry_403_count > 0: |
| logger.info(f"[Client] 重试成功!") |
| |
| |
| if stream: |
| |
| result = GrokResponseProcessor.process_stream(response, token, session) |
| else: |
| |
| try: |
| result = await GrokResponseProcessor.process_normal(response, token, model) |
| finally: |
| await session.close() |
| |
| asyncio.create_task(GrokClient._update_limits(token, model)) |
| return result |
| |
| except Exception as e: |
| await session.close() |
| if "RequestsError" in str(type(e)): |
| logger.error(f"[Client] 网络错误: {e}") |
| raise GrokApiException(f"网络错误: {e}", "NETWORK_ERROR") from e |
| raise |
| |
| raise GrokApiException("请求失败:已达到最大重试次数", "MAX_RETRIES_EXCEEDED") |
|
|
|
|
| @staticmethod |
| def _build_headers(token: str) -> Dict[str, str]: |
| """构建请求头""" |
| headers = get_dynamic_headers("/rest/app-chat/conversations/new") |
| cf = setting.grok_config.get("cf_clearance", "") |
| headers["Cookie"] = f"{token};{cf}" if cf else token |
| return headers |
|
|
| @staticmethod |
| def _handle_error(response, token: str): |
| """处理错误""" |
| if response.status_code == 403: |
| msg = "您的IP被拦截,请尝试以下方法之一: 1.更换IP 2.使用代理 3.配置CF值" |
| data = {"cf_blocked": True, "status": 403} |
| logger.warning(f"[Client] {msg}") |
| else: |
| try: |
| data = response.json() |
| msg = str(data) |
| except: |
| data = response.text |
| msg = data[:200] if data else "未知错误" |
| |
| asyncio.create_task(token_manager.record_failure(token, response.status_code, msg)) |
| asyncio.create_task(token_manager.apply_cooldown(token, response.status_code)) |
| raise GrokApiException( |
| f"请求失败: {response.status_code} - {msg}", |
| "HTTP_ERROR", |
| {"status": response.status_code, "data": data} |
| ) |
|
|
| @staticmethod |
| async def _update_limits(token: str, model: str): |
| """更新速率限制""" |
| try: |
| await token_manager.check_limits(token, model) |
| except Exception as e: |
| logger.error(f"[Client] 更新限制失败: {e}") |