Spaces:
Paused
Paused
| from curl_cffi import requests | |
| from typing import Optional, Dict, Any, Generator, Literal | |
| import json | |
| from .pow import DeepSeekPOW | |
| import pkg_resources | |
| import sys | |
| from pathlib import Path | |
| import subprocess | |
| import time | |
| ThinkingMode = Literal['detailed', 'simple', 'disabled'] | |
| SearchMode = Literal['enabled', 'disabled'] | |
| class DeepSeekError(Exception): | |
| """Base exception for all DeepSeek API errors""" | |
| pass | |
| class AuthenticationError(DeepSeekError): | |
| """Raised when authentication fails""" | |
| pass | |
| class RateLimitError(DeepSeekError): | |
| """Raised when API rate limit is exceeded""" | |
| pass | |
| class NetworkError(DeepSeekError): | |
| """Raised when network communication fails""" | |
| pass | |
| class CloudflareError(DeepSeekError): | |
| """Raised when Cloudflare blocks the request""" | |
| pass | |
| class APIError(DeepSeekError): | |
| """Raised when API returns an error response""" | |
| def __init__(self, message: str, status_code: Optional[int] = None): | |
| super().__init__(message) | |
| self.status_code = status_code | |
| class DeepSeekAPI: | |
| BASE_URL = "https://chat.deepseek.com/api/v0" | |
| def __init__(self, auth_token: str): | |
| if not auth_token or not isinstance(auth_token, str): | |
| raise AuthenticationError("Invalid auth token provided") | |
| try: | |
| curl_cffi_version = pkg_resources.get_distribution('curl-cffi').version | |
| if curl_cffi_version != '0.8.1b9': | |
| print("\033[93mWarning: DeepSeek API requires curl-cffi version 0.8.1b9", file=sys.stderr) | |
| print("Please install the correct version using: pip install curl-cffi==0.8.1b9\033[0m", file=sys.stderr) | |
| except pkg_resources.DistributionNotFound: | |
| print("\033[93mWarning: curl-cffi not found. Please install version 0.8.1b9:", file=sys.stderr) | |
| print("pip install curl-cffi==0.8.1b9\033[0m", file=sys.stderr) | |
| self.auth_token = auth_token | |
| self.pow_solver = DeepSeekPOW() | |
| # Load cookies from JSON file | |
| cookies_path = Path(__file__).parent / 'cookies.json' | |
| try: | |
| with open(cookies_path, 'r') as f: | |
| cookie_data = json.load(f) | |
| self.cookies = cookie_data.get('cookies', {}) | |
| except (FileNotFoundError, json.JSONDecodeError) as e: | |
| print(f"\033[93mWarning: Could not load cookies from {cookies_path}: {e}\033[0m", file=sys.stderr) | |
| self.cookies = {} | |
| def _get_headers(self, pow_response: Optional[str] = None) -> Dict[str, str]: | |
| headers = { | |
| 'accept': '*/*', | |
| 'accept-language': 'en,fr-FR;q=0.9,fr;q=0.8,es-ES;q=0.7,es;q=0.6,en-US;q=0.5,am;q=0.4,de;q=0.3', | |
| 'authorization': f'Bearer {self.auth_token}', | |
| 'content-type': 'application/json', | |
| 'origin': 'https://chat.deepseek.com', | |
| 'referer': 'https://chat.deepseek.com/', | |
| 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36', | |
| 'x-app-version': '20241129.1', | |
| 'x-client-locale': 'en_US', | |
| 'x-client-platform': 'web', | |
| 'x-client-version': '1.0.0-always', | |
| } | |
| if pow_response: | |
| headers['x-ds-pow-response'] = pow_response | |
| return headers | |
| def _refresh_cookies(self) -> None: | |
| """Run the cookie refresh script and reload cookies""" | |
| try: | |
| # Get path to bypass.py | |
| script_path = Path(__file__).parent / 'bypass.py' | |
| # Run the script | |
| subprocess.run([sys.executable, script_path], check=True) | |
| # Wait briefly for cookies file to be written | |
| time.sleep(2) | |
| # Reload cookies | |
| cookies_path = Path(__file__).parent / 'cookies.json' | |
| with open(cookies_path, 'r') as f: | |
| cookie_data = json.load(f) | |
| self.cookies = cookie_data.get('cookies', {}) | |
| except Exception as e: | |
| print(f"\033[93mWarning: Failed to refresh cookies: {e}\033[0m", file=sys.stderr) | |
| def _make_request(self, method: str, endpoint: str, json_data: Dict[str, Any], pow_required: bool = False) -> Any: | |
| url = f"{self.BASE_URL}{endpoint}" | |
| retry_count = 0 | |
| max_retries = 2 | |
| while retry_count < max_retries: | |
| try: | |
| headers = self._get_headers() | |
| if pow_required: | |
| challenge = self._get_pow_challenge() | |
| pow_response = self.pow_solver.solve_challenge(challenge) | |
| headers = self._get_headers(pow_response) | |
| response = requests.request( | |
| method=method, | |
| url=url, | |
| headers=headers, | |
| json=json_data, | |
| cookies=self.cookies, | |
| impersonate='chrome120', | |
| timeout=None | |
| ) | |
| # Check if we hit Cloudflare protection | |
| if "<!DOCTYPE html>" in response.text and "Just a moment" in response.text: | |
| print("\033[93mWarning: Cloudflare protection detected. Bypassing...\033[0m", file=sys.stderr) | |
| if retry_count < max_retries - 1: | |
| self._refresh_cookies() # Refresh cookies | |
| retry_count += 1 | |
| continue | |
| # Handle other response codes | |
| if response.status_code == 401: | |
| raise AuthenticationError("Invalid or expired authentication token") | |
| elif response.status_code == 429: | |
| raise RateLimitError("API rate limit exceeded") | |
| elif response.status_code >= 500: | |
| raise APIError(f"Server error occurred: {response.text}", response.status_code) | |
| elif response.status_code != 200: | |
| raise APIError(f"API request failed: {response.text}", response.status_code) | |
| return response.json() | |
| except requests.exceptions.RequestException as e: | |
| raise NetworkError(f"Network error occurred: {str(e)}") | |
| except json.JSONDecodeError: | |
| raise APIError("Invalid JSON response from server") | |
| raise APIError("Failed to bypass Cloudflare protection after multiple attempts") | |
| def _get_pow_challenge(self) -> Dict[str, Any]: | |
| try: | |
| response = self._make_request( | |
| 'POST', | |
| '/chat/create_pow_challenge', | |
| {'target_path': '/api/v0/chat/completion'} | |
| ) | |
| return response['data']['biz_data']['challenge'] | |
| except KeyError: | |
| raise APIError("Invalid challenge response format from server") | |
| def create_chat_session(self) -> str: | |
| """Creates a new chat session and returns the session ID""" | |
| try: | |
| response = self._make_request( | |
| 'POST', | |
| '/chat_session/create', | |
| {'character_id': None} | |
| ) | |
| return response['data']['biz_data']['id'] | |
| except KeyError: | |
| raise APIError("Invalid session creation response format from server") | |
| def chat_completion(self, | |
| chat_session_id: str, | |
| prompt: str, | |
| parent_message_id: Optional[str] = None, | |
| thinking_enabled: bool = True, | |
| search_enabled: bool = False) -> Generator[Dict[str, Any], None, None]: | |
| """ | |
| Send a message and get streaming response | |
| Args: | |
| chat_session_id (str): The ID of the chat session | |
| prompt (str): The message to send | |
| parent_message_id (Optional[str]): ID of the parent message for threading | |
| thinking_enabled (bool): Whether to show the thinking process | |
| search_enabled (bool): Whether to enable web search for up-to-date information | |
| Returns: | |
| Generator[Dict[str, Any], None, None]: Yields message chunks with content and type | |
| Raises: | |
| AuthenticationError: If the authentication token is invalid | |
| RateLimitError: If the API rate limit is exceeded | |
| NetworkError: If a network error occurs | |
| APIError: If any other API error occurs | |
| """ | |
| if not prompt or not isinstance(prompt, str): | |
| raise ValueError("Prompt must be a non-empty string") | |
| if not chat_session_id or not isinstance(chat_session_id, str): | |
| raise ValueError("Chat session ID must be a non-empty string") | |
| json_data = { | |
| 'chat_session_id': chat_session_id, | |
| 'parent_message_id': parent_message_id, | |
| 'prompt': prompt, | |
| 'ref_file_ids': [], | |
| 'thinking_enabled': thinking_enabled, | |
| 'search_enabled': search_enabled, | |
| } | |
| try: | |
| headers = self._get_headers( | |
| pow_response=self.pow_solver.solve_challenge( | |
| self._get_pow_challenge() | |
| ) | |
| ) | |
| response = requests.post( | |
| f"{self.BASE_URL}/chat/completion", | |
| headers=headers, | |
| json=json_data, | |
| cookies=self.cookies, # Add cookies | |
| impersonate='chrome120', | |
| stream=True, | |
| timeout=None | |
| ) | |
| if response.status_code != 200: | |
| error_text = next(response.iter_lines(), b'').decode('utf-8', 'ignore') | |
| if response.status_code == 401: | |
| raise AuthenticationError("Invalid or expired authentication token") | |
| elif response.status_code == 429: | |
| raise RateLimitError("API rate limit exceeded") | |
| else: | |
| raise APIError(f"API request failed: {error_text}", response.status_code) | |
| for chunk in response.iter_lines(): | |
| try: | |
| parsed = self._parse_chunk(chunk) | |
| if parsed: | |
| yield parsed | |
| if parsed.get('finish_reason') == 'stop': | |
| break | |
| except Exception as e: | |
| raise APIError(f"Error parsing response chunk: {str(e)}") | |
| except requests.exceptions.RequestException as e: | |
| raise NetworkError(f"Network error occurred during streaming: {str(e)}") | |
| def _parse_chunk(self, chunk: bytes) -> Optional[Dict[str, Any]]: | |
| """Parse a SSE chunk from the API response""" | |
| if not chunk: | |
| return None | |
| try: | |
| if chunk.startswith(b'data: '): | |
| data = json.loads(chunk[6:]) | |
| if 'choices' in data and data['choices']: | |
| choice = data['choices'][0] | |
| if 'delta' in choice: | |
| delta = choice['delta'] | |
| return { | |
| 'content': delta.get('content', ''), | |
| 'type': delta.get('type', ''), | |
| 'finish_reason': choice.get('finish_reason') | |
| } | |
| except json.JSONDecodeError: | |
| raise APIError("Invalid JSON in response chunk") | |
| except Exception as e: | |
| raise APIError(f"Error parsing chunk: {str(e)}") | |
| return None | |