"""OMEGA HTTP Server — Streamable HTTP transport for the MCP server. Wraps the existing stdio-based MCP server in a Starlette ASGI app using the MCP SDK's StreamableHTTPSessionManager. This enables: - Remote access (Docker, multi-IDE, mobile via Tailscale) - Smithery.ai marketplace listing - Native HTTP without external mcp-proxy dependency Dependencies (starlette, uvicorn) are already transitive deps of mcp>=1.0.0. """ import contextlib import os import secrets from collections.abc import AsyncIterator from pathlib import Path from starlette.applications import Starlette from starlette.requests import Request from starlette.responses import JSONResponse from starlette.routing import Mount, Route from starlette.types import Receive, Scope, Send from mcp.server.streamable_http_manager import StreamableHTTPSessionManager API_KEY_PATH = Path.home() / ".omega" / "api_key" def get_or_create_api_key() -> str: """Load API key from ~/.omega/api_key, or generate one.""" if API_KEY_PATH.exists(): return API_KEY_PATH.read_text().strip() key = secrets.token_urlsafe(32) API_KEY_PATH.parent.mkdir(parents=True, exist_ok=True) API_KEY_PATH.write_text(key + "\n") API_KEY_PATH.chmod(0o600) return key def create_http_app(server, api_key: str | None = None) -> Starlette: """Create a Starlette ASGI app wrapping the MCP server. Args: server: The MCP Server instance from mcp_server.py. api_key: Optional API key for authentication. None disables auth. """ session_manager = StreamableHTTPSessionManager( app=server, json_response=True, stateless=True, ) async def mcp_asgi_app(scope: Scope, receive: Receive, send: Send) -> None: """ASGI app for the /mcp endpoint — delegates to StreamableHTTPSessionManager.""" if api_key: request = Request(scope, receive) provided = request.headers.get("x-api-key") or request.query_params.get("api_key") if provided != api_key: response = JSONResponse({"error": "Unauthorized"}, status_code=401) await response(scope, receive, send) return await session_manager.handle_request(scope, receive, send) async def health(request: Request): return JSONResponse({"status": "ok", "server": "omega-memory"}) async def server_card(request: Request): from omega import __version__ from omega.server.mcp_server import TOOL_SCHEMAS return JSONResponse({ "name": "omega-memory", "version": __version__, "description": ( "OMEGA gives AI coding agents persistent memory that survives across sessions. " "Store decisions, lessons learned, error patterns, and task checkpoints — then " "retrieve them instantly via semantic search or relationship graph traversal. " "Features include automatic session briefings, memory consolidation, " "time-based reminders, encrypted user profiles, and knowledge compaction. " "Works with Claude Code, Cursor, Windsurf, VS Code, and any MCP-compatible client. " "Install locally with: pip install omega-memory && omega setup" ), "homepage": "https://omegamax.co", "repository": "https://github.com/omega-memory/core", "icon": "https://omegamax.co/icon.png", "transports": [ {"type": "streamable-http", "url": "/mcp"}, {"type": "stdio", "command": "python3 -m omega.server.mcp_server"}, ], "tools_count": len(TOOL_SCHEMAS), }) @contextlib.asynccontextmanager async def lifespan(app: Starlette) -> AsyncIterator[None]: async with session_manager.run(): yield app = Starlette( routes=[ Mount("/mcp", app=mcp_asgi_app), Route("/health", endpoint=health), Route("/.well-known/mcp.json", endpoint=server_card), Route("/.well-known/mcp/server-card.json", endpoint=server_card), ], lifespan=lifespan, ) return app async def run_http(host: str, port: int, api_key: str | None) -> None: """Import existing server, create HTTP app, run uvicorn.""" import uvicorn from omega.server.mcp_server import server, _wire_plugin_retrieval from omega.server.hook_server import start_hook_server await start_hook_server() _wire_plugin_retrieval() app = create_http_app(server, api_key=api_key) config = uvicorn.Config(app, host=host, port=port, log_level="info") srv = uvicorn.Server(config) await srv.serve()