flow2api / src /services /flow_client.py
genz27
refactor: 将有头浏览器打码从 patchright 改为 playwright
260483d
"""Flow API Client for VideoFX (Veo)"""
import asyncio
import time
import uuid
import random
import base64
from typing import Dict, Any, Optional, List
from curl_cffi.requests import AsyncSession
from ..core.logger import debug_logger
from ..core.config import config
class FlowClient:
"""VideoFX API客户端"""
def __init__(self, proxy_manager, db=None):
self.proxy_manager = proxy_manager
self.db = db # Database instance for captcha config
self.labs_base_url = config.flow_labs_base_url # https://labs.google/fx/api
self.api_base_url = config.flow_api_base_url # https://aisandbox-pa.googleapis.com/v1
self.timeout = config.flow_timeout
# 缓存每个账号的 User-Agent
self._user_agent_cache = {}
# Default "real browser" headers (Android Chrome style) to reduce upstream 4xx/5xx instability.
# These will be applied as defaults (won't override caller-provided headers).
self._default_client_headers = {
"sec-ch-ua-mobile": "?1",
"sec-ch-ua-platform": "\"Android\"",
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "cross-site",
"x-browser-channel": "stable",
"x-browser-copyright": "Copyright 2026 Google LLC. All Rights reserved.",
"x-browser-validation": "UujAs0GAwdnCJ9nvrswZ+O+oco0=",
"x-browser-year": "2026",
"x-client-data": "CJS2yQEIpLbJAQipncoBCNj9ygEIlKHLAQiFoM0BGP6lzwE="
}
def _generate_user_agent(self, account_id: str = None) -> str:
"""基于账号ID生成固定的 User-Agent
Args:
account_id: 账号标识(如 email 或 token_id),相同账号返回相同 UA
Returns:
User-Agent 字符串
"""
# 如果没有提供账号ID,生成随机UA
if not account_id:
account_id = f"random_{random.randint(1, 999999)}"
# 如果已缓存,直接返回
if account_id in self._user_agent_cache:
return self._user_agent_cache[account_id]
# 使用账号ID作为随机种子,确保同一账号生成相同的UA
import hashlib
seed = int(hashlib.md5(account_id.encode()).hexdigest()[:8], 16)
rng = random.Random(seed)
# Chrome 版本池
chrome_versions = ["130.0.0.0", "131.0.0.0", "132.0.0.0", "129.0.0.0"]
# Firefox 版本池
firefox_versions = ["133.0", "132.0", "131.0", "134.0"]
# Safari 版本池
safari_versions = ["18.2", "18.1", "18.0", "17.6"]
# Edge 版本池
edge_versions = ["130.0.0.0", "131.0.0.0", "132.0.0.0"]
# 操作系统配置
os_configs = [
# Windows
{
"platform": "Windows NT 10.0; Win64; x64",
"browsers": [
lambda r: f"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{r.choice(chrome_versions)} Safari/537.36",
lambda r: f"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:{r.choice(firefox_versions).split('.')[0]}.0) Gecko/20100101 Firefox/{r.choice(firefox_versions)}",
lambda r: f"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{r.choice(chrome_versions)} Safari/537.36 Edg/{r.choice(edge_versions)}",
]
},
# macOS
{
"platform": "Macintosh; Intel Mac OS X 10_15_7",
"browsers": [
lambda r: f"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{r.choice(chrome_versions)} Safari/537.36",
lambda r: f"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/{r.choice(safari_versions)} Safari/605.1.15",
lambda r: f"Mozilla/5.0 (Macintosh; Intel Mac OS X 14.{r.randint(0, 7)}; rv:{r.choice(firefox_versions).split('.')[0]}.0) Gecko/20100101 Firefox/{r.choice(firefox_versions)}",
]
},
# Linux
{
"platform": "X11; Linux x86_64",
"browsers": [
lambda r: f"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{r.choice(chrome_versions)} Safari/537.36",
lambda r: f"Mozilla/5.0 (X11; Linux x86_64; rv:{r.choice(firefox_versions).split('.')[0]}.0) Gecko/20100101 Firefox/{r.choice(firefox_versions)}",
lambda r: f"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:{r.choice(firefox_versions).split('.')[0]}.0) Gecko/20100101 Firefox/{r.choice(firefox_versions)}",
]
}
]
# 使用固定种子随机选择操作系统和浏览器
os_config = rng.choice(os_configs)
browser_generator = rng.choice(os_config["browsers"])
user_agent = browser_generator(rng)
# 缓存结果
self._user_agent_cache[account_id] = user_agent
return user_agent
async def _make_request(
self,
method: str,
url: str,
headers: Optional[Dict] = None,
json_data: Optional[Dict] = None,
use_st: bool = False,
st_token: Optional[str] = None,
use_at: bool = False,
at_token: Optional[str] = None,
timeout: Optional[int] = None
) -> Dict[str, Any]:
"""统一HTTP请求处理
Args:
method: HTTP方法 (GET/POST)
url: 完整URL
headers: 请求头
json_data: JSON请求体
use_st: 是否使用ST认证 (Cookie方式)
st_token: Session Token
use_at: 是否使用AT认证 (Bearer方式)
at_token: Access Token
timeout: 自定义超时时间(秒),不传则使用默认值
"""
proxy_url = await self.proxy_manager.get_proxy_url()
request_timeout = timeout or self.timeout
if headers is None:
headers = {}
# ST认证 - 使用Cookie
if use_st and st_token:
headers["Cookie"] = f"__Secure-next-auth.session-token={st_token}"
# AT认证 - 使用Bearer
if use_at and at_token:
headers["authorization"] = f"Bearer {at_token}"
# 确定账号标识(优先使用 token 的前16个字符作为标识)
account_id = None
if st_token:
account_id = st_token[:16] # 使用 ST 的前16个字符
elif at_token:
account_id = at_token[:16] # 使用 AT 的前16个字符
# 通用请求头 - 基于账号生成固定的 User-Agent
headers.update({
"Content-Type": "application/json",
"User-Agent": self._generate_user_agent(account_id)
})
# Add default Chromium/Android client headers (do not override explicitly provided values).
for key, value in self._default_client_headers.items():
headers.setdefault(key, value)
# Log request
if config.debug_enabled:
debug_logger.log_request(
method=method,
url=url,
headers=headers,
body=json_data,
proxy=proxy_url
)
start_time = time.time()
try:
async with AsyncSession() as session:
if method.upper() == "GET":
response = await session.get(
url,
headers=headers,
proxy=proxy_url,
timeout=request_timeout,
impersonate="chrome110"
)
else: # POST
response = await session.post(
url,
headers=headers,
json=json_data,
proxy=proxy_url,
timeout=request_timeout,
impersonate="chrome110"
)
duration_ms = (time.time() - start_time) * 1000
# Log response
if config.debug_enabled:
debug_logger.log_response(
status_code=response.status_code,
headers=dict(response.headers),
body=response.text,
duration_ms=duration_ms
)
# 检查HTTP错误
if response.status_code >= 400:
# 解析错误响应
error_reason = f"HTTP Error {response.status_code}"
try:
error_body = response.json()
# 提取 Google API 错误格式中的 reason
if "error" in error_body:
error_info = error_body["error"]
error_message = error_info.get("message", "")
# 从 details 中提取 reason
details = error_info.get("details", [])
for detail in details:
if detail.get("reason"):
error_reason = detail.get("reason")
break
if error_message:
error_reason = f"{error_reason}: {error_message}"
except:
error_reason = f"HTTP Error {response.status_code}: {response.text[:200]}"
# 失败时输出请求体和错误内容到控制台
debug_logger.log_error(f"[API FAILED] URL: {url}")
debug_logger.log_error(f"[API FAILED] Request Body: {json_data}")
debug_logger.log_error(f"[API FAILED] Response: {response.text}")
raise Exception(error_reason)
return response.json()
except Exception as e:
duration_ms = (time.time() - start_time) * 1000
error_msg = str(e)
# 如果不是我们自己抛出的异常,记录日志
if "HTTP Error" not in error_msg and not any(x in error_msg for x in ["PUBLIC_ERROR", "INVALID_ARGUMENT"]):
debug_logger.log_error(f"[API FAILED] URL: {url}")
debug_logger.log_error(f"[API FAILED] Request Body: {json_data}")
debug_logger.log_error(f"[API FAILED] Exception: {error_msg}")
raise Exception(f"Flow API request failed: {error_msg}")
# ========== 认证相关 (使用ST) ==========
async def st_to_at(self, st: str) -> dict:
"""ST转AT
Args:
st: Session Token
Returns:
{
"access_token": "AT",
"expires": "2025-11-15T04:46:04.000Z",
"user": {...}
}
"""
url = f"{self.labs_base_url}/auth/session"
result = await self._make_request(
method="GET",
url=url,
use_st=True,
st_token=st
)
return result
# ========== 项目管理 (使用ST) ==========
async def create_project(self, st: str, title: str) -> str:
"""创建项目,返回project_id
Args:
st: Session Token
title: 项目标题
Returns:
project_id (UUID)
"""
url = f"{self.labs_base_url}/trpc/project.createProject"
json_data = {
"json": {
"projectTitle": title,
"toolName": "PINHOLE"
}
}
result = await self._make_request(
method="POST",
url=url,
json_data=json_data,
use_st=True,
st_token=st
)
# 解析返回的project_id
project_id = result["result"]["data"]["json"]["result"]["projectId"]
return project_id
async def delete_project(self, st: str, project_id: str):
"""删除项目
Args:
st: Session Token
project_id: 项目ID
"""
url = f"{self.labs_base_url}/trpc/project.deleteProject"
json_data = {
"json": {
"projectToDeleteId": project_id
}
}
await self._make_request(
method="POST",
url=url,
json_data=json_data,
use_st=True,
st_token=st
)
# ========== 余额查询 (使用AT) ==========
async def get_credits(self, at: str) -> dict:
"""查询余额
Args:
at: Access Token
Returns:
{
"credits": 920,
"userPaygateTier": "PAYGATE_TIER_ONE"
}
"""
url = f"{self.api_base_url}/credits"
result = await self._make_request(
method="GET",
url=url,
use_at=True,
at_token=at
)
return result
# ========== 图片上传 (使用AT) ==========
def _detect_image_mime_type(self, image_bytes: bytes) -> str:
"""通过文件头 magic bytes 检测图片 MIME 类型
Args:
image_bytes: 图片字节数据
Returns:
MIME 类型字符串,默认 image/jpeg
"""
if len(image_bytes) < 12:
return "image/jpeg"
# WebP: RIFF....WEBP
if image_bytes[:4] == b'RIFF' and image_bytes[8:12] == b'WEBP':
return "image/webp"
# PNG: 89 50 4E 47
if image_bytes[:4] == b'\x89PNG':
return "image/png"
# JPEG: FF D8 FF
if image_bytes[:3] == b'\xff\xd8\xff':
return "image/jpeg"
# GIF: GIF87a 或 GIF89a
if image_bytes[:6] in (b'GIF87a', b'GIF89a'):
return "image/gif"
# BMP: BM
if image_bytes[:2] == b'BM':
return "image/bmp"
# JPEG 2000: 00 00 00 0C 6A 50
if image_bytes[:6] == b'\x00\x00\x00\x0cjP':
return "image/jp2"
return "image/jpeg"
def _convert_to_jpeg(self, image_bytes: bytes) -> bytes:
"""将图片转换为 JPEG 格式
Args:
image_bytes: 原始图片字节数据
Returns:
JPEG 格式的图片字节数据
"""
from io import BytesIO
from PIL import Image
img = Image.open(BytesIO(image_bytes))
# 如果有透明通道,转换为 RGB
if img.mode in ('RGBA', 'LA', 'P'):
img = img.convert('RGB')
output = BytesIO()
img.save(output, format='JPEG', quality=95)
return output.getvalue()
async def upload_image(
self,
at: str,
image_bytes: bytes,
aspect_ratio: str = "IMAGE_ASPECT_RATIO_LANDSCAPE"
) -> str:
"""上传图片,返回mediaGenerationId
Args:
at: Access Token
image_bytes: 图片字节数据
aspect_ratio: 图片或视频宽高比(会自动转换为图片格式)
Returns:
mediaGenerationId (CAM...)
"""
# 转换视频aspect_ratio为图片aspect_ratio
# VIDEO_ASPECT_RATIO_LANDSCAPE -> IMAGE_ASPECT_RATIO_LANDSCAPE
# VIDEO_ASPECT_RATIO_PORTRAIT -> IMAGE_ASPECT_RATIO_PORTRAIT
if aspect_ratio.startswith("VIDEO_"):
aspect_ratio = aspect_ratio.replace("VIDEO_", "IMAGE_")
# 自动检测图片 MIME 类型
mime_type = self._detect_image_mime_type(image_bytes)
# 编码为base64 (去掉前缀)
image_base64 = base64.b64encode(image_bytes).decode('utf-8')
url = f"{self.api_base_url}:uploadUserImage"
json_data = {
"imageInput": {
"rawImageBytes": image_base64,
"mimeType": mime_type,
"isUserUploaded": True,
"aspectRatio": aspect_ratio
},
"clientContext": {
"sessionId": self._generate_session_id(),
"tool": "ASSET_MANAGER"
}
}
result = await self._make_request(
method="POST",
url=url,
json_data=json_data,
use_at=True,
at_token=at
)
# 返回mediaGenerationId
media_id = result["mediaGenerationId"]["mediaGenerationId"]
return media_id
# ========== 图片生成 (使用AT) - 同步返回 ==========
async def generate_image(
self,
at: str,
project_id: str,
prompt: str,
model_name: str,
aspect_ratio: str,
image_inputs: Optional[List[Dict]] = None
) -> dict:
"""生成图片(同步返回)
Args:
at: Access Token
project_id: 项目ID
prompt: 提示词
model_name: GEM_PIX, GEM_PIX_2 或 IMAGEN_3_5
aspect_ratio: 图片宽高比
image_inputs: 参考图片列表(图生图时使用)
Returns:
{
"media": [{
"image": {
"generatedImage": {
"fifeUrl": "图片URL",
...
}
}
}]
}
"""
url = f"{self.api_base_url}/projects/{project_id}/flowMedia:batchGenerateImages"
# 403/reCAPTCHA 重试逻辑 - 最多重试3次
max_retries = 3
last_error = None
for retry_attempt in range(max_retries):
# 每次重试都重新获取 reCAPTCHA token
recaptcha_token, browser_id = await self._get_recaptcha_token(project_id, action="IMAGE_GENERATION")
if not recaptcha_token:
raise Exception("Failed to obtain reCAPTCHA token")
session_id = self._generate_session_id()
# 构建请求 - clientContext 只在外层,requests 内不重复
client_context = {
"recaptchaContext": {
"token": recaptcha_token,
"applicationType": "RECAPTCHA_APPLICATION_TYPE_WEB"
},
"sessionId": session_id,
"projectId": project_id,
"tool": "PINHOLE"
}
request_data = {
"seed": random.randint(1, 99999),
"imageModelName": model_name,
"imageAspectRatio": aspect_ratio,
"prompt": prompt,
"imageInputs": image_inputs or []
}
json_data = {
"clientContext": client_context,
"requests": [request_data]
}
try:
result = await self._make_request(
method="POST",
url=url,
json_data=json_data,
use_at=True,
at_token=at
)
return result
except Exception as e:
error_str = str(e)
last_error = e
retry_reason = self._get_retry_reason(error_str)
if retry_reason and retry_attempt < max_retries - 1:
debug_logger.log_warning(f"[IMAGE] 生成遇到{retry_reason},正在重新获取验证码重试 ({retry_attempt + 2}/{max_retries})...")
await self._notify_browser_captcha_error(browser_id)
await asyncio.sleep(1)
continue
else:
raise e
# 所有重试都失败
raise last_error
async def upsample_image(
self,
at: str,
project_id: str,
media_id: str,
target_resolution: str = "UPSAMPLE_IMAGE_RESOLUTION_4K"
) -> str:
"""放大图片到 2K/4K
Args:
at: Access Token
project_id: 项目ID
media_id: 图片的 mediaId (从 batchGenerateImages 返回的 media[0]["name"])
target_resolution: UPSAMPLE_IMAGE_RESOLUTION_2K 或 UPSAMPLE_IMAGE_RESOLUTION_4K
Returns:
base64 编码的图片数据
"""
url = f"{self.api_base_url}/flow/upsampleImage"
# 获取 reCAPTCHA token - 使用 IMAGE_GENERATION action
recaptcha_token, _ = await self._get_recaptcha_token(project_id, action="IMAGE_GENERATION")
if not recaptcha_token:
raise Exception("Failed to obtain reCAPTCHA token")
session_id = self._generate_session_id()
json_data = {
"mediaId": media_id,
"targetResolution": target_resolution,
"clientContext": {
"recaptchaContext": {
"token": recaptcha_token,
"applicationType": "RECAPTCHA_APPLICATION_TYPE_WEB"
},
"sessionId": session_id,
"projectId": project_id,
"tool": "PINHOLE"
}
}
# 4K/2K 放大使用专用超时,因为返回的 base64 数据量很大
result = await self._make_request(
method="POST",
url=url,
json_data=json_data,
use_at=True,
at_token=at,
timeout=config.upsample_timeout
)
# 返回 base64 编码的图片
return result.get("encodedImage", "")
# ========== 视频生成 (使用AT) - 异步返回 ==========
async def generate_video_text(
self,
at: str,
project_id: str,
prompt: str,
model_key: str,
aspect_ratio: str,
user_paygate_tier: str = "PAYGATE_TIER_ONE"
) -> dict:
"""文生视频,返回task_id
Args:
at: Access Token
project_id: 项目ID
prompt: 提示词
model_key: veo_3_1_t2v_fast 等
aspect_ratio: 视频宽高比
user_paygate_tier: 用户等级
Returns:
{
"operations": [{
"operation": {"name": "task_id"},
"sceneId": "uuid",
"status": "MEDIA_GENERATION_STATUS_PENDING"
}],
"remainingCredits": 900
}
"""
url = f"{self.api_base_url}/video:batchAsyncGenerateVideoText"
# 403/reCAPTCHA 重试逻辑 - 最多重试3次
max_retries = 3
last_error = None
for retry_attempt in range(max_retries):
# 每次重试都重新获取 reCAPTCHA token - 视频使用 VIDEO_GENERATION action
recaptcha_token, browser_id = await self._get_recaptcha_token(project_id, action="VIDEO_GENERATION")
if not recaptcha_token:
raise Exception("Failed to obtain reCAPTCHA token")
session_id = self._generate_session_id()
scene_id = str(uuid.uuid4())
json_data = {
"clientContext": {
"recaptchaContext": {
"token": recaptcha_token,
"applicationType": "RECAPTCHA_APPLICATION_TYPE_WEB"
},
"sessionId": session_id,
"projectId": project_id,
"tool": "PINHOLE",
"userPaygateTier": user_paygate_tier
},
"requests": [{
"aspectRatio": aspect_ratio,
"seed": random.randint(1, 99999),
"textInput": {
"prompt": prompt
},
"videoModelKey": model_key,
"metadata": {
"sceneId": scene_id
}
}]
}
try:
result = await self._make_request(
method="POST",
url=url,
json_data=json_data,
use_at=True,
at_token=at
)
return result
except Exception as e:
error_str = str(e)
last_error = e
retry_reason = self._get_retry_reason(error_str)
if retry_reason and retry_attempt < max_retries - 1:
debug_logger.log_warning(f"[VIDEO T2V] 生成遇到{retry_reason},正在重新获取验证码重试 ({retry_attempt + 2}/{max_retries})...")
await self._notify_browser_captcha_error(browser_id)
await asyncio.sleep(1)
continue
else:
raise e
# 所有重试都失败
raise last_error
async def generate_video_reference_images(
self,
at: str,
project_id: str,
prompt: str,
model_key: str,
aspect_ratio: str,
reference_images: List[Dict],
user_paygate_tier: str = "PAYGATE_TIER_ONE"
) -> dict:
"""图生视频,返回task_id
Args:
at: Access Token
project_id: 项目ID
prompt: 提示词
model_key: veo_3_0_r2v_fast
aspect_ratio: 视频宽高比
reference_images: 参考图片列表 [{"imageUsageType": "IMAGE_USAGE_TYPE_ASSET", "mediaId": "..."}]
user_paygate_tier: 用户等级
Returns:
同 generate_video_text
"""
url = f"{self.api_base_url}/video:batchAsyncGenerateVideoReferenceImages"
# 403/reCAPTCHA 重试逻辑 - 最多重试3次
max_retries = 3
last_error = None
for retry_attempt in range(max_retries):
# 每次重试都重新获取 reCAPTCHA token - 视频使用 VIDEO_GENERATION action
recaptcha_token, browser_id = await self._get_recaptcha_token(project_id, action="VIDEO_GENERATION")
if not recaptcha_token:
raise Exception("Failed to obtain reCAPTCHA token")
session_id = self._generate_session_id()
scene_id = str(uuid.uuid4())
json_data = {
"clientContext": {
"recaptchaContext": {
"token": recaptcha_token,
"applicationType": "RECAPTCHA_APPLICATION_TYPE_WEB"
},
"sessionId": session_id,
"projectId": project_id,
"tool": "PINHOLE",
"userPaygateTier": user_paygate_tier
},
"requests": [{
"aspectRatio": aspect_ratio,
"seed": random.randint(1, 99999),
"textInput": {
"prompt": prompt
},
"videoModelKey": model_key,
"referenceImages": reference_images,
"metadata": {
"sceneId": scene_id
}
}]
}
try:
result = await self._make_request(
method="POST",
url=url,
json_data=json_data,
use_at=True,
at_token=at
)
return result
except Exception as e:
error_str = str(e)
last_error = e
retry_reason = self._get_retry_reason(error_str)
if retry_reason and retry_attempt < max_retries - 1:
debug_logger.log_warning(f"[VIDEO R2V] 生成遇到{retry_reason},正在重新获取验证码重试 ({retry_attempt + 2}/{max_retries})...")
await self._notify_browser_captcha_error(browser_id)
await asyncio.sleep(1)
continue
else:
raise e
# 所有重试都失败
raise last_error
async def generate_video_start_end(
self,
at: str,
project_id: str,
prompt: str,
model_key: str,
aspect_ratio: str,
start_media_id: str,
end_media_id: str,
user_paygate_tier: str = "PAYGATE_TIER_ONE"
) -> dict:
"""收尾帧生成视频,返回task_id
Args:
at: Access Token
project_id: 项目ID
prompt: 提示词
model_key: veo_3_1_i2v_s_fast_fl
aspect_ratio: 视频宽高比
start_media_id: 起始帧mediaId
end_media_id: 结束帧mediaId
user_paygate_tier: 用户等级
Returns:
同 generate_video_text
"""
url = f"{self.api_base_url}/video:batchAsyncGenerateVideoStartAndEndImage"
# 403/reCAPTCHA 重试逻辑 - 最多重试3次
max_retries = 3
last_error = None
for retry_attempt in range(max_retries):
# 每次重试都重新获取 reCAPTCHA token - 视频使用 VIDEO_GENERATION action
recaptcha_token, browser_id = await self._get_recaptcha_token(project_id, action="VIDEO_GENERATION")
if not recaptcha_token:
raise Exception("Failed to obtain reCAPTCHA token")
session_id = self._generate_session_id()
scene_id = str(uuid.uuid4())
json_data = {
"clientContext": {
"recaptchaContext": {
"token": recaptcha_token,
"applicationType": "RECAPTCHA_APPLICATION_TYPE_WEB"
},
"sessionId": session_id,
"projectId": project_id,
"tool": "PINHOLE",
"userPaygateTier": user_paygate_tier
},
"requests": [{
"aspectRatio": aspect_ratio,
"seed": random.randint(1, 99999),
"textInput": {
"prompt": prompt
},
"videoModelKey": model_key,
"startImage": {
"mediaId": start_media_id
},
"endImage": {
"mediaId": end_media_id
},
"metadata": {
"sceneId": scene_id
}
}]
}
try:
result = await self._make_request(
method="POST",
url=url,
json_data=json_data,
use_at=True,
at_token=at
)
return result
except Exception as e:
error_str = str(e)
last_error = e
retry_reason = self._get_retry_reason(error_str)
if retry_reason and retry_attempt < max_retries - 1:
debug_logger.log_warning(f"[VIDEO I2V] 首尾帧生成遇到{retry_reason},正在重新获取验证码重试 ({retry_attempt + 2}/{max_retries})...")
await self._notify_browser_captcha_error(browser_id)
await asyncio.sleep(1)
continue
else:
raise e
# 所有重试都失败
raise last_error
async def generate_video_start_image(
self,
at: str,
project_id: str,
prompt: str,
model_key: str,
aspect_ratio: str,
start_media_id: str,
user_paygate_tier: str = "PAYGATE_TIER_ONE"
) -> dict:
"""仅首帧生成视频,返回task_id
Args:
at: Access Token
project_id: 项目ID
prompt: 提示词
model_key: veo_3_1_i2v_s_fast_fl等
aspect_ratio: 视频宽高比
start_media_id: 起始帧mediaId
user_paygate_tier: 用户等级
Returns:
同 generate_video_text
"""
url = f"{self.api_base_url}/video:batchAsyncGenerateVideoStartImage"
# 403/reCAPTCHA 重试逻辑 - 最多重试3次
max_retries = 3
last_error = None
for retry_attempt in range(max_retries):
# 每次重试都重新获取 reCAPTCHA token - 视频使用 VIDEO_GENERATION action
recaptcha_token, browser_id = await self._get_recaptcha_token(project_id, action="VIDEO_GENERATION")
if not recaptcha_token:
raise Exception("Failed to obtain reCAPTCHA token")
session_id = self._generate_session_id()
scene_id = str(uuid.uuid4())
json_data = {
"clientContext": {
"recaptchaContext": {
"token": recaptcha_token,
"applicationType": "RECAPTCHA_APPLICATION_TYPE_WEB"
},
"sessionId": session_id,
"projectId": project_id,
"tool": "PINHOLE",
"userPaygateTier": user_paygate_tier
},
"requests": [{
"aspectRatio": aspect_ratio,
"seed": random.randint(1, 99999),
"textInput": {
"prompt": prompt
},
"videoModelKey": model_key,
"startImage": {
"mediaId": start_media_id
},
# 注意: 没有endImage字段,只用首帧
"metadata": {
"sceneId": scene_id
}
}]
}
try:
result = await self._make_request(
method="POST",
url=url,
json_data=json_data,
use_at=True,
at_token=at
)
return result
except Exception as e:
error_str = str(e)
last_error = e
retry_reason = self._get_retry_reason(error_str)
if retry_reason and retry_attempt < max_retries - 1:
debug_logger.log_warning(f"[VIDEO I2V] 首帧生成遇到{retry_reason},正在重新获取验证码重试 ({retry_attempt + 2}/{max_retries})...")
await self._notify_browser_captcha_error(browser_id)
await asyncio.sleep(1)
continue
else:
raise e
# 所有重试都失败
raise last_error
# ========== 视频放大 (Video Upsampler) ==========
async def upsample_video(
self,
at: str,
project_id: str,
video_media_id: str,
aspect_ratio: str,
resolution: str,
model_key: str
) -> dict:
"""视频放大到 4K/1080P,返回 task_id
Args:
at: Access Token
project_id: 项目ID
video_media_id: 视频的 mediaId
aspect_ratio: 视频宽高比 VIDEO_ASPECT_RATIO_PORTRAIT/LANDSCAPE
resolution: VIDEO_RESOLUTION_4K 或 VIDEO_RESOLUTION_1080P
model_key: veo_3_1_upsampler_4k 或 veo_3_1_upsampler_1080p
Returns:
同 generate_video_text
"""
url = f"{self.api_base_url}/video:batchAsyncGenerateVideoUpsampleVideo"
# 403/reCAPTCHA 重试逻辑 - 最多重试3次
max_retries = 3
last_error = None
for retry_attempt in range(max_retries):
recaptcha_token, browser_id = await self._get_recaptcha_token(project_id, action="VIDEO_GENERATION")
if not recaptcha_token:
raise Exception("Failed to obtain reCAPTCHA token")
session_id = self._generate_session_id()
scene_id = str(uuid.uuid4())
json_data = {
"requests": [{
"aspectRatio": aspect_ratio,
"resolution": resolution,
"seed": random.randint(1, 99999),
"videoInput": {
"mediaId": video_media_id
},
"videoModelKey": model_key,
"metadata": {
"sceneId": scene_id
}
}],
"clientContext": {
"recaptchaContext": {
"token": recaptcha_token,
"applicationType": "RECAPTCHA_APPLICATION_TYPE_WEB"
},
"sessionId": session_id
}
}
try:
result = await self._make_request(
method="POST",
url=url,
json_data=json_data,
use_at=True,
at_token=at
)
return result
except Exception as e:
error_str = str(e)
last_error = e
retry_reason = self._get_retry_reason(error_str)
if retry_reason and retry_attempt < max_retries - 1:
debug_logger.log_warning(f"[VIDEO UPSAMPLE] 放大遇到{retry_reason},正在重新获取验证码重试 ({retry_attempt + 2}/{max_retries})...")
await self._notify_browser_captcha_error(browser_id)
await asyncio.sleep(1)
continue
else:
raise e
raise last_error
# ========== 任务轮询 (使用AT) ==========
async def check_video_status(self, at: str, operations: List[Dict]) -> dict:
"""查询视频生成状态
Args:
at: Access Token
operations: 操作列表 [{"operation": {"name": "task_id"}, "sceneId": "...", "status": "..."}]
Returns:
{
"operations": [{
"operation": {
"name": "task_id",
"metadata": {...} # 完成时包含视频信息
},
"status": "MEDIA_GENERATION_STATUS_SUCCESSFUL"
}]
}
"""
url = f"{self.api_base_url}/video:batchCheckAsyncVideoGenerationStatus"
json_data = {
"operations": operations
}
result = await self._make_request(
method="POST",
url=url,
json_data=json_data,
use_at=True,
at_token=at
)
return result
# ========== 媒体删除 (使用ST) ==========
async def delete_media(self, st: str, media_names: List[str]):
"""删除媒体
Args:
st: Session Token
media_names: 媒体ID列表
"""
url = f"{self.labs_base_url}/trpc/media.deleteMedia"
json_data = {
"json": {
"names": media_names
}
}
await self._make_request(
method="POST",
url=url,
json_data=json_data,
use_st=True,
st_token=st
)
# ========== 辅助方法 ==========
def _get_retry_reason(self, error_str: str) -> Optional[str]:
"""判断是否需要重试,返回日志提示内容"""
error_lower = error_str.lower()
if "403" in error_lower:
return "403错误"
if "recaptcha evaluation failed" in error_lower:
return "reCAPTCHA 验证失败"
if "recaptcha" in error_lower:
return "reCAPTCHA 错误"
return None
async def _notify_browser_captcha_error(self, browser_id: int = None):
"""通知有头浏览器打码切换指纹(仅当使用 browser 打码方式时)
Args:
browser_id: 要标记为 bad 的浏览器 ID
"""
if config.captcha_method == "browser":
try:
from .browser_captcha import BrowserCaptchaService
service = await BrowserCaptchaService.get_instance(self.db)
await service.report_error(browser_id)
except Exception:
pass
def _generate_session_id(self) -> str:
"""生成sessionId: ;timestamp"""
return f";{int(time.time() * 1000)}"
def _generate_scene_id(self) -> str:
"""生成sceneId: UUID"""
return str(uuid.uuid4())
async def _get_recaptcha_token(self, project_id: str, action: str = "IMAGE_GENERATION") -> tuple[Optional[str], Optional[int]]:
"""获取reCAPTCHA token - 支持多种打码方式
Args:
project_id: 项目ID
action: reCAPTCHA action类型
- IMAGE_GENERATION: 图片生成和2K/4K图片放大 (默认)
- VIDEO_GENERATION: 视频生成和视频放大
Returns:
(token, browser_id) 元组,browser_id 用于失败时调用 report_error
对于非 browser 打码方式,browser_id 为 None
"""
captcha_method = config.captcha_method
# 内置浏览器打码 (nodriver)
if captcha_method == "personal":
try:
from .browser_captcha_personal import BrowserCaptchaService
service = await BrowserCaptchaService.get_instance(self.db)
return await service.get_token(project_id, action), None
except RuntimeError as e:
# 捕获 Docker 环境或依赖缺失的明确错误
error_msg = str(e)
debug_logger.log_error(f"[reCAPTCHA Personal] {error_msg}")
print(f"[reCAPTCHA] ❌ 内置浏览器打码失败: {error_msg}")
return None, None
except ImportError as e:
debug_logger.log_error(f"[reCAPTCHA Personal] 导入失败: {str(e)}")
print(f"[reCAPTCHA] ❌ nodriver 未安装,请运行: pip install nodriver")
return None, None
except Exception as e:
debug_logger.log_error(f"[reCAPTCHA Personal] 错误: {str(e)}")
return None, None
# 有头浏览器打码 (playwright)
elif captcha_method == "browser":
try:
from .browser_captcha import BrowserCaptchaService
service = await BrowserCaptchaService.get_instance(self.db)
return await service.get_token(project_id, action)
except RuntimeError as e:
# 捕获 Docker 环境或依赖缺失的明确错误
error_msg = str(e)
debug_logger.log_error(f"[reCAPTCHA Browser] {error_msg}")
print(f"[reCAPTCHA] ❌ 有头浏览器打码失败: {error_msg}")
return None, None
except ImportError as e:
debug_logger.log_error(f"[reCAPTCHA Browser] 导入失败: {str(e)}")
print(f"[reCAPTCHA] ❌ playwright 未安装,请运行: pip install playwright && python -m playwright install chromium")
return None, None
except Exception as e:
debug_logger.log_error(f"[reCAPTCHA Browser] 错误: {str(e)}")
return None, None
# API打码服务
elif captcha_method in ["yescaptcha", "capmonster", "ezcaptcha", "capsolver"]:
token = await self._get_api_captcha_token(captcha_method, project_id, action)
return token, None
else:
debug_logger.log_info(f"[reCAPTCHA] 未知的打码方式: {captcha_method}")
return None, None
async def _get_api_captcha_token(self, method: str, project_id: str, action: str = "IMAGE_GENERATION") -> Optional[str]:
"""通用API打码服务
Args:
method: 打码服务类型
project_id: 项目ID
action: reCAPTCHA action类型 (IMAGE_GENERATION 或 VIDEO_GENERATION)
"""
# 获取配置
if method == "yescaptcha":
client_key = config.yescaptcha_api_key
base_url = config.yescaptcha_base_url
task_type = "RecaptchaV3TaskProxylessM1"
elif method == "capmonster":
client_key = config.capmonster_api_key
base_url = config.capmonster_base_url
task_type = "RecaptchaV3TaskProxyless"
elif method == "ezcaptcha":
client_key = config.ezcaptcha_api_key
base_url = config.ezcaptcha_base_url
task_type = "ReCaptchaV3TaskProxylessS9"
elif method == "capsolver":
client_key = config.capsolver_api_key
base_url = config.capsolver_base_url
task_type = "ReCaptchaV3EnterpriseTaskProxyLess"
else:
debug_logger.log_error(f"[reCAPTCHA] Unknown API method: {method}")
return None
if not client_key:
debug_logger.log_info(f"[reCAPTCHA] {method} API key not configured, skipping")
return None
website_key = "6LdsFiUsAAAAAIjVDZcuLhaHiDn5nnHVXVRQGeMV"
website_url = f"https://labs.google/fx/tools/flow/project/{project_id}"
page_action = action
try:
async with AsyncSession() as session:
create_url = f"{base_url}/createTask"
create_data = {
"clientKey": client_key,
"task": {
"websiteURL": website_url,
"websiteKey": website_key,
"type": task_type,
"pageAction": page_action
}
}
result = await session.post(create_url, json=create_data, impersonate="chrome110")
result_json = result.json()
task_id = result_json.get('taskId')
debug_logger.log_info(f"[reCAPTCHA {method}] created task_id: {task_id}")
if not task_id:
error_desc = result_json.get('errorDescription', 'Unknown error')
debug_logger.log_error(f"[reCAPTCHA {method}] Failed to create task: {error_desc}")
return None
get_url = f"{base_url}/getTaskResult"
for i in range(40):
get_data = {
"clientKey": client_key,
"taskId": task_id
}
result = await session.post(get_url, json=get_data, impersonate="chrome110")
result_json = result.json()
debug_logger.log_info(f"[reCAPTCHA {method}] polling #{i+1}: {result_json}")
status = result_json.get('status')
if status == 'ready':
solution = result_json.get('solution', {})
response = solution.get('gRecaptchaResponse')
if response:
debug_logger.log_info(f"[reCAPTCHA {method}] Token获取成功")
return response
time.sleep(3)
debug_logger.log_error(f"[reCAPTCHA {method}] Timeout waiting for token")
return None
except Exception as e:
debug_logger.log_error(f"[reCAPTCHA {method}] error: {str(e)}")
return None