Spaces:
Sleeping
Sleeping
| import json | |
| import os | |
| import shutil | |
| import subprocess | |
| import tempfile | |
| import time | |
| from typing import Optional | |
| from curl_cffi import AsyncSession, Response | |
| from fastapi import FastAPI, Depends, HTTPException | |
| from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
| from loguru import logger | |
| from starlette.middleware.cors import CORSMiddleware | |
| from app.config import SCRIPT_URL, FP, API_KEY, MODELS, SYSTEM_PROMPT_INJECT, TIMEOUT | |
| from app.errors import CursorWebError | |
| from app.models import ChatCompletionRequest, Message, ModelsResponse, Model, Usage | |
| from app.utils import error_wrapper, to_async, generate_random_string, non_stream_chat_completion, \ | |
| stream_chat_completion, safe_stream_wrapper | |
| main_code = open('./jscode/main.js', 'r', encoding='utf-8').read() | |
| env_code = open('./jscode/env.js', 'r', encoding='utf-8').read() | |
| app = FastAPI() | |
| security = HTTPBearer() | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| async def chat_completions( | |
| request: ChatCompletionRequest, | |
| credentials: HTTPAuthorizationCredentials = Depends(security), | |
| ): | |
| """处理聊天完成请求""" | |
| if credentials.credentials != API_KEY: | |
| raise HTTPException(401, 'api key 错误') | |
| chat_generator = cursor_chat(request) | |
| # async for c in chat_generator: | |
| # logger.debug(c) | |
| if request.stream: | |
| return await error_wrapper(safe_stream_wrapper, stream_chat_completion, request, chat_generator) | |
| else: | |
| return await error_wrapper(non_stream_chat_completion, request, chat_generator) | |
| async def list_models(credentials: HTTPAuthorizationCredentials = Depends(security)): | |
| models = MODELS.split(',') | |
| model_list = [] | |
| for model_id in models: | |
| model_list.append( | |
| Model( | |
| id=model_id, # 使用model name作为对外的id | |
| object="model", | |
| created=int(time.time()), | |
| owned_by='', | |
| ) | |
| ) | |
| return ModelsResponse(object="list", data=model_list) | |
| def to_cursor_messages(list_openai_message: list[Message]): | |
| if list_openai_message is None: | |
| list_openai_message = [] | |
| result = [] | |
| if len(list_openai_message) > 0: | |
| if list_openai_message[0].role == 'system': | |
| if isinstance(list_openai_message[0].content, str): | |
| list_openai_message[0].content += f'\n{SYSTEM_PROMPT_INJECT}' | |
| else: | |
| list_openai_message.insert(0, Message(role='system', content=f'\n{SYSTEM_PROMPT_INJECT}', | |
| tool_call_id=None, | |
| tool_calls=None)) | |
| for m in list_openai_message: | |
| if not m: | |
| continue | |
| text = '' | |
| if isinstance(m.content, str): | |
| text = m.content | |
| else: | |
| for content in m.content: | |
| if not content.text: | |
| continue | |
| text = text + content.text | |
| message = { | |
| 'role': m.role, | |
| 'parts': [{ | |
| 'type': 'text', | |
| 'text': text | |
| }] | |
| } | |
| result.append(message) | |
| return result | |
| def parse_sse_line(line: str) -> Optional[str]: | |
| """解析SSE数据行""" | |
| line = line.strip() | |
| if line.startswith("data: "): | |
| return line[6:] # 去掉 'data: ' 前缀 | |
| return None | |
| async def cursor_chat(request: ChatCompletionRequest): | |
| json_data = { | |
| "context": [ | |
| ], | |
| "model": request.model, | |
| "id": generate_random_string(16), | |
| "messages": to_cursor_messages(request.messages), | |
| "trigger": "submit-message" | |
| } | |
| async with AsyncSession(impersonate='chrome', timeout=TIMEOUT) as session: | |
| x_is_human = await get_x_is_human(session) | |
| logger.debug(x_is_human) | |
| headers = { | |
| 'User-Agent': FP.get("userAgent"), | |
| # 'Accept-Encoding': 'gzip, deflate, br, zstd', | |
| 'Content-Type': 'application/json', | |
| 'sec-ch-ua-platform': '"Windows"', | |
| 'x-path': '/api/chat', | |
| 'sec-ch-ua': '"Chromium";v="140", "Not=A?Brand";v="24", "Google Chrome";v="140"', | |
| 'x-method': 'POST', | |
| 'sec-ch-ua-bitness': '"64"', | |
| 'sec-ch-ua-mobile': '?0', | |
| 'sec-ch-ua-arch': '"x86"', | |
| 'x-is-human': x_is_human, | |
| 'sec-ch-ua-platform-version': '"19.0.0"', | |
| 'origin': 'https://cursor.com', | |
| 'sec-fetch-site': 'same-origin', | |
| 'sec-fetch-mode': 'cors', | |
| 'sec-fetch-dest': 'empty', | |
| 'referer': 'https://cursor.com/en-US/learn/how-ai-models-work', | |
| 'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8', | |
| 'priority': 'u=1, i', | |
| } | |
| # logger.debug(json_data) | |
| async with session.stream("POST", 'https://cursor.com/api/chat', headers=headers, json=json_data, | |
| impersonate='chrome') as response: | |
| response: Response | |
| if response.status_code != 200: | |
| text = await response.atext() | |
| if 'Attention Required! | Cloudflare' in text: | |
| text = 'Cloudflare 403' | |
| raise CursorWebError(response.status_code, text) | |
| async for line in response.aiter_lines(): | |
| line = line.decode("utf-8") | |
| data = parse_sse_line(line) | |
| if not data: | |
| continue | |
| if data and data.strip(): | |
| try: | |
| event_data = json.loads(data) | |
| if event_data.get('type') == 'error': | |
| raise CursorWebError(response.status_code, event_data.get('errorText', 'errorText为空')) | |
| if event_data.get('type') == 'finish': | |
| usage = event_data.get('messageMetadata', {}).get('usage') | |
| if not usage: | |
| continue | |
| yield Usage(prompt_tokens=usage.get('inputTokens'), | |
| completion_tokens=usage.get('outputTokens'), | |
| total_tokens=usage.get('totalTokens')) | |
| return | |
| delta = event_data.get('delta') | |
| # logger.debug(delta) | |
| if not delta: | |
| continue | |
| yield delta | |
| except json.JSONDecodeError: | |
| continue | |
| async def get_x_is_human(session: AsyncSession): | |
| headers = { | |
| 'User-Agent': FP.get("userAgent"), | |
| # 'Accept-Encoding': 'gzip, deflate, br, zstd', | |
| 'sec-ch-ua-arch': '"x86"', | |
| 'sec-ch-ua-platform': '"Windows"', | |
| 'sec-ch-ua': '"Chromium";v="140", "Not=A?Brand";v="24", "Google Chrome";v="140"', | |
| 'sec-ch-ua-bitness': '"64"', | |
| 'sec-ch-ua-mobile': '?0', | |
| 'sec-ch-ua-platform-version': '"19.0.0"', | |
| 'sec-fetch-site': 'same-origin', | |
| 'sec-fetch-mode': 'no-cors', | |
| 'sec-fetch-dest': 'script', | |
| 'referer': 'https://cursor.com/en-US/learn/how-ai-models-work', | |
| 'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8', | |
| } | |
| response = await session.get(SCRIPT_URL, | |
| headers=headers, | |
| impersonate='chrome') | |
| cursor_js = response.text | |
| # 替换指纹 | |
| main = (main_code.replace("$$currentScriptSrc$$", SCRIPT_URL) | |
| .replace("$$UNMASKED_VENDOR_WEBGL$$", FP.get("UNMASKED_VENDOR_WEBGL")) | |
| .replace("$$UNMASKED_RENDERER_WEBGL$$", FP.get("UNMASKED_RENDERER_WEBGL")) | |
| .replace("$$userAgent$$", FP.get("userAgent"))) | |
| # 替换代码 | |
| main = main.replace('$$env_jscode$$', env_code) | |
| main = main.replace("$$cursor_jscode$$", cursor_js) | |
| return await runjs(main) | |
| def runjs(jscode: str) -> str: | |
| """ | |
| 执行 JavaScript 代码并返回标准输出内容。 | |
| Args: | |
| jscode: 要执行的 JavaScript 代码字符串 | |
| Returns: | |
| Node.js 程序的标准输出内容 | |
| Raises: | |
| FileNotFoundError: Node.js 未安装或不在系统 PATH 中 | |
| subprocess.CalledProcessError: Node.js 程序执行失败,异常信息包含 stdout 和 stderr | |
| """ | |
| temp_dir = tempfile.mkdtemp() | |
| try: | |
| js_file_path = os.path.join(temp_dir, "script.js") | |
| with open(js_file_path, "w", encoding="utf-8") as f: | |
| f.write(jscode) | |
| result = subprocess.run( | |
| ['node', js_file_path], | |
| capture_output=True, | |
| text=True, | |
| encoding="utf-8" | |
| ) | |
| if result.returncode != 0: | |
| error_msg = f"Node.js 执行失败 (退出码: {result.returncode})\nSTDOUT:\n{result.stdout}\nSTDERR:\n{result.stderr}" | |
| logger.error(error_msg) | |
| raise subprocess.CalledProcessError(result.returncode, ['node', js_file_path], result.stdout, result.stderr) | |
| return result.stdout.strip() | |
| finally: | |
| shutil.rmtree(temp_dir) | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run( | |
| "main:app", | |
| host="0.0.0.0", | |
| port=8000, | |
| reload=False, | |
| log_level="info", | |
| ) | |