| """Generation handler for Flow2API""" |
| import asyncio |
| import base64 |
| import json |
| import time |
| from typing import Optional, AsyncGenerator, List, Dict, Any |
| from ..core.logger import debug_logger |
| from ..core.config import config |
| from ..core.models import Task, RequestLog |
| from .file_cache import FileCache |
|
|
|
|
| |
| MODEL_CONFIG = { |
| |
| "gemini-2.5-flash-image-landscape": { |
| "type": "image", |
| "model_name": "GEM_PIX", |
| "aspect_ratio": "IMAGE_ASPECT_RATIO_LANDSCAPE" |
| }, |
| "gemini-2.5-flash-image-portrait": { |
| "type": "image", |
| "model_name": "GEM_PIX", |
| "aspect_ratio": "IMAGE_ASPECT_RATIO_PORTRAIT" |
| }, |
|
|
| |
| "gemini-3.0-pro-image-landscape": { |
| "type": "image", |
| "model_name": "GEM_PIX_2", |
| "aspect_ratio": "IMAGE_ASPECT_RATIO_LANDSCAPE" |
| }, |
| "gemini-3.0-pro-image-portrait": { |
| "type": "image", |
| "model_name": "GEM_PIX_2", |
| "aspect_ratio": "IMAGE_ASPECT_RATIO_PORTRAIT" |
| }, |
|
|
| |
| "imagen-4.0-generate-preview-landscape": { |
| "type": "image", |
| "model_name": "IMAGEN_3_5", |
| "aspect_ratio": "IMAGE_ASPECT_RATIO_LANDSCAPE" |
| }, |
| "imagen-4.0-generate-preview-portrait": { |
| "type": "image", |
| "model_name": "IMAGEN_3_5", |
| "aspect_ratio": "IMAGE_ASPECT_RATIO_PORTRAIT" |
| }, |
|
|
| |
| |
|
|
| |
| |
| "veo_3_1_t2v_fast_portrait": { |
| "type": "video", |
| "video_type": "t2v", |
| "model_key": "veo_3_1_t2v_fast_portrait", |
| "aspect_ratio": "VIDEO_ASPECT_RATIO_PORTRAIT", |
| "supports_images": False |
| }, |
| |
| |
| "veo_3_1_t2v_fast_landscape": { |
| "type": "video", |
| "video_type": "t2v", |
| "model_key": "veo_3_1_t2v_fast", |
| "aspect_ratio": "VIDEO_ASPECT_RATIO_LANDSCAPE", |
| "supports_images": False |
| }, |
|
|
| |
| "veo_2_1_fast_d_15_t2v_portrait": { |
| "type": "video", |
| "video_type": "t2v", |
| "model_key": "veo_2_1_fast_d_15_t2v", |
| "aspect_ratio": "VIDEO_ASPECT_RATIO_PORTRAIT", |
| "supports_images": False |
| }, |
| "veo_2_1_fast_d_15_t2v_landscape": { |
| "type": "video", |
| "video_type": "t2v", |
| "model_key": "veo_2_1_fast_d_15_t2v", |
| "aspect_ratio": "VIDEO_ASPECT_RATIO_LANDSCAPE", |
| "supports_images": False |
| }, |
|
|
| |
| "veo_2_0_t2v_portrait": { |
| "type": "video", |
| "video_type": "t2v", |
| "model_key": "veo_2_0_t2v", |
| "aspect_ratio": "VIDEO_ASPECT_RATIO_PORTRAIT", |
| "supports_images": False |
| }, |
| "veo_2_0_t2v_landscape": { |
| "type": "video", |
| "video_type": "t2v", |
| "model_key": "veo_2_0_t2v", |
| "aspect_ratio": "VIDEO_ASPECT_RATIO_LANDSCAPE", |
| "supports_images": False |
| }, |
|
|
| |
| |
|
|
| |
| "veo_3_1_i2v_s_fast_fl_portrait": { |
| "type": "video", |
| "video_type": "i2v", |
| "model_key": "veo_3_1_i2v_s_fast_portrait_fl_ultra_relaxed", |
| "aspect_ratio": "VIDEO_ASPECT_RATIO_PORTRAIT", |
| "supports_images": True, |
| "min_images": 1, |
| "max_images": 2 |
| }, |
| "veo_3_1_i2v_s_fast_fl_landscape": { |
| "type": "video", |
| "video_type": "i2v", |
| "model_key": "veo_3_1_i2v_s_fast_fl_ultra_relaxed", |
| "aspect_ratio": "VIDEO_ASPECT_RATIO_LANDSCAPE", |
| "supports_images": True, |
| "min_images": 1, |
| "max_images": 2 |
| }, |
|
|
| |
| "veo_2_1_fast_d_15_i2v_portrait": { |
| "type": "video", |
| "video_type": "i2v", |
| "model_key": "veo_2_1_fast_d_15_i2v", |
| "aspect_ratio": "VIDEO_ASPECT_RATIO_PORTRAIT", |
| "supports_images": True, |
| "min_images": 1, |
| "max_images": 2 |
| }, |
| "veo_2_1_fast_d_15_i2v_landscape": { |
| "type": "video", |
| "video_type": "i2v", |
| "model_key": "veo_2_1_fast_d_15_i2v", |
| "aspect_ratio": "VIDEO_ASPECT_RATIO_LANDSCAPE", |
| "supports_images": True, |
| "min_images": 1, |
| "max_images": 2 |
| }, |
|
|
| |
| "veo_2_0_i2v_portrait": { |
| "type": "video", |
| "video_type": "i2v", |
| "model_key": "veo_2_0_i2v", |
| "aspect_ratio": "VIDEO_ASPECT_RATIO_PORTRAIT", |
| "supports_images": True, |
| "min_images": 1, |
| "max_images": 2 |
| }, |
| "veo_2_0_i2v_landscape": { |
| "type": "video", |
| "video_type": "i2v", |
| "model_key": "veo_2_0_i2v", |
| "aspect_ratio": "VIDEO_ASPECT_RATIO_LANDSCAPE", |
| "supports_images": True, |
| "min_images": 1, |
| "max_images": 2 |
| }, |
|
|
| |
| |
|
|
| |
| "veo_3_0_r2v_fast_portrait": { |
| "type": "video", |
| "video_type": "r2v", |
| "model_key": "veo_3_0_r2v_fast_portrait_ultra_relaxed", |
| "aspect_ratio": "VIDEO_ASPECT_RATIO_PORTRAIT", |
| "supports_images": True, |
| "min_images": 0, |
| "max_images": None |
| }, |
| "veo_3_0_r2v_fast_landscape": { |
| "type": "video", |
| "video_type": "r2v", |
| "model_key": "veo_3_0_r2v_fast_ultra_relaxed", |
| "aspect_ratio": "VIDEO_ASPECT_RATIO_LANDSCAPE", |
| "supports_images": True, |
| "min_images": 0, |
| "max_images": None |
| } |
| } |
|
|
|
|
| class GenerationHandler: |
| """统一生成处理器""" |
|
|
| def __init__(self, flow_client, token_manager, load_balancer, db, concurrency_manager, proxy_manager): |
| self.flow_client = flow_client |
| self.token_manager = token_manager |
| self.load_balancer = load_balancer |
| self.db = db |
| self.concurrency_manager = concurrency_manager |
| self.file_cache = FileCache( |
| cache_dir="tmp", |
| default_timeout=config.cache_timeout, |
| proxy_manager=proxy_manager |
| ) |
|
|
| async def check_token_availability(self, is_image: bool, is_video: bool) -> bool: |
| """检查Token可用性 |
| |
| Args: |
| is_image: 是否检查图片生成Token |
| is_video: 是否检查视频生成Token |
| |
| Returns: |
| True表示有可用Token, False表示无可用Token |
| """ |
| token_obj = await self.load_balancer.select_token( |
| for_image_generation=is_image, |
| for_video_generation=is_video |
| ) |
| return token_obj is not None |
|
|
| async def handle_generation( |
| self, |
| model: str, |
| prompt: str, |
| images: Optional[List[bytes]] = None, |
| stream: bool = False |
| ) -> AsyncGenerator: |
| """统一生成入口 |
| |
| Args: |
| model: 模型名称 |
| prompt: 提示词 |
| images: 图片列表 (bytes格式) |
| stream: 是否流式输出 |
| """ |
| start_time = time.time() |
| token = None |
|
|
| |
| if model not in MODEL_CONFIG: |
| error_msg = f"不支持的模型: {model}" |
| debug_logger.log_error(error_msg) |
| yield self._create_error_response(error_msg) |
| return |
|
|
| model_config = MODEL_CONFIG[model] |
| generation_type = model_config["type"] |
| debug_logger.log_info(f"[GENERATION] 开始生成 - 模型: {model}, 类型: {generation_type}, Prompt: {prompt[:50]}...") |
|
|
| |
| if not stream: |
| is_image = (generation_type == "image") |
| is_video = (generation_type == "video") |
| available = await self.check_token_availability(is_image, is_video) |
|
|
| if available: |
| if is_image: |
| message = "所有Token可用于图片生成。请启用流式模式使用生成功能。" |
| else: |
| message = "所有Token可用于视频生成。请启用流式模式使用生成功能。" |
| else: |
| if is_image: |
| message = "没有可用的Token进行图片生成" |
| else: |
| message = "没有可用的Token进行视频生成" |
|
|
| yield self._create_completion_response(message, is_availability_check=True) |
| return |
|
|
| |
| if stream: |
| yield self._create_stream_chunk( |
| f"✨ {'视频' if generation_type == 'video' else '图片'}生成任务已启动\n", |
| role="assistant" |
| ) |
|
|
| |
| debug_logger.log_info(f"[GENERATION] 正在选择可用Token...") |
|
|
| if generation_type == "image": |
| token = await self.load_balancer.select_token(for_image_generation=True, model=model) |
| else: |
| token = await self.load_balancer.select_token(for_video_generation=True, model=model) |
|
|
| if not token: |
| error_msg = self._get_no_token_error_message(generation_type) |
| debug_logger.log_error(f"[GENERATION] {error_msg}") |
| if stream: |
| yield self._create_stream_chunk(f"❌ {error_msg}\n") |
| yield self._create_error_response(error_msg) |
| return |
|
|
| debug_logger.log_info(f"[GENERATION] 已选择Token: {token.id} ({token.email})") |
|
|
| try: |
| |
| debug_logger.log_info(f"[GENERATION] 检查Token AT有效性...") |
| if stream: |
| yield self._create_stream_chunk("初始化生成环境...\n") |
|
|
| if not await self.token_manager.is_at_valid(token.id): |
| error_msg = "Token AT无效或刷新失败" |
| debug_logger.log_error(f"[GENERATION] {error_msg}") |
| if stream: |
| yield self._create_stream_chunk(f"❌ {error_msg}\n") |
| yield self._create_error_response(error_msg) |
| return |
|
|
| |
| token = await self.token_manager.get_token(token.id) |
|
|
| |
| debug_logger.log_info(f"[GENERATION] 检查/创建Project...") |
|
|
| project_id = await self.token_manager.ensure_project_exists(token.id) |
| debug_logger.log_info(f"[GENERATION] Project ID: {project_id}") |
|
|
| |
| if generation_type == "image": |
| debug_logger.log_info(f"[GENERATION] 开始图片生成流程...") |
| async for chunk in self._handle_image_generation( |
| token, project_id, model_config, prompt, images, stream |
| ): |
| yield chunk |
| else: |
| debug_logger.log_info(f"[GENERATION] 开始视频生成流程...") |
| async for chunk in self._handle_video_generation( |
| token, project_id, model_config, prompt, images, stream |
| ): |
| yield chunk |
|
|
| |
| is_video = (generation_type == "video") |
| await self.token_manager.record_usage(token.id, is_video=is_video) |
|
|
| |
| await self.token_manager.record_success(token.id) |
|
|
| debug_logger.log_info(f"[GENERATION] ✅ 生成成功完成") |
|
|
| |
| duration = time.time() - start_time |
| await self._log_request( |
| token.id, |
| f"generate_{generation_type}", |
| {"model": model, "prompt": prompt[:100], "has_images": images is not None and len(images) > 0}, |
| {"status": "success"}, |
| 200, |
| duration |
| ) |
|
|
| except Exception as e: |
| error_msg = f"生成失败: {str(e)}" |
| debug_logger.log_error(f"[GENERATION] ❌ {error_msg}") |
| if stream: |
| yield self._create_stream_chunk(f"❌ {error_msg}\n") |
| if token: |
| |
| if "429" in str(e) or "HTTP Error 429" in str(e): |
| debug_logger.log_warning(f"[429_BAN] Token {token.id} 遇到429错误,立即禁用") |
| await self.token_manager.ban_token_for_429(token.id) |
| else: |
| await self.token_manager.record_error(token.id) |
| yield self._create_error_response(error_msg) |
|
|
| |
| duration = time.time() - start_time |
| await self._log_request( |
| token.id if token else None, |
| f"generate_{generation_type if model_config else 'unknown'}", |
| {"model": model, "prompt": prompt[:100], "has_images": images is not None and len(images) > 0}, |
| {"error": error_msg}, |
| 500, |
| duration |
| ) |
|
|
| def _get_no_token_error_message(self, generation_type: str) -> str: |
| """获取无可用Token时的详细错误信息""" |
| if generation_type == "image": |
| return "没有可用的Token进行图片生成。所有Token都处于禁用、冷却、锁定或已过期状态。" |
| else: |
| return "没有可用的Token进行视频生成。所有Token都处于禁用、冷却、配额耗尽或已过期状态。" |
|
|
| async def _handle_image_generation( |
| self, |
| token, |
| project_id: str, |
| model_config: dict, |
| prompt: str, |
| images: Optional[List[bytes]], |
| stream: bool |
| ) -> AsyncGenerator: |
| """处理图片生成 (同步返回)""" |
|
|
| |
| if self.concurrency_manager: |
| if not await self.concurrency_manager.acquire_image(token.id): |
| yield self._create_error_response("图片并发限制已达上限") |
| return |
|
|
| try: |
| |
| image_inputs = [] |
| if images and len(images) > 0: |
| if stream: |
| yield self._create_stream_chunk(f"上传 {len(images)} 张参考图片...\n") |
|
|
| |
| for idx, image_bytes in enumerate(images): |
| media_id = await self.flow_client.upload_image( |
| token.at, |
| image_bytes, |
| model_config["aspect_ratio"] |
| ) |
| image_inputs.append({ |
| "name": media_id, |
| "imageInputType": "IMAGE_INPUT_TYPE_REFERENCE" |
| }) |
| if stream: |
| yield self._create_stream_chunk(f"已上传第 {idx + 1}/{len(images)} 张图片\n") |
|
|
| |
| if stream: |
| yield self._create_stream_chunk("正在生成图片...\n") |
|
|
| result = await self.flow_client.generate_image( |
| at=token.at, |
| project_id=project_id, |
| prompt=prompt, |
| model_name=model_config["model_name"], |
| aspect_ratio=model_config["aspect_ratio"], |
| image_inputs=image_inputs |
| ) |
|
|
| |
| media = result.get("media", []) |
| if not media: |
| yield self._create_error_response("生成结果为空") |
| return |
|
|
| image_url = media[0]["image"]["generatedImage"]["fifeUrl"] |
|
|
| |
| local_url = image_url |
| if config.cache_enabled: |
| try: |
| if stream: |
| yield self._create_stream_chunk("缓存图片中...\n") |
| cached_filename = await self.file_cache.download_and_cache(image_url, "image") |
| local_url = f"{self._get_base_url()}/tmp/{cached_filename}" |
| if stream: |
| yield self._create_stream_chunk("✅ 图片缓存成功,准备返回缓存地址...\n") |
| except Exception as e: |
| debug_logger.log_error(f"Failed to cache image: {str(e)}") |
| |
| local_url = image_url |
| if stream: |
| yield self._create_stream_chunk(f"⚠️ 缓存失败: {str(e)}\n正在返回源链接...\n") |
| else: |
| if stream: |
| yield self._create_stream_chunk("缓存已关闭,正在返回源链接...\n") |
|
|
| |
| if stream: |
| yield self._create_stream_chunk( |
| f"", |
| finish_reason="stop" |
| ) |
| else: |
| yield self._create_completion_response( |
| local_url, |
| media_type="image" |
| ) |
|
|
| finally: |
| |
| if self.concurrency_manager: |
| await self.concurrency_manager.release_image(token.id) |
|
|
| async def _handle_video_generation( |
| self, |
| token, |
| project_id: str, |
| model_config: dict, |
| prompt: str, |
| images: Optional[List[bytes]], |
| stream: bool |
| ) -> AsyncGenerator: |
| """处理视频生成 (异步轮询)""" |
|
|
| |
| if self.concurrency_manager: |
| if not await self.concurrency_manager.acquire_video(token.id): |
| yield self._create_error_response("视频并发限制已达上限") |
| return |
|
|
| try: |
| |
| video_type = model_config.get("video_type") |
| supports_images = model_config.get("supports_images", False) |
| min_images = model_config.get("min_images", 0) |
| max_images = model_config.get("max_images", 0) |
|
|
| |
| image_count = len(images) if images else 0 |
|
|
| |
|
|
| |
| if video_type == "t2v": |
| if image_count > 0: |
| if stream: |
| yield self._create_stream_chunk("⚠️ 文生视频模型不支持上传图片,将忽略图片仅使用文本提示词生成\n") |
| debug_logger.log_warning(f"[T2V] 模型 {model_config['model_key']} 不支持图片,已忽略 {image_count} 张图片") |
| images = None |
| image_count = 0 |
|
|
| |
| elif video_type == "i2v": |
| if image_count < min_images or image_count > max_images: |
| error_msg = f"❌ 首尾帧模型需要 {min_images}-{max_images} 张图片,当前提供了 {image_count} 张" |
| if stream: |
| yield self._create_stream_chunk(f"{error_msg}\n") |
| yield self._create_error_response(error_msg) |
| return |
|
|
| |
| elif video_type == "r2v": |
| |
| pass |
|
|
| |
| start_media_id = None |
| end_media_id = None |
| reference_images = [] |
|
|
| |
| if video_type == "i2v" and images: |
| if image_count == 1: |
| |
| if stream: |
| yield self._create_stream_chunk("上传首帧图片...\n") |
| start_media_id = await self.flow_client.upload_image( |
| token.at, images[0], model_config["aspect_ratio"] |
| ) |
| debug_logger.log_info(f"[I2V] 仅上传首帧: {start_media_id}") |
|
|
| elif image_count == 2: |
| |
| if stream: |
| yield self._create_stream_chunk("上传首帧和尾帧图片...\n") |
| start_media_id = await self.flow_client.upload_image( |
| token.at, images[0], model_config["aspect_ratio"] |
| ) |
| end_media_id = await self.flow_client.upload_image( |
| token.at, images[1], model_config["aspect_ratio"] |
| ) |
| debug_logger.log_info(f"[I2V] 上传首尾帧: {start_media_id}, {end_media_id}") |
|
|
| |
| elif video_type == "r2v" and images: |
| if stream: |
| yield self._create_stream_chunk(f"上传 {image_count} 张参考图片...\n") |
|
|
| for idx, img in enumerate(images): |
| media_id = await self.flow_client.upload_image( |
| token.at, img, model_config["aspect_ratio"] |
| ) |
| reference_images.append({ |
| "imageUsageType": "IMAGE_USAGE_TYPE_ASSET", |
| "mediaId": media_id |
| }) |
| debug_logger.log_info(f"[R2V] 上传了 {len(reference_images)} 张参考图片") |
|
|
| |
| if stream: |
| yield self._create_stream_chunk("提交视频生成任务...\n") |
|
|
| |
| if video_type == "i2v" and start_media_id: |
| if end_media_id: |
| |
| result = await self.flow_client.generate_video_start_end( |
| at=token.at, |
| project_id=project_id, |
| prompt=prompt, |
| model_key=model_config["model_key"], |
| aspect_ratio=model_config["aspect_ratio"], |
| start_media_id=start_media_id, |
| end_media_id=end_media_id, |
| user_paygate_tier=token.user_paygate_tier or "PAYGATE_TIER_ONE" |
| ) |
| else: |
| |
| |
| |
| actual_model_key = model_config["model_key"].replace("_fl_", "_") |
| debug_logger.log_info(f"[I2V] 单帧模式,model_key: {model_config['model_key']} -> {actual_model_key}") |
| result = await self.flow_client.generate_video_start_image( |
| at=token.at, |
| project_id=project_id, |
| prompt=prompt, |
| model_key=actual_model_key, |
| aspect_ratio=model_config["aspect_ratio"], |
| start_media_id=start_media_id, |
| user_paygate_tier=token.user_paygate_tier or "PAYGATE_TIER_ONE" |
| ) |
|
|
| |
| elif video_type == "r2v" and reference_images: |
| result = await self.flow_client.generate_video_reference_images( |
| at=token.at, |
| project_id=project_id, |
| prompt=prompt, |
| model_key=model_config["model_key"], |
| aspect_ratio=model_config["aspect_ratio"], |
| reference_images=reference_images, |
| user_paygate_tier=token.user_paygate_tier or "PAYGATE_TIER_ONE" |
| ) |
|
|
| |
| else: |
| result = await self.flow_client.generate_video_text( |
| at=token.at, |
| project_id=project_id, |
| prompt=prompt, |
| model_key=model_config["model_key"], |
| aspect_ratio=model_config["aspect_ratio"], |
| user_paygate_tier=token.user_paygate_tier or "PAYGATE_TIER_ONE" |
| ) |
|
|
| |
| operations = result.get("operations", []) |
| if not operations: |
| yield self._create_error_response("生成任务创建失败") |
| return |
|
|
| operation = operations[0] |
| task_id = operation["operation"]["name"] |
| scene_id = operation.get("sceneId") |
|
|
| |
| task = Task( |
| task_id=task_id, |
| token_id=token.id, |
| model=model_config["model_key"], |
| prompt=prompt, |
| status="processing", |
| scene_id=scene_id |
| ) |
| await self.db.create_task(task) |
|
|
| |
| if stream: |
| yield self._create_stream_chunk(f"视频生成中...\n") |
|
|
| async for chunk in self._poll_video_result(token, operations, stream): |
| yield chunk |
|
|
| finally: |
| |
| if self.concurrency_manager: |
| await self.concurrency_manager.release_video(token.id) |
|
|
| async def _poll_video_result( |
| self, |
| token, |
| operations: List[Dict], |
| stream: bool |
| ) -> AsyncGenerator: |
| """轮询视频生成结果""" |
|
|
| max_attempts = config.max_poll_attempts |
| poll_interval = config.poll_interval |
|
|
| for attempt in range(max_attempts): |
| await asyncio.sleep(poll_interval) |
|
|
| try: |
| result = await self.flow_client.check_video_status(token.at, operations) |
| checked_operations = result.get("operations", []) |
|
|
| if not checked_operations: |
| continue |
|
|
| operation = checked_operations[0] |
| status = operation.get("status") |
|
|
| |
| progress_update_interval = 7 |
| if stream and attempt % progress_update_interval == 0: |
| progress = min(int((attempt / max_attempts) * 100), 95) |
| yield self._create_stream_chunk(f"生成进度: {progress}%\n") |
|
|
| |
| if status == "MEDIA_GENERATION_STATUS_SUCCESSFUL": |
| |
| metadata = operation["operation"].get("metadata", {}) |
| video_info = metadata.get("video", {}) |
| video_url = video_info.get("fifeUrl") |
|
|
| if not video_url: |
| yield self._create_error_response("视频URL为空") |
| return |
|
|
| |
| local_url = video_url |
| if config.cache_enabled: |
| try: |
| if stream: |
| yield self._create_stream_chunk("正在缓存视频文件...\n") |
| cached_filename = await self.file_cache.download_and_cache(video_url, "video") |
| local_url = f"{self._get_base_url()}/tmp/{cached_filename}" |
| if stream: |
| yield self._create_stream_chunk("✅ 视频缓存成功,准备返回缓存地址...\n") |
| except Exception as e: |
| debug_logger.log_error(f"Failed to cache video: {str(e)}") |
| |
| local_url = video_url |
| if stream: |
| yield self._create_stream_chunk(f"⚠️ 缓存失败: {str(e)}\n正在返回源链接...\n") |
| else: |
| if stream: |
| yield self._create_stream_chunk("缓存已关闭,正在返回源链接...\n") |
|
|
| |
| task_id = operation["operation"]["name"] |
| await self.db.update_task( |
| task_id, |
| status="completed", |
| progress=100, |
| result_urls=[local_url], |
| completed_at=time.time() |
| ) |
|
|
| |
| if stream: |
| yield self._create_stream_chunk( |
| f"<video src='{local_url}' controls style='max-width:100%'></video>", |
| finish_reason="stop" |
| ) |
| else: |
| yield self._create_completion_response( |
| local_url, |
| media_type="video" |
| ) |
| return |
|
|
| elif status == "MEDIA_GENERATION_STATUS_FAILED": |
| |
| error_info = operation.get("operation", {}).get("error", {}) |
| error_code = error_info.get("code", "unknown") |
| error_message = error_info.get("message", "未知错误") |
| |
| |
| task_id = operation["operation"]["name"] |
| await self.db.update_task( |
| task_id, |
| status="failed", |
| error_message=f"{error_message} (code: {error_code})", |
| completed_at=time.time() |
| ) |
| |
| |
| friendly_error = f"视频生成失败: {error_message},请重试" |
| if stream: |
| yield self._create_stream_chunk(f"❌ {friendly_error}\n") |
| yield self._create_error_response(friendly_error) |
| return |
|
|
| elif status.startswith("MEDIA_GENERATION_STATUS_ERROR"): |
| |
| yield self._create_error_response(f"视频生成失败: {status}") |
| return |
|
|
| except Exception as e: |
| debug_logger.log_error(f"Poll error: {str(e)}") |
| continue |
|
|
| |
| yield self._create_error_response(f"视频生成超时 (已轮询{max_attempts}次)") |
|
|
| |
|
|
| def _create_stream_chunk(self, content: str, role: str = None, finish_reason: str = None) -> str: |
| """创建流式响应chunk""" |
| import json |
| import time |
|
|
| chunk = { |
| "id": f"chatcmpl-{int(time.time())}", |
| "object": "chat.completion.chunk", |
| "created": int(time.time()), |
| "model": "flow2api", |
| "choices": [{ |
| "index": 0, |
| "delta": {}, |
| "finish_reason": finish_reason |
| }] |
| } |
|
|
| if role: |
| chunk["choices"][0]["delta"]["role"] = role |
|
|
| if finish_reason: |
| chunk["choices"][0]["delta"]["content"] = content |
| else: |
| chunk["choices"][0]["delta"]["reasoning_content"] = content |
|
|
| return f"data: {json.dumps(chunk, ensure_ascii=False)}\n\n" |
|
|
| def _create_completion_response(self, content: str, media_type: str = "image", is_availability_check: bool = False) -> str: |
| """创建非流式响应 |
| |
| Args: |
| content: 媒体URL或纯文本消息 |
| media_type: 媒体类型 ("image" 或 "video") |
| is_availability_check: 是否为可用性检查响应 (纯文本消息) |
| |
| Returns: |
| JSON格式的响应 |
| """ |
| import json |
| import time |
|
|
| |
| if is_availability_check: |
| formatted_content = content |
| else: |
| |
| if media_type == "video": |
| formatted_content = f"```html\n<video src='{content}' controls></video>\n```" |
| else: |
| formatted_content = f"" |
|
|
| response = { |
| "id": f"chatcmpl-{int(time.time())}", |
| "object": "chat.completion", |
| "created": int(time.time()), |
| "model": "flow2api", |
| "choices": [{ |
| "index": 0, |
| "message": { |
| "role": "assistant", |
| "content": formatted_content |
| }, |
| "finish_reason": "stop" |
| }] |
| } |
|
|
| return json.dumps(response, ensure_ascii=False) |
|
|
| def _create_error_response(self, error_message: str) -> str: |
| """创建错误响应""" |
| import json |
|
|
| error = { |
| "error": { |
| "message": error_message, |
| "type": "invalid_request_error", |
| "code": "generation_failed" |
| } |
| } |
|
|
| return json.dumps(error, ensure_ascii=False) |
|
|
| def _get_base_url(self) -> str: |
| """获取基础URL用于缓存文件访问""" |
| |
| if config.cache_base_url: |
| return config.cache_base_url |
| |
| return f"http://{config.server_host}:{config.server_port}" |
|
|
| async def _log_request( |
| self, |
| token_id: Optional[int], |
| operation: str, |
| request_data: Dict[str, Any], |
| response_data: Dict[str, Any], |
| status_code: int, |
| duration: float |
| ): |
| """记录请求到数据库""" |
| try: |
| log = RequestLog( |
| token_id=token_id, |
| operation=operation, |
| request_body=json.dumps(request_data, ensure_ascii=False), |
| response_body=json.dumps(response_data, ensure_ascii=False), |
| status_code=status_code, |
| duration=duration |
| ) |
| await self.db.add_request_log(log) |
| except Exception as e: |
| |
| debug_logger.log_error(f"Failed to log request: {e}") |
|
|
|
|