Spaces:
Paused
Paused
| import gradio as gr | |
| import os | |
| import base64 | |
| import io | |
| from PIL import Image | |
| from google import genai | |
| from google.genai import types | |
| import json | |
| import logging | |
| import traceback | |
| from typing import Optional | |
| from fastapi import FastAPI, Request | |
| from fastapi.responses import JSONResponse, RedirectResponse, FileResponse | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.staticfiles import StaticFiles | |
| import uvicorn | |
| import tempfile | |
| import uuid | |
| from datetime import datetime, timedelta | |
| import shutil | |
| # ログ設定 | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Gradio内部ログも取得 | |
| gradio_logger = logging.getLogger("gradio") | |
| gradio_logger.setLevel(logging.INFO) | |
| # FastAPIアプリケーション初期化 | |
| app = FastAPI( | |
| title="ImageGenMCP Server", | |
| version="1.0.0", | |
| description="Gemini 2.0 Flash画像生成MCPサーバー" | |
| ) | |
| # CORS設定の最適化 | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # 画像ファイル保存用ディレクトリ | |
| IMAGES_DIR = tempfile.mkdtemp(prefix="mcp_images_") | |
| os.makedirs(IMAGES_DIR, exist_ok=True) | |
| # 静的ファイルのマウント | |
| app.mount("/images", StaticFiles(directory=IMAGES_DIR), name="images") | |
| # 画像ファイルのクリーンアップ(古いファイルを削除) | |
| def cleanup_old_images(): | |
| """24時間以上経過した画像ファイルを削除""" | |
| try: | |
| now = datetime.now() | |
| for filename in os.listdir(IMAGES_DIR): | |
| filepath = os.path.join(IMAGES_DIR, filename) | |
| if os.path.isfile(filepath): | |
| file_time = datetime.fromtimestamp(os.path.getmtime(filepath)) | |
| if now - file_time > timedelta(hours=24): | |
| os.remove(filepath) | |
| logger.info(f"古い画像を削除: {filename}") | |
| except Exception as e: | |
| logger.error(f"画像クリーンアップエラー: {str(e)}") | |
| # Gemini APIクライアントの初期化 | |
| def get_gemini_client(): | |
| api_key = os.environ.get("GEMINI_API_KEY") | |
| if not api_key: | |
| raise ValueError("GEMINI_API_KEY環境変数が設定されていません") | |
| return genai.Client(api_key=api_key) | |
| # 画像生成関数 | |
| def generate_image(prompt: str, previous_image: Optional[Image.Image] = None) -> tuple[Optional[Image.Image], str]: | |
| """ | |
| Gemini 2.0 Flashを使用して画像を生成する | |
| Args: | |
| prompt: 生成したい画像の説明 | |
| previous_image: 参考にする前の画像(オプション) | |
| Returns: | |
| 生成された画像とステータスメッセージ | |
| """ | |
| try: | |
| logger.info(f"画像生成開始: プロンプト='{prompt[:50]}...', 参考画像={'あり' if previous_image else 'なし'}") | |
| client = get_gemini_client() | |
| model = "gemini-2.0-flash-preview-image-generation" | |
| # コンテンツの準備 | |
| contents = [] | |
| # システムインストラクションを追加 | |
| system_instruction = """あなたは画像生成アシスタントです。 | |
| 必ず以下のルールに従ってください: | |
| 1. ユーザーのリクエストに基づいて、必ず画像を生成してください。 | |
| 2. 画像とともに、生成した画像の簡単な説明文も提供してください。 | |
| 3. コンテンツポリシーに違反しない限り、常に画像を生成してください。 | |
| 4. もし元のリクエストが不明確な場合は、創造的に解釈して画像を生成してください。 | |
| 5. 画像生成を拒否しないでください。""" | |
| contents.append( | |
| types.Content( | |
| role="user", | |
| parts=[types.Part.from_text(text=system_instruction)] | |
| ) | |
| ) | |
| contents.append( | |
| types.Content( | |
| role="model", | |
| parts=[types.Part.from_text(text="了解しました。ユーザーのリクエストに基づいて必ず画像を生成し、説明文も提供します。")] | |
| ) | |
| ) | |
| # ユーザープロンプトの追加 | |
| contents.append( | |
| types.Content( | |
| role="user", | |
| parts=[types.Part.from_text(text=f"次のリクエストに基づいて画像を生成してください: {prompt}")] | |
| ) | |
| ) | |
| # 前の画像がある場合は追加 | |
| if previous_image: | |
| buffered = io.BytesIO() | |
| previous_image.save(buffered, format="PNG") | |
| img_data = base64.b64encode(buffered.getvalue()).decode() | |
| contents.append( | |
| types.Content( | |
| role="model", | |
| parts=[ | |
| types.Part.from_bytes( | |
| mime_type="image/png", | |
| data=base64.b64decode(img_data) | |
| ) | |
| ] | |
| ) | |
| ) | |
| contents.append( | |
| types.Content( | |
| role="user", | |
| parts=[types.Part.from_text(text=f"上記の画像を参考にしながら、次のリクエストに基づいて新しい画像を必ず生成してください: {prompt}")] | |
| ) | |
| ) | |
| # 生成設定 | |
| generate_content_config = types.GenerateContentConfig( | |
| response_modalities=["IMAGE", "TEXT"], # モデルはIMAGEとTEXTの両方を要求 | |
| ) | |
| # 画像生成 | |
| logger.info("Gemini APIを呼び出し中...") | |
| response = client.models.generate_content( | |
| model=model, | |
| contents=contents, | |
| config=generate_content_config | |
| ) | |
| logger.info("Gemini APIの呼び出し完了") | |
| # レスポンスから画像を取得 | |
| if response.candidates and response.candidates[0].content and response.candidates[0].content.parts: | |
| # パーツを検査して画像を探す | |
| image_found = False | |
| text_content = "" | |
| for part in response.candidates[0].content.parts: | |
| if hasattr(part, 'inline_data') and part.inline_data: | |
| # 画像データが見つかった | |
| image_data = part.inline_data.data | |
| image = Image.open(io.BytesIO(image_data)) | |
| logger.info(f"画像生成成功: サイズ={image.size}") | |
| image_found = True | |
| elif hasattr(part, 'text') and part.text: | |
| # テキストコンテンツを収集 | |
| text_content += part.text | |
| if image_found: | |
| return image, "画像生成に成功しました!" | |
| else: | |
| # 画像が含まれていない場合の詳細ログ | |
| logger.warning(f"レスポンスに画像データが含まれていません。テキスト: {text_content[:200]}") | |
| if "I cannot" in text_content or "I can't" in text_content or "unable to" in text_content: | |
| return None, f"画像生成が拒否されました: {text_content}" | |
| else: | |
| return None, "画像の生成に失敗しました。レスポンスに画像データが含まれていません。" | |
| else: | |
| logger.warning("レスポンスに有効なコンテンツが含まれていません") | |
| return None, "画像の生成に失敗しました。レスポンスが空です。" | |
| except Exception as e: | |
| logger.error(f"画像生成エラー: {str(e)}") | |
| logger.error(traceback.format_exc()) | |
| return None, f"エラーが発生しました: {str(e)}" | |
| # MCPエンドポイント | |
| async def mcp_list_tools(): | |
| """MCPツールのリストを返す""" | |
| logger.info("MCP list_tools 呼び出し") | |
| return { | |
| "tools": [ | |
| { | |
| "name": "generate_image", | |
| "description": "Gemini 2.0 Flash Previewを使用して画像を生成します", | |
| "inputSchema": { | |
| "type": "object", | |
| "properties": { | |
| "prompt": { | |
| "type": "string", | |
| "description": "生成したい画像の説明" | |
| } | |
| }, | |
| "required": ["prompt"] | |
| } | |
| } | |
| ] | |
| } | |
| async def mcp_call_tool(request: Request): | |
| """MCPツールを実行する""" | |
| try: | |
| data = await request.json() | |
| tool_name = data.get("name") | |
| arguments = data.get("arguments", {}) | |
| logger.info(f"MCP call_tool 呼び出し: {tool_name}, args: {arguments}") | |
| if tool_name == "generate_image": | |
| prompt = arguments.get("prompt", "") | |
| # 古い画像をクリーンアップ | |
| cleanup_old_images() | |
| # 画像生成を実行 | |
| image, message = generate_image(prompt) | |
| if image: | |
| # ユニークなファイル名を生成 | |
| filename = f"generated_{uuid.uuid4()}.png" | |
| filepath = os.path.join(IMAGES_DIR, filename) | |
| # 画像を保存 | |
| image.save(filepath, format="PNG") | |
| logger.info(f"画像を保存: {filepath}") | |
| # Hugging Face SpacesのベースURLを取得 | |
| base_url = str(request.base_url).rstrip('/') | |
| image_url = f"{base_url}/images/{filename}" | |
| logger.info(f"MCP画像生成成功: {image_url}") | |
| return JSONResponse({ | |
| "success": True, | |
| "message": message, | |
| "image_url": image_url, | |
| "image_path": filepath | |
| }) | |
| else: | |
| logger.warning(f"MCP画像生成失敗: {message}") | |
| return JSONResponse({ | |
| "success": False, | |
| "message": message | |
| }) | |
| return JSONResponse({ | |
| "success": False, | |
| "message": f"Unknown tool: {tool_name}" | |
| }, status_code=400) | |
| except Exception as e: | |
| logger.error(f"MCPエラー: {str(e)}") | |
| return JSONResponse({ | |
| "success": False, | |
| "message": f"Error: {str(e)}" | |
| }, status_code=500) | |
| # ヘルスチェックエンドポイント | |
| async def health_check(): | |
| """ヘルスチェックエンドポイント""" | |
| return { | |
| "status": "OK", | |
| "version": "5.31.0", | |
| "service": "image-gen-mcp", | |
| "endpoints": ["/mcp/list_tools", "/mcp/call_tool", "/health"] | |
| } | |
| # プロキシエンドポイント(外部からのアクセス用) | |
| async def proxy_list_tools(): | |
| """Gradio API経由でのlist_tools呼び出し""" | |
| return await mcp_list_tools() | |
| async def proxy_call_tool(request: Request): | |
| """Gradio API経由でのcall_tool呼び出し""" | |
| return await mcp_call_tool(request) | |
| # Gradioインターフェースの作成 | |
| def create_gradio_interface(): | |
| with gr.Blocks(title="画像生成MCP - Gemini 2.0 Flash") as demo: | |
| gr.Markdown(""" | |
| # 画像生成MCPサーバー | |
| このアプリケーションは主にClaudeCodeから利用するためのMCPサーバーです。 | |
| Gemini 2.0 Flash Previewを使用して画像を生成します。 | |
| ## APIエンドポイント | |
| - `POST /mcp/list_tools` - ツール一覧 | |
| - `POST /mcp/call_tool` - 画像生成実行 | |
| - `GET /health` - ヘルスチェック | |
| """) | |
| with gr.Tab("画像生成テスト"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| prompt_input = gr.Textbox( | |
| label="プロンプト", | |
| placeholder="生成したい画像の説明を入力してください...", | |
| lines=3 | |
| ) | |
| reference_image = gr.Image( | |
| label="参考画像(オプション)", | |
| type="pil" | |
| ) | |
| generate_btn = gr.Button("画像を生成", variant="primary") | |
| with gr.Column(): | |
| output_image = gr.Image( | |
| label="生成された画像", | |
| type="pil" | |
| ) | |
| status_output = gr.Textbox( | |
| label="ステータス", | |
| interactive=False | |
| ) | |
| with gr.Tab("MCP API情報"): | |
| gr.Markdown(""" | |
| ### ClaudeCode設定例 | |
| ```json | |
| { | |
| "mcpServers": { | |
| "image-gen": { | |
| "url": "https://tomo2chin2-imagegenmcp.hf.space/mcp/" | |
| } | |
| } | |
| } | |
| ``` | |
| ### プロキシエンドポイント | |
| - `POST /gradio_api/mcp/list_tools` | |
| - `POST /gradio_api/mcp/call_tool` | |
| ### 直接エンドポイント | |
| - `POST /mcp/list_tools` | |
| - `POST /mcp/call_tool` | |
| - `GET /health` | |
| """) | |
| # イベントハンドラ | |
| generate_btn.click( | |
| fn=generate_image, | |
| inputs=[prompt_input, reference_image], | |
| outputs=[output_image, status_output] | |
| ) | |
| return demo | |
| # メイン実行部分 | |
| if __name__ == "__main__": | |
| # APIキーチェック | |
| if not os.environ.get("GEMINI_API_KEY"): | |
| logger.error("GEMINI_API_KEY環境変数が設定されていません") | |
| with gr.Blocks(title="エラー - 画像生成MCP") as demo: | |
| gr.Markdown(""" | |
| # ⚠️ 設定エラー | |
| GEMINI_API_KEY環境変数が設定されていません。 | |
| Hugging Face SpacesのSettings > Repository secretsで以下を設定してください: | |
| - Name: `GEMINI_API_KEY` | |
| - Value: あなたのGemini APIキー | |
| """) | |
| demo.launch() | |
| else: | |
| # Gradioインターフェース作成 | |
| logger.info("画像生成MCPサーバーを起動中...") | |
| demo = create_gradio_interface() | |
| # GradioをFastAPIにマウント | |
| app = gr.mount_gradio_app(app, demo, path="/") | |
| # サーバー起動設定 | |
| config = uvicorn.Config( | |
| app=app, | |
| host="0.0.0.0", | |
| port=7860, | |
| log_level="info" | |
| ) | |
| logger.info("FastAPI + Gradio統合サーバー起動") | |
| server = uvicorn.Server(config) | |
| server.run() |