import json import os from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional from fastapi import Body, FastAPI, HTTPException, WebSocket, WebSocketDisconnect from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import Response from avatar_store import avatar_generated_path from generate_image_with_nano import build_prompt, run_edit from mcp_tools import ( create_avatar, delete_avatar_memory, delete_avatar_portrait, delete_generated_images, ensure_public_avatar, generate_as_avatar, get_avatar, get_avatar_context, record_generated_image, retrieve_avatar_snippets, set_avatar_portrait, store_avatar_memory, summarize_avatar_context, ) fastapi_app = FastAPI() fastapi_app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) JSONRPC_VERSION = "2.0" MCP_PROTOCOL_VERSION = os.getenv("MCP_PROTOCOL_VERSION", "2025-06-18") SERVER_INFO = { "name": "avatar-mcp", "version": os.getenv("AVATAR_MCP_VERSION", "0.1.0"), } SERVER_INSTRUCTIONS = ( "Use generate_as_avatar for dialog, retrieve_snippets/summarize_avatar for " "context, and generate_image for Nano/Banana edits. Memory + portrait tools " "manage persistent state under avatars/." ) SERVER_CAPABILITIES = { "tools": {"listChanged": False}, } JSONRPC_PARSE_ERROR = -32700 JSONRPC_INVALID_REQUEST = -32600 JSONRPC_METHOD_NOT_FOUND = -32601 JSONRPC_INVALID_PARAMS = -32602 JSONRPC_INTERNAL_ERROR = -32603 def _schema(properties: Dict[str, Dict[str, Any]], required: Optional[List[str]] = None): schema = {"type": "object", "properties": properties} if required: schema["required"] = required return schema MCP_TOOLS_METADATA: List[Dict[str, Any]] = [ { "name": "create_avatar", "description": "Create a brand-new avatar persona using a short description.", "inputSchema": _schema( { "description": { "type": "string", "description": "Persona description used to seed prompts.", } }, ["description"], ), }, { "name": "get_avatar", "description": "Fetch avatar metadata (public view unless a matching admin_id is supplied).", "inputSchema": _schema( { "avatar_id": { "type": "string", "description": "Avatar identifier.", }, "admin_id": { "type": "string", "description": "Admin credential to unlock private fields.", }, }, ["avatar_id"], ), }, { "name": "generate_as_avatar", "description": "Generate a chat reply using the avatar's persona + history.", "inputSchema": _schema( { "avatar_id": {"type": "string", "description": "Avatar identifier."}, "message": {"type": "string", "description": "User message text."}, "history": { "type": "array", "description": "Optional backscroll of prior dialog turns.", "items": {"type": "object"}, }, }, ["avatar_id", "message"], ), }, { "name": "generate_image", "description": "Create a Nano/Banana visual using the current portrait plus message/reply context.", "inputSchema": _schema( { "avatar_id": {"type": "string", "description": "Avatar identifier."}, "message": { "type": "string", "description": "User request driving the render.", }, "reply": { "type": "string", "description": "Assistant reply text to seed the scene.", }, }, ["avatar_id"], ), }, { "name": "store_avatar_memory", "description": "Append a memory/context entry for the avatar.", "inputSchema": _schema( { "avatar_id": {"type": "string", "description": "Avatar identifier."}, "admin_id": {"type": "string", "description": "Admin credential."}, "entry": {"type": "string", "description": "Memory text to store."}, "private": {"type": "boolean", "description": "Hide memory from public calls."}, }, ["avatar_id", "admin_id", "entry"], ), }, { "name": "delete_avatar_memory", "description": "Remove a specific memory row by index.", "inputSchema": _schema( { "avatar_id": {"type": "string", "description": "Avatar identifier."}, "admin_id": {"type": "string", "description": "Admin credential."}, "index": {"type": "integer", "description": "Zero-based memory index."}, }, ["avatar_id", "admin_id", "index"], ), }, { "name": "get_avatar_context", "description": "Retrieve persona, description, and memory (public or admin).", "inputSchema": _schema( { "avatar_id": {"type": "string", "description": "Avatar identifier."}, "admin_id": {"type": "string", "description": "Admin credential."}, "mode": { "type": "string", "description": "Use 'admin' for private view; defaults to public.", }, }, ["avatar_id"], ), }, { "name": "delete_generated_images", "description": "Remove on-disk generated image files for an avatar.", "inputSchema": _schema( { "avatar_id": {"type": "string", "description": "Avatar identifier."}, "admin_id": {"type": "string", "description": "Admin credential."}, }, ["avatar_id", "admin_id"], ), }, { "name": "retrieve_snippets", "description": "Return top memories/persona snippets that match a lightweight query.", "inputSchema": _schema( { "avatar_id": {"type": "string", "description": "Avatar identifier."}, "admin_id": {"type": "string", "description": "Admin credential (optional)."}, "query": { "type": "string", "description": "Free-form search text to score memories.", }, "limit": { "type": "integer", "description": "Maximum snippet count to return.", }, }, ["avatar_id"], ), }, { "name": "summarize_avatar", "description": "Summarize persona plus the most recent memories.", "inputSchema": _schema( { "avatar_id": {"type": "string", "description": "Avatar identifier."}, "admin_id": {"type": "string", "description": "Admin credential (optional)."}, "max_mem": { "type": "integer", "description": "How many of the newest memories to include.", }, }, ["avatar_id"], ), }, ] MCP_TOOLS_BY_NAME = {tool["name"]: tool for tool in MCP_TOOLS_METADATA} def _handle(func, payload): try: return func(payload or {}) except ValueError as exc: raise HTTPException(status_code=400, detail=str(exc)) def _resolve_path(path_str): if not path_str: return None path = Path(path_str) if not path.is_absolute(): path = Path(__file__).parent / path return str(path) def _jsonrpc_success_response(request_id: Any, result: Dict[str, Any] | None = None) -> Dict[str, Any]: return { "jsonrpc": JSONRPC_VERSION, "id": request_id, "result": result or {}, } def _jsonrpc_error_response( request_id: Any, code: int, message: str, data: Dict[str, Any] | None = None, ) -> Dict[str, Any]: error = {"code": code, "message": message} if data is not None: error["data"] = data return { "jsonrpc": JSONRPC_VERSION, "id": request_id, "error": error, } def _tool_call_payload(data: Any, is_error: bool = False) -> Dict[str, Any]: if isinstance(data, dict): content_text = json.dumps(data, ensure_ascii=False, indent=2) structured = data elif isinstance(data, (list, tuple)): structured = {"data": data} content_text = json.dumps(data, ensure_ascii=False, indent=2) else: structured = None content_text = str(data) if not content_text: content_text = "ok" if not is_error else "error" payload: Dict[str, Any] = { "content": [ { "type": "text", "text": content_text, } ] } if structured is not None: payload["structuredContent"] = structured if is_error: payload["isError"] = True return payload @fastapi_app.post("/mcp/create_avatar") def api_create_avatar(payload: dict): return _handle(create_avatar, payload) @fastapi_app.post("/mcp/get_avatar") def api_get_avatar(payload: dict): return _handle(get_avatar, payload) @fastapi_app.post("/mcp/generate_as_avatar") def api_generate_as_avatar(payload: dict): return _handle(generate_as_avatar, payload) def _generate_image(payload: dict): payload = payload or {} avatar_id = payload.get("avatar_id") if not avatar_id: raise ValueError("avatar_id required") message = (payload.get("message") or "").strip() reply = (payload.get("reply") or "").strip() context = message or reply or "scene with the avatar" try: avatar = ensure_public_avatar(avatar_id) except ValueError as exc: raise ValueError(str(exc)) portrait_path = _resolve_path(avatar.get("portrait")) if not portrait_path or not Path(portrait_path).exists(): raise ValueError("portrait not found for avatar") prompt = build_prompt(avatar.get("persona", "Avatar"), context) timestamp = datetime.utcnow().strftime("%Y%m%d-%H%M%S") out_path = avatar_generated_path(avatar_id, timestamp) out_path.parent.mkdir(parents=True, exist_ok=True) try: rc = run_edit(Path(portrait_path), prompt, out_path) except SystemExit: rc = 1 if rc != 0: raise RuntimeError("image generation failed") record_generated_image(avatar_id, out_path, prompt, timestamp) return { "status": "generated", "path": str(out_path), "prompt": prompt, "timestamp": timestamp, } @fastapi_app.post("/mcp/generate_image") def api_generate_image(payload: dict): try: return _generate_image(payload) except ValueError as exc: raise HTTPException(status_code=400, detail=str(exc)) except RuntimeError as exc: raise HTTPException(status_code=500, detail=str(exc)) def _handle_jsonrpc_tool_call(params: Dict[str, Any] | None, request_id: Any): params = params or {} name = params.get("name") if not name or not isinstance(name, str): return _jsonrpc_error_response(request_id, JSONRPC_INVALID_PARAMS, "tool name required") arguments = params.get("arguments") or {} if not isinstance(arguments, dict): return _jsonrpc_error_response( request_id, JSONRPC_INVALID_PARAMS, "tool arguments must be an object", ) handler = WEBSOCKET_TOOLS.get(name) if not handler: return _jsonrpc_error_response(request_id, JSONRPC_METHOD_NOT_FOUND, f"unknown tool: {name}") try: result = handler(arguments) return _jsonrpc_success_response(request_id, _tool_call_payload(result)) except (ValueError, RuntimeError) as exc: return _jsonrpc_success_response( request_id, _tool_call_payload({"error": str(exc)}, is_error=True), ) except Exception as exc: return _jsonrpc_error_response( request_id, JSONRPC_INTERNAL_ERROR, "internal error during tool call", {"details": str(exc), "tool": name}, ) def _handle_single_jsonrpc_request(payload: Any): if not isinstance(payload, dict): return _jsonrpc_error_response(None, JSONRPC_INVALID_REQUEST, "request must be an object") request_id = payload.get("id") method = payload.get("method") if not method: return _jsonrpc_error_response(request_id, JSONRPC_INVALID_REQUEST, "method is required") params = payload.get("params") if params is not None and not isinstance(params, dict): # Notifications may omit params entirely; when supplied, enforce object return _jsonrpc_error_response(request_id, JSONRPC_INVALID_PARAMS, "params must be an object") if method == "initialize": return _jsonrpc_success_response( request_id, { "protocolVersion": MCP_PROTOCOL_VERSION, "capabilities": SERVER_CAPABILITIES, "serverInfo": SERVER_INFO, "instructions": SERVER_INSTRUCTIONS, }, ) if method == "notifications/initialized": return None if method == "notifications/cancelled": return None if method == "ping": return _jsonrpc_success_response(request_id, {}) if method == "tools/list": result: Dict[str, Any] = {"tools": MCP_TOOLS_METADATA} if params and "cursor" in params and params["cursor"]: result["nextCursor"] = None return _jsonrpc_success_response(request_id, result) if method == "tools/call": return _handle_jsonrpc_tool_call(params, request_id) if method == "resources/list": return _jsonrpc_success_response(request_id, {"resources": []}) if method == "resources/templates/list": return _jsonrpc_success_response(request_id, {"resourceTemplates": []}) if method == "resources/read": return _jsonrpc_error_response( request_id, JSONRPC_METHOD_NOT_FOUND, "resources/read not supported", ) if method == "prompts/list": return _jsonrpc_success_response(request_id, {"prompts": []}) if method == "prompts/get": return _jsonrpc_error_response( request_id, JSONRPC_METHOD_NOT_FOUND, "prompts/get not supported", ) return _jsonrpc_error_response(request_id, JSONRPC_METHOD_NOT_FOUND, f"unknown method: {method}") def _handle_jsonrpc_payload(payload: Any): if isinstance(payload, list): responses = [] for entry in payload: resp = _handle_single_jsonrpc_request(entry) if resp is not None: responses.append(resp) if not responses: # Pure notification batches must not emit a body; return HTTP 204 to signal no content. return Response(status_code=204) return responses response = _handle_single_jsonrpc_request(payload) if response is None: return Response(status_code=204) return response @fastapi_app.post("/mcp/store_avatar_memory") def api_store_avatar_memory(payload: dict): return _handle(store_avatar_memory, payload) @fastapi_app.post("/mcp/delete_avatar_memory") def api_delete_avatar_memory(payload: dict): return _handle(delete_avatar_memory, payload) @fastapi_app.post("/mcp/get_avatar_context") def api_get_avatar_context(payload: dict): return _handle(get_avatar_context, payload) @fastapi_app.post("/mcp/delete_generated_images") def api_delete_generated_images(payload: dict): return _handle(delete_generated_images, payload) @fastapi_app.post("/mcp/retrieve_snippets") def api_retrieve_snippets(payload: dict): return _handle(retrieve_avatar_snippets, payload) @fastapi_app.post("/mcp/summarize_avatar") def api_summarize_avatar(payload: dict): return _handle(summarize_avatar_context, payload) WEBSOCKET_TOOLS = {} # filled later after helper definitions @fastapi_app.websocket("/mcp/ws") async def websocket_mcp(websocket: WebSocket): await websocket.accept() await websocket.send_json({"type": "welcome", "tools": list(WEBSOCKET_TOOLS.keys())}) while True: try: data = await websocket.receive_json() except WebSocketDisconnect: break except Exception: await websocket.close(code=1003) break req_id = data.get("id") tool = data.get("tool") payload = data.get("payload") or {} handler = WEBSOCKET_TOOLS.get(tool) if not handler: await websocket.send_json( {"id": req_id, "ok": False, "error": f"unknown tool: {tool}"} ) continue try: result = handler(payload) await websocket.send_json({"id": req_id, "ok": True, "result": result}) except Exception as exc: await websocket.send_json({"id": req_id, "ok": False, "error": str(exc)}) @fastapi_app.get("/mcp/http") def http_list_tools(): return {"tools": list(WEBSOCKET_TOOLS.keys())} @fastapi_app.post("/mcp/http") def http_invoke_tool(payload: Any = Body(...)): if isinstance(payload, list): return _handle_jsonrpc_payload(payload) if isinstance(payload, dict): if "jsonrpc" in payload or "method" in payload: return _handle_jsonrpc_payload(payload) payload = payload or {} tool_name = payload.get("tool") handler = WEBSOCKET_TOOLS.get(tool_name) if not handler: raise HTTPException(status_code=404, detail="unknown tool") tool_payload = payload.get("payload") or {} try: result = handler(tool_payload) return {"ok": True, "result": result} except ValueError as exc: raise HTTPException(status_code=400, detail=str(exc)) except RuntimeError as exc: raise HTTPException(status_code=500, detail=str(exc)) except Exception as exc: raise HTTPException(status_code=500, detail=str(exc)) raise HTTPException(status_code=400, detail="invalid payload") WEBSOCKET_TOOLS.update( { "create_avatar": create_avatar, "get_avatar": get_avatar, "generate_as_avatar": generate_as_avatar, "generate_image": _generate_image, "store_avatar_memory": store_avatar_memory, "delete_avatar_memory": delete_avatar_memory, "get_avatar_context": get_avatar_context, "delete_generated_images": delete_generated_images, "retrieve_snippets": retrieve_avatar_snippets, "summarize_avatar": summarize_avatar_context, } )