Spaces:
Runtime error
Runtime error
| """UseAPI.net Kling AI API通信処理モジュール""" | |
| import os | |
| import time | |
| import base64 | |
| import json | |
| import httpx | |
| import asyncio | |
| from datetime import datetime, timezone | |
| from typing import Optional, Dict, Any, Tuple | |
| from PIL import Image | |
| import io | |
| class KlingAPI: | |
| """UseAPI.net Kling AI APIクライアント""" | |
| def __init__(self): | |
| self.api_token = os.getenv('USEAPI_NET_TOKEN') | |
| self.email = os.getenv('USEAPI_NET_EMAIL', '') # オプション:複数アカウント利用時 | |
| if not self.api_token: | |
| raise ValueError("UseAPI.net認証情報が設定されていません。環境変数を確認してください。") | |
| self.base_url = "https://api.useapi.net/v1/kling" | |
| self.client = httpx.AsyncClient(timeout=120.0) # UseAPI.netは処理が長い場合があるため延長 | |
| def _get_headers(self) -> Dict[str, str]: | |
| """API通信用のヘッダーを取得""" | |
| return { | |
| "Authorization": f"Bearer {self.api_token}", | |
| "Content-Type": "application/json" | |
| } | |
| async def _upload_image(self, image_path: str, for_image_generation: bool = False, auto_resize: bool = True) -> Tuple[str, Dict[str, Any]]: | |
| """画像をUseAPI.netにアップロードし、URLを取得 | |
| Args: | |
| image_path: アップロードする画像のパス | |
| for_image_generation: 画像生成用かどうか | |
| auto_resize: 自動リサイズを有効にするか | |
| """ | |
| # 一時ファイルパスを準備 | |
| resized_path = None | |
| import os | |
| try: | |
| # まず画像情報を取得 | |
| with Image.open(image_path) as img: | |
| # 画像検証 | |
| width, height = img.size | |
| original_width, original_height = width, height | |
| # 最小サイズチェック | |
| if width < 300 or height < 300: | |
| raise ValueError(f"画像サイズが小さすぎます。最小300x300ピクセル必要です。現在: {width}x{height}") | |
| # アスペクト比チェック | |
| aspect_ratio = width / height | |
| if for_image_generation: | |
| # 画像生成用の許可されるアスペクト比 | |
| allowed_ratios = { | |
| "1:1": 1.0, # 1.0 | |
| "16:9": 16/9, # 1.7777... | |
| "4:3": 4/3, # 1.3333... | |
| "3:2": 3/2, # 1.5 | |
| "2:3": 2/3, # 0.6666... | |
| "3:4": 3/4, # 0.75 | |
| "9:16": 9/16, # 0.5625 | |
| "21:9": 21/9, # 2.3333... | |
| } | |
| supported_ratios_text = "1:1, 16:9, 4:3, 3:2, 2:3, 3:4, 9:16, 21:9" | |
| else: | |
| # 動画生成用の許可されるアスペクト比(厳密なチェック) | |
| allowed_ratios = { | |
| "16:9": 16/9, # 1.7777... | |
| "9:16": 9/16, # 0.5625 | |
| "1:1": 1.0 # 1.0 | |
| } | |
| supported_ratios_text = "16:9 (1920x1080), 9:16 (1080x1920), 1:1 (1440x1440)" | |
| # 最も近いアスペクト比を見つける | |
| valid_ratio = False | |
| ratio_name = "" | |
| tolerance = 0.02 if for_image_generation else 0.001 # 画像生成はより寛容な誤差範囲 | |
| for name, ratio in allowed_ratios.items(): | |
| if abs(aspect_ratio - ratio) < tolerance: | |
| valid_ratio = True | |
| ratio_name = name | |
| break | |
| if not valid_ratio: | |
| if auto_resize: | |
| # 自動リサイズが有効な場合、最も近いアスペクト比にリサイズ | |
| closest_ratio_name = None | |
| min_diff = float('inf') | |
| for name, ratio in allowed_ratios.items(): | |
| diff = abs(aspect_ratio - ratio) | |
| if diff < min_diff: | |
| min_diff = diff | |
| closest_ratio_name = name | |
| closest_ratio = ratio | |
| # 新しいサイズを計算(元の面積を可能な限り保持) | |
| current_area = width * height | |
| if closest_ratio >= 1: | |
| # 横長 | |
| new_height = int((current_area / closest_ratio) ** 0.5) | |
| new_width = int(new_height * closest_ratio) | |
| else: | |
| # 縦長 | |
| new_width = int((current_area * closest_ratio) ** 0.5) | |
| new_height = int(new_width / closest_ratio) | |
| # 最小サイズを確保 | |
| if new_width < 300 or new_height < 300: | |
| scale_factor = max(300 / new_width, 300 / new_height) | |
| new_width = int(new_width * scale_factor) | |
| new_height = int(new_height * scale_factor) | |
| # リサイズ実行 | |
| print(f"画像を自動リサイズ: {width}x{height} -> {new_width}x{new_height} (アスペクト比: {closest_ratio_name})") | |
| # 画像を再度開いてリサイズ | |
| with Image.open(image_path) as img_to_resize: | |
| # 一時ファイルにリサイズした画像を保存 | |
| resized_img = img_to_resize.resize((new_width, new_height), Image.Resampling.LANCZOS) | |
| # ファイル拡張子を正しく取得 | |
| base_name, ext = os.path.splitext(image_path) | |
| if not ext: | |
| ext = '.png' # デフォルト | |
| resized_path = f"{base_name}_resized{ext}" | |
| # 画像形式を維持して保存 | |
| save_format = 'PNG' if ext.lower() in ['.png'] else 'JPEG' | |
| resized_img.save(resized_path, format=save_format, quality=95) | |
| # リサイズ後の値を更新 | |
| width, height = new_width, new_height | |
| aspect_ratio = width / height | |
| ratio_name = closest_ratio_name | |
| else: | |
| # 自動リサイズが無効な場合はエラー | |
| current_ratio_str = f"{aspect_ratio:.2f}:1" if aspect_ratio >= 1 else f"1:{1/aspect_ratio:.2f}" | |
| mode_text = "画像生成" if for_image_generation else "動画生成" | |
| raise ValueError(f"{mode_text}:画像のアスペクト比が対応していません。\n" | |
| f"現在: {current_ratio_str} ({width}x{height})\n" | |
| f"対応画角: {supported_ratios_text}") | |
| # ファイルサイズチェック(リサイズした場合はスキップ) | |
| if not resized_path: | |
| with Image.open(image_path) as img_for_check: | |
| img_byte_arr = io.BytesIO() | |
| # フォーマットを判定 | |
| img_format = img_for_check.format if img_for_check.format else 'PNG' | |
| img_for_check.save(img_byte_arr, format=img_format) | |
| img_byte_arr = img_byte_arr.getvalue() | |
| if len(img_byte_arr) > 10 * 1024 * 1024: # 10MB | |
| raise ValueError(f"画像ファイルサイズが大きすぎます。最大10MBまでです。") | |
| # 画像をUseAPI.netにアップロード | |
| # リサイズされた場合はそのファイルを使用 | |
| upload_path = resized_path if resized_path else image_path | |
| with open(upload_path, 'rb') as f: | |
| file_data = f.read() | |
| # ファイル拡張子に基づいてContent-Typeを設定 | |
| content_type = "image/png" if upload_path.lower().endswith('.png') else "image/jpeg" | |
| headers = { | |
| "Authorization": f"Bearer {self.api_token}", | |
| "Content-Type": content_type | |
| } | |
| # URLにemailパラメータを追加 | |
| url = f"{self.base_url}/assets/" | |
| if self.email: | |
| url += f"?email={self.email}" | |
| # バイナリデータを送信 | |
| response = await self.client.post(url, headers=headers, content=file_data) | |
| if response.status_code == 200: | |
| result = response.json() | |
| asset_url = result.get('url') | |
| if asset_url: | |
| return asset_url, { | |
| "width": width, | |
| "height": height, | |
| "aspect_ratio": aspect_ratio, | |
| "size_mb": len(file_data) / (1024 * 1024), | |
| "original_size": f"{original_width}x{original_height}" if resized_path else None, | |
| "resized": bool(resized_path) | |
| } | |
| else: | |
| raise Exception(f"アセットURLが取得できませんでした: {result}") | |
| else: | |
| raise Exception(f"画像アップロードエラー: {response.status_code} - {response.text}") | |
| finally: | |
| # 一時ファイルを削除 | |
| if resized_path and os.path.exists(resized_path): | |
| os.remove(resized_path) | |
| async def create_video_task( | |
| self, | |
| image_path: str, | |
| prompt: str, | |
| model: str = "kling-v2-1", | |
| duration: int = 10, | |
| tail_image_path: Optional[str] = None, | |
| mode: str = "std" | |
| ) -> Optional[str]: | |
| """動画生成タスクを作成""" | |
| try: | |
| # 画像アップロード(動画生成でも自動リサイズを有効化) | |
| image_url, image_info = await self._upload_image(image_path, for_image_generation=False, auto_resize=True) | |
| # UseAPI.net用のペイロード構築 | |
| # モデル名の処理(image2video-framesエンドポイント用) | |
| # v1系はそのまま、v2系もそのまま使用 | |
| useapi_model = model | |
| payload = { | |
| "model_name": useapi_model, | |
| "prompt": prompt[:2500], | |
| "image": image_url, # URLを使用 | |
| "duration": str(duration) # UseAPI.netでは文字列型 | |
| } | |
| # modeパラメータはv1系モデルのみサポート(v2系では使用しない) | |
| if mode and useapi_model.startswith("kling-v1"): | |
| payload["mode"] = mode | |
| # emailパラメータ(複数アカウント利用時) | |
| if self.email: | |
| payload["email"] = self.email | |
| # tail_imageがある場合(v1-6のみサポート、v2系は非対応) | |
| # UseAPI.netでは image_tail パラメータを使用 | |
| if tail_image_path: | |
| if model == "kling-v1-6": | |
| tail_image_url, _ = await self._upload_image(tail_image_path, for_image_generation=False, auto_resize=True) | |
| payload["image_tail"] = tail_image_url # tail_image → image_tail | |
| elif model.startswith("kling-v2"): | |
| print(f"警告: {model}はimage_tailをサポートしていません。無視されます。") | |
| # UseAPI.netではimage2video-framesエンドポイントを使用 | |
| url = f"{self.base_url}/videos/image2video-frames" | |
| print(f"=== 動画生成リクエスト ===") | |
| print(f"URL: {url}") | |
| print(f"ペイロード: {json.dumps(payload, indent=2, ensure_ascii=False)}") | |
| response = await self.client.post(url, headers=self._get_headers(), json=payload) | |
| if response.status_code == 200: | |
| result = response.json() | |
| print(f"=== 動画生成レスポンス ===") | |
| print(f"レスポンス: {json.dumps(result, indent=2, ensure_ascii=False)[:500]}...") | |
| # UseAPI.netの成功レスポンスをチェック | |
| if result.get("task") and isinstance(result["task"], dict) and "id" in result["task"]: | |
| task_id = str(result["task"]["id"]) | |
| print(f"✅ タスクID取得成功: {task_id}") | |
| # タスクログに記録 | |
| return task_id | |
| elif result.get("code") == 0 and "data" in result and "task_id" in result["data"]: | |
| task_id = result["data"]["task_id"] | |
| # タスクログに記録 | |
| return task_id | |
| else: | |
| # エラーメッセージを取得 | |
| error_msg = result.get("message", result.get("error", "")) | |
| status_name = result.get("status_name", "") | |
| status = result.get("status", "") | |
| if not error_msg: | |
| error_msg = f"ステータス: {status} ({status_name})" | |
| # Status 58 (UNKNOWN)の場合の詳細なエラーメッセージ | |
| if status == 58: | |
| error_msg += "\n考えられる原因:\n" | |
| error_msg += " 1. 別のタスクが処理中(同時処理は1タスクまで)\n" | |
| error_msg += " 2. アカウントの一時的な制限\n" | |
| error_msg += " 3. APIサービスの問題\n" | |
| error_msg += " → Klingのダッシュボードで処理中のタスクを確認してください" | |
| if "balance" in error_msg.lower() or "credit" in error_msg.lower(): | |
| raise Exception("クレジット不足: Kling AIダッシュボードでクレジットを購入してください") | |
| raise Exception(f"タスク作成エラー: {error_msg}") | |
| else: | |
| # HTTPエラーの詳細を解析 | |
| error_text = response.text | |
| if "balance" in error_text.lower() or "credit" in error_text.lower() or "insufficient" in error_text.lower(): | |
| raise Exception("クレジット不足: Kling AIダッシュボードでクレジットを購入してください") | |
| elif response.status_code == 402: # Payment Required | |
| raise Exception("クレジット不足: Kling AIダッシュボードでクレジットを購入してください") | |
| else: | |
| raise Exception(f"HTTPエラー {response.status_code}: {error_text}") | |
| except Exception as e: | |
| raise Exception(f"動画タスク作成エラー: {str(e)}") | |
| async def poll_task_result(self, task_id: str, max_wait_minutes: int = 60) -> Optional[str]: | |
| """タスクの完了を待機し、結果を取得""" | |
| max_attempts = max_wait_minutes * 12 # 5秒間隔 | |
| print(f"=== タスクポーリング開始 ===") | |
| print(f"タスクID: {task_id}") | |
| print(f"最大待機時間: {max_wait_minutes}分") | |
| for attempt in range(max_attempts): | |
| try: | |
| # UseAPI.netではGETでタスクステータスを確認 | |
| # 正しいエンドポイント: /v1/kling/tasks/{task_id} | |
| url = f"{self.base_url}/tasks/{task_id}" | |
| params = {} | |
| if self.email: | |
| params["email"] = self.email | |
| print(f"\n試行 {attempt + 1}/{max_attempts}: {url}") | |
| if params: | |
| print(f"パラメータ: {params}") | |
| response = await self.client.get(url, headers=self._get_headers(), params=params) | |
| if response.status_code == 200: | |
| result = response.json() | |
| # レスポンスの詳細をログ出力 | |
| print(f"レスポンス: {json.dumps(result, indent=2, ensure_ascii=False)[:1000]}...") | |
| # UseAPI.netのレスポンス形式に対応 | |
| task_data = None | |
| if "task" in result: | |
| task_data = result["task"] | |
| elif "data" in result: | |
| task_data = result["data"] | |
| else: | |
| task_data = result | |
| # ステータスの確認 | |
| status = task_data.get("status") | |
| status_name = task_data.get("status_name") | |
| status_final = task_data.get("status_final", False) | |
| print(f"ステータス: {status} ({status_name}), 最終: {status_final}") | |
| # 処理中の場合も情報を表示 | |
| if status_name in ["processing", "running", "generating"]: | |
| progress_percent = task_data.get("progress", 0) | |
| print(f"処理中... 進捗: {progress_percent}%") | |
| # 成功状態の判定(status 99 = succeed) | |
| if status == 99 or (status_final and status_name in ["finished", "succeed"]): | |
| # 動画URLを取得 | |
| video_url = None | |
| # まずresult.videoをチェック(UseAPI.netの標準レスポンス) | |
| if result.get("video"): | |
| video_url = result["video"] | |
| # taskInfo.outputsをチェック(フォールバック) | |
| if not video_url and task_data.get("taskInfo", {}).get("outputs"): | |
| outputs = task_data["taskInfo"]["outputs"] | |
| for output in outputs: | |
| if output.get("url"): | |
| video_url = output["url"] | |
| break | |
| # worksをチェック(フォールバック) | |
| if not video_url and "works" in result and len(result["works"]) > 0: | |
| work = result["works"][0] | |
| if "resource" in work and "resource" in work["resource"]: | |
| video_url = work["resource"]["resource"] | |
| if video_url: | |
| print(f"✅ 動画生成完了: {video_url}") | |
| # work_idを取得(ウォーターマークなしダウンロード用) | |
| work_id = None | |
| if "works" in result and len(result["works"]) > 0: | |
| work_id = result["works"][0].get("workId") | |
| if work_id: | |
| print(f"📌 Work ID取得: {work_id}") | |
| # ウォーターマークなしダウンロードを試みる | |
| try: | |
| watermark_free_url = await self.download_without_watermark(work_id) | |
| if watermark_free_url: | |
| print(f"✅ ウォーターマークなしURL取得成功") | |
| return watermark_free_url | |
| except Exception as e: | |
| print(f"⚠️ ウォーターマークなしダウンロード失敗: {e}") | |
| print(f"通常のURLを使用します") | |
| return video_url | |
| else: | |
| print("⚠️ 動画URLが見つかりません。レスポンス構造を確認中...") | |
| print(f"result全体: {json.dumps(result, indent=2, ensure_ascii=False)[:1000]}") | |
| # まだ完了していない | |
| pass | |
| elif status_name == "failed": | |
| error_msg = task_data.get('message', 'Unknown error') | |
| print(f"❌ タスク失敗: {error_msg}") | |
| raise Exception(f"タスク失敗: {error_msg}") | |
| elif response.status_code == 404: | |
| # 404エラーの場合、タスクが見つからない | |
| print(f"HTTPエラー 404: タスクが見つかりません") | |
| print(f"タスクID: {task_id}") | |
| print(f"エラー詳細: {response.text}") | |
| # レスポンスをパースしてエラー詳細を確認 | |
| try: | |
| error_data = response.json() | |
| error_msg = error_data.get('message', '') or error_data.get('error', '') | |
| if "balance" in error_msg.lower() or "credit" in error_msg.lower() or "insufficient" in error_msg.lower(): | |
| raise Exception("クレジット不足: Kling AIダッシュボードでクレジットを購入してください") | |
| except: | |
| pass | |
| # 最初の数回は待機してリトライ(タスクの作成に時間がかかる場合があるため) | |
| if attempt < 5: | |
| print("タスク作成待機中...") | |
| await asyncio.sleep(10) | |
| continue | |
| else: | |
| # タスクが見つからない場合、クレジット不足の可能性を示唆 | |
| raise Exception(f"タスクが見つかりません: {task_id} (クレジット不足の可能性があります)") | |
| else: | |
| print(f"HTTPエラー: {response.status_code} - {response.text}") | |
| except Exception as e: | |
| print(f"エラー発生: {str(e)}") | |
| if "タスク失敗" in str(e) or "タスクが見つかりません" in str(e): | |
| raise | |
| await asyncio.sleep(5) | |
| # タイムアウト時の詳細情報 | |
| print(f"\n❌ タスクがタイムアウトしました") | |
| print(f"タスクID: {task_id}") | |
| print(f"最大待機時間: {max_wait_minutes}分") | |
| raise Exception(f"タスクタイムアウト: {max_wait_minutes}分待機しても完了しませんでした") | |
| async def download_without_watermark(self, work_id: str) -> Optional[str]: | |
| """ウォーターマークなしでコンテンツをダウンロード(有料アカウント必須)""" | |
| try: | |
| # 正しいエンドポイント: /v1/kling/assets/download | |
| url = f"{self.base_url}/assets/download" | |
| params = {"workIds": work_id} | |
| if self.email: | |
| params["email"] = self.email | |
| print(f"ウォーターマークなしダウンロードを試行中...") | |
| print(f"URL: {url}") | |
| print(f"パラメータ: {params}") | |
| response = await self.client.get(url, headers=self._get_headers(), params=params) | |
| if response.status_code == 200: | |
| result = response.json() | |
| print(f"レスポンス: {json.dumps(result, indent=2, ensure_ascii=False)[:500]}...") | |
| # cdnUrlを取得(ドキュメントに基づく正しいフィールド) | |
| if result.get("cdnUrl"): | |
| return result["cdnUrl"] | |
| # フォールバック: 他の可能なフィールドをチェック | |
| elif result.get("urls") and len(result["urls"]) > 0: | |
| return result["urls"][0] | |
| elif result.get("url"): | |
| return result["url"] | |
| elif result.get("video"): | |
| return result["video"] | |
| else: | |
| print(f"⚠️ cdnUrlが見つかりません。レスポンス全体: {json.dumps(result, ensure_ascii=False)}") | |
| else: | |
| print(f"ウォーターマークなしダウンロード失敗: {response.status_code}") | |
| print(f"レスポンス: {response.text[:500]}") | |
| if response.status_code == 403: | |
| print("⚠️ 403 Forbidden: 有料アカウントが必要です") | |
| return None | |
| except Exception as e: | |
| print(f"ウォーターマークなしダウンロードエラー: {str(e)}") | |
| return None | |
| async def download_video(self, url: str, output_path: str) -> bool: | |
| """動画をダウンロード""" | |
| try: | |
| response = await self.client.get(url) | |
| if response.status_code == 200: | |
| with open(output_path, "wb") as f: | |
| f.write(response.content) | |
| return True | |
| except Exception as e: | |
| print(f"ダウンロードエラー: {e}") | |
| return False | |
| def _calculate_credits(self, model: str, duration: int) -> int: | |
| """使用クレジットを計算""" | |
| # モデルごとのクレジット単価(仮定) | |
| credit_rates = { | |
| "kling-v2-1": 10, # V2.1 Standard | |
| "kling-v1-6": 30, # V1.6 Professional | |
| "kling-v1-5": 20, # V1.5 | |
| } | |
| base_rate = credit_rates.get(model, 10) | |
| return base_rate * (duration // 5) # 5秒あたりの単価 | |
| async def create_image_task( | |
| self, | |
| image_path: str, | |
| prompt: str, | |
| model: str = "kling-v1-5", | |
| negative_prompt: str = "", | |
| strength: float = 0.3, | |
| reference_type: Optional[str] = "subject", | |
| image_fidelity: Optional[float] = 0.3, | |
| human_fidelity: Optional[float] = 0.9, | |
| aspect_ratio: Optional[str] = None | |
| ) -> Optional[str]: | |
| """画像生成タスクを作成""" | |
| try: | |
| # 画像アップロード(画像生成用アスペクト比チェック、自動リサイズ有効) | |
| image_url, image_info = await self._upload_image(image_path, for_image_generation=True, auto_resize=True) | |
| # UseAPI.net kolorsエンドポイント用のペイロード構築 | |
| payload = { | |
| "prompt": prompt[:2500], | |
| } | |
| # emailパラメータ(複数アカウント利用時) | |
| if self.email: | |
| payload["email"] = self.email | |
| # 参照モードの設定 | |
| if reference_type in ["face", "subject"]: | |
| payload["reference"] = reference_type | |
| payload["imageReference"] = image_url # 参照画像URL | |
| # アスペクト比を設定(face/subjectモードのみ) | |
| if aspect_ratio: | |
| payload["aspect_ratio"] = aspect_ratio | |
| # 顔参照強度(0-100の整数値) | |
| if reference_type == "face": | |
| face_strength = int((image_fidelity or 0.5) * 100) | |
| payload["faceStrength"] = face_strength | |
| payload["faceNo"] = 1 # デフォルトで最初の顔を使用 | |
| else: | |
| # 通常のrestyle/Image-to-Imageモード | |
| payload["reference"] = "restyle" | |
| payload["imageReference"] = image_url | |
| # restyleモードではaspect_ratioはサポートされない | |
| # 生成画像数(デフォルト1) | |
| payload["imageCount"] = 1 | |
| # UseAPI.netのkolorsエンドポイントを使用 | |
| url = f"{self.base_url}/images/kolors" | |
| # デバッグ用:送信するペイロードをログ出力 | |
| print(f"=== Kling API Image Request ===") | |
| print(f"URL: {url}") | |
| print(f"Model: {model}") | |
| print(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}") | |
| print(f"===============================") | |
| response = await self.client.post(url, headers=self._get_headers(), json=payload) | |
| if response.status_code == 200: | |
| result = response.json() | |
| print(f"=== Kling API Image Response ===") | |
| print(f"Response: {json.dumps(result, indent=2, ensure_ascii=False)}") | |
| print(f"================================") | |
| # UseAPI.netのレスポンス形式に対応 | |
| if "task" in result and "id" in result["task"]: | |
| task_id = str(result["task"]["id"]) | |
| # タスクログに記録 | |
| return task_id | |
| elif "data" in result and "task_id" in result["data"]: | |
| task_id = result["data"]["task_id"] | |
| # タスクログに記録 | |
| return task_id | |
| elif "id" in result: | |
| task_id = str(result["id"]) | |
| # タスクログに記録 | |
| return task_id | |
| else: | |
| # エラーチェック | |
| if result.get("code") != 0: | |
| error_msg = result.get("message", result.get("error", "Unknown error")) | |
| if "balance" in error_msg.lower() or "credit" in error_msg.lower(): | |
| raise Exception("クレジット不足: Kling AIダッシュボードでクレジットを購入してください") | |
| raise Exception(f"画像タスク作成エラー: {error_msg}") | |
| else: | |
| raise Exception(f"タスクIDが見つかりません: {result}") | |
| else: | |
| raise Exception(f"HTTPエラー {response.status_code}: {response.text}") | |
| except Exception as e: | |
| raise Exception(f"画像タスク作成エラー: {str(e)}") | |
| async def poll_image_result(self, task_id: str, max_wait_minutes: int = 10) -> Optional[str]: | |
| """画像タスクの完了を待機し、結果を取得""" | |
| max_attempts = max_wait_minutes * 12 # 5秒間隔 | |
| for attempt in range(max_attempts): | |
| try: | |
| # UseAPI.netではGETでタスクステータスを確認 | |
| # 正しいエンドポイント: /v1/kling/tasks/{task_id} | |
| url = f"{self.base_url}/tasks/{task_id}" | |
| params = {} | |
| if self.email: | |
| params["email"] = self.email | |
| response = await self.client.get(url, headers=self._get_headers(), params=params) | |
| if response.status_code == 200: | |
| result = response.json() | |
| # タスクデータを取得 | |
| task_data = result.get("task", {}) | |
| status_name = task_data.get("status_name", "") | |
| status_final = task_data.get("status_final", False) | |
| # ステータスの確認 | |
| status = task_data.get("status") | |
| if (status == 99) or (status_final and status_name == "succeed"): | |
| # 画像URLを取得 | |
| image_url = None | |
| # まずresult.imageをチェック(UseAPI.netの標準レスポンス) | |
| if result.get("image"): | |
| image_url = result["image"] | |
| # taskInfo.outputsをチェック(フォールバック) | |
| if not image_url and task_data.get("taskInfo", {}).get("outputs"): | |
| outputs = task_data["taskInfo"]["outputs"] | |
| for output in outputs: | |
| if output.get("url"): | |
| image_url = output["url"] | |
| break | |
| # worksをチェック(フォールバック) | |
| if not image_url and "works" in result and len(result["works"]) > 0: | |
| work = result["works"][0] | |
| if "resource" in work and "resource" in work["resource"]: | |
| image_url = work["resource"]["resource"] | |
| if image_url: | |
| print(f"✅ 画像生成完了: {image_url}") | |
| # work_idを取得(ウォーターマークなしダウンロード用) | |
| work_id = None | |
| if "works" in result and len(result["works"]) > 0: | |
| work_id = result["works"][0].get("workId") | |
| if work_id: | |
| print(f"📌 Work ID取得: {work_id}") | |
| # ウォーターマークなしダウンロードを試みる | |
| try: | |
| watermark_free_url = await self.download_without_watermark(work_id) | |
| if watermark_free_url: | |
| print(f"✅ ウォーターマークなしURL取得成功") | |
| return watermark_free_url | |
| except Exception as e: | |
| print(f"⚠️ ウォーターマークなしダウンロード失敗: {e}") | |
| print(f"通常のURLを使用します") | |
| return image_url | |
| elif status_name == "failed": | |
| error_msg = result.get('message', 'Unknown error') | |
| raise Exception(f"画像生成失敗: {error_msg}") | |
| except Exception as e: | |
| if "画像生成失敗" in str(e): | |
| raise | |
| await asyncio.sleep(5) | |
| raise Exception("画像生成タスクタイムアウト") | |
| async def download_image(self, url: str, output_path: str) -> bool: | |
| """画像をダウンロード""" | |
| try: | |
| response = await self.client.get(url) | |
| if response.status_code == 200: | |
| with open(output_path, "wb") as f: | |
| f.write(response.content) | |
| return True | |
| except Exception as e: | |
| print(f"画像ダウンロードエラー: {e}") | |
| return False | |
| async def close(self): | |
| """クライアントを閉じる""" | |
| await self.client.aclose() | |
| # === 追加: 動画タスク結果をポーリング(進捗バー連携) === | |
| async def poll_video_result( | |
| self, | |
| task_id: str, | |
| progress_callback=None, | |
| base_progress: float = 0.0, | |
| progress_delta: float = 0.2, | |
| max_wait_minutes: int = 60 | |
| ) -> Optional[str]: | |
| """動画生成タスクの完了を待機してURLを返す | |
| progress_callback には gr.Progress インスタンスを渡すことを想定 | |
| base_progress はポーリング開始時点の進捗値、 | |
| progress_delta は完了時に上乗せする進捗幅""" | |
| try: | |
| # ポーリング開始を通知 | |
| if progress_callback: | |
| progress_callback(base_progress, "動画生成を待機中...") | |
| video_url = await self.poll_task_result(task_id, max_wait_minutes=max_wait_minutes) | |
| # 完了を通知(進捗を一気に上げる) | |
| if progress_callback: | |
| progress_callback(min(base_progress + progress_delta, 1.0), "動画生成完了") | |
| return video_url | |
| except Exception as e: | |
| # エラー時にも進捗を更新 | |
| if progress_callback: | |
| progress_callback(base_progress, "動画生成エラー") | |
| raise |