# coding: utf-8 import gradio as gr from PIL import Image import numpy as np import io import base64 import zipfile import json import time import asyncio import os import cv2 import httpx # --- 定数とAPI設定 (Constants and API Configuration) --- # Nano Banana API (gemini-2.5-flash-image-preview) の設定 # 環境変数からAPIキーを安全に読み込む API_KEY = os.environ.get("NANO_BANANA_API", "") # Secret名 "NANO_BANANA_API" を使用 API_URL = "https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash-image-preview:generateContent?key=" # API送信用の最大画像サイズを定義 (ほとんどのAIモデルで推奨される最大値に近い) MAX_API_SIZE = 1024 # --- ヘルパー関数 (Helper Functions) --- def pil_to_base64(img: Image.Image) -> str: """PIL画像をBase64エンコードされたPNGデータに変換します。""" buffered = io.BytesIo() # アルファチャンネル(PNG)を維持 if img.mode != 'RGBA': img = img.convert('RGBA') img.save(buffered, format="PNG") return base64.b64encode(buffered.getvalue()).decode("utf-8") async def nano_banana_completion_api(base_image: Image.Image, prompt: str) -> Image.Image: """ Nano Banana (gemini-2.5-flash-image-preview) APIを呼び出し、画像補完を実行します。 """ if not API_KEY: # APIキーが空の場合はエラーを発生させて、ユーザーに設定を促す raise gr.Error("エラー: APIキーが設定されていません。Hugging Face Secretsで 'NANO_BANANA_API' を設定してください。") # --- API送信前に画像サイズを調整 --- resized_image = base_image.copy() w, h = resized_image.size if max(w, h) > MAX_API_SIZE: if w > h: new_w = MAX_API_SIZE new_h = int(h * (MAX_API_SIZE / w)) else: new_h = MAX_API_SIZE new_w = int(w * (MAX_API_SIZE / h)) # アスペクト比を維持しつつリサイズ resized_image = resized_image.resize((new_w, new_h), Image.Resampling.LANCZOS) print(f"--- 画像をAPI送信のためリサイズしました: {w}x{h} -> {new_w}x{new_h} ---") # Base64エンコード base64_image = pil_to_base64(resized_image) # プロンプトと画像を含むペイロードの構築(Image-to-Image用) payload = { "contents": [ { "parts": [ { "text": f"画像を編集・補完してください。この画像はLive2D素材の一部です。マスクされた(透明な)領域にプロンプトに従った内容を生成し、画像を自然に完成させてください。プロンプト: {prompt}" }, { "inlineData": { "mimeType": "image/png", "data": base64_image } } ] } ], "generationConfig": { "responseModalities": ["TEXT", "IMAGE"] }, } print(f"--- API呼び出しペイロードを構築しました。プロンプト: {prompt} ---") # --- 実際のAPI呼び出しロジック (httpxを使用) --- final_api_url = API_URL + API_KEY # 指数バックオフ付きリトライロジック max_retries = 5 delay = 2 for attempt in range(max_retries): try: async with httpx.AsyncClient(timeout=60.0) as client: response = await client.post(final_api_url, json=payload) response.raise_for_status() result = response.json() candidate = result.get('candidates', [{}])[0] # Base64画像データの抽出 base64_data = None for part in candidate.get('content', {}).get('parts', []): if 'inlineData' in part and part['inlineData']['mimeType'].startswith('image/'): base64_data = part['inlineData']['data'] break if base64_data: # Base64データをデコードしてPIL Imageに変換 image_data = base64.b64decode(base64_data) completed_image = Image.open(io.BytesIO(image_data)).convert('RGBA') print("--- AI補完処理を完了しました (実API) ---") return completed_image else: error_detail = result.get('candidates', [{}])[0].get('finishReason', 'UNKNOWN') raise ValueError(f"APIレスポンスから画像データが抽出できませんでした。Finish Reason: {error_detail}") except httpx.HTTPStatusError as e: if e.response.status_code in [429, 503] and attempt < max_retries - 1: print(f"APIレート制限/サーバーエラー ({e.response.status_code})。{delay}秒待機後にリトライします...") await asyncio.sleep(delay) delay *= 2 # 指数バックオフ else: # ユーザーへの表示用にgr.Errorを再raise raise gr.Error(f"API呼び出しエラー: Client error '{e.response.status_code} {e.response.reason_phrase}' for url '{e.request.url}'") except httpx.RequestError as e: raise gr.Error(f"APIリクエストエラー: ネットワークまたはタイムアウトエラーが発生しました。詳細: {e}") except Exception as e: # その他の予期せぬエラー raise gr.Error(f"予期せぬエラーが発生しました: {e}") raise gr.Error("APIリクエストが最大リトライ回数を超えて失敗しました。") # --- メイン処理ロジック (Main Processing Logic) --- async def segment_and_inpaint(original_image_np: np.ndarray, sketchpad_image_np: np.ndarray, inpaint_prompt: str): """ アップロードされた一枚絵とブラシで描かれたマスクを受け取り、 マスクされた領域を分割し、欠損部分をAI補完する関数。 (この関数はasync generatorとして動作し、yieldで結果を返します) """ # 失敗時の統一リターン用タプルを生成 def fail(message): # 画像出力は全てNone, JSONとZIPは空文字列/None, ステータスにエラーメッセージ return (None, None, None, "{}", None, f"❌ 処理失敗: {message}") if original_image_np is None: # 修正: return -> yield yield fail("元の画像がアップロードされていません。") return # Generatorを終了 # 元画像の形状を取得(SketchpadがNoneだった場合の代替用) h, w, _ = original_image_np.shape # ★★★ Sketchpad入力の型チェックとデータ形式の強制 ★★★ if not isinstance(sketchpad_image_np, np.ndarray) or sketchpad_image_np is None: # Sketchpadの入力がNumPy配列ではないか、Noneであった場合、 # 全て透明なダミーのNumPy配列を作成し、「描画なし」として処理を続行 print("--- Sketchpad入力が不正またはNoneでした。全て透明なダミーデータを作成します。 ---") sketchpad_image_np = np.zeros((h, w, 4), dtype=np.uint8) # RGBA if sketchpad_image_np.ndim == 3 and sketchpad_image_np.shape[2] == 3: # 3次元配列(RGB)の場合、透明なアルファチャンネルを追加して4次元(RGBA)に変換 print("--- SketchpadデータがRGB (3次元) で渡されました。RGBA (4次元) に変換します。 ---") # 全て透明(0)のアルファチャンネルを作成 alpha_channel = np.zeros((h, w, 1), dtype=np.uint8) # RGB配列とアルファチャンネルを結合 sketchpad_image_np = np.concatenate((sketchpad_image_np, alpha_channel), axis=2) elif sketchpad_image_np.ndim != 4 or sketchpad_image_np.shape[2] != 4: # 4次元配列ではない、または4チャンネルではない場合は不正と判断 # 修正: return -> yield yield fail(f"描画データ形式が不正です。(次元: {sketchpad_image_np.ndim}, チャンネル: {sketchpad_image_np.shape[2] if sketchpad_image_np.ndim >= 3 else 'N/A'})。") return # Generatorを終了 # PIL Image (元の画像) を取得 original_image = Image.fromarray(original_image_np).convert("RGBA") # Sketchpadからの入力は、元の画像の上にブラシで描画されたRGBA画像 # アルファチャンネルからマスクを抽出 sketch_mask_alpha = sketchpad_image_np[:, :, 3] # ユーザーが何も描画しなかった場合のチェック (アルファチャンネルが全てゼロ) if np.all(sketch_mask_alpha == 0): # SketchpadがNoneの場合もここに来る # 修正: return -> yield yield fail("分割したいパーツをブラシで描画してください。(描画が検出されません)") return # Generatorを終了 # NumPy配列からPIL Imageのマスクに変換 (Lモード) hair_mask = Image.fromarray(sketch_mask_alpha, mode='L') # --- Part A: 分割パーツを作成 (ユーザーが描いた部分) --- hair_part = original_image.copy().convert('RGBA') hair_part.putalpha(hair_mask) # --- Part B: 欠損穴のある体 (Body with Hole) を作成 (AI補完用入力) --- body_with_hole = original_image.copy().convert("RGBA") # マスクを反転 (描画した部分を透明な穴にするため) inverted_mask_np = cv2.bitwise_not(sketch_mask_alpha) body_with_hole.putalpha(Image.fromarray(inverted_mask_np, mode='L')) print("--- 1. 手動パーツ分割 完了 ---") # 2. 欠損領域のAI補完 (AI Inpainting) try: # 処理ステータスを一時的に更新してフィードバックを提供 yield (None, None, None, "{}", None, "⏳ AI補完処理を開始しました... (完了まで最大1分程度かかる場合があります)") completed_body_part = await nano_banana_completion_api( body_with_hole, inpaint_prompt or "マスクされた領域のキャラクターの顔と身体の肌、及び下に着ている服を、元のイラストのテイストに合わせて自然に補完してください。" ) except gr.Error as e: # 修正: return -> yield yield fail(f"AI補完APIエラー: {e}") return # Generatorを終了 except Exception as e: # 修正: return -> yield yield fail(f"AI補完処理中に予期せぬエラー: {e}") return # Generatorを終了 # 3. 出力ファイルの準備とZIPファイルの作成 output_parts = { "body_completed": completed_body_part, "hair_front": hair_part, } # JSON構造の作成 output_json_data = { "parts": { "body_completed": "body_completed.png", "hair_front": "hair_front.png" }, "inpaint_prompt_used": inpaint_prompt, "note": "hair_frontはユーザーがブラシで描画したマスクに基づいて分割されました。", "timestamp": time.strftime("%Y%m%d_%H%M%S") } json_data_str = json.dumps(output_json_data, indent=2, ensure_ascii=False) zip_buffer = io.BytesIO() with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zipf: for name, img in output_parts.items(): img_buffer = io.BytesIO() if img.mode != 'RGBA': img = img.convert('RGBA') img.save(img_buffer, format="PNG") zipf.writestr(f"live2d_parts/{name}.png", img_buffer.getvalue()) zipf.writestr("live2d_parts/parts_structure.json", json_data_str.encode('utf-8')) zip_buffer.seek(0) zip_file_path = f"live2d_parts_{output_json_data['timestamp']}.zip" with open(zip_file_path, "wb") as f: f.write(zip_buffer.read()) print(f"--- 4. 全パーツZIPファイル {zip_file_path} の作成完了 ---") # 成功時の最終結果をyield (6つの出力) yield ( completed_body_part, hair_part, body_with_hole, json_data_str, # JSON文字列をそのまま渡す zip_file_path, "✅ 処理完了!結果を確認し、ZIPファイルをダウンロードしてください。" # ステータス ) # --- Gradioインターフェース定義 (Gradio Interface Definition) --- # Gradioテーマ定義 theme = gr.themes.Soft( primary_hue="blue", secondary_hue="blue", neutral_hue="gray", ) with gr.Blocks(theme=theme, title="Live2D素材自動分割・補完アプリ") as demo: gr.Markdown( """

🎨 Live2D 素材 自動分割・補完アプリ 🤖

一枚絵をアップロードし、**ブラシでパーツを指定**することで、AIによる欠損部分の自動補完(Nano Banana API利用)を行います。

💡 **操作方法:** ①に元画像をアップロードし、②のキャンバスに**描画ブラシ**を使って、前髪など**分割したいパーツ**を塗りつぶしてください。塗った部分がパーツとして分離され、AIがその下の欠損を補完します。

⚠ 注意: 現在、描画キャンバスの背景にアップロード画像を動的に表示できません。アップロード画像を確認しながら、**描画キャンバス(②)の白いエリアに**ブラシでパーツの輪郭を塗ってください。

""" ) status_message = gr.Textbox(label="処理ステータス", interactive=False, value="処理を開始するには、画像アップロードと描画を行い、実行ボタンを押してください。", elem_classes="status-box") with gr.Row(): with gr.Column(scale=1): # 元画像をアップロードするためのコンポーネント original_image_upload = gr.Image( label="① 元画像のアップロード", type="numpy", height=200, ) # 描画専用のSketchpadコンポーネント (type="numpy") input_sketchpad = gr.Sketchpad( label="② 分割したいパーツをブラシで塗る (例: 前髪)", height=400, brush={"color": "#FF0000", "size": 20}, type="numpy" ) inpaint_prompt = gr.Textbox( label="③ AI補完プロンプト(オプション)", value="マスクされた領域のキャラクターの顔と身体の肌、及び下に着ている服を、元のイラストのテイストに合わせて自然に補完してください。", placeholder="例: マスクされた領域を元の絵柄で自然に描き足す" ) process_button = gr.Button("④ 手動分割・AI補完を実行", variant="primary", scale=0) gr.Markdown( """ ### 📌 開発のポイント * **エラー解消:** `async def` 関数内で `yield` を使用する場合の `SyntaxError` を解消しました。これにより、処理ステータスが正しくフィードバックされるようになります。 * **安定性の再強化:** Sketchpadの入力データ型チェックを引き続き強化しています。 """ ) with gr.Column(scale=2): # --- 出力エリア --- gr.Markdown("## 💡 処理結果 (手動分割・補完済パーツ)") with gr.Tabs(): with gr.TabItem("メインパーツ (AI補完結果)"): completed_body_output = gr.Image(label="補完済みメインボディパーツ (AI Inpainting)", height=300) with gr.TabItem("分割パーツ例 (ブラシ指定部分)"): hair_part_output = gr.Image(label="ユーザーがブラシで指定したパーツ", height=300) with gr.TabItem("欠損体 (補完AI入力)"): body_with_hole_output = gr.Image(label="補完前の欠損体 (AIが補完する穴)", height=300) with gr.Row(): download_zip = gr.File(label="全パーツZIPダウンロード", file_count="single") gr.Markdown("## 📋 出力レイヤー構造 (JSON)") # JSONDecodeErrorを避けるため、JSONコンポーネントではなくテキストボックスを使用 output_json_text = gr.Textbox(label="Live2Dパーツ構造 JSON (確認用)", lines=10, interactive=False) # --- イベントリスナー --- process_button.click( fn=segment_and_inpaint, inputs=[original_image_upload, input_sketchpad, inpaint_prompt], outputs=[ completed_body_output, hair_part_output, body_with_hole_output, output_json_text, download_zip, status_message ], # AI補完処理中はUIをロック api_name="process_inpaint" ) # デモの起動 (Hugging Face Spacesでの実行を想定) if __name__ == "__main__": demo.launch(share=False)