|
|
import json |
|
|
import os |
|
|
from datetime import datetime |
|
|
from pathlib import Path |
|
|
from typing import Any, Dict, List, Optional |
|
|
|
|
|
from fastapi import Body, FastAPI, HTTPException, WebSocket, WebSocketDisconnect |
|
|
from fastapi.middleware.cors import CORSMiddleware |
|
|
from fastapi.responses import Response |
|
|
|
|
|
from avatar_store import avatar_generated_path |
|
|
from generate_image_with_nano import build_prompt, run_edit |
|
|
from mcp_tools import ( |
|
|
create_avatar, |
|
|
delete_avatar_memory, |
|
|
delete_avatar_portrait, |
|
|
delete_generated_images, |
|
|
ensure_public_avatar, |
|
|
generate_as_avatar, |
|
|
get_avatar, |
|
|
get_avatar_context, |
|
|
record_generated_image, |
|
|
retrieve_avatar_snippets, |
|
|
set_avatar_portrait, |
|
|
store_avatar_memory, |
|
|
summarize_avatar_context, |
|
|
) |
|
|
|
|
|
fastapi_app = FastAPI() |
|
|
fastapi_app.add_middleware( |
|
|
CORSMiddleware, |
|
|
allow_origins=["*"], |
|
|
allow_credentials=True, |
|
|
allow_methods=["*"], |
|
|
allow_headers=["*"], |
|
|
) |
|
|
|
|
|
JSONRPC_VERSION = "2.0" |
|
|
MCP_PROTOCOL_VERSION = os.getenv("MCP_PROTOCOL_VERSION", "2025-06-18") |
|
|
SERVER_INFO = { |
|
|
"name": "avatar-mcp", |
|
|
"version": os.getenv("AVATAR_MCP_VERSION", "0.1.0"), |
|
|
} |
|
|
SERVER_INSTRUCTIONS = ( |
|
|
"Use generate_as_avatar for dialog, retrieve_snippets/summarize_avatar for " |
|
|
"context, and generate_image for Nano/Banana edits. Memory + portrait tools " |
|
|
"manage persistent state under avatars/<id>." |
|
|
) |
|
|
SERVER_CAPABILITIES = { |
|
|
"tools": {"listChanged": False}, |
|
|
} |
|
|
JSONRPC_PARSE_ERROR = -32700 |
|
|
JSONRPC_INVALID_REQUEST = -32600 |
|
|
JSONRPC_METHOD_NOT_FOUND = -32601 |
|
|
JSONRPC_INVALID_PARAMS = -32602 |
|
|
JSONRPC_INTERNAL_ERROR = -32603 |
|
|
|
|
|
|
|
|
def _schema(properties: Dict[str, Dict[str, Any]], required: Optional[List[str]] = None): |
|
|
schema = {"type": "object", "properties": properties} |
|
|
if required: |
|
|
schema["required"] = required |
|
|
return schema |
|
|
|
|
|
|
|
|
MCP_TOOLS_METADATA: List[Dict[str, Any]] = [ |
|
|
{ |
|
|
"name": "create_avatar", |
|
|
"description": "Create a brand-new avatar persona using a short description.", |
|
|
"inputSchema": _schema( |
|
|
{ |
|
|
"description": { |
|
|
"type": "string", |
|
|
"description": "Persona description used to seed prompts.", |
|
|
} |
|
|
}, |
|
|
["description"], |
|
|
), |
|
|
}, |
|
|
{ |
|
|
"name": "get_avatar", |
|
|
"description": "Fetch avatar metadata (public view unless a matching admin_id is supplied).", |
|
|
"inputSchema": _schema( |
|
|
{ |
|
|
"avatar_id": { |
|
|
"type": "string", |
|
|
"description": "Avatar identifier.", |
|
|
}, |
|
|
"admin_id": { |
|
|
"type": "string", |
|
|
"description": "Admin credential to unlock private fields.", |
|
|
}, |
|
|
}, |
|
|
["avatar_id"], |
|
|
), |
|
|
}, |
|
|
{ |
|
|
"name": "generate_as_avatar", |
|
|
"description": "Generate a chat reply using the avatar's persona + history.", |
|
|
"inputSchema": _schema( |
|
|
{ |
|
|
"avatar_id": {"type": "string", "description": "Avatar identifier."}, |
|
|
"message": {"type": "string", "description": "User message text."}, |
|
|
"history": { |
|
|
"type": "array", |
|
|
"description": "Optional backscroll of prior dialog turns.", |
|
|
"items": {"type": "object"}, |
|
|
}, |
|
|
}, |
|
|
["avatar_id", "message"], |
|
|
), |
|
|
}, |
|
|
{ |
|
|
"name": "generate_image", |
|
|
"description": "Create a Nano/Banana visual using the current portrait plus message/reply context.", |
|
|
"inputSchema": _schema( |
|
|
{ |
|
|
"avatar_id": {"type": "string", "description": "Avatar identifier."}, |
|
|
"message": { |
|
|
"type": "string", |
|
|
"description": "User request driving the render.", |
|
|
}, |
|
|
"reply": { |
|
|
"type": "string", |
|
|
"description": "Assistant reply text to seed the scene.", |
|
|
}, |
|
|
}, |
|
|
["avatar_id"], |
|
|
), |
|
|
}, |
|
|
{ |
|
|
"name": "store_avatar_memory", |
|
|
"description": "Append a memory/context entry for the avatar.", |
|
|
"inputSchema": _schema( |
|
|
{ |
|
|
"avatar_id": {"type": "string", "description": "Avatar identifier."}, |
|
|
"admin_id": {"type": "string", "description": "Admin credential."}, |
|
|
"entry": {"type": "string", "description": "Memory text to store."}, |
|
|
"private": {"type": "boolean", "description": "Hide memory from public calls."}, |
|
|
}, |
|
|
["avatar_id", "admin_id", "entry"], |
|
|
), |
|
|
}, |
|
|
{ |
|
|
"name": "delete_avatar_memory", |
|
|
"description": "Remove a specific memory row by index.", |
|
|
"inputSchema": _schema( |
|
|
{ |
|
|
"avatar_id": {"type": "string", "description": "Avatar identifier."}, |
|
|
"admin_id": {"type": "string", "description": "Admin credential."}, |
|
|
"index": {"type": "integer", "description": "Zero-based memory index."}, |
|
|
}, |
|
|
["avatar_id", "admin_id", "index"], |
|
|
), |
|
|
}, |
|
|
{ |
|
|
"name": "get_avatar_context", |
|
|
"description": "Retrieve persona, description, and memory (public or admin).", |
|
|
"inputSchema": _schema( |
|
|
{ |
|
|
"avatar_id": {"type": "string", "description": "Avatar identifier."}, |
|
|
"admin_id": {"type": "string", "description": "Admin credential."}, |
|
|
"mode": { |
|
|
"type": "string", |
|
|
"description": "Use 'admin' for private view; defaults to public.", |
|
|
}, |
|
|
}, |
|
|
["avatar_id"], |
|
|
), |
|
|
}, |
|
|
{ |
|
|
"name": "delete_generated_images", |
|
|
"description": "Remove on-disk generated image files for an avatar.", |
|
|
"inputSchema": _schema( |
|
|
{ |
|
|
"avatar_id": {"type": "string", "description": "Avatar identifier."}, |
|
|
"admin_id": {"type": "string", "description": "Admin credential."}, |
|
|
}, |
|
|
["avatar_id", "admin_id"], |
|
|
), |
|
|
}, |
|
|
{ |
|
|
"name": "retrieve_snippets", |
|
|
"description": "Return top memories/persona snippets that match a lightweight query.", |
|
|
"inputSchema": _schema( |
|
|
{ |
|
|
"avatar_id": {"type": "string", "description": "Avatar identifier."}, |
|
|
"admin_id": {"type": "string", "description": "Admin credential (optional)."}, |
|
|
"query": { |
|
|
"type": "string", |
|
|
"description": "Free-form search text to score memories.", |
|
|
}, |
|
|
"limit": { |
|
|
"type": "integer", |
|
|
"description": "Maximum snippet count to return.", |
|
|
}, |
|
|
}, |
|
|
["avatar_id"], |
|
|
), |
|
|
}, |
|
|
{ |
|
|
"name": "summarize_avatar", |
|
|
"description": "Summarize persona plus the most recent memories.", |
|
|
"inputSchema": _schema( |
|
|
{ |
|
|
"avatar_id": {"type": "string", "description": "Avatar identifier."}, |
|
|
"admin_id": {"type": "string", "description": "Admin credential (optional)."}, |
|
|
"max_mem": { |
|
|
"type": "integer", |
|
|
"description": "How many of the newest memories to include.", |
|
|
}, |
|
|
}, |
|
|
["avatar_id"], |
|
|
), |
|
|
}, |
|
|
] |
|
|
MCP_TOOLS_BY_NAME = {tool["name"]: tool for tool in MCP_TOOLS_METADATA} |
|
|
|
|
|
|
|
|
def _handle(func, payload): |
|
|
try: |
|
|
return func(payload or {}) |
|
|
except ValueError as exc: |
|
|
raise HTTPException(status_code=400, detail=str(exc)) |
|
|
|
|
|
|
|
|
def _resolve_path(path_str): |
|
|
if not path_str: |
|
|
return None |
|
|
path = Path(path_str) |
|
|
if not path.is_absolute(): |
|
|
path = Path(__file__).parent / path |
|
|
return str(path) |
|
|
|
|
|
|
|
|
def _jsonrpc_success_response(request_id: Any, result: Dict[str, Any] | None = None) -> Dict[str, Any]: |
|
|
return { |
|
|
"jsonrpc": JSONRPC_VERSION, |
|
|
"id": request_id, |
|
|
"result": result or {}, |
|
|
} |
|
|
|
|
|
|
|
|
def _jsonrpc_error_response( |
|
|
request_id: Any, |
|
|
code: int, |
|
|
message: str, |
|
|
data: Dict[str, Any] | None = None, |
|
|
) -> Dict[str, Any]: |
|
|
error = {"code": code, "message": message} |
|
|
if data is not None: |
|
|
error["data"] = data |
|
|
return { |
|
|
"jsonrpc": JSONRPC_VERSION, |
|
|
"id": request_id, |
|
|
"error": error, |
|
|
} |
|
|
|
|
|
|
|
|
def _tool_call_payload(data: Any, is_error: bool = False) -> Dict[str, Any]: |
|
|
if isinstance(data, dict): |
|
|
content_text = json.dumps(data, ensure_ascii=False, indent=2) |
|
|
structured = data |
|
|
elif isinstance(data, (list, tuple)): |
|
|
structured = {"data": data} |
|
|
content_text = json.dumps(data, ensure_ascii=False, indent=2) |
|
|
else: |
|
|
structured = None |
|
|
content_text = str(data) |
|
|
if not content_text: |
|
|
content_text = "ok" if not is_error else "error" |
|
|
payload: Dict[str, Any] = { |
|
|
"content": [ |
|
|
{ |
|
|
"type": "text", |
|
|
"text": content_text, |
|
|
} |
|
|
] |
|
|
} |
|
|
if structured is not None: |
|
|
payload["structuredContent"] = structured |
|
|
if is_error: |
|
|
payload["isError"] = True |
|
|
return payload |
|
|
|
|
|
|
|
|
@fastapi_app.post("/mcp/create_avatar") |
|
|
def api_create_avatar(payload: dict): |
|
|
return _handle(create_avatar, payload) |
|
|
|
|
|
|
|
|
@fastapi_app.post("/mcp/get_avatar") |
|
|
def api_get_avatar(payload: dict): |
|
|
return _handle(get_avatar, payload) |
|
|
|
|
|
|
|
|
@fastapi_app.post("/mcp/generate_as_avatar") |
|
|
def api_generate_as_avatar(payload: dict): |
|
|
return _handle(generate_as_avatar, payload) |
|
|
|
|
|
|
|
|
def _generate_image(payload: dict): |
|
|
payload = payload or {} |
|
|
avatar_id = payload.get("avatar_id") |
|
|
if not avatar_id: |
|
|
raise ValueError("avatar_id required") |
|
|
message = (payload.get("message") or "").strip() |
|
|
reply = (payload.get("reply") or "").strip() |
|
|
context = message or reply or "scene with the avatar" |
|
|
try: |
|
|
avatar = ensure_public_avatar(avatar_id) |
|
|
except ValueError as exc: |
|
|
raise ValueError(str(exc)) |
|
|
portrait_path = _resolve_path(avatar.get("portrait")) |
|
|
if not portrait_path or not Path(portrait_path).exists(): |
|
|
raise ValueError("portrait not found for avatar") |
|
|
prompt = build_prompt(avatar.get("persona", "Avatar"), context) |
|
|
timestamp = datetime.utcnow().strftime("%Y%m%d-%H%M%S") |
|
|
out_path = avatar_generated_path(avatar_id, timestamp) |
|
|
out_path.parent.mkdir(parents=True, exist_ok=True) |
|
|
try: |
|
|
rc = run_edit(Path(portrait_path), prompt, out_path) |
|
|
except SystemExit: |
|
|
rc = 1 |
|
|
if rc != 0: |
|
|
raise RuntimeError("image generation failed") |
|
|
record_generated_image(avatar_id, out_path, prompt, timestamp) |
|
|
return { |
|
|
"status": "generated", |
|
|
"path": str(out_path), |
|
|
"prompt": prompt, |
|
|
"timestamp": timestamp, |
|
|
} |
|
|
|
|
|
|
|
|
@fastapi_app.post("/mcp/generate_image") |
|
|
def api_generate_image(payload: dict): |
|
|
try: |
|
|
return _generate_image(payload) |
|
|
except ValueError as exc: |
|
|
raise HTTPException(status_code=400, detail=str(exc)) |
|
|
except RuntimeError as exc: |
|
|
raise HTTPException(status_code=500, detail=str(exc)) |
|
|
|
|
|
|
|
|
def _handle_jsonrpc_tool_call(params: Dict[str, Any] | None, request_id: Any): |
|
|
params = params or {} |
|
|
name = params.get("name") |
|
|
if not name or not isinstance(name, str): |
|
|
return _jsonrpc_error_response(request_id, JSONRPC_INVALID_PARAMS, "tool name required") |
|
|
arguments = params.get("arguments") or {} |
|
|
if not isinstance(arguments, dict): |
|
|
return _jsonrpc_error_response( |
|
|
request_id, |
|
|
JSONRPC_INVALID_PARAMS, |
|
|
"tool arguments must be an object", |
|
|
) |
|
|
handler = WEBSOCKET_TOOLS.get(name) |
|
|
if not handler: |
|
|
return _jsonrpc_error_response(request_id, JSONRPC_METHOD_NOT_FOUND, f"unknown tool: {name}") |
|
|
try: |
|
|
result = handler(arguments) |
|
|
return _jsonrpc_success_response(request_id, _tool_call_payload(result)) |
|
|
except (ValueError, RuntimeError) as exc: |
|
|
return _jsonrpc_success_response( |
|
|
request_id, |
|
|
_tool_call_payload({"error": str(exc)}, is_error=True), |
|
|
) |
|
|
except Exception as exc: |
|
|
return _jsonrpc_error_response( |
|
|
request_id, |
|
|
JSONRPC_INTERNAL_ERROR, |
|
|
"internal error during tool call", |
|
|
{"details": str(exc), "tool": name}, |
|
|
) |
|
|
|
|
|
|
|
|
def _handle_single_jsonrpc_request(payload: Any): |
|
|
if not isinstance(payload, dict): |
|
|
return _jsonrpc_error_response(None, JSONRPC_INVALID_REQUEST, "request must be an object") |
|
|
request_id = payload.get("id") |
|
|
method = payload.get("method") |
|
|
if not method: |
|
|
return _jsonrpc_error_response(request_id, JSONRPC_INVALID_REQUEST, "method is required") |
|
|
params = payload.get("params") |
|
|
if params is not None and not isinstance(params, dict): |
|
|
|
|
|
return _jsonrpc_error_response(request_id, JSONRPC_INVALID_PARAMS, "params must be an object") |
|
|
|
|
|
if method == "initialize": |
|
|
return _jsonrpc_success_response( |
|
|
request_id, |
|
|
{ |
|
|
"protocolVersion": MCP_PROTOCOL_VERSION, |
|
|
"capabilities": SERVER_CAPABILITIES, |
|
|
"serverInfo": SERVER_INFO, |
|
|
"instructions": SERVER_INSTRUCTIONS, |
|
|
}, |
|
|
) |
|
|
if method == "notifications/initialized": |
|
|
return None |
|
|
if method == "notifications/cancelled": |
|
|
return None |
|
|
if method == "ping": |
|
|
return _jsonrpc_success_response(request_id, {}) |
|
|
if method == "tools/list": |
|
|
result: Dict[str, Any] = {"tools": MCP_TOOLS_METADATA} |
|
|
if params and "cursor" in params and params["cursor"]: |
|
|
result["nextCursor"] = None |
|
|
return _jsonrpc_success_response(request_id, result) |
|
|
if method == "tools/call": |
|
|
return _handle_jsonrpc_tool_call(params, request_id) |
|
|
if method == "resources/list": |
|
|
return _jsonrpc_success_response(request_id, {"resources": []}) |
|
|
if method == "resources/templates/list": |
|
|
return _jsonrpc_success_response(request_id, {"resourceTemplates": []}) |
|
|
if method == "resources/read": |
|
|
return _jsonrpc_error_response( |
|
|
request_id, |
|
|
JSONRPC_METHOD_NOT_FOUND, |
|
|
"resources/read not supported", |
|
|
) |
|
|
if method == "prompts/list": |
|
|
return _jsonrpc_success_response(request_id, {"prompts": []}) |
|
|
if method == "prompts/get": |
|
|
return _jsonrpc_error_response( |
|
|
request_id, |
|
|
JSONRPC_METHOD_NOT_FOUND, |
|
|
"prompts/get not supported", |
|
|
) |
|
|
return _jsonrpc_error_response(request_id, JSONRPC_METHOD_NOT_FOUND, f"unknown method: {method}") |
|
|
|
|
|
|
|
|
def _handle_jsonrpc_payload(payload: Any): |
|
|
if isinstance(payload, list): |
|
|
responses = [] |
|
|
for entry in payload: |
|
|
resp = _handle_single_jsonrpc_request(entry) |
|
|
if resp is not None: |
|
|
responses.append(resp) |
|
|
if not responses: |
|
|
|
|
|
return Response(status_code=204) |
|
|
return responses |
|
|
response = _handle_single_jsonrpc_request(payload) |
|
|
if response is None: |
|
|
return Response(status_code=204) |
|
|
return response |
|
|
|
|
|
|
|
|
@fastapi_app.post("/mcp/store_avatar_memory") |
|
|
def api_store_avatar_memory(payload: dict): |
|
|
return _handle(store_avatar_memory, payload) |
|
|
|
|
|
|
|
|
@fastapi_app.post("/mcp/delete_avatar_memory") |
|
|
def api_delete_avatar_memory(payload: dict): |
|
|
return _handle(delete_avatar_memory, payload) |
|
|
|
|
|
|
|
|
@fastapi_app.post("/mcp/get_avatar_context") |
|
|
def api_get_avatar_context(payload: dict): |
|
|
return _handle(get_avatar_context, payload) |
|
|
|
|
|
|
|
|
@fastapi_app.post("/mcp/delete_generated_images") |
|
|
def api_delete_generated_images(payload: dict): |
|
|
return _handle(delete_generated_images, payload) |
|
|
|
|
|
|
|
|
@fastapi_app.post("/mcp/retrieve_snippets") |
|
|
def api_retrieve_snippets(payload: dict): |
|
|
return _handle(retrieve_avatar_snippets, payload) |
|
|
|
|
|
|
|
|
@fastapi_app.post("/mcp/summarize_avatar") |
|
|
def api_summarize_avatar(payload: dict): |
|
|
return _handle(summarize_avatar_context, payload) |
|
|
|
|
|
|
|
|
WEBSOCKET_TOOLS = {} |
|
|
|
|
|
|
|
|
@fastapi_app.websocket("/mcp/ws") |
|
|
async def websocket_mcp(websocket: WebSocket): |
|
|
await websocket.accept() |
|
|
await websocket.send_json({"type": "welcome", "tools": list(WEBSOCKET_TOOLS.keys())}) |
|
|
while True: |
|
|
try: |
|
|
data = await websocket.receive_json() |
|
|
except WebSocketDisconnect: |
|
|
break |
|
|
except Exception: |
|
|
await websocket.close(code=1003) |
|
|
break |
|
|
req_id = data.get("id") |
|
|
tool = data.get("tool") |
|
|
payload = data.get("payload") or {} |
|
|
handler = WEBSOCKET_TOOLS.get(tool) |
|
|
if not handler: |
|
|
await websocket.send_json( |
|
|
{"id": req_id, "ok": False, "error": f"unknown tool: {tool}"} |
|
|
) |
|
|
continue |
|
|
try: |
|
|
result = handler(payload) |
|
|
await websocket.send_json({"id": req_id, "ok": True, "result": result}) |
|
|
except Exception as exc: |
|
|
await websocket.send_json({"id": req_id, "ok": False, "error": str(exc)}) |
|
|
|
|
|
|
|
|
@fastapi_app.get("/mcp/http") |
|
|
def http_list_tools(): |
|
|
return {"tools": list(WEBSOCKET_TOOLS.keys())} |
|
|
|
|
|
|
|
|
@fastapi_app.post("/mcp/http") |
|
|
def http_invoke_tool(payload: Any = Body(...)): |
|
|
if isinstance(payload, list): |
|
|
return _handle_jsonrpc_payload(payload) |
|
|
if isinstance(payload, dict): |
|
|
if "jsonrpc" in payload or "method" in payload: |
|
|
return _handle_jsonrpc_payload(payload) |
|
|
payload = payload or {} |
|
|
tool_name = payload.get("tool") |
|
|
handler = WEBSOCKET_TOOLS.get(tool_name) |
|
|
if not handler: |
|
|
raise HTTPException(status_code=404, detail="unknown tool") |
|
|
tool_payload = payload.get("payload") or {} |
|
|
try: |
|
|
result = handler(tool_payload) |
|
|
return {"ok": True, "result": result} |
|
|
except ValueError as exc: |
|
|
raise HTTPException(status_code=400, detail=str(exc)) |
|
|
except RuntimeError as exc: |
|
|
raise HTTPException(status_code=500, detail=str(exc)) |
|
|
except Exception as exc: |
|
|
raise HTTPException(status_code=500, detail=str(exc)) |
|
|
raise HTTPException(status_code=400, detail="invalid payload") |
|
|
|
|
|
|
|
|
WEBSOCKET_TOOLS.update( |
|
|
{ |
|
|
"create_avatar": create_avatar, |
|
|
"get_avatar": get_avatar, |
|
|
"generate_as_avatar": generate_as_avatar, |
|
|
"generate_image": _generate_image, |
|
|
"store_avatar_memory": store_avatar_memory, |
|
|
"delete_avatar_memory": delete_avatar_memory, |
|
|
"get_avatar_context": get_avatar_context, |
|
|
"delete_generated_images": delete_generated_images, |
|
|
"retrieve_snippets": retrieve_avatar_snippets, |
|
|
"summarize_avatar": summarize_avatar_context, |
|
|
} |
|
|
) |
|
|
|