ds2api-browser / deepseek_api.py
nacho
fix: 全面代码审查修复
466f6e2
raw
history blame
8.6 kB
import asyncio
import hashlib
import json
import logging
import random
import time
import uuid
from typing import AsyncGenerator, Optional, Union
import httpx
logger = logging.getLogger(__name__)
class DeepSeekAPI:
LOGIN_URL = "https://chat.deepseek.com/api/v0/users/login"
CREATE_SESSION_URL = "https://chat.deepseek.com/api/v0/chat_session/create"
CREATE_POW_URL = "https://chat.deepseek.com/api/v0/chat/create_pow_challenge"
COMPLETION_URL = "https://chat.deepseek.com/api/v0/chat/completion"
DELETE_SESSION_URL = "https://chat.deepseek.com/api/v0/chat_session/delete"
def __init__(self, email: str, password: str, proxy: Optional[str] = None):
self.email = email
self.password = password
self.proxy = proxy
self._token: Optional[str] = None
self._device_id = str(uuid.uuid4())
self._client: Optional[httpx.AsyncClient] = None
async def _get_client(self) -> httpx.AsyncClient:
if self._client is None or self._client.is_closed:
self._client = httpx.AsyncClient(
proxy=self.proxy,
timeout=60.0,
follow_redirects=True,
)
return self._client
async def login(self) -> str:
client = await self._get_client()
payload = {
"email": self.email,
"password": self.password,
"device_id": self._device_id,
"os": "android",
}
headers = self._base_headers()
resp = await client.post(self.LOGIN_URL, json=payload, headers=headers, timeout=30)
data = resp.json()
code = data.get("code", -1)
if code != 0:
raise Exception(f"Login failed: {data.get('msg', 'Unknown')}")
biz_data = data.get("data", {})
biz_code = biz_data.get("biz_code", -1)
if biz_code != 0:
raise Exception(f"Login failed: {biz_data.get('biz_msg', 'Unknown')}")
user = biz_data.get("biz_data", {}).get("user", {})
self._token = user.get("token", "")
if not self._token:
raise Exception("No token received")
return self._token
async def create_session(self) -> str:
if not self._token:
await self.login()
client = await self._get_client()
headers = self._auth_headers()
resp = await client.post(
self.CREATE_SESSION_URL,
json={"agent": "chat"},
headers=headers,
timeout=30,
)
data = resp.json()
if data.get("code") != 0:
raise Exception(f"Create session failed: {data.get('msg')}")
biz_data = data.get("data", {}).get("biz_data", {})
if "chat_session" in biz_data:
session_id = biz_data["chat_session"].get("id", "")
else:
session_id = biz_data.get("id", "")
if not session_id:
raise Exception("No session ID received")
return session_id
async def get_pow(self, target_path: str = "/api/v0/chat/completion") -> dict:
client = await self._get_client()
headers = self._auth_headers()
resp = await client.post(
self.CREATE_POW_URL,
json={"target_path": target_path},
headers=headers,
timeout=30,
)
data = resp.json()
if data.get("code") != 0:
raise Exception(f"Get PoW failed: {data.get('msg')}")
return data.get("data", {}).get("biz_data", {}).get("challenge", {})
def _compute_pow(self, challenge: dict) -> str:
prefix = challenge.get("prefix", "")
target = challenge.get("target", "")
expire_at = challenge.get("expire_at", 0)
nonce = 0
while nonce < 10000000:
test = f"{prefix}{nonce}"
hash_val = hashlib.sha256(test.encode()).hexdigest()
# Compare hash against target lexicographically — this handles
# both all-zero targets and mixed targets like "000abc".
if hash_val <= target:
break
nonce += 1
return json.dumps({
"prefix": prefix,
"nonce": nonce,
"expire_at": expire_at,
"target": target,
})
async def send_message(
self,
prompt: str,
model: str = "deepseek-chat",
stream: bool = False,
timeout: int = 120,
) -> Union[str, AsyncGenerator[str, None]]:
"""Send a message and return a response.
Returns:
str when stream=False, AsyncGenerator[str, None] when stream=True.
"""
if not self._token:
await self.login()
session_id = await self.create_session()
pow_challenge = await self.get_pow()
pow_header = self._compute_pow(pow_challenge)
client = await self._get_client()
headers = self._auth_headers()
headers["x-ds-pow-response"] = pow_header
payload = {
"chat_session_id": session_id,
"parent_message_id": None,
"prompt": prompt,
"ref_file_ids": [],
"thinking_enabled": True,
"search_enabled": "search" in model.lower(),
}
if stream:
return self._stream_completion(client, headers, payload, timeout)
else:
return await self._sync_completion(client, headers, payload, timeout)
async def _sync_completion(
self,
client: httpx.AsyncClient,
headers: dict,
payload: dict,
timeout: int,
) -> str:
resp = await client.post(
self.COMPLETION_URL,
json=payload,
headers=headers,
timeout=timeout,
)
# 处理 SSE 响应
full_text = ""
for line in resp.text.split("\n"):
if line.startswith("data:"):
data_str = line[5:].strip()
if data_str == "[DONE]":
continue
try:
data = json.loads(data_str)
msg = data.get("message", {})
content = msg.get("content", "")
if content:
full_text += content
except json.JSONDecodeError:
continue
return full_text
async def _stream_completion(
self,
client: httpx.AsyncClient,
headers: dict,
payload: dict,
timeout: int,
) -> AsyncGenerator[str, None]:
async with client.stream(
"POST",
self.COMPLETION_URL,
json=payload,
headers=headers,
timeout=timeout,
) as resp:
async for line in resp.aiter_lines():
if line.startswith("data:"):
data_str = line[5:].strip()
if data_str == "[DONE]":
return
try:
data = json.loads(data_str)
msg = data.get("message", {})
content = msg.get("content", "")
if content:
yield content
except json.JSONDecodeError:
continue
async def delete_session(self, session_id: str):
if not self._token:
return
client = await self._get_client()
headers = self._auth_headers()
await client.post(
self.DELETE_SESSION_URL,
json={"chat_session_id": session_id},
headers=headers,
timeout=30,
)
def _base_headers(self) -> dict:
return {
"Content-Type": "application/json",
"User-Agent": "DeepSeek/3.2.1 Android/35",
"x-client-platform": "android",
"x-client-version": "3.2.1",
"x-client-locale": "zh_CN",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
}
def _auth_headers(self) -> dict:
headers = self._base_headers()
if self._token:
headers["authorization"] = f"Bearer {self._token}"
return headers
async def close(self):
if self._client:
await self._client.aclose()