"""Sora API client module""" import base64 import io import time import random import string from typing import Optional, Dict, Any from curl_cffi.requests import AsyncSession from curl_cffi import CurlMime from .proxy_manager import ProxyManager from ..core.config import config from ..core.logger import debug_logger class SoraClient: """Sora API client with proxy support""" def __init__(self, proxy_manager: ProxyManager): self.proxy_manager = proxy_manager self.base_url = config.sora_base_url self.timeout = config.sora_timeout @staticmethod def _generate_sentinel_token() -> str: """ 生成 openai-sentinel-token 根据测试文件的逻辑,传入任意随机字符即可 生成10-20个字符的随机字符串(字母+数字) """ length = random.randint(10, 20) random_str = ''.join(random.choices(string.ascii_letters + string.digits, k=length)) return random_str async def _make_request(self, method: str, endpoint: str, token: str, json_data: Optional[Dict] = None, multipart: Optional[Dict] = None, add_sentinel_token: bool = False) -> Dict[str, Any]: """Make HTTP request with proxy support Args: method: HTTP method (GET/POST) endpoint: API endpoint token: Access token json_data: JSON request body multipart: Multipart form data (for file uploads) add_sentinel_token: Whether to add openai-sentinel-token header (only for generation requests) """ proxy_url = await self.proxy_manager.get_proxy_url() headers = { "Authorization": f"Bearer {token}" } # 只在生成请求时添加 sentinel token if add_sentinel_token: headers["openai-sentinel-token"] = self._generate_sentinel_token() if not multipart: headers["Content-Type"] = "application/json" async with AsyncSession() as session: url = f"{self.base_url}{endpoint}" kwargs = { "headers": headers, "timeout": self.timeout, "impersonate": "chrome" # 自动生成 User-Agent 和浏览器指纹 } if proxy_url: kwargs["proxy"] = proxy_url if json_data: kwargs["json"] = json_data if multipart: kwargs["multipart"] = multipart # Log request debug_logger.log_request( method=method, url=url, headers=headers, body=json_data, files=multipart, proxy=proxy_url ) # Record start time start_time = time.time() # Make request if method == "GET": response = await session.get(url, **kwargs) elif method == "POST": response = await session.post(url, **kwargs) else: raise ValueError(f"Unsupported method: {method}") # Calculate duration duration_ms = (time.time() - start_time) * 1000 # Parse response try: response_json = response.json() except: response_json = None # Log response debug_logger.log_response( status_code=response.status_code, headers=dict(response.headers), body=response_json if response_json else response.text, duration_ms=duration_ms ) # Check status if response.status_code not in [200, 201]: error_msg = f"API request failed: {response.status_code} - {response.text}" debug_logger.log_error( error_message=error_msg, status_code=response.status_code, response_text=response.text ) raise Exception(error_msg) return response_json if response_json else response.json() async def get_user_info(self, token: str) -> Dict[str, Any]: """Get user information""" return await self._make_request("GET", "/me", token) async def upload_image(self, image_data: bytes, token: str, filename: str = "image.png") -> str: """Upload image and return media_id 使用 CurlMime 对象上传文件(curl_cffi 的正确方式) 参考:https://curl-cffi.readthedocs.io/en/latest/quick_start.html#uploads """ # 检测图片类型 mime_type = "image/png" if filename.lower().endswith('.jpg') or filename.lower().endswith('.jpeg'): mime_type = "image/jpeg" elif filename.lower().endswith('.webp'): mime_type = "image/webp" # 创建 CurlMime 对象 mp = CurlMime() # 添加文件部分 mp.addpart( name="file", content_type=mime_type, filename=filename, data=image_data ) # 添加文件名字段 mp.addpart( name="file_name", data=filename.encode('utf-8') ) result = await self._make_request("POST", "/uploads", token, multipart=mp) return result["id"] async def generate_image(self, prompt: str, token: str, width: int = 360, height: int = 360, media_id: Optional[str] = None) -> str: """Generate image (text-to-image or image-to-image)""" operation = "remix" if media_id else "simple_compose" inpaint_items = [] if media_id: inpaint_items = [{ "type": "image", "frame_index": 0, "upload_media_id": media_id }] json_data = { "type": "image_gen", "operation": operation, "prompt": prompt, "width": width, "height": height, "n_variants": 1, "n_frames": 1, "inpaint_items": inpaint_items } # 生成请求需要添加 sentinel token result = await self._make_request("POST", "/video_gen", token, json_data=json_data, add_sentinel_token=True) return result["id"] async def generate_video(self, prompt: str, token: str, orientation: str = "landscape", media_id: Optional[str] = None, n_frames: int = 450) -> str: """Generate video (text-to-video or image-to-video)""" inpaint_items = [] if media_id: inpaint_items = [{ "kind": "upload", "upload_id": media_id }] json_data = { "kind": "video", "prompt": prompt, "orientation": orientation, "size": "small", "n_frames": n_frames, "model": "sy_8", "inpaint_items": inpaint_items } # 生成请求需要添加 sentinel token result = await self._make_request("POST", "/nf/create", token, json_data=json_data, add_sentinel_token=True) return result["id"] async def get_image_tasks(self, token: str, limit: int = 20) -> Dict[str, Any]: """Get recent image generation tasks""" return await self._make_request("GET", f"/v2/recent_tasks?limit={limit}", token) async def get_video_drafts(self, token: str, limit: int = 15) -> Dict[str, Any]: """Get recent video drafts""" return await self._make_request("GET", f"/project_y/profile/drafts?limit={limit}", token) async def get_pending_tasks(self, token: str) -> list: """Get pending video generation tasks Returns: List of pending tasks with progress information """ result = await self._make_request("GET", "/nf/pending", token) # The API returns a list directly return result if isinstance(result, list) else [] async def post_video_for_watermark_free(self, generation_id: str, prompt: str, token: str) -> str: """Post video to get watermark-free version Args: generation_id: The generation ID (e.g., gen_01k9btrqrnen792yvt703dp0tq) prompt: The original generation prompt token: Access token Returns: Post ID (e.g., s_690ce161c2488191a3476e9969911522) """ json_data = { "attachments_to_create": [ { "generation_id": generation_id, "kind": "sora" } ], "post_text": prompt } # 发布请求需要添加 sentinel token result = await self._make_request("POST", "/project_y/post", token, json_data=json_data, add_sentinel_token=True) # 返回 post.id return result.get("post", {}).get("id", "") async def delete_post(self, post_id: str, token: str) -> bool: """Delete a published post Args: post_id: The post ID (e.g., s_690ce161c2488191a3476e9969911522) token: Access token Returns: True if deletion was successful """ proxy_url = await self.proxy_manager.get_proxy_url() headers = { "Authorization": f"Bearer {token}" } async with AsyncSession() as session: url = f"{self.base_url}/project_y/post/{post_id}" kwargs = { "headers": headers, "timeout": self.timeout, "impersonate": "chrome" } if proxy_url: kwargs["proxy"] = proxy_url # Log request debug_logger.log_request( method="DELETE", url=url, headers=headers, body=None, files=None, proxy=proxy_url ) # Record start time start_time = time.time() # Make DELETE request response = await session.delete(url, **kwargs) # Calculate duration duration_ms = (time.time() - start_time) * 1000 # Log response debug_logger.log_response( status_code=response.status_code, headers=dict(response.headers), body=response.text if response.text else "No content", duration_ms=duration_ms ) # Check status (DELETE typically returns 204 No Content or 200 OK) if response.status_code not in [200, 204]: error_msg = f"Delete post failed: {response.status_code} - {response.text}" debug_logger.log_error( error_message=error_msg, status_code=response.status_code, response_text=response.text ) raise Exception(error_msg) return True async def get_watermark_free_url_custom(self, parse_url: str, parse_token: str, post_id: str) -> str: """Get watermark-free video URL from custom parse server Args: parse_url: Custom parse server URL (e.g., http://example.com) parse_token: Access token for custom parse server post_id: Post ID to parse (e.g., s_690c0f574c3881918c3bc5b682a7e9fd) Returns: Download link from custom parse server Raises: Exception: If parse fails or token is invalid """ proxy_url = await self.proxy_manager.get_proxy_url() # Construct the share URL share_url = f"https://sora.chatgpt.com/p/{post_id}" # Prepare request json_data = { "url": share_url, "token": parse_token } kwargs = { "json": json_data, "timeout": 30, "impersonate": "chrome" } if proxy_url: kwargs["proxy"] = proxy_url try: async with AsyncSession() as session: # Record start time start_time = time.time() # Make POST request to custom parse server response = await session.post(f"{parse_url}/get-sora-link", **kwargs) # Calculate duration duration_ms = (time.time() - start_time) * 1000 # Log response debug_logger.log_response( status_code=response.status_code, headers=dict(response.headers), body=response.text if response.text else "No content", duration_ms=duration_ms ) # Check status if response.status_code != 200: error_msg = f"Custom parse failed: {response.status_code} - {response.text}" debug_logger.log_error( error_message=error_msg, status_code=response.status_code, response_text=response.text ) raise Exception(error_msg) # Parse response result = response.json() # Check for error in response if "error" in result: error_msg = f"Custom parse error: {result['error']}" debug_logger.log_error( error_message=error_msg, status_code=401, response_text=str(result) ) raise Exception(error_msg) # Extract download link download_link = result.get("download_link") if not download_link: raise Exception("No download_link in custom parse response") debug_logger.log_info(f"Custom parse successful: {download_link}") return download_link except Exception as e: debug_logger.log_error( error_message=f"Custom parse request failed: {str(e)}", status_code=500, response_text=str(e) ) raise # ==================== Character Creation Methods ==================== async def upload_character_video(self, video_data: bytes, token: str) -> str: """Upload character video and return cameo_id Args: video_data: Video file bytes token: Access token Returns: cameo_id """ mp = CurlMime() mp.addpart( name="file", content_type="video/mp4", filename="video.mp4", data=video_data ) mp.addpart( name="timestamps", data=b"0,3" ) result = await self._make_request("POST", "/characters/upload", token, multipart=mp) return result.get("id") async def get_cameo_status(self, cameo_id: str, token: str) -> Dict[str, Any]: """Get character (cameo) processing status Args: cameo_id: The cameo ID returned from upload_character_video token: Access token Returns: Dictionary with status, display_name_hint, username_hint, profile_asset_url, instruction_set_hint """ return await self._make_request("GET", f"/project_y/cameos/in_progress/{cameo_id}", token) async def download_character_image(self, image_url: str) -> bytes: """Download character image from URL Args: image_url: The profile_asset_url from cameo status Returns: Image file bytes """ proxy_url = await self.proxy_manager.get_proxy_url() kwargs = { "timeout": self.timeout, "impersonate": "chrome" } if proxy_url: kwargs["proxy"] = proxy_url async with AsyncSession() as session: response = await session.get(image_url, **kwargs) if response.status_code != 200: raise Exception(f"Failed to download image: {response.status_code}") return response.content async def finalize_character(self, cameo_id: str, username: str, display_name: str, profile_asset_pointer: str, instruction_set, token: str) -> str: """Finalize character creation Args: cameo_id: The cameo ID username: Character username display_name: Character display name profile_asset_pointer: Asset pointer from upload_character_image instruction_set: Character instruction set (not used by API, always set to None) token: Access token Returns: character_id """ # Note: API always expects instruction_set to be null # The instruction_set parameter is kept for backward compatibility but not used _ = instruction_set # Suppress unused parameter warning json_data = { "cameo_id": cameo_id, "username": username, "display_name": display_name, "profile_asset_pointer": profile_asset_pointer, "instruction_set": None, "safety_instruction_set": None } result = await self._make_request("POST", "/characters/finalize", token, json_data=json_data) return result.get("character", {}).get("character_id") async def set_character_public(self, cameo_id: str, token: str) -> bool: """Set character as public Args: cameo_id: The cameo ID token: Access token Returns: True if successful """ json_data = {"visibility": "public"} await self._make_request("POST", f"/project_y/cameos/by_id/{cameo_id}/update_v2", token, json_data=json_data) return True async def upload_character_image(self, image_data: bytes, token: str) -> str: """Upload character image and return asset_pointer Args: image_data: Image file bytes token: Access token Returns: asset_pointer """ mp = CurlMime() mp.addpart( name="file", content_type="image/webp", filename="profile.webp", data=image_data ) mp.addpart( name="use_case", data=b"profile" ) result = await self._make_request("POST", "/project_y/file/upload", token, multipart=mp) return result.get("asset_pointer") async def delete_character(self, character_id: str, token: str) -> bool: """Delete a character Args: character_id: The character ID token: Access token Returns: True if successful """ proxy_url = await self.proxy_manager.get_proxy_url() headers = { "Authorization": f"Bearer {token}" } async with AsyncSession() as session: url = f"{self.base_url}/project_y/characters/{character_id}" kwargs = { "headers": headers, "timeout": self.timeout, "impersonate": "chrome" } if proxy_url: kwargs["proxy"] = proxy_url response = await session.delete(url, **kwargs) if response.status_code not in [200, 204]: raise Exception(f"Failed to delete character: {response.status_code}") return True async def remix_video(self, remix_target_id: str, prompt: str, token: str, orientation: str = "portrait", n_frames: int = 450) -> str: """Generate video using remix (based on existing video) Args: remix_target_id: The video ID from Sora share link (e.g., s_690d100857248191b679e6de12db840e) prompt: Generation prompt token: Access token orientation: Video orientation (portrait/landscape) n_frames: Number of frames Returns: task_id """ json_data = { "kind": "video", "prompt": prompt, "inpaint_items": [], "remix_target_id": remix_target_id, "cameo_ids": [], "cameo_replacements": {}, "model": "sy_8", "orientation": orientation, "n_frames": n_frames } result = await self._make_request("POST", "/nf/create", token, json_data=json_data, add_sentinel_token=True) return result.get("id")