"""Flow API Client for VideoFX (Veo)""" import time import uuid import random import base64 from typing import Dict, Any, Optional, List from curl_cffi.requests import AsyncSession from ..core.logger import debug_logger from ..core.config import config class FlowClient: """VideoFX API客户端""" def __init__(self, proxy_manager, db=None): self.proxy_manager = proxy_manager self.db = db # Database instance for captcha config self.labs_base_url = config.flow_labs_base_url # https://labs.google/fx/api self.api_base_url = config.flow_api_base_url # https://aisandbox-pa.googleapis.com/v1 self.timeout = config.flow_timeout # 缓存每个账号的 User-Agent self._user_agent_cache = {} def _generate_user_agent(self, account_id: str = None) -> str: """基于账号ID生成固定的 User-Agent Args: account_id: 账号标识(如 email 或 token_id),相同账号返回相同 UA Returns: User-Agent 字符串 """ # 如果没有提供账号ID,生成随机UA if not account_id: account_id = f"random_{random.randint(1, 999999)}" # 如果已缓存,直接返回 if account_id in self._user_agent_cache: return self._user_agent_cache[account_id] # 使用账号ID作为随机种子,确保同一账号生成相同的UA import hashlib seed = int(hashlib.md5(account_id.encode()).hexdigest()[:8], 16) rng = random.Random(seed) # Chrome 版本池 chrome_versions = ["130.0.0.0", "131.0.0.0", "132.0.0.0", "129.0.0.0"] # Firefox 版本池 firefox_versions = ["133.0", "132.0", "131.0", "134.0"] # Safari 版本池 safari_versions = ["18.2", "18.1", "18.0", "17.6"] # Edge 版本池 edge_versions = ["130.0.0.0", "131.0.0.0", "132.0.0.0"] # 操作系统配置 os_configs = [ # Windows { "platform": "Windows NT 10.0; Win64; x64", "browsers": [ lambda r: f"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{r.choice(chrome_versions)} Safari/537.36", lambda r: f"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:{r.choice(firefox_versions).split('.')[0]}.0) Gecko/20100101 Firefox/{r.choice(firefox_versions)}", lambda r: f"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{r.choice(chrome_versions)} Safari/537.36 Edg/{r.choice(edge_versions)}", ] }, # macOS { "platform": "Macintosh; Intel Mac OS X 10_15_7", "browsers": [ lambda r: f"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{r.choice(chrome_versions)} Safari/537.36", lambda r: f"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/{r.choice(safari_versions)} Safari/605.1.15", lambda r: f"Mozilla/5.0 (Macintosh; Intel Mac OS X 14.{r.randint(0, 7)}; rv:{r.choice(firefox_versions).split('.')[0]}.0) Gecko/20100101 Firefox/{r.choice(firefox_versions)}", ] }, # Linux { "platform": "X11; Linux x86_64", "browsers": [ lambda r: f"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{r.choice(chrome_versions)} Safari/537.36", lambda r: f"Mozilla/5.0 (X11; Linux x86_64; rv:{r.choice(firefox_versions).split('.')[0]}.0) Gecko/20100101 Firefox/{r.choice(firefox_versions)}", lambda r: f"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:{r.choice(firefox_versions).split('.')[0]}.0) Gecko/20100101 Firefox/{r.choice(firefox_versions)}", ] } ] # 使用固定种子随机选择操作系统和浏览器 os_config = rng.choice(os_configs) browser_generator = rng.choice(os_config["browsers"]) user_agent = browser_generator(rng) # 缓存结果 self._user_agent_cache[account_id] = user_agent return user_agent async def _make_request( self, method: str, url: str, headers: Optional[Dict] = None, json_data: Optional[Dict] = None, use_st: bool = False, st_token: Optional[str] = None, use_at: bool = False, at_token: Optional[str] = None ) -> Dict[str, Any]: """统一HTTP请求处理 Args: method: HTTP方法 (GET/POST) url: 完整URL headers: 请求头 json_data: JSON请求体 use_st: 是否使用ST认证 (Cookie方式) st_token: Session Token use_at: 是否使用AT认证 (Bearer方式) at_token: Access Token """ proxy_url = await self.proxy_manager.get_proxy_url() if headers is None: headers = {} # ST认证 - 使用Cookie if use_st and st_token: headers["Cookie"] = f"__Secure-next-auth.session-token={st_token}" # AT认证 - 使用Bearer if use_at and at_token: headers["authorization"] = f"Bearer {at_token}" # 确定账号标识(优先使用 token 的前16个字符作为标识) account_id = None if st_token: account_id = st_token[:16] # 使用 ST 的前16个字符 elif at_token: account_id = at_token[:16] # 使用 AT 的前16个字符 # 通用请求头 - 基于账号生成固定的 User-Agent headers.update({ "Content-Type": "application/json", "User-Agent": self._generate_user_agent(account_id) }) # Log request if config.debug_enabled: debug_logger.log_request( method=method, url=url, headers=headers, body=json_data, proxy=proxy_url ) start_time = time.time() try: async with AsyncSession() as session: if method.upper() == "GET": response = await session.get( url, headers=headers, proxy=proxy_url, timeout=self.timeout, impersonate="chrome110" ) else: # POST response = await session.post( url, headers=headers, json=json_data, proxy=proxy_url, timeout=self.timeout, impersonate="chrome110" ) duration_ms = (time.time() - start_time) * 1000 # Log response if config.debug_enabled: debug_logger.log_response( status_code=response.status_code, headers=dict(response.headers), body=response.text, duration_ms=duration_ms ) response.raise_for_status() return response.json() except Exception as e: duration_ms = (time.time() - start_time) * 1000 error_msg = str(e) if config.debug_enabled: debug_logger.log_error( error_message=error_msg, status_code=getattr(e, 'status_code', None), response_text=getattr(e, 'response_text', None) ) raise Exception(f"Flow API request failed: {error_msg}") # ========== 认证相关 (使用ST) ========== async def st_to_at(self, st: str) -> dict: """ST转AT Args: st: Session Token Returns: { "access_token": "AT", "expires": "2025-11-15T04:46:04.000Z", "user": {...} } """ url = f"{self.labs_base_url}/auth/session" result = await self._make_request( method="GET", url=url, use_st=True, st_token=st ) return result # ========== 项目管理 (使用ST) ========== async def create_project(self, st: str, title: str) -> str: """创建项目,返回project_id Args: st: Session Token title: 项目标题 Returns: project_id (UUID) """ url = f"{self.labs_base_url}/trpc/project.createProject" json_data = { "json": { "projectTitle": title, "toolName": "PINHOLE" } } result = await self._make_request( method="POST", url=url, json_data=json_data, use_st=True, st_token=st ) # 解析返回的project_id project_id = result["result"]["data"]["json"]["result"]["projectId"] return project_id async def delete_project(self, st: str, project_id: str): """删除项目 Args: st: Session Token project_id: 项目ID """ url = f"{self.labs_base_url}/trpc/project.deleteProject" json_data = { "json": { "projectToDeleteId": project_id } } await self._make_request( method="POST", url=url, json_data=json_data, use_st=True, st_token=st ) # ========== 余额查询 (使用AT) ========== async def get_credits(self, at: str) -> dict: """查询余额 Args: at: Access Token Returns: { "credits": 920, "userPaygateTier": "PAYGATE_TIER_ONE" } """ url = f"{self.api_base_url}/credits" result = await self._make_request( method="GET", url=url, use_at=True, at_token=at ) return result # ========== 图片上传 (使用AT) ========== async def upload_image( self, at: str, image_bytes: bytes, aspect_ratio: str = "IMAGE_ASPECT_RATIO_LANDSCAPE" ) -> str: """上传图片,返回mediaGenerationId Args: at: Access Token image_bytes: 图片字节数据 aspect_ratio: 图片或视频宽高比(会自动转换为图片格式) Returns: mediaGenerationId (CAM...) """ # 转换视频aspect_ratio为图片aspect_ratio # VIDEO_ASPECT_RATIO_LANDSCAPE -> IMAGE_ASPECT_RATIO_LANDSCAPE # VIDEO_ASPECT_RATIO_PORTRAIT -> IMAGE_ASPECT_RATIO_PORTRAIT if aspect_ratio.startswith("VIDEO_"): aspect_ratio = aspect_ratio.replace("VIDEO_", "IMAGE_") # 编码为base64 (去掉前缀) image_base64 = base64.b64encode(image_bytes).decode('utf-8') url = f"{self.api_base_url}:uploadUserImage" json_data = { "imageInput": { "rawImageBytes": image_base64, "mimeType": "image/jpeg", "isUserUploaded": True, "aspectRatio": aspect_ratio }, "clientContext": { "sessionId": self._generate_session_id(), "tool": "ASSET_MANAGER" } } result = await self._make_request( method="POST", url=url, json_data=json_data, use_at=True, at_token=at ) # 返回mediaGenerationId media_id = result["mediaGenerationId"]["mediaGenerationId"] return media_id # ========== 图片生成 (使用AT) - 同步返回 ========== async def generate_image( self, at: str, project_id: str, prompt: str, model_name: str, aspect_ratio: str, image_inputs: Optional[List[Dict]] = None ) -> dict: """生成图片(同步返回) Args: at: Access Token project_id: 项目ID prompt: 提示词 model_name: GEM_PIX, GEM_PIX_2 或 IMAGEN_3_5 aspect_ratio: 图片宽高比 image_inputs: 参考图片列表(图生图时使用) Returns: { "media": [{ "image": { "generatedImage": { "fifeUrl": "图片URL", ... } } }] } """ url = f"{self.api_base_url}/projects/{project_id}/flowMedia:batchGenerateImages" # 获取 reCAPTCHA token recaptcha_token = await self._get_recaptcha_token(project_id) or "" session_id = self._generate_session_id() # 构建请求 request_data = { "clientContext": { "recaptchaToken": recaptcha_token, "projectId": project_id, "sessionId": session_id, "tool": "PINHOLE" }, "seed": random.randint(1, 99999), "imageModelName": model_name, "imageAspectRatio": aspect_ratio, "prompt": prompt, "imageInputs": image_inputs or [] } json_data = { "clientContext": { "recaptchaToken": recaptcha_token, "sessionId": session_id }, "requests": [request_data] } result = await self._make_request( method="POST", url=url, json_data=json_data, use_at=True, at_token=at ) return result # ========== 视频生成 (使用AT) - 异步返回 ========== async def generate_video_text( self, at: str, project_id: str, prompt: str, model_key: str, aspect_ratio: str, user_paygate_tier: str = "PAYGATE_TIER_ONE" ) -> dict: """文生视频,返回task_id Args: at: Access Token project_id: 项目ID prompt: 提示词 model_key: veo_3_1_t2v_fast 等 aspect_ratio: 视频宽高比 user_paygate_tier: 用户等级 Returns: { "operations": [{ "operation": {"name": "task_id"}, "sceneId": "uuid", "status": "MEDIA_GENERATION_STATUS_PENDING" }], "remainingCredits": 900 } """ url = f"{self.api_base_url}/video:batchAsyncGenerateVideoText" # 获取 reCAPTCHA token recaptcha_token = await self._get_recaptcha_token(project_id) or "" session_id = self._generate_session_id() scene_id = str(uuid.uuid4()) json_data = { "clientContext": { "recaptchaToken": recaptcha_token, "sessionId": session_id, "projectId": project_id, "tool": "PINHOLE", "userPaygateTier": user_paygate_tier }, "requests": [{ "aspectRatio": aspect_ratio, "seed": random.randint(1, 99999), "textInput": { "prompt": prompt }, "videoModelKey": model_key, "metadata": { "sceneId": scene_id } }] } result = await self._make_request( method="POST", url=url, json_data=json_data, use_at=True, at_token=at ) return result async def generate_video_reference_images( self, at: str, project_id: str, prompt: str, model_key: str, aspect_ratio: str, reference_images: List[Dict], user_paygate_tier: str = "PAYGATE_TIER_ONE" ) -> dict: """图生视频,返回task_id Args: at: Access Token project_id: 项目ID prompt: 提示词 model_key: veo_3_0_r2v_fast aspect_ratio: 视频宽高比 reference_images: 参考图片列表 [{"imageUsageType": "IMAGE_USAGE_TYPE_ASSET", "mediaId": "..."}] user_paygate_tier: 用户等级 Returns: 同 generate_video_text """ url = f"{self.api_base_url}/video:batchAsyncGenerateVideoReferenceImages" # 获取 reCAPTCHA token recaptcha_token = await self._get_recaptcha_token(project_id) or "" session_id = self._generate_session_id() scene_id = str(uuid.uuid4()) json_data = { "clientContext": { "recaptchaToken": recaptcha_token, "sessionId": session_id, "projectId": project_id, "tool": "PINHOLE", "userPaygateTier": user_paygate_tier }, "requests": [{ "aspectRatio": aspect_ratio, "seed": random.randint(1, 99999), "textInput": { "prompt": prompt }, "videoModelKey": model_key, "referenceImages": reference_images, "metadata": { "sceneId": scene_id } }] } result = await self._make_request( method="POST", url=url, json_data=json_data, use_at=True, at_token=at ) return result async def generate_video_start_end( self, at: str, project_id: str, prompt: str, model_key: str, aspect_ratio: str, start_media_id: str, end_media_id: str, user_paygate_tier: str = "PAYGATE_TIER_ONE" ) -> dict: """收尾帧生成视频,返回task_id Args: at: Access Token project_id: 项目ID prompt: 提示词 model_key: veo_3_1_i2v_s_fast_fl aspect_ratio: 视频宽高比 start_media_id: 起始帧mediaId end_media_id: 结束帧mediaId user_paygate_tier: 用户等级 Returns: 同 generate_video_text """ url = f"{self.api_base_url}/video:batchAsyncGenerateVideoStartAndEndImage" # 获取 reCAPTCHA token recaptcha_token = await self._get_recaptcha_token(project_id) or "" session_id = self._generate_session_id() scene_id = str(uuid.uuid4()) json_data = { "clientContext": { "recaptchaToken": recaptcha_token, "sessionId": session_id, "projectId": project_id, "tool": "PINHOLE", "userPaygateTier": user_paygate_tier }, "requests": [{ "aspectRatio": aspect_ratio, "seed": random.randint(1, 99999), "textInput": { "prompt": prompt }, "videoModelKey": model_key, "startImage": { "mediaId": start_media_id }, "endImage": { "mediaId": end_media_id }, "metadata": { "sceneId": scene_id } }] } result = await self._make_request( method="POST", url=url, json_data=json_data, use_at=True, at_token=at ) return result async def generate_video_start_image( self, at: str, project_id: str, prompt: str, model_key: str, aspect_ratio: str, start_media_id: str, user_paygate_tier: str = "PAYGATE_TIER_ONE" ) -> dict: """仅首帧生成视频,返回task_id Args: at: Access Token project_id: 项目ID prompt: 提示词 model_key: veo_3_1_i2v_s_fast_fl等 aspect_ratio: 视频宽高比 start_media_id: 起始帧mediaId user_paygate_tier: 用户等级 Returns: 同 generate_video_text """ url = f"{self.api_base_url}/video:batchAsyncGenerateVideoStartImage" # 获取 reCAPTCHA token recaptcha_token = await self._get_recaptcha_token(project_id) or "" session_id = self._generate_session_id() scene_id = str(uuid.uuid4()) json_data = { "clientContext": { "recaptchaToken": recaptcha_token, "sessionId": session_id, "projectId": project_id, "tool": "PINHOLE", "userPaygateTier": user_paygate_tier }, "requests": [{ "aspectRatio": aspect_ratio, "seed": random.randint(1, 99999), "textInput": { "prompt": prompt }, "videoModelKey": model_key, "startImage": { "mediaId": start_media_id }, # 注意: 没有endImage字段,只用首帧 "metadata": { "sceneId": scene_id } }] } result = await self._make_request( method="POST", url=url, json_data=json_data, use_at=True, at_token=at ) return result # ========== 任务轮询 (使用AT) ========== async def check_video_status(self, at: str, operations: List[Dict]) -> dict: """查询视频生成状态 Args: at: Access Token operations: 操作列表 [{"operation": {"name": "task_id"}, "sceneId": "...", "status": "..."}] Returns: { "operations": [{ "operation": { "name": "task_id", "metadata": {...} # 完成时包含视频信息 }, "status": "MEDIA_GENERATION_STATUS_SUCCESSFUL" }] } """ url = f"{self.api_base_url}/video:batchCheckAsyncVideoGenerationStatus" json_data = { "operations": operations } result = await self._make_request( method="POST", url=url, json_data=json_data, use_at=True, at_token=at ) return result # ========== 媒体删除 (使用ST) ========== async def delete_media(self, st: str, media_names: List[str]): """删除媒体 Args: st: Session Token media_names: 媒体ID列表 """ url = f"{self.labs_base_url}/trpc/media.deleteMedia" json_data = { "json": { "names": media_names } } await self._make_request( method="POST", url=url, json_data=json_data, use_st=True, st_token=st ) # ========== 辅助方法 ========== def _generate_session_id(self) -> str: """生成sessionId: ;timestamp""" return f";{int(time.time() * 1000)}" def _generate_scene_id(self) -> str: """生成sceneId: UUID""" return str(uuid.uuid4()) async def _get_recaptcha_token(self, project_id: str) -> Optional[str]: """获取reCAPTCHA token - 支持多种打码方式""" captcha_method = config.captcha_method # 恒定浏览器打码 if captcha_method == "personal": try: from .browser_captcha_personal import BrowserCaptchaService service = await BrowserCaptchaService.get_instance(self.db) return await service.get_token(project_id) except Exception as e: debug_logger.log_error(f"[reCAPTCHA Browser] error: {str(e)}") return None # 无头浏览器打码 elif captcha_method == "browser": try: from .browser_captcha import BrowserCaptchaService service = await BrowserCaptchaService.get_instance(self.db) return await service.get_token(project_id) except Exception as e: debug_logger.log_error(f"[reCAPTCHA Browser] error: {str(e)}") return None # API打码服务 elif captcha_method in ["yescaptcha", "capmonster", "ezcaptcha", "capsolver"]: return await self._get_api_captcha_token(captcha_method, project_id) else: debug_logger.log_error(f"[reCAPTCHA] Unknown captcha method: {captcha_method}") return None async def _get_api_captcha_token(self, method: str, project_id: str) -> Optional[str]: """通用API打码服务""" # 获取配置 if method == "yescaptcha": client_key = config.yescaptcha_api_key base_url = config.yescaptcha_base_url task_type = "RecaptchaV3TaskProxylessM1" elif method == "capmonster": client_key = config.capmonster_api_key base_url = config.capmonster_base_url task_type = "RecaptchaV3EnterpriseTask" elif method == "ezcaptcha": client_key = config.ezcaptcha_api_key base_url = config.ezcaptcha_base_url task_type = "ReCaptchaV3EnterpriseTaskProxyless" elif method == "capsolver": client_key = config.capsolver_api_key base_url = config.capsolver_base_url task_type = "ReCaptchaV3EnterpriseTaskProxyLess" else: debug_logger.log_error(f"[reCAPTCHA] Unknown API method: {method}") return None if not client_key: debug_logger.log_info(f"[reCAPTCHA] {method} API key not configured, skipping") return None website_key = "6LdsFiUsAAAAAIjVDZcuLhaHiDn5nnHVXVRQGeMV" website_url = f"https://labs.google/fx/tools/flow/project/{project_id}" page_action = "FLOW_GENERATION" try: async with AsyncSession() as session: create_url = f"{base_url}/createTask" create_data = { "clientKey": client_key, "softID": "42780", "task": { "websiteURL": website_url, "websiteKey": website_key, "type": task_type, "pageAction": page_action } } result = await session.post(create_url, json=create_data, impersonate="chrome110") result_json = result.json() task_id = result_json.get('taskId') debug_logger.log_info(f"[reCAPTCHA {method}] created task_id: {task_id}") if not task_id: error_desc = result_json.get('errorDescription', 'Unknown error') debug_logger.log_error(f"[reCAPTCHA {method}] Failed to create task: {error_desc}") return None get_url = f"{base_url}/getTaskResult" for i in range(40): get_data = { "clientKey": client_key, "taskId": task_id } result = await session.post(get_url, json=get_data, impersonate="chrome110") result_json = result.json() debug_logger.log_info(f"[reCAPTCHA {method}] polling #{i+1}: {result_json}") status = result_json.get('status') if status == 'ready': solution = result_json.get('solution', {}) response = solution.get('gRecaptchaResponse') if response: debug_logger.log_info(f"[reCAPTCHA {method}] Token获取成功") return response time.sleep(3) debug_logger.log_error(f"[reCAPTCHA {method}] Timeout waiting for token") return None except Exception as e: debug_logger.log_error(f"[reCAPTCHA {method}] error: {str(e)}") return None