Spaces:
Running
Running
| import asyncio | |
| import hashlib | |
| import json | |
| import logging | |
| import random | |
| import time | |
| import uuid | |
| from typing import AsyncGenerator, Optional, Union | |
| import httpx | |
| logger = logging.getLogger(__name__) | |
| class DeepSeekAPI: | |
| LOGIN_URL = "https://chat.deepseek.com/api/v0/users/login" | |
| CREATE_SESSION_URL = "https://chat.deepseek.com/api/v0/chat_session/create" | |
| CREATE_POW_URL = "https://chat.deepseek.com/api/v0/chat/create_pow_challenge" | |
| COMPLETION_URL = "https://chat.deepseek.com/api/v0/chat/completion" | |
| DELETE_SESSION_URL = "https://chat.deepseek.com/api/v0/chat_session/delete" | |
| def __init__(self, email: str, password: str, proxy: Optional[str] = None): | |
| self.email = email | |
| self.password = password | |
| self.proxy = proxy | |
| self._token: Optional[str] = None | |
| self._device_id = str(uuid.uuid4()) | |
| self._client: Optional[httpx.AsyncClient] = None | |
| async def _get_client(self) -> httpx.AsyncClient: | |
| if self._client is None or self._client.is_closed: | |
| self._client = httpx.AsyncClient( | |
| proxy=self.proxy, | |
| timeout=60.0, | |
| follow_redirects=True, | |
| ) | |
| return self._client | |
| async def login(self) -> str: | |
| client = await self._get_client() | |
| payload = { | |
| "email": self.email, | |
| "password": self.password, | |
| "device_id": self._device_id, | |
| "os": "android", | |
| } | |
| headers = self._base_headers() | |
| resp = await client.post(self.LOGIN_URL, json=payload, headers=headers, timeout=30) | |
| data = resp.json() | |
| code = data.get("code", -1) | |
| if code != 0: | |
| raise Exception(f"Login failed: {data.get('msg', 'Unknown')}") | |
| biz_data = data.get("data", {}) | |
| biz_code = biz_data.get("biz_code", -1) | |
| if biz_code != 0: | |
| raise Exception(f"Login failed: {biz_data.get('biz_msg', 'Unknown')}") | |
| user = biz_data.get("biz_data", {}).get("user", {}) | |
| self._token = user.get("token", "") | |
| if not self._token: | |
| raise Exception("No token received") | |
| return self._token | |
| async def create_session(self) -> str: | |
| if not self._token: | |
| await self.login() | |
| client = await self._get_client() | |
| headers = self._auth_headers() | |
| resp = await client.post( | |
| self.CREATE_SESSION_URL, | |
| json={"agent": "chat"}, | |
| headers=headers, | |
| timeout=30, | |
| ) | |
| data = resp.json() | |
| if data.get("code") != 0: | |
| raise Exception(f"Create session failed: {data.get('msg')}") | |
| biz_data = data.get("data", {}).get("biz_data", {}) | |
| if "chat_session" in biz_data: | |
| session_id = biz_data["chat_session"].get("id", "") | |
| else: | |
| session_id = biz_data.get("id", "") | |
| if not session_id: | |
| raise Exception("No session ID received") | |
| return session_id | |
| async def get_pow(self, target_path: str = "/api/v0/chat/completion") -> dict: | |
| client = await self._get_client() | |
| headers = self._auth_headers() | |
| resp = await client.post( | |
| self.CREATE_POW_URL, | |
| json={"target_path": target_path}, | |
| headers=headers, | |
| timeout=30, | |
| ) | |
| data = resp.json() | |
| if data.get("code") != 0: | |
| raise Exception(f"Get PoW failed: {data.get('msg')}") | |
| return data.get("data", {}).get("biz_data", {}).get("challenge", {}) | |
| def _compute_pow(self, challenge: dict) -> str: | |
| prefix = challenge.get("prefix", "") | |
| target = challenge.get("target", "") | |
| expire_at = challenge.get("expire_at", 0) | |
| nonce = 0 | |
| while nonce < 10000000: | |
| test = f"{prefix}{nonce}" | |
| hash_val = hashlib.sha256(test.encode()).hexdigest() | |
| # Compare hash against target lexicographically — this handles | |
| # both all-zero targets and mixed targets like "000abc". | |
| if hash_val <= target: | |
| break | |
| nonce += 1 | |
| return json.dumps({ | |
| "prefix": prefix, | |
| "nonce": nonce, | |
| "expire_at": expire_at, | |
| "target": target, | |
| }) | |
| async def send_message( | |
| self, | |
| prompt: str, | |
| model: str = "deepseek-chat", | |
| stream: bool = False, | |
| timeout: int = 120, | |
| ) -> Union[str, AsyncGenerator[str, None]]: | |
| """Send a message and return a response. | |
| Returns: | |
| str when stream=False, AsyncGenerator[str, None] when stream=True. | |
| """ | |
| if not self._token: | |
| await self.login() | |
| session_id = await self.create_session() | |
| pow_challenge = await self.get_pow() | |
| pow_header = self._compute_pow(pow_challenge) | |
| client = await self._get_client() | |
| headers = self._auth_headers() | |
| headers["x-ds-pow-response"] = pow_header | |
| payload = { | |
| "chat_session_id": session_id, | |
| "parent_message_id": None, | |
| "prompt": prompt, | |
| "ref_file_ids": [], | |
| "thinking_enabled": True, | |
| "search_enabled": "search" in model.lower(), | |
| } | |
| if stream: | |
| return self._stream_completion(client, headers, payload, timeout) | |
| else: | |
| return await self._sync_completion(client, headers, payload, timeout) | |
| async def _sync_completion( | |
| self, | |
| client: httpx.AsyncClient, | |
| headers: dict, | |
| payload: dict, | |
| timeout: int, | |
| ) -> str: | |
| resp = await client.post( | |
| self.COMPLETION_URL, | |
| json=payload, | |
| headers=headers, | |
| timeout=timeout, | |
| ) | |
| # 处理 SSE 响应 | |
| full_text = "" | |
| for line in resp.text.split("\n"): | |
| if line.startswith("data:"): | |
| data_str = line[5:].strip() | |
| if data_str == "[DONE]": | |
| continue | |
| try: | |
| data = json.loads(data_str) | |
| msg = data.get("message", {}) | |
| content = msg.get("content", "") | |
| if content: | |
| full_text += content | |
| except json.JSONDecodeError: | |
| continue | |
| return full_text | |
| async def _stream_completion( | |
| self, | |
| client: httpx.AsyncClient, | |
| headers: dict, | |
| payload: dict, | |
| timeout: int, | |
| ) -> AsyncGenerator[str, None]: | |
| async with client.stream( | |
| "POST", | |
| self.COMPLETION_URL, | |
| json=payload, | |
| headers=headers, | |
| timeout=timeout, | |
| ) as resp: | |
| async for line in resp.aiter_lines(): | |
| if line.startswith("data:"): | |
| data_str = line[5:].strip() | |
| if data_str == "[DONE]": | |
| return | |
| try: | |
| data = json.loads(data_str) | |
| msg = data.get("message", {}) | |
| content = msg.get("content", "") | |
| if content: | |
| yield content | |
| except json.JSONDecodeError: | |
| continue | |
| async def delete_session(self, session_id: str): | |
| if not self._token: | |
| return | |
| client = await self._get_client() | |
| headers = self._auth_headers() | |
| await client.post( | |
| self.DELETE_SESSION_URL, | |
| json={"chat_session_id": session_id}, | |
| headers=headers, | |
| timeout=30, | |
| ) | |
| def _base_headers(self) -> dict: | |
| return { | |
| "Content-Type": "application/json", | |
| "User-Agent": "DeepSeek/3.2.1 Android/35", | |
| "x-client-platform": "android", | |
| "x-client-version": "3.2.1", | |
| "x-client-locale": "zh_CN", | |
| "Accept-Encoding": "gzip, deflate, br", | |
| "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", | |
| } | |
| def _auth_headers(self) -> dict: | |
| headers = self._base_headers() | |
| if self._token: | |
| headers["authorization"] = f"Bearer {self._token}" | |
| return headers | |
| async def close(self): | |
| if self._client: | |
| await self._client.aclose() | |