copa_video / utils /kling_api.py
oKen38461's picture
動画生成タスクの進捗をポーリングするメソッドをKlingAPIクラスに追加し、動画ダウンロード時のエラーハンドリングを強化しました。また、generate_loop_video関数内で使用するモデルを指定するように変更しました。
2efb9d4
"""UseAPI.net Kling AI API通信処理モジュール"""
import os
import time
import base64
import json
import httpx
import asyncio
from datetime import datetime, timezone
from typing import Optional, Dict, Any, Tuple
from PIL import Image
import io
class KlingAPI:
"""UseAPI.net Kling AI APIクライアント"""
def __init__(self):
self.api_token = os.getenv('USEAPI_NET_TOKEN')
self.email = os.getenv('USEAPI_NET_EMAIL', '') # オプション:複数アカウント利用時
if not self.api_token:
raise ValueError("UseAPI.net認証情報が設定されていません。環境変数を確認してください。")
self.base_url = "https://api.useapi.net/v1/kling"
self.client = httpx.AsyncClient(timeout=120.0) # UseAPI.netは処理が長い場合があるため延長
def _get_headers(self) -> Dict[str, str]:
"""API通信用のヘッダーを取得"""
return {
"Authorization": f"Bearer {self.api_token}",
"Content-Type": "application/json"
}
async def _upload_image(self, image_path: str, for_image_generation: bool = False, auto_resize: bool = True) -> Tuple[str, Dict[str, Any]]:
"""画像をUseAPI.netにアップロードし、URLを取得
Args:
image_path: アップロードする画像のパス
for_image_generation: 画像生成用かどうか
auto_resize: 自動リサイズを有効にするか
"""
# 一時ファイルパスを準備
resized_path = None
import os
try:
# まず画像情報を取得
with Image.open(image_path) as img:
# 画像検証
width, height = img.size
original_width, original_height = width, height
# 最小サイズチェック
if width < 300 or height < 300:
raise ValueError(f"画像サイズが小さすぎます。最小300x300ピクセル必要です。現在: {width}x{height}")
# アスペクト比チェック
aspect_ratio = width / height
if for_image_generation:
# 画像生成用の許可されるアスペクト比
allowed_ratios = {
"1:1": 1.0, # 1.0
"16:9": 16/9, # 1.7777...
"4:3": 4/3, # 1.3333...
"3:2": 3/2, # 1.5
"2:3": 2/3, # 0.6666...
"3:4": 3/4, # 0.75
"9:16": 9/16, # 0.5625
"21:9": 21/9, # 2.3333...
}
supported_ratios_text = "1:1, 16:9, 4:3, 3:2, 2:3, 3:4, 9:16, 21:9"
else:
# 動画生成用の許可されるアスペクト比(厳密なチェック)
allowed_ratios = {
"16:9": 16/9, # 1.7777...
"9:16": 9/16, # 0.5625
"1:1": 1.0 # 1.0
}
supported_ratios_text = "16:9 (1920x1080), 9:16 (1080x1920), 1:1 (1440x1440)"
# 最も近いアスペクト比を見つける
valid_ratio = False
ratio_name = ""
tolerance = 0.02 if for_image_generation else 0.001 # 画像生成はより寛容な誤差範囲
for name, ratio in allowed_ratios.items():
if abs(aspect_ratio - ratio) < tolerance:
valid_ratio = True
ratio_name = name
break
if not valid_ratio:
if auto_resize:
# 自動リサイズが有効な場合、最も近いアスペクト比にリサイズ
closest_ratio_name = None
min_diff = float('inf')
for name, ratio in allowed_ratios.items():
diff = abs(aspect_ratio - ratio)
if diff < min_diff:
min_diff = diff
closest_ratio_name = name
closest_ratio = ratio
# 新しいサイズを計算(元の面積を可能な限り保持)
current_area = width * height
if closest_ratio >= 1:
# 横長
new_height = int((current_area / closest_ratio) ** 0.5)
new_width = int(new_height * closest_ratio)
else:
# 縦長
new_width = int((current_area * closest_ratio) ** 0.5)
new_height = int(new_width / closest_ratio)
# 最小サイズを確保
if new_width < 300 or new_height < 300:
scale_factor = max(300 / new_width, 300 / new_height)
new_width = int(new_width * scale_factor)
new_height = int(new_height * scale_factor)
# リサイズ実行
print(f"画像を自動リサイズ: {width}x{height} -> {new_width}x{new_height} (アスペクト比: {closest_ratio_name})")
# 画像を再度開いてリサイズ
with Image.open(image_path) as img_to_resize:
# 一時ファイルにリサイズした画像を保存
resized_img = img_to_resize.resize((new_width, new_height), Image.Resampling.LANCZOS)
# ファイル拡張子を正しく取得
base_name, ext = os.path.splitext(image_path)
if not ext:
ext = '.png' # デフォルト
resized_path = f"{base_name}_resized{ext}"
# 画像形式を維持して保存
save_format = 'PNG' if ext.lower() in ['.png'] else 'JPEG'
resized_img.save(resized_path, format=save_format, quality=95)
# リサイズ後の値を更新
width, height = new_width, new_height
aspect_ratio = width / height
ratio_name = closest_ratio_name
else:
# 自動リサイズが無効な場合はエラー
current_ratio_str = f"{aspect_ratio:.2f}:1" if aspect_ratio >= 1 else f"1:{1/aspect_ratio:.2f}"
mode_text = "画像生成" if for_image_generation else "動画生成"
raise ValueError(f"{mode_text}:画像のアスペクト比が対応していません。\n"
f"現在: {current_ratio_str} ({width}x{height})\n"
f"対応画角: {supported_ratios_text}")
# ファイルサイズチェック(リサイズした場合はスキップ)
if not resized_path:
with Image.open(image_path) as img_for_check:
img_byte_arr = io.BytesIO()
# フォーマットを判定
img_format = img_for_check.format if img_for_check.format else 'PNG'
img_for_check.save(img_byte_arr, format=img_format)
img_byte_arr = img_byte_arr.getvalue()
if len(img_byte_arr) > 10 * 1024 * 1024: # 10MB
raise ValueError(f"画像ファイルサイズが大きすぎます。最大10MBまでです。")
# 画像をUseAPI.netにアップロード
# リサイズされた場合はそのファイルを使用
upload_path = resized_path if resized_path else image_path
with open(upload_path, 'rb') as f:
file_data = f.read()
# ファイル拡張子に基づいてContent-Typeを設定
content_type = "image/png" if upload_path.lower().endswith('.png') else "image/jpeg"
headers = {
"Authorization": f"Bearer {self.api_token}",
"Content-Type": content_type
}
# URLにemailパラメータを追加
url = f"{self.base_url}/assets/"
if self.email:
url += f"?email={self.email}"
# バイナリデータを送信
response = await self.client.post(url, headers=headers, content=file_data)
if response.status_code == 200:
result = response.json()
asset_url = result.get('url')
if asset_url:
return asset_url, {
"width": width,
"height": height,
"aspect_ratio": aspect_ratio,
"size_mb": len(file_data) / (1024 * 1024),
"original_size": f"{original_width}x{original_height}" if resized_path else None,
"resized": bool(resized_path)
}
else:
raise Exception(f"アセットURLが取得できませんでした: {result}")
else:
raise Exception(f"画像アップロードエラー: {response.status_code} - {response.text}")
finally:
# 一時ファイルを削除
if resized_path and os.path.exists(resized_path):
os.remove(resized_path)
async def create_video_task(
self,
image_path: str,
prompt: str,
model: str = "kling-v2-1",
duration: int = 10,
tail_image_path: Optional[str] = None,
mode: str = "std"
) -> Optional[str]:
"""動画生成タスクを作成"""
try:
# 画像アップロード(動画生成でも自動リサイズを有効化)
image_url, image_info = await self._upload_image(image_path, for_image_generation=False, auto_resize=True)
# UseAPI.net用のペイロード構築
# モデル名の処理(image2video-framesエンドポイント用)
# v1系はそのまま、v2系もそのまま使用
useapi_model = model
payload = {
"model_name": useapi_model,
"prompt": prompt[:2500],
"image": image_url, # URLを使用
"duration": str(duration) # UseAPI.netでは文字列型
}
# modeパラメータはv1系モデルのみサポート(v2系では使用しない)
if mode and useapi_model.startswith("kling-v1"):
payload["mode"] = mode
# emailパラメータ(複数アカウント利用時)
if self.email:
payload["email"] = self.email
# tail_imageがある場合(v1-6のみサポート、v2系は非対応)
# UseAPI.netでは image_tail パラメータを使用
if tail_image_path:
if model == "kling-v1-6":
tail_image_url, _ = await self._upload_image(tail_image_path, for_image_generation=False, auto_resize=True)
payload["image_tail"] = tail_image_url # tail_image → image_tail
elif model.startswith("kling-v2"):
print(f"警告: {model}はimage_tailをサポートしていません。無視されます。")
# UseAPI.netではimage2video-framesエンドポイントを使用
url = f"{self.base_url}/videos/image2video-frames"
print(f"=== 動画生成リクエスト ===")
print(f"URL: {url}")
print(f"ペイロード: {json.dumps(payload, indent=2, ensure_ascii=False)}")
response = await self.client.post(url, headers=self._get_headers(), json=payload)
if response.status_code == 200:
result = response.json()
print(f"=== 動画生成レスポンス ===")
print(f"レスポンス: {json.dumps(result, indent=2, ensure_ascii=False)[:500]}...")
# UseAPI.netの成功レスポンスをチェック
if result.get("task") and isinstance(result["task"], dict) and "id" in result["task"]:
task_id = str(result["task"]["id"])
print(f"✅ タスクID取得成功: {task_id}")
# タスクログに記録
return task_id
elif result.get("code") == 0 and "data" in result and "task_id" in result["data"]:
task_id = result["data"]["task_id"]
# タスクログに記録
return task_id
else:
# エラーメッセージを取得
error_msg = result.get("message", result.get("error", ""))
status_name = result.get("status_name", "")
status = result.get("status", "")
if not error_msg:
error_msg = f"ステータス: {status} ({status_name})"
# Status 58 (UNKNOWN)の場合の詳細なエラーメッセージ
if status == 58:
error_msg += "\n考えられる原因:\n"
error_msg += " 1. 別のタスクが処理中(同時処理は1タスクまで)\n"
error_msg += " 2. アカウントの一時的な制限\n"
error_msg += " 3. APIサービスの問題\n"
error_msg += " → Klingのダッシュボードで処理中のタスクを確認してください"
if "balance" in error_msg.lower() or "credit" in error_msg.lower():
raise Exception("クレジット不足: Kling AIダッシュボードでクレジットを購入してください")
raise Exception(f"タスク作成エラー: {error_msg}")
else:
# HTTPエラーの詳細を解析
error_text = response.text
if "balance" in error_text.lower() or "credit" in error_text.lower() or "insufficient" in error_text.lower():
raise Exception("クレジット不足: Kling AIダッシュボードでクレジットを購入してください")
elif response.status_code == 402: # Payment Required
raise Exception("クレジット不足: Kling AIダッシュボードでクレジットを購入してください")
else:
raise Exception(f"HTTPエラー {response.status_code}: {error_text}")
except Exception as e:
raise Exception(f"動画タスク作成エラー: {str(e)}")
async def poll_task_result(self, task_id: str, max_wait_minutes: int = 60) -> Optional[str]:
"""タスクの完了を待機し、結果を取得"""
max_attempts = max_wait_minutes * 12 # 5秒間隔
print(f"=== タスクポーリング開始 ===")
print(f"タスクID: {task_id}")
print(f"最大待機時間: {max_wait_minutes}分")
for attempt in range(max_attempts):
try:
# UseAPI.netではGETでタスクステータスを確認
# 正しいエンドポイント: /v1/kling/tasks/{task_id}
url = f"{self.base_url}/tasks/{task_id}"
params = {}
if self.email:
params["email"] = self.email
print(f"\n試行 {attempt + 1}/{max_attempts}: {url}")
if params:
print(f"パラメータ: {params}")
response = await self.client.get(url, headers=self._get_headers(), params=params)
if response.status_code == 200:
result = response.json()
# レスポンスの詳細をログ出力
print(f"レスポンス: {json.dumps(result, indent=2, ensure_ascii=False)[:1000]}...")
# UseAPI.netのレスポンス形式に対応
task_data = None
if "task" in result:
task_data = result["task"]
elif "data" in result:
task_data = result["data"]
else:
task_data = result
# ステータスの確認
status = task_data.get("status")
status_name = task_data.get("status_name")
status_final = task_data.get("status_final", False)
print(f"ステータス: {status} ({status_name}), 最終: {status_final}")
# 処理中の場合も情報を表示
if status_name in ["processing", "running", "generating"]:
progress_percent = task_data.get("progress", 0)
print(f"処理中... 進捗: {progress_percent}%")
# 成功状態の判定(status 99 = succeed)
if status == 99 or (status_final and status_name in ["finished", "succeed"]):
# 動画URLを取得
video_url = None
# まずresult.videoをチェック(UseAPI.netの標準レスポンス)
if result.get("video"):
video_url = result["video"]
# taskInfo.outputsをチェック(フォールバック)
if not video_url and task_data.get("taskInfo", {}).get("outputs"):
outputs = task_data["taskInfo"]["outputs"]
for output in outputs:
if output.get("url"):
video_url = output["url"]
break
# worksをチェック(フォールバック)
if not video_url and "works" in result and len(result["works"]) > 0:
work = result["works"][0]
if "resource" in work and "resource" in work["resource"]:
video_url = work["resource"]["resource"]
if video_url:
print(f"✅ 動画生成完了: {video_url}")
# work_idを取得(ウォーターマークなしダウンロード用)
work_id = None
if "works" in result and len(result["works"]) > 0:
work_id = result["works"][0].get("workId")
if work_id:
print(f"📌 Work ID取得: {work_id}")
# ウォーターマークなしダウンロードを試みる
try:
watermark_free_url = await self.download_without_watermark(work_id)
if watermark_free_url:
print(f"✅ ウォーターマークなしURL取得成功")
return watermark_free_url
except Exception as e:
print(f"⚠️ ウォーターマークなしダウンロード失敗: {e}")
print(f"通常のURLを使用します")
return video_url
else:
print("⚠️ 動画URLが見つかりません。レスポンス構造を確認中...")
print(f"result全体: {json.dumps(result, indent=2, ensure_ascii=False)[:1000]}")
# まだ完了していない
pass
elif status_name == "failed":
error_msg = task_data.get('message', 'Unknown error')
print(f"❌ タスク失敗: {error_msg}")
raise Exception(f"タスク失敗: {error_msg}")
elif response.status_code == 404:
# 404エラーの場合、タスクが見つからない
print(f"HTTPエラー 404: タスクが見つかりません")
print(f"タスクID: {task_id}")
print(f"エラー詳細: {response.text}")
# レスポンスをパースしてエラー詳細を確認
try:
error_data = response.json()
error_msg = error_data.get('message', '') or error_data.get('error', '')
if "balance" in error_msg.lower() or "credit" in error_msg.lower() or "insufficient" in error_msg.lower():
raise Exception("クレジット不足: Kling AIダッシュボードでクレジットを購入してください")
except:
pass
# 最初の数回は待機してリトライ(タスクの作成に時間がかかる場合があるため)
if attempt < 5:
print("タスク作成待機中...")
await asyncio.sleep(10)
continue
else:
# タスクが見つからない場合、クレジット不足の可能性を示唆
raise Exception(f"タスクが見つかりません: {task_id} (クレジット不足の可能性があります)")
else:
print(f"HTTPエラー: {response.status_code} - {response.text}")
except Exception as e:
print(f"エラー発生: {str(e)}")
if "タスク失敗" in str(e) or "タスクが見つかりません" in str(e):
raise
await asyncio.sleep(5)
# タイムアウト時の詳細情報
print(f"\n❌ タスクがタイムアウトしました")
print(f"タスクID: {task_id}")
print(f"最大待機時間: {max_wait_minutes}分")
raise Exception(f"タスクタイムアウト: {max_wait_minutes}分待機しても完了しませんでした")
async def download_without_watermark(self, work_id: str) -> Optional[str]:
"""ウォーターマークなしでコンテンツをダウンロード(有料アカウント必須)"""
try:
# 正しいエンドポイント: /v1/kling/assets/download
url = f"{self.base_url}/assets/download"
params = {"workIds": work_id}
if self.email:
params["email"] = self.email
print(f"ウォーターマークなしダウンロードを試行中...")
print(f"URL: {url}")
print(f"パラメータ: {params}")
response = await self.client.get(url, headers=self._get_headers(), params=params)
if response.status_code == 200:
result = response.json()
print(f"レスポンス: {json.dumps(result, indent=2, ensure_ascii=False)[:500]}...")
# cdnUrlを取得(ドキュメントに基づく正しいフィールド)
if result.get("cdnUrl"):
return result["cdnUrl"]
# フォールバック: 他の可能なフィールドをチェック
elif result.get("urls") and len(result["urls"]) > 0:
return result["urls"][0]
elif result.get("url"):
return result["url"]
elif result.get("video"):
return result["video"]
else:
print(f"⚠️ cdnUrlが見つかりません。レスポンス全体: {json.dumps(result, ensure_ascii=False)}")
else:
print(f"ウォーターマークなしダウンロード失敗: {response.status_code}")
print(f"レスポンス: {response.text[:500]}")
if response.status_code == 403:
print("⚠️ 403 Forbidden: 有料アカウントが必要です")
return None
except Exception as e:
print(f"ウォーターマークなしダウンロードエラー: {str(e)}")
return None
async def download_video(self, url: str, output_path: str) -> bool:
"""動画をダウンロード"""
try:
response = await self.client.get(url)
if response.status_code == 200:
with open(output_path, "wb") as f:
f.write(response.content)
return True
except Exception as e:
print(f"ダウンロードエラー: {e}")
return False
def _calculate_credits(self, model: str, duration: int) -> int:
"""使用クレジットを計算"""
# モデルごとのクレジット単価(仮定)
credit_rates = {
"kling-v2-1": 10, # V2.1 Standard
"kling-v1-6": 30, # V1.6 Professional
"kling-v1-5": 20, # V1.5
}
base_rate = credit_rates.get(model, 10)
return base_rate * (duration // 5) # 5秒あたりの単価
async def create_image_task(
self,
image_path: str,
prompt: str,
model: str = "kling-v1-5",
negative_prompt: str = "",
strength: float = 0.3,
reference_type: Optional[str] = "subject",
image_fidelity: Optional[float] = 0.3,
human_fidelity: Optional[float] = 0.9,
aspect_ratio: Optional[str] = None
) -> Optional[str]:
"""画像生成タスクを作成"""
try:
# 画像アップロード(画像生成用アスペクト比チェック、自動リサイズ有効)
image_url, image_info = await self._upload_image(image_path, for_image_generation=True, auto_resize=True)
# UseAPI.net kolorsエンドポイント用のペイロード構築
payload = {
"prompt": prompt[:2500],
}
# emailパラメータ(複数アカウント利用時)
if self.email:
payload["email"] = self.email
# 参照モードの設定
if reference_type in ["face", "subject"]:
payload["reference"] = reference_type
payload["imageReference"] = image_url # 参照画像URL
# アスペクト比を設定(face/subjectモードのみ)
if aspect_ratio:
payload["aspect_ratio"] = aspect_ratio
# 顔参照強度(0-100の整数値)
if reference_type == "face":
face_strength = int((image_fidelity or 0.5) * 100)
payload["faceStrength"] = face_strength
payload["faceNo"] = 1 # デフォルトで最初の顔を使用
else:
# 通常のrestyle/Image-to-Imageモード
payload["reference"] = "restyle"
payload["imageReference"] = image_url
# restyleモードではaspect_ratioはサポートされない
# 生成画像数(デフォルト1)
payload["imageCount"] = 1
# UseAPI.netのkolorsエンドポイントを使用
url = f"{self.base_url}/images/kolors"
# デバッグ用:送信するペイロードをログ出力
print(f"=== Kling API Image Request ===")
print(f"URL: {url}")
print(f"Model: {model}")
print(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}")
print(f"===============================")
response = await self.client.post(url, headers=self._get_headers(), json=payload)
if response.status_code == 200:
result = response.json()
print(f"=== Kling API Image Response ===")
print(f"Response: {json.dumps(result, indent=2, ensure_ascii=False)}")
print(f"================================")
# UseAPI.netのレスポンス形式に対応
if "task" in result and "id" in result["task"]:
task_id = str(result["task"]["id"])
# タスクログに記録
return task_id
elif "data" in result and "task_id" in result["data"]:
task_id = result["data"]["task_id"]
# タスクログに記録
return task_id
elif "id" in result:
task_id = str(result["id"])
# タスクログに記録
return task_id
else:
# エラーチェック
if result.get("code") != 0:
error_msg = result.get("message", result.get("error", "Unknown error"))
if "balance" in error_msg.lower() or "credit" in error_msg.lower():
raise Exception("クレジット不足: Kling AIダッシュボードでクレジットを購入してください")
raise Exception(f"画像タスク作成エラー: {error_msg}")
else:
raise Exception(f"タスクIDが見つかりません: {result}")
else:
raise Exception(f"HTTPエラー {response.status_code}: {response.text}")
except Exception as e:
raise Exception(f"画像タスク作成エラー: {str(e)}")
async def poll_image_result(self, task_id: str, max_wait_minutes: int = 10) -> Optional[str]:
"""画像タスクの完了を待機し、結果を取得"""
max_attempts = max_wait_minutes * 12 # 5秒間隔
for attempt in range(max_attempts):
try:
# UseAPI.netではGETでタスクステータスを確認
# 正しいエンドポイント: /v1/kling/tasks/{task_id}
url = f"{self.base_url}/tasks/{task_id}"
params = {}
if self.email:
params["email"] = self.email
response = await self.client.get(url, headers=self._get_headers(), params=params)
if response.status_code == 200:
result = response.json()
# タスクデータを取得
task_data = result.get("task", {})
status_name = task_data.get("status_name", "")
status_final = task_data.get("status_final", False)
# ステータスの確認
status = task_data.get("status")
if (status == 99) or (status_final and status_name == "succeed"):
# 画像URLを取得
image_url = None
# まずresult.imageをチェック(UseAPI.netの標準レスポンス)
if result.get("image"):
image_url = result["image"]
# taskInfo.outputsをチェック(フォールバック)
if not image_url and task_data.get("taskInfo", {}).get("outputs"):
outputs = task_data["taskInfo"]["outputs"]
for output in outputs:
if output.get("url"):
image_url = output["url"]
break
# worksをチェック(フォールバック)
if not image_url and "works" in result and len(result["works"]) > 0:
work = result["works"][0]
if "resource" in work and "resource" in work["resource"]:
image_url = work["resource"]["resource"]
if image_url:
print(f"✅ 画像生成完了: {image_url}")
# work_idを取得(ウォーターマークなしダウンロード用)
work_id = None
if "works" in result and len(result["works"]) > 0:
work_id = result["works"][0].get("workId")
if work_id:
print(f"📌 Work ID取得: {work_id}")
# ウォーターマークなしダウンロードを試みる
try:
watermark_free_url = await self.download_without_watermark(work_id)
if watermark_free_url:
print(f"✅ ウォーターマークなしURL取得成功")
return watermark_free_url
except Exception as e:
print(f"⚠️ ウォーターマークなしダウンロード失敗: {e}")
print(f"通常のURLを使用します")
return image_url
elif status_name == "failed":
error_msg = result.get('message', 'Unknown error')
raise Exception(f"画像生成失敗: {error_msg}")
except Exception as e:
if "画像生成失敗" in str(e):
raise
await asyncio.sleep(5)
raise Exception("画像生成タスクタイムアウト")
async def download_image(self, url: str, output_path: str) -> bool:
"""画像をダウンロード"""
try:
response = await self.client.get(url)
if response.status_code == 200:
with open(output_path, "wb") as f:
f.write(response.content)
return True
except Exception as e:
print(f"画像ダウンロードエラー: {e}")
return False
async def close(self):
"""クライアントを閉じる"""
await self.client.aclose()
# === 追加: 動画タスク結果をポーリング(進捗バー連携) ===
async def poll_video_result(
self,
task_id: str,
progress_callback=None,
base_progress: float = 0.0,
progress_delta: float = 0.2,
max_wait_minutes: int = 60
) -> Optional[str]:
"""動画生成タスクの完了を待機してURLを返す
progress_callback には gr.Progress インスタンスを渡すことを想定
base_progress はポーリング開始時点の進捗値、
progress_delta は完了時に上乗せする進捗幅"""
try:
# ポーリング開始を通知
if progress_callback:
progress_callback(base_progress, "動画生成を待機中...")
video_url = await self.poll_task_result(task_id, max_wait_minutes=max_wait_minutes)
# 完了を通知(進捗を一気に上げる)
if progress_callback:
progress_callback(min(base_progress + progress_delta, 1.0), "動画生成完了")
return video_url
except Exception as e:
# エラー時にも進捗を更新
if progress_callback:
progress_callback(base_progress, "動画生成エラー")
raise