Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import uuid | |
| import logging | |
| import time | |
| from flask import Flask, request, Response, jsonify, stream_with_context | |
| from flask_cors import CORS | |
| from dotenv import load_dotenv | |
| from curl_cffi import requests | |
| # 配置日志 | |
| class CustomLogger: | |
| def __init__(self): | |
| self.logger = logging.getLogger("grok_api") | |
| self.logger.setLevel(logging.INFO) | |
| # 创建控制台处理器 | |
| console_handler = logging.StreamHandler() | |
| console_handler.setLevel(logging.INFO) | |
| # 设置日志格式 | |
| formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
| console_handler.setFormatter(formatter) | |
| # 添加处理器到日志器 | |
| self.logger.addHandler(console_handler) | |
| def info(self, message, component="App"): | |
| self.logger.info(f"[{component}] {message}") | |
| def error(self, message, component="App"): | |
| self.logger.error(f"[{component}] {message}") | |
| def request_logger(self): | |
| def middleware(): | |
| # 在请求处理之前记录 | |
| self.info(f"{request.method} {request.path}", "Request") | |
| # 继续处理请求 | |
| return None | |
| return middleware | |
| # 加载环境变量 | |
| load_dotenv() | |
| # 全局配置 | |
| CONFIG = { | |
| "MODELS": { | |
| 'grok-2': 'grok-latest', | |
| 'grok-2-imageGen': 'grok-latest', | |
| 'grok-2-search': 'grok-latest', | |
| "grok-3": "grok-3", | |
| "grok-3-search": "grok-3", | |
| "grok-3-imageGen": "grok-3", | |
| "grok-3-deepsearch": "grok-3", | |
| "grok-3-reasoning": "grok-3" | |
| }, | |
| "API": { | |
| "BASE_URL": "https://grok.com", | |
| "API_KEY": os.getenv("API_KEY", "sk-123456"), | |
| "IS_TEMP_CONVERSATION": os.getenv("IS_TEMP_CONVERSATION", "false").lower() == "true", | |
| "PICGO_KEY": os.getenv("PICGO_KEY", None), | |
| "SIGNATURE_COOKIE": "" | |
| }, | |
| "SERVER": { | |
| "PORT": int(os.getenv("PORT", 3000)), | |
| "BODY_LIMIT": "5mb" | |
| }, | |
| "RETRY": { | |
| "MAX_ATTEMPTS": 2 # 重试次数 | |
| }, | |
| "DEFAULT_HEADERS": { | |
| 'Accept': '*/*', | |
| 'Accept-Language': 'zh-CN,zh;q=0.9', | |
| 'Accept-Encoding': 'gzip, deflate, br, zstd', | |
| 'Content-Type': 'text/plain;charset=UTF-8', | |
| 'Connection': 'keep-alive', | |
| 'Origin': 'https://grok.com', | |
| 'Priority': 'u=1, i', | |
| 'Sec-Ch-Ua': '"Chromium";v="130", "Google Chrome";v="130", "Not?A_Brand";v="99"', | |
| 'Sec-Ch-Ua-Mobile': '?0', | |
| 'Sec-Ch-Ua-Platform': '"Windows"', | |
| 'Sec-Fetch-Dest': 'empty', | |
| 'Sec-Fetch-Mode': 'cors', | |
| 'Sec-Fetch-Site': 'same-origin', | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36', | |
| 'Baggage': 'sentry-public_key=b311e0f2690c81f25e2c4cf6d4f7ce1c' | |
| }, | |
| "SIGNATUREARRAY": [], | |
| "SIGNATUREINDEX": 0, | |
| "SHOW_THINKING": os.getenv("SHOW_THINKING", "false").lower() == "true", | |
| "IS_THINKING": False, | |
| "IS_IMG_GEN": False, | |
| "IS_IMG_GEN2": False, | |
| "ISSHOW_SEARCH_RESULTS": os.getenv("ISSHOW_SEARCH_RESULTS", "true").lower() == "true" | |
| } | |
| # 初始化Logger | |
| Logger = CustomLogger() | |
| async def initialization(): | |
| """初始化函数,加载令牌""" | |
| sso_array = os.getenv("SSO", "").split(",") | |
| Logger.info("开始加载令牌", "Server") | |
| for sso in sso_array: | |
| if sso.strip(): # 确保不添加空值 | |
| CONFIG["SIGNATUREARRAY"].append(f"sso-rw={sso};sso={sso}") | |
| Logger.info(f"成功加载令牌: {json.dumps(CONFIG['SIGNATUREARRAY'], indent=2)}", "Server") | |
| Logger.info(f"令牌加载完成,共加载: {len(CONFIG['SIGNATUREARRAY'])}个令牌", "Server") | |
| Logger.info("初始化完成", "Server") | |
| class Utils: | |
| def organize_search_results(search_results): | |
| """格式化搜索结果""" | |
| # 确保传入的是有效的搜索结果对象 | |
| if not search_results or "results" not in search_results: | |
| return '' | |
| results = search_results["results"] | |
| formatted_results = [] | |
| for index, result in enumerate(results): | |
| # 处理可能为空的字段 | |
| title = result.get("title", "未知标题") | |
| url = result.get("url", "#") | |
| preview = result.get("preview", "无预览内容") | |
| formatted_result = f"\r\n<details><summary>资料[{index}]: {title}</summary>\r\n{preview}\r\n\n[Link]({url})\r\n</details>" | |
| formatted_results.append(formatted_result) | |
| return '\n\n'.join(formatted_results) | |
| class GrokApiClient: | |
| def __init__(self, model_id): | |
| if model_id not in CONFIG["MODELS"]: | |
| raise ValueError(f"不支持的模型: {model_id}") | |
| self.model_id = CONFIG["MODELS"][model_id] | |
| def process_message_content(self, content): | |
| """处理消息内容""" | |
| if isinstance(content, str): | |
| return content | |
| return None | |
| def get_image_type(self, base64_string): | |
| """获取图片类型""" | |
| mime_type = 'image/jpeg' | |
| if 'data:image' in base64_string: | |
| import re | |
| matches = re.search(r'data:([a-zA-Z0-9]+\/[a-zA-Z0-9-.+]+);base64,', base64_string) | |
| if matches: | |
| mime_type = matches.group(1) | |
| extension = mime_type.split('/')[1] | |
| file_name = f"image.{extension}" | |
| return { | |
| "mimeType": mime_type, | |
| "fileName": file_name | |
| } | |
| async def upload_base64_image(self, base64_data, url): | |
| """上传Base64图片""" | |
| try: | |
| # 处理 base64 数据 | |
| if 'data:image' in base64_data: | |
| image_buffer = base64_data.split(',')[1] | |
| else: | |
| image_buffer = base64_data | |
| image_info = self.get_image_type(base64_data) | |
| upload_data = { | |
| "rpc": "uploadFile", | |
| "req": { | |
| "fileName": image_info["fileName"], | |
| "fileMimeType": image_info["mimeType"], | |
| "content": image_buffer | |
| } | |
| } | |
| Logger.info("发送图片请求", "Server") | |
| # 使用curl_cffi发送请求 | |
| response = requests.post( | |
| url, | |
| headers={ | |
| **CONFIG["DEFAULT_HEADERS"], | |
| "cookie": CONFIG["API"]["SIGNATURE_COOKIE"] | |
| }, | |
| json=upload_data | |
| ) | |
| if response.status_code != 200: | |
| Logger.error(f"上传图片失败,状态码:{response.status_code}", "Server") | |
| return '' | |
| result = response.json() | |
| Logger.info(f"上传图片成功: {result}", "Server") | |
| return result["fileMetadataId"] | |
| except Exception as error: | |
| Logger.error(str(error), "Server") | |
| return '' | |
| async def prepare_chat_request(self, request_data): | |
| """准备聊天请求""" | |
| todo_messages = request_data["messages"] | |
| if request_data["model"] in ['grok-2-imageGen', 'grok-3-imageGen']: | |
| last_message = todo_messages[-1] | |
| if last_message["role"] != 'user': | |
| raise ValueError('画图模型的最后一条消息必须是用户消息!') | |
| todo_messages = [last_message] | |
| file_attachments = [] | |
| messages = '' | |
| last_role = None | |
| last_content = '' | |
| search = request_data["model"] in ['grok-2-search', 'grok-3-search'] | |
| # 移除<think>标签及其内容和base64图片 | |
| def remove_think_tags(text): | |
| import re | |
| text = re.sub(r'<think>[\s\S]*?<\/think>', '', text).strip() | |
| text = re.sub(r'!\[image\]\(data:.*?base64,.*?\)', '[图片]', text) | |
| return text | |
| async def process_image_url(content): | |
| if content["type"] == 'image_url' and 'data:image' in content["image_url"]["url"]: | |
| image_response = await self.upload_base64_image( | |
| content["image_url"]["url"], | |
| f"{CONFIG['API']['BASE_URL']}/api/rpc" | |
| ) | |
| return image_response | |
| return None | |
| async def process_content(content): | |
| if isinstance(content, list): | |
| text_content = '' | |
| for item in content: | |
| if item["type"] == 'image_url': | |
| text_content += (text_content + '\n' if text_content else '') + "[图片]" | |
| elif item["type"] == 'text': | |
| text_content += (text_content + '\n' if text_content else '') + remove_think_tags(item["text"]) | |
| return text_content | |
| elif isinstance(content, dict) and content is not None: | |
| if content["type"] == 'image_url': | |
| return "[图片]" | |
| elif content["type"] == 'text': | |
| return remove_think_tags(content["text"]) | |
| return remove_think_tags(self.process_message_content(content)) | |
| for current in todo_messages: | |
| role = 'assistant' if current["role"] == 'assistant' else 'user' | |
| is_last_message = current == todo_messages[-1] | |
| # 处理图片附件 | |
| if is_last_message and "content" in current: | |
| if isinstance(current["content"], list): | |
| for item in current["content"]: | |
| if item.get("type") == 'image_url': | |
| processed_image = await process_image_url(item) | |
| if processed_image: | |
| file_attachments.append(processed_image) | |
| elif isinstance(current["content"], dict) and current["content"].get("type") == 'image_url': | |
| processed_image = await process_image_url(current["content"]) | |
| if processed_image: | |
| file_attachments.append(processed_image) | |
| # 处理文本内容 | |
| text_content = await process_content(current.get("content", "")) | |
| if text_content or (is_last_message and file_attachments): | |
| if role == last_role and text_content: | |
| last_content += '\n' + text_content | |
| messages = messages[:messages.rindex(f"{role.upper()}: ")] + f"{role.upper()}: {last_content}\n" | |
| else: | |
| messages += f"{role.upper()}: {text_content or '[图片]'}\n" | |
| last_content = text_content | |
| last_role = role | |
| return { | |
| "temporary": CONFIG["API"]["IS_TEMP_CONVERSATION"], | |
| "modelName": self.model_id, | |
| "message": messages.strip(), | |
| "fileAttachments": file_attachments[:4], | |
| "imageAttachments": [], | |
| "disableSearch": False, | |
| "enableImageGeneration": True, | |
| "returnImageBytes": False, | |
| "returnRawGrokInXaiRequest": False, | |
| "enableImageStreaming": False, | |
| "imageGenerationCount": 1, | |
| "forceConcise": False, | |
| "toolOverrides": { | |
| "imageGen": request_data["model"] in ['grok-2-imageGen', 'grok-3-imageGen'], | |
| "webSearch": search, | |
| "xSearch": search, | |
| "xMediaSearch": search, | |
| "trendsSearch": search, | |
| "xPostAnalyze": search | |
| }, | |
| "enableSideBySide": True, | |
| "isPreset": False, | |
| "sendFinalMetadata": True, | |
| "customInstructions": "", | |
| "deepsearchPreset": "default" if request_data["model"] == 'grok-3-deepsearch' else "", | |
| "isReasoning": request_data["model"] == 'grok-3-reasoning' | |
| } | |
| class MessageProcessor: | |
| def create_chat_response(message, model, is_stream=False): | |
| """创建聊天响应""" | |
| base_response = { | |
| "id": f"chatcmpl-{str(uuid.uuid4())}", | |
| "created": int(time.time()), | |
| "model": model | |
| } | |
| if is_stream: | |
| return { | |
| **base_response, | |
| "object": "chat.completion.chunk", | |
| "choices": [{ | |
| "index": 0, | |
| "delta": { | |
| "content": message | |
| } | |
| }] | |
| } | |
| return { | |
| **base_response, | |
| "object": "chat.completion", | |
| "choices": [{ | |
| "index": 0, | |
| "message": { | |
| "role": "assistant", | |
| "content": message | |
| }, | |
| "finish_reason": "stop" | |
| }], | |
| "usage": None | |
| } | |
| def process_model_response(response, model): | |
| """处理模型响应""" | |
| result = {"token": None, "imageUrl": None} | |
| if CONFIG["IS_IMG_GEN"]: | |
| if response and response.get("cachedImageGenerationResponse") and not CONFIG["IS_IMG_GEN2"]: | |
| result["imageUrl"] = response["cachedImageGenerationResponse"]["imageUrl"] | |
| return result | |
| # 非生图模型的处理 | |
| if model == 'grok-2': | |
| result["token"] = response.get("token") | |
| elif model in ['grok-2-search', 'grok-3-search']: | |
| if response and response.get("webSearchResults") and CONFIG["ISSHOW_SEARCH_RESULTS"]: | |
| result["token"] = f"\r\n<think>{Utils.organize_search_results(response['webSearchResults'])}</think>\r\n" | |
| else: | |
| result["token"] = response.get("token") | |
| elif model == 'grok-3': | |
| result["token"] = response.get("token") | |
| elif model == 'grok-3-deepsearch': | |
| if response and response.get("messageTag") == "final": | |
| result["token"] = response.get("token") | |
| elif model == 'grok-3-reasoning': | |
| if response and response.get("isThinking") and not CONFIG["SHOW_THINKING"]: | |
| return result | |
| if response and response.get("isThinking") and not CONFIG["IS_THINKING"]: | |
| result["token"] = "<think>" + response.get("token", "") | |
| CONFIG["IS_THINKING"] = True | |
| elif response and not response.get("isThinking") and CONFIG["IS_THINKING"]: | |
| result["token"] = "</think>" + response.get("token", "") | |
| CONFIG["IS_THINKING"] = False | |
| else: | |
| result["token"] = response.get("token") | |
| return result | |
| async def handle_stream_response(response, model, flask_response): | |
| """处理流式响应""" | |
| try: | |
| stream = response.iter_lines() | |
| CONFIG["IS_THINKING"] = False | |
| CONFIG["IS_IMG_GEN"] = False | |
| CONFIG["IS_IMG_GEN2"] = False | |
| Logger.info("开始处理流式响应", "Server") | |
| def generate(): | |
| for line in stream: | |
| if not line: | |
| continue | |
| line_json = json.loads(line.decode("utf-8").strip()) | |
| try: | |
| if line_json and line_json.get("error"): | |
| raise ValueError("RateLimitError") | |
| response_data = line_json.get("result", {}).get("response") | |
| if not response_data: | |
| continue | |
| if response_data.get("doImgGen") or response_data.get("imageAttachmentInfo"): | |
| CONFIG["IS_IMG_GEN"] = True | |
| result = process_model_response(response_data, model) | |
| if result["token"]: | |
| chat_response = MessageProcessor.create_chat_response(result["token"], model, True) | |
| yield f"data: {json.dumps(chat_response)}\n\n" | |
| if result["imageUrl"]: | |
| CONFIG["IS_IMG_GEN2"] = True | |
| data_image = handle_image_response(result["imageUrl"]) | |
| image_response = MessageProcessor.create_chat_response(data_image, model, True) | |
| yield f"data: {json.dumps(image_response)}\n\n" | |
| except Exception as error: | |
| Logger.error(str(error), "Server") | |
| continue | |
| yield "data: [DONE]\n\n" | |
| return Response(generate(), mimetype="text/event-stream") | |
| except Exception as error: | |
| Logger.error(str(error), "Server") | |
| raise error | |
| async def handle_non_stream_response(response, model): | |
| """处理非流式响应""" | |
| try: | |
| stream = response.iter_lines() | |
| full_response = "" | |
| CONFIG["IS_THINKING"] = False | |
| CONFIG["IS_IMG_GEN"] = False | |
| CONFIG["IS_IMG_GEN2"] = False | |
| Logger.info("开始处理非流式响应", "Server") | |
| for line in stream: | |
| if not line: | |
| continue | |
| try: | |
| line_json = json.loads(line.decode("utf-8").strip()) | |
| if line_json and line_json.get("error"): | |
| raise ValueError("RateLimitError") | |
| response_data = line_json.get("result", {}).get("response") | |
| if not response_data: | |
| continue | |
| if response_data.get("doImgGen") or response_data.get("imageAttachmentInfo"): | |
| CONFIG["IS_IMG_GEN"] = True | |
| result = process_model_response(response_data, model) | |
| if result["token"]: | |
| full_response += result["token"] | |
| if result["imageUrl"]: | |
| CONFIG["IS_IMG_GEN2"] = True | |
| data_image = await handle_image_response(result["imageUrl"]) | |
| return MessageProcessor.create_chat_response(data_image, model) | |
| except Exception as error: | |
| Logger.error(str(error), "Server") | |
| continue | |
| if not CONFIG["IS_IMG_GEN2"]: | |
| return MessageProcessor.create_chat_response(full_response, model) | |
| return None | |
| except Exception as error: | |
| Logger.error(str(error), "Server") | |
| raise error | |
| async def handle_image_response(image_url): | |
| """处理图片响应""" | |
| MAX_RETRIES = 2 | |
| retry_count = 0 | |
| while retry_count < MAX_RETRIES: | |
| try: | |
| # 使用curl_cffi获取图片 | |
| image_response = requests.get( | |
| f"https://assets.grok.com/{image_url}", | |
| headers={ | |
| **CONFIG["DEFAULT_HEADERS"], | |
| "Cookie": CONFIG["API"]["SIGNATURE_COOKIE"] | |
| } | |
| ) | |
| if image_response.status_code == 200: | |
| break | |
| retry_count += 1 | |
| if retry_count == MAX_RETRIES: | |
| raise ValueError(f"上游服务请求失败! status: {image_response.status_code}") | |
| time.sleep(1 * retry_count) # 简单的退避策略 | |
| except Exception as error: | |
| Logger.error(str(error), "Server") | |
| retry_count += 1 | |
| if retry_count == MAX_RETRIES: | |
| raise error | |
| time.sleep(1 * retry_count) | |
| # 图片处理逻辑 | |
| image_buffer = image_response.content | |
| if CONFIG["API"]["PICGO_KEY"]: | |
| # 使用curl_cffi上传到PICGO | |
| import io | |
| from curl_cffi.requests import AsyncSession | |
| files = { | |
| "source": ("image.jpg", image_buffer, "image/jpeg") | |
| } | |
| headers = { | |
| "X-API-Key": CONFIG["API"]["PICGO_KEY"] | |
| } | |
| try: | |
| pic_response = requests.post( | |
| "https://www.picgo.net/api/1/upload", | |
| headers=headers, | |
| files=files | |
| ) | |
| if pic_response.status_code != 200: | |
| return "生图失败,请查看PICGO图床密钥是否设置正确" | |
| Logger.info("生图成功", "Server") | |
| result = pic_response.json() | |
| return f"" | |
| except Exception as e: | |
| Logger.error(f"上传PICGO失败: {str(e)}", "Server") | |
| return "生图上传失败,请检查网络连接和PICGO配置" | |
| # 如果没有PICGO_KEY,可以返回一个适当的消息或其他逻辑 | |
| return "生图成功,但未配置PICGO图床,无法显示图片" | |
| # 创建Flask应用 | |
| app = Flask(__name__) | |
| CORS(app, resources={r"/*": {"origins": "*", "methods": ["GET", "POST", "OPTIONS"], "allow_headers": ["Content-Type", "Authorization"]}}) | |
| def log_request_info(): | |
| """请求日志中间件""" | |
| Logger.info(f"{request.method} {request.path}", "Request") | |
| async def get_models(): | |
| """获取模型列表""" | |
| return jsonify({ | |
| "object": "list", | |
| "data": [ | |
| { | |
| "id": model, | |
| "object": "model", | |
| "created": int(time.time()), | |
| "owned_by": "grok" | |
| } for model in CONFIG["MODELS"].keys() | |
| ] | |
| }) | |
| async def chat_completions(): | |
| """处理聊天完成请求""" | |
| try: | |
| # 验证API密钥 | |
| auth_token = request.headers.get('Authorization', '').replace('Bearer ', '') | |
| if auth_token != CONFIG["API"]["API_KEY"]: | |
| return jsonify({"error": "Unauthorized"}), 401 | |
| # 解析请求 | |
| data = request.get_json() | |
| model = data.get("model") | |
| stream = data.get("stream", False) | |
| # 创建Grok客户端 | |
| grok_client = GrokApiClient(model) | |
| request_payload = await grok_client.prepare_chat_request(data) | |
| Logger.info(json.dumps(request_payload, indent=2), "Server") | |
| # 重试逻辑 | |
| retry_count = 0 | |
| while retry_count < CONFIG["RETRY"]["MAX_ATTEMPTS"]: | |
| retry_count += 1 | |
| Logger.info("开始请求", "Server") | |
| # 设置cookie | |
| CONFIG["API"]["SIGNATURE_COOKIE"] = CONFIG["SIGNATUREARRAY"][CONFIG["SIGNATUREINDEX"]] | |
| # 发送请求 | |
| response = requests.post( | |
| f"{CONFIG['API']['BASE_URL']}/rest/app-chat/conversations/new", | |
| headers={ | |
| "Accept": "text/event-stream", | |
| "Baggage": "sentry-public_key=b311e0f2690c81f25e2c4cf6d4f7ce1c", | |
| "Content-Type": "text/plain;charset=UTF-8", | |
| "Connection": "keep-alive", | |
| "Cookie": CONFIG["API"]["SIGNATURE_COOKIE"] | |
| }, | |
| data=json.dumps(request_payload), | |
| impersonate="chrome110", | |
| stream=True | |
| ) | |
| if response.status_code == 200: | |
| Logger.info("请求成功", "Server") | |
| if stream: | |
| # 处理流式响应 | |
| stream_response = await handle_stream_response(response, model, Response()) | |
| CONFIG["SIGNATUREINDEX"] = (CONFIG["SIGNATUREINDEX"] + 1) % len(CONFIG["SIGNATUREARRAY"]) | |
| return stream_response | |
| else: | |
| # 处理非流式响应 | |
| non_stream_result = await handle_non_stream_response(response, model) | |
| CONFIG["SIGNATUREINDEX"] = (CONFIG["SIGNATUREINDEX"] + 1) % len(CONFIG["SIGNATUREARRAY"]) | |
| if non_stream_result: | |
| return jsonify(non_stream_result) | |
| # 轮换token | |
| CONFIG["SIGNATUREINDEX"] = (CONFIG["SIGNATUREINDEX"] + 1) % len(CONFIG["SIGNATUREARRAY"]) | |
| # 如果所有重试都失败 | |
| return jsonify({ | |
| "error": { | |
| "message": "请求失败,所有令牌均已尝试", | |
| "type": "server_error" | |
| } | |
| }), 500 | |
| except Exception as error: | |
| Logger.error(str(error), "ChatAPI") | |
| return jsonify({ | |
| "error": { | |
| "message": str(error), | |
| "type": "server_error" | |
| } | |
| }), 500 | |
| def catch_all(path): | |
| """处理所有其他路由""" | |
| return "api运行正常", 200 | |
| # 主程序入口 | |
| if __name__ == "__main__": | |
| # 初始化应用 | |
| import asyncio | |
| asyncio.run(initialization()) | |
| # 启动服务器 | |
| Logger.info(f"服务器已启动,监听端口: {CONFIG['SERVER']['PORT']}", "Server") | |
| from waitress import serve | |
| serve(app, host="0.0.0.0", port=CONFIG["SERVER"]["PORT"]) |