cursor2api / main.py
kyyyx's picture
upload
7ce5341 verified
import json
import os
import shutil
import subprocess
import tempfile
import time
from typing import Optional
from curl_cffi import AsyncSession, Response
from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from loguru import logger
from starlette.middleware.cors import CORSMiddleware
from app.config import SCRIPT_URL, FP, API_KEY, MODELS, SYSTEM_PROMPT_INJECT, TIMEOUT
from app.errors import CursorWebError
from app.models import ChatCompletionRequest, Message, ModelsResponse, Model, Usage
from app.utils import error_wrapper, to_async, generate_random_string, non_stream_chat_completion, \
stream_chat_completion, safe_stream_wrapper
main_code = open('./jscode/main.js', 'r', encoding='utf-8').read()
env_code = open('./jscode/env.js', 'r', encoding='utf-8').read()
app = FastAPI()
security = HTTPBearer()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.post("/v1/chat/completions")
async def chat_completions(
request: ChatCompletionRequest,
credentials: HTTPAuthorizationCredentials = Depends(security),
):
"""处理聊天完成请求"""
if credentials.credentials != API_KEY:
raise HTTPException(401, 'api key 错误')
chat_generator = cursor_chat(request)
# async for c in chat_generator:
# logger.debug(c)
if request.stream:
return await error_wrapper(safe_stream_wrapper, stream_chat_completion, request, chat_generator)
else:
return await error_wrapper(non_stream_chat_completion, request, chat_generator)
@app.get("/v1/models")
async def list_models(credentials: HTTPAuthorizationCredentials = Depends(security)):
models = MODELS.split(',')
model_list = []
for model_id in models:
model_list.append(
Model(
id=model_id, # 使用model name作为对外的id
object="model",
created=int(time.time()),
owned_by='',
)
)
return ModelsResponse(object="list", data=model_list)
def to_cursor_messages(list_openai_message: list[Message]):
if list_openai_message is None:
list_openai_message = []
result = []
if len(list_openai_message) > 0:
if list_openai_message[0].role == 'system':
if isinstance(list_openai_message[0].content, str):
list_openai_message[0].content += f'\n{SYSTEM_PROMPT_INJECT}'
else:
list_openai_message.insert(0, Message(role='system', content=f'\n{SYSTEM_PROMPT_INJECT}',
tool_call_id=None,
tool_calls=None))
for m in list_openai_message:
if not m:
continue
text = ''
if isinstance(m.content, str):
text = m.content
else:
for content in m.content:
if not content.text:
continue
text = text + content.text
message = {
'role': m.role,
'parts': [{
'type': 'text',
'text': text
}]
}
result.append(message)
return result
def parse_sse_line(line: str) -> Optional[str]:
"""解析SSE数据行"""
line = line.strip()
if line.startswith("data: "):
return line[6:] # 去掉 'data: ' 前缀
return None
async def cursor_chat(request: ChatCompletionRequest):
json_data = {
"context": [
],
"model": request.model,
"id": generate_random_string(16),
"messages": to_cursor_messages(request.messages),
"trigger": "submit-message"
}
async with AsyncSession(impersonate='chrome', timeout=TIMEOUT) as session:
x_is_human = await get_x_is_human(session)
logger.debug(x_is_human)
headers = {
'User-Agent': FP.get("userAgent"),
# 'Accept-Encoding': 'gzip, deflate, br, zstd',
'Content-Type': 'application/json',
'sec-ch-ua-platform': '"Windows"',
'x-path': '/api/chat',
'sec-ch-ua': '"Chromium";v="140", "Not=A?Brand";v="24", "Google Chrome";v="140"',
'x-method': 'POST',
'sec-ch-ua-bitness': '"64"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-arch': '"x86"',
'x-is-human': x_is_human,
'sec-ch-ua-platform-version': '"19.0.0"',
'origin': 'https://cursor.com',
'sec-fetch-site': 'same-origin',
'sec-fetch-mode': 'cors',
'sec-fetch-dest': 'empty',
'referer': 'https://cursor.com/en-US/learn/how-ai-models-work',
'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8',
'priority': 'u=1, i',
}
# logger.debug(json_data)
async with session.stream("POST", 'https://cursor.com/api/chat', headers=headers, json=json_data,
impersonate='chrome') as response:
response: Response
if response.status_code != 200:
text = await response.atext()
if 'Attention Required! | Cloudflare' in text:
text = 'Cloudflare 403'
raise CursorWebError(response.status_code, text)
async for line in response.aiter_lines():
line = line.decode("utf-8")
data = parse_sse_line(line)
if not data:
continue
if data and data.strip():
try:
event_data = json.loads(data)
if event_data.get('type') == 'error':
raise CursorWebError(response.status_code, event_data.get('errorText', 'errorText为空'))
if event_data.get('type') == 'finish':
usage = event_data.get('messageMetadata', {}).get('usage')
if not usage:
continue
yield Usage(prompt_tokens=usage.get('inputTokens'),
completion_tokens=usage.get('outputTokens'),
total_tokens=usage.get('totalTokens'))
return
delta = event_data.get('delta')
# logger.debug(delta)
if not delta:
continue
yield delta
except json.JSONDecodeError:
continue
async def get_x_is_human(session: AsyncSession):
headers = {
'User-Agent': FP.get("userAgent"),
# 'Accept-Encoding': 'gzip, deflate, br, zstd',
'sec-ch-ua-arch': '"x86"',
'sec-ch-ua-platform': '"Windows"',
'sec-ch-ua': '"Chromium";v="140", "Not=A?Brand";v="24", "Google Chrome";v="140"',
'sec-ch-ua-bitness': '"64"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform-version': '"19.0.0"',
'sec-fetch-site': 'same-origin',
'sec-fetch-mode': 'no-cors',
'sec-fetch-dest': 'script',
'referer': 'https://cursor.com/en-US/learn/how-ai-models-work',
'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8',
}
response = await session.get(SCRIPT_URL,
headers=headers,
impersonate='chrome')
cursor_js = response.text
# 替换指纹
main = (main_code.replace("$$currentScriptSrc$$", SCRIPT_URL)
.replace("$$UNMASKED_VENDOR_WEBGL$$", FP.get("UNMASKED_VENDOR_WEBGL"))
.replace("$$UNMASKED_RENDERER_WEBGL$$", FP.get("UNMASKED_RENDERER_WEBGL"))
.replace("$$userAgent$$", FP.get("userAgent")))
# 替换代码
main = main.replace('$$env_jscode$$', env_code)
main = main.replace("$$cursor_jscode$$", cursor_js)
return await runjs(main)
@to_async
def runjs(jscode: str) -> str:
"""
执行 JavaScript 代码并返回标准输出内容。
Args:
jscode: 要执行的 JavaScript 代码字符串
Returns:
Node.js 程序的标准输出内容
Raises:
FileNotFoundError: Node.js 未安装或不在系统 PATH 中
subprocess.CalledProcessError: Node.js 程序执行失败,异常信息包含 stdout 和 stderr
"""
temp_dir = tempfile.mkdtemp()
try:
js_file_path = os.path.join(temp_dir, "script.js")
with open(js_file_path, "w", encoding="utf-8") as f:
f.write(jscode)
result = subprocess.run(
['node', js_file_path],
capture_output=True,
text=True,
encoding="utf-8"
)
if result.returncode != 0:
error_msg = f"Node.js 执行失败 (退出码: {result.returncode})\nSTDOUT:\n{result.stdout}\nSTDERR:\n{result.stderr}"
logger.error(error_msg)
raise subprocess.CalledProcessError(result.returncode, ['node', js_file_path], result.stdout, result.stderr)
return result.stdout.strip()
finally:
shutil.rmtree(temp_dir)
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"main:app",
host="0.0.0.0",
port=8000,
reload=False,
log_level="info",
)