"""UseAPI.net Kling AI API通信処理モジュール""" import os import time import base64 import json import httpx import asyncio from datetime import datetime, timezone from typing import Optional, Dict, Any, Tuple from PIL import Image import io class KlingAPI: """UseAPI.net Kling AI APIクライアント""" def __init__(self): self.api_token = os.getenv('USEAPI_NET_TOKEN') self.email = os.getenv('USEAPI_NET_EMAIL', '') # オプション:複数アカウント利用時 if not self.api_token: raise ValueError("UseAPI.net認証情報が設定されていません。環境変数を確認してください。") self.base_url = "https://api.useapi.net/v1/kling" self.client = httpx.AsyncClient(timeout=120.0) # UseAPI.netは処理が長い場合があるため延長 def _get_headers(self) -> Dict[str, str]: """API通信用のヘッダーを取得""" return { "Authorization": f"Bearer {self.api_token}", "Content-Type": "application/json" } async def _upload_image(self, image_path: str, for_image_generation: bool = False, auto_resize: bool = True) -> Tuple[str, Dict[str, Any]]: """画像をUseAPI.netにアップロードし、URLを取得 Args: image_path: アップロードする画像のパス for_image_generation: 画像生成用かどうか auto_resize: 自動リサイズを有効にするか """ # 一時ファイルパスを準備 resized_path = None import os try: # まず画像情報を取得 with Image.open(image_path) as img: # 画像検証 width, height = img.size original_width, original_height = width, height # 最小サイズチェック if width < 300 or height < 300: raise ValueError(f"画像サイズが小さすぎます。最小300x300ピクセル必要です。現在: {width}x{height}") # アスペクト比チェック aspect_ratio = width / height if for_image_generation: # 画像生成用の許可されるアスペクト比 allowed_ratios = { "1:1": 1.0, # 1.0 "16:9": 16/9, # 1.7777... "4:3": 4/3, # 1.3333... "3:2": 3/2, # 1.5 "2:3": 2/3, # 0.6666... "3:4": 3/4, # 0.75 "9:16": 9/16, # 0.5625 "21:9": 21/9, # 2.3333... } supported_ratios_text = "1:1, 16:9, 4:3, 3:2, 2:3, 3:4, 9:16, 21:9" else: # 動画生成用の許可されるアスペクト比(厳密なチェック) allowed_ratios = { "16:9": 16/9, # 1.7777... "9:16": 9/16, # 0.5625 "1:1": 1.0 # 1.0 } supported_ratios_text = "16:9 (1920x1080), 9:16 (1080x1920), 1:1 (1440x1440)" # 最も近いアスペクト比を見つける valid_ratio = False ratio_name = "" tolerance = 0.02 if for_image_generation else 0.001 # 画像生成はより寛容な誤差範囲 for name, ratio in allowed_ratios.items(): if abs(aspect_ratio - ratio) < tolerance: valid_ratio = True ratio_name = name break if not valid_ratio: if auto_resize: # 自動リサイズが有効な場合、最も近いアスペクト比にリサイズ closest_ratio_name = None min_diff = float('inf') for name, ratio in allowed_ratios.items(): diff = abs(aspect_ratio - ratio) if diff < min_diff: min_diff = diff closest_ratio_name = name closest_ratio = ratio # 新しいサイズを計算(元の面積を可能な限り保持) current_area = width * height if closest_ratio >= 1: # 横長 new_height = int((current_area / closest_ratio) ** 0.5) new_width = int(new_height * closest_ratio) else: # 縦長 new_width = int((current_area * closest_ratio) ** 0.5) new_height = int(new_width / closest_ratio) # 最小サイズを確保 if new_width < 300 or new_height < 300: scale_factor = max(300 / new_width, 300 / new_height) new_width = int(new_width * scale_factor) new_height = int(new_height * scale_factor) # リサイズ実行 print(f"画像を自動リサイズ: {width}x{height} -> {new_width}x{new_height} (アスペクト比: {closest_ratio_name})") # 画像を再度開いてリサイズ with Image.open(image_path) as img_to_resize: # 一時ファイルにリサイズした画像を保存 resized_img = img_to_resize.resize((new_width, new_height), Image.Resampling.LANCZOS) # ファイル拡張子を正しく取得 base_name, ext = os.path.splitext(image_path) if not ext: ext = '.png' # デフォルト resized_path = f"{base_name}_resized{ext}" # 画像形式を維持して保存 save_format = 'PNG' if ext.lower() in ['.png'] else 'JPEG' resized_img.save(resized_path, format=save_format, quality=95) # リサイズ後の値を更新 width, height = new_width, new_height aspect_ratio = width / height ratio_name = closest_ratio_name else: # 自動リサイズが無効な場合はエラー current_ratio_str = f"{aspect_ratio:.2f}:1" if aspect_ratio >= 1 else f"1:{1/aspect_ratio:.2f}" mode_text = "画像生成" if for_image_generation else "動画生成" raise ValueError(f"{mode_text}:画像のアスペクト比が対応していません。\n" f"現在: {current_ratio_str} ({width}x{height})\n" f"対応画角: {supported_ratios_text}") # ファイルサイズチェック(リサイズした場合はスキップ) if not resized_path: with Image.open(image_path) as img_for_check: img_byte_arr = io.BytesIO() # フォーマットを判定 img_format = img_for_check.format if img_for_check.format else 'PNG' img_for_check.save(img_byte_arr, format=img_format) img_byte_arr = img_byte_arr.getvalue() if len(img_byte_arr) > 10 * 1024 * 1024: # 10MB raise ValueError(f"画像ファイルサイズが大きすぎます。最大10MBまでです。") # 画像をUseAPI.netにアップロード # リサイズされた場合はそのファイルを使用 upload_path = resized_path if resized_path else image_path with open(upload_path, 'rb') as f: file_data = f.read() # ファイル拡張子に基づいてContent-Typeを設定 content_type = "image/png" if upload_path.lower().endswith('.png') else "image/jpeg" headers = { "Authorization": f"Bearer {self.api_token}", "Content-Type": content_type } # URLにemailパラメータを追加 url = f"{self.base_url}/assets/" if self.email: url += f"?email={self.email}" # バイナリデータを送信 response = await self.client.post(url, headers=headers, content=file_data) if response.status_code == 200: result = response.json() asset_url = result.get('url') if asset_url: return asset_url, { "width": width, "height": height, "aspect_ratio": aspect_ratio, "size_mb": len(file_data) / (1024 * 1024), "original_size": f"{original_width}x{original_height}" if resized_path else None, "resized": bool(resized_path) } else: raise Exception(f"アセットURLが取得できませんでした: {result}") else: raise Exception(f"画像アップロードエラー: {response.status_code} - {response.text}") finally: # 一時ファイルを削除 if resized_path and os.path.exists(resized_path): os.remove(resized_path) async def create_video_task( self, image_path: str, prompt: str, model: str = "kling-v2-1", duration: int = 10, tail_image_path: Optional[str] = None, mode: str = "std" ) -> Optional[str]: """動画生成タスクを作成""" try: # 画像アップロード(動画生成でも自動リサイズを有効化) image_url, image_info = await self._upload_image(image_path, for_image_generation=False, auto_resize=True) # UseAPI.net用のペイロード構築 # モデル名の処理(image2video-framesエンドポイント用) # v1系はそのまま、v2系もそのまま使用 useapi_model = model payload = { "model_name": useapi_model, "prompt": prompt[:2500], "image": image_url, # URLを使用 "duration": str(duration) # UseAPI.netでは文字列型 } # modeパラメータはv1系モデルのみサポート(v2系では使用しない) if mode and useapi_model.startswith("kling-v1"): payload["mode"] = mode # emailパラメータ(複数アカウント利用時) if self.email: payload["email"] = self.email # tail_imageがある場合(v1-6のみサポート、v2系は非対応) # UseAPI.netでは image_tail パラメータを使用 if tail_image_path: if model == "kling-v1-6": tail_image_url, _ = await self._upload_image(tail_image_path, for_image_generation=False, auto_resize=True) payload["image_tail"] = tail_image_url # tail_image → image_tail elif model.startswith("kling-v2"): print(f"警告: {model}はimage_tailをサポートしていません。無視されます。") # UseAPI.netではimage2video-framesエンドポイントを使用 url = f"{self.base_url}/videos/image2video-frames" print(f"=== 動画生成リクエスト ===") print(f"URL: {url}") print(f"ペイロード: {json.dumps(payload, indent=2, ensure_ascii=False)}") response = await self.client.post(url, headers=self._get_headers(), json=payload) if response.status_code == 200: result = response.json() print(f"=== 動画生成レスポンス ===") print(f"レスポンス: {json.dumps(result, indent=2, ensure_ascii=False)[:500]}...") # UseAPI.netの成功レスポンスをチェック if result.get("task") and isinstance(result["task"], dict) and "id" in result["task"]: task_id = str(result["task"]["id"]) print(f"✅ タスクID取得成功: {task_id}") # タスクログに記録 return task_id elif result.get("code") == 0 and "data" in result and "task_id" in result["data"]: task_id = result["data"]["task_id"] # タスクログに記録 return task_id else: # エラーメッセージを取得 error_msg = result.get("message", result.get("error", "")) status_name = result.get("status_name", "") status = result.get("status", "") if not error_msg: error_msg = f"ステータス: {status} ({status_name})" # Status 58 (UNKNOWN)の場合の詳細なエラーメッセージ if status == 58: error_msg += "\n考えられる原因:\n" error_msg += " 1. 別のタスクが処理中(同時処理は1タスクまで)\n" error_msg += " 2. アカウントの一時的な制限\n" error_msg += " 3. APIサービスの問題\n" error_msg += " → Klingのダッシュボードで処理中のタスクを確認してください" if "balance" in error_msg.lower() or "credit" in error_msg.lower(): raise Exception("クレジット不足: Kling AIダッシュボードでクレジットを購入してください") raise Exception(f"タスク作成エラー: {error_msg}") else: # HTTPエラーの詳細を解析 error_text = response.text if "balance" in error_text.lower() or "credit" in error_text.lower() or "insufficient" in error_text.lower(): raise Exception("クレジット不足: Kling AIダッシュボードでクレジットを購入してください") elif response.status_code == 402: # Payment Required raise Exception("クレジット不足: Kling AIダッシュボードでクレジットを購入してください") else: raise Exception(f"HTTPエラー {response.status_code}: {error_text}") except Exception as e: raise Exception(f"動画タスク作成エラー: {str(e)}") async def poll_task_result(self, task_id: str, max_wait_minutes: int = 60) -> Optional[str]: """タスクの完了を待機し、結果を取得""" max_attempts = max_wait_minutes * 12 # 5秒間隔 print(f"=== タスクポーリング開始 ===") print(f"タスクID: {task_id}") print(f"最大待機時間: {max_wait_minutes}分") for attempt in range(max_attempts): try: # UseAPI.netではGETでタスクステータスを確認 # 正しいエンドポイント: /v1/kling/tasks/{task_id} url = f"{self.base_url}/tasks/{task_id}" params = {} if self.email: params["email"] = self.email print(f"\n試行 {attempt + 1}/{max_attempts}: {url}") if params: print(f"パラメータ: {params}") response = await self.client.get(url, headers=self._get_headers(), params=params) if response.status_code == 200: result = response.json() # レスポンスの詳細をログ出力 print(f"レスポンス: {json.dumps(result, indent=2, ensure_ascii=False)[:1000]}...") # UseAPI.netのレスポンス形式に対応 task_data = None if "task" in result: task_data = result["task"] elif "data" in result: task_data = result["data"] else: task_data = result # ステータスの確認 status = task_data.get("status") status_name = task_data.get("status_name") status_final = task_data.get("status_final", False) print(f"ステータス: {status} ({status_name}), 最終: {status_final}") # 処理中の場合も情報を表示 if status_name in ["processing", "running", "generating"]: progress_percent = task_data.get("progress", 0) print(f"処理中... 進捗: {progress_percent}%") # 成功状態の判定(status 99 = succeed) if status == 99 or (status_final and status_name in ["finished", "succeed"]): # 動画URLを取得 video_url = None # まずresult.videoをチェック(UseAPI.netの標準レスポンス) if result.get("video"): video_url = result["video"] # taskInfo.outputsをチェック(フォールバック) if not video_url and task_data.get("taskInfo", {}).get("outputs"): outputs = task_data["taskInfo"]["outputs"] for output in outputs: if output.get("url"): video_url = output["url"] break # worksをチェック(フォールバック) if not video_url and "works" in result and len(result["works"]) > 0: work = result["works"][0] if "resource" in work and "resource" in work["resource"]: video_url = work["resource"]["resource"] if video_url: print(f"✅ 動画生成完了: {video_url}") # work_idを取得(ウォーターマークなしダウンロード用) work_id = None if "works" in result and len(result["works"]) > 0: work_id = result["works"][0].get("workId") if work_id: print(f"📌 Work ID取得: {work_id}") # ウォーターマークなしダウンロードを試みる try: watermark_free_url = await self.download_without_watermark(work_id) if watermark_free_url: print(f"✅ ウォーターマークなしURL取得成功") return watermark_free_url except Exception as e: print(f"⚠️ ウォーターマークなしダウンロード失敗: {e}") print(f"通常のURLを使用します") return video_url else: print("⚠️ 動画URLが見つかりません。レスポンス構造を確認中...") print(f"result全体: {json.dumps(result, indent=2, ensure_ascii=False)[:1000]}") # まだ完了していない pass elif status_name == "failed": error_msg = task_data.get('message', 'Unknown error') print(f"❌ タスク失敗: {error_msg}") raise Exception(f"タスク失敗: {error_msg}") elif response.status_code == 404: # 404エラーの場合、タスクが見つからない print(f"HTTPエラー 404: タスクが見つかりません") print(f"タスクID: {task_id}") print(f"エラー詳細: {response.text}") # レスポンスをパースしてエラー詳細を確認 try: error_data = response.json() error_msg = error_data.get('message', '') or error_data.get('error', '') if "balance" in error_msg.lower() or "credit" in error_msg.lower() or "insufficient" in error_msg.lower(): raise Exception("クレジット不足: Kling AIダッシュボードでクレジットを購入してください") except: pass # 最初の数回は待機してリトライ(タスクの作成に時間がかかる場合があるため) if attempt < 5: print("タスク作成待機中...") await asyncio.sleep(10) continue else: # タスクが見つからない場合、クレジット不足の可能性を示唆 raise Exception(f"タスクが見つかりません: {task_id} (クレジット不足の可能性があります)") else: print(f"HTTPエラー: {response.status_code} - {response.text}") except Exception as e: print(f"エラー発生: {str(e)}") if "タスク失敗" in str(e) or "タスクが見つかりません" in str(e): raise await asyncio.sleep(5) # タイムアウト時の詳細情報 print(f"\n❌ タスクがタイムアウトしました") print(f"タスクID: {task_id}") print(f"最大待機時間: {max_wait_minutes}分") raise Exception(f"タスクタイムアウト: {max_wait_minutes}分待機しても完了しませんでした") async def download_without_watermark(self, work_id: str) -> Optional[str]: """ウォーターマークなしでコンテンツをダウンロード(有料アカウント必須)""" try: # 正しいエンドポイント: /v1/kling/assets/download url = f"{self.base_url}/assets/download" params = {"workIds": work_id} if self.email: params["email"] = self.email print(f"ウォーターマークなしダウンロードを試行中...") print(f"URL: {url}") print(f"パラメータ: {params}") response = await self.client.get(url, headers=self._get_headers(), params=params) if response.status_code == 200: result = response.json() print(f"レスポンス: {json.dumps(result, indent=2, ensure_ascii=False)[:500]}...") # cdnUrlを取得(ドキュメントに基づく正しいフィールド) if result.get("cdnUrl"): return result["cdnUrl"] # フォールバック: 他の可能なフィールドをチェック elif result.get("urls") and len(result["urls"]) > 0: return result["urls"][0] elif result.get("url"): return result["url"] elif result.get("video"): return result["video"] else: print(f"⚠️ cdnUrlが見つかりません。レスポンス全体: {json.dumps(result, ensure_ascii=False)}") else: print(f"ウォーターマークなしダウンロード失敗: {response.status_code}") print(f"レスポンス: {response.text[:500]}") if response.status_code == 403: print("⚠️ 403 Forbidden: 有料アカウントが必要です") return None except Exception as e: print(f"ウォーターマークなしダウンロードエラー: {str(e)}") return None async def download_video(self, url: str, output_path: str) -> bool: """動画をダウンロード""" try: response = await self.client.get(url) if response.status_code == 200: with open(output_path, "wb") as f: f.write(response.content) return True except Exception as e: print(f"ダウンロードエラー: {e}") return False def _calculate_credits(self, model: str, duration: int) -> int: """使用クレジットを計算""" # モデルごとのクレジット単価(仮定) credit_rates = { "kling-v2-1": 10, # V2.1 Standard "kling-v1-6": 30, # V1.6 Professional "kling-v1-5": 20, # V1.5 } base_rate = credit_rates.get(model, 10) return base_rate * (duration // 5) # 5秒あたりの単価 async def create_image_task( self, image_path: str, prompt: str, model: str = "kling-v1-5", negative_prompt: str = "", strength: float = 0.3, reference_type: Optional[str] = "subject", image_fidelity: Optional[float] = 0.3, human_fidelity: Optional[float] = 0.9, aspect_ratio: Optional[str] = None ) -> Optional[str]: """画像生成タスクを作成""" try: # 画像アップロード(画像生成用アスペクト比チェック、自動リサイズ有効) image_url, image_info = await self._upload_image(image_path, for_image_generation=True, auto_resize=True) # UseAPI.net kolorsエンドポイント用のペイロード構築 payload = { "prompt": prompt[:2500], } # emailパラメータ(複数アカウント利用時) if self.email: payload["email"] = self.email # 参照モードの設定 if reference_type in ["face", "subject"]: payload["reference"] = reference_type payload["imageReference"] = image_url # 参照画像URL # アスペクト比を設定(face/subjectモードのみ) if aspect_ratio: payload["aspect_ratio"] = aspect_ratio # 顔参照強度(0-100の整数値) if reference_type == "face": face_strength = int((image_fidelity or 0.5) * 100) payload["faceStrength"] = face_strength payload["faceNo"] = 1 # デフォルトで最初の顔を使用 else: # 通常のrestyle/Image-to-Imageモード payload["reference"] = "restyle" payload["imageReference"] = image_url # restyleモードではaspect_ratioはサポートされない # 生成画像数(デフォルト1) payload["imageCount"] = 1 # UseAPI.netのkolorsエンドポイントを使用 url = f"{self.base_url}/images/kolors" # デバッグ用:送信するペイロードをログ出力 print(f"=== Kling API Image Request ===") print(f"URL: {url}") print(f"Model: {model}") print(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}") print(f"===============================") response = await self.client.post(url, headers=self._get_headers(), json=payload) if response.status_code == 200: result = response.json() print(f"=== Kling API Image Response ===") print(f"Response: {json.dumps(result, indent=2, ensure_ascii=False)}") print(f"================================") # UseAPI.netのレスポンス形式に対応 if "task" in result and "id" in result["task"]: task_id = str(result["task"]["id"]) # タスクログに記録 return task_id elif "data" in result and "task_id" in result["data"]: task_id = result["data"]["task_id"] # タスクログに記録 return task_id elif "id" in result: task_id = str(result["id"]) # タスクログに記録 return task_id else: # エラーチェック if result.get("code") != 0: error_msg = result.get("message", result.get("error", "Unknown error")) if "balance" in error_msg.lower() or "credit" in error_msg.lower(): raise Exception("クレジット不足: Kling AIダッシュボードでクレジットを購入してください") raise Exception(f"画像タスク作成エラー: {error_msg}") else: raise Exception(f"タスクIDが見つかりません: {result}") else: raise Exception(f"HTTPエラー {response.status_code}: {response.text}") except Exception as e: raise Exception(f"画像タスク作成エラー: {str(e)}") async def poll_image_result(self, task_id: str, max_wait_minutes: int = 10) -> Optional[str]: """画像タスクの完了を待機し、結果を取得""" max_attempts = max_wait_minutes * 12 # 5秒間隔 for attempt in range(max_attempts): try: # UseAPI.netではGETでタスクステータスを確認 # 正しいエンドポイント: /v1/kling/tasks/{task_id} url = f"{self.base_url}/tasks/{task_id}" params = {} if self.email: params["email"] = self.email response = await self.client.get(url, headers=self._get_headers(), params=params) if response.status_code == 200: result = response.json() # タスクデータを取得 task_data = result.get("task", {}) status_name = task_data.get("status_name", "") status_final = task_data.get("status_final", False) # ステータスの確認 status = task_data.get("status") if (status == 99) or (status_final and status_name == "succeed"): # 画像URLを取得 image_url = None # まずresult.imageをチェック(UseAPI.netの標準レスポンス) if result.get("image"): image_url = result["image"] # taskInfo.outputsをチェック(フォールバック) if not image_url and task_data.get("taskInfo", {}).get("outputs"): outputs = task_data["taskInfo"]["outputs"] for output in outputs: if output.get("url"): image_url = output["url"] break # worksをチェック(フォールバック) if not image_url and "works" in result and len(result["works"]) > 0: work = result["works"][0] if "resource" in work and "resource" in work["resource"]: image_url = work["resource"]["resource"] if image_url: print(f"✅ 画像生成完了: {image_url}") # work_idを取得(ウォーターマークなしダウンロード用) work_id = None if "works" in result and len(result["works"]) > 0: work_id = result["works"][0].get("workId") if work_id: print(f"📌 Work ID取得: {work_id}") # ウォーターマークなしダウンロードを試みる try: watermark_free_url = await self.download_without_watermark(work_id) if watermark_free_url: print(f"✅ ウォーターマークなしURL取得成功") return watermark_free_url except Exception as e: print(f"⚠️ ウォーターマークなしダウンロード失敗: {e}") print(f"通常のURLを使用します") return image_url elif status_name == "failed": error_msg = result.get('message', 'Unknown error') raise Exception(f"画像生成失敗: {error_msg}") except Exception as e: if "画像生成失敗" in str(e): raise await asyncio.sleep(5) raise Exception("画像生成タスクタイムアウト") async def download_image(self, url: str, output_path: str) -> bool: """画像をダウンロード""" try: response = await self.client.get(url) if response.status_code == 200: with open(output_path, "wb") as f: f.write(response.content) return True except Exception as e: print(f"画像ダウンロードエラー: {e}") return False async def close(self): """クライアントを閉じる""" await self.client.aclose() # === 追加: 動画タスク結果をポーリング(進捗バー連携) === async def poll_video_result( self, task_id: str, progress_callback=None, base_progress: float = 0.0, progress_delta: float = 0.2, max_wait_minutes: int = 60 ) -> Optional[str]: """動画生成タスクの完了を待機してURLを返す progress_callback には gr.Progress インスタンスを渡すことを想定 base_progress はポーリング開始時点の進捗値、 progress_delta は完了時に上乗せする進捗幅""" try: # ポーリング開始を通知 if progress_callback: progress_callback(base_progress, "動画生成を待機中...") video_url = await self.poll_task_result(task_id, max_wait_minutes=max_wait_minutes) # 完了を通知(進捗を一気に上げる) if progress_callback: progress_callback(min(base_progress + progress_delta, 1.0), "動画生成完了") return video_url except Exception as e: # エラー時にも進捗を更新 if progress_callback: progress_callback(base_progress, "動画生成エラー") raise