Spaces:
Sleeping
Sleeping
| """OMEGA HTTP Server — Streamable HTTP transport for the MCP server. | |
| Wraps the existing stdio-based MCP server in a Starlette ASGI app using | |
| the MCP SDK's StreamableHTTPSessionManager. This enables: | |
| - Remote access (Docker, multi-IDE, mobile via Tailscale) | |
| - Smithery.ai marketplace listing | |
| - Native HTTP without external mcp-proxy dependency | |
| Dependencies (starlette, uvicorn) are already transitive deps of mcp>=1.0.0. | |
| """ | |
| import contextlib | |
| import os | |
| import secrets | |
| from collections.abc import AsyncIterator | |
| from pathlib import Path | |
| from starlette.applications import Starlette | |
| from starlette.requests import Request | |
| from starlette.responses import JSONResponse | |
| from starlette.routing import Mount, Route | |
| from starlette.types import Receive, Scope, Send | |
| from mcp.server.streamable_http_manager import StreamableHTTPSessionManager | |
| API_KEY_PATH = Path.home() / ".omega" / "api_key" | |
| def get_or_create_api_key() -> str: | |
| """Load API key from ~/.omega/api_key, or generate one.""" | |
| if API_KEY_PATH.exists(): | |
| return API_KEY_PATH.read_text().strip() | |
| key = secrets.token_urlsafe(32) | |
| API_KEY_PATH.parent.mkdir(parents=True, exist_ok=True) | |
| API_KEY_PATH.write_text(key + "\n") | |
| API_KEY_PATH.chmod(0o600) | |
| return key | |
| def create_http_app(server, api_key: str | None = None) -> Starlette: | |
| """Create a Starlette ASGI app wrapping the MCP server. | |
| Args: | |
| server: The MCP Server instance from mcp_server.py. | |
| api_key: Optional API key for authentication. None disables auth. | |
| """ | |
| session_manager = StreamableHTTPSessionManager( | |
| app=server, | |
| json_response=True, | |
| stateless=True, | |
| ) | |
| async def mcp_asgi_app(scope: Scope, receive: Receive, send: Send) -> None: | |
| """ASGI app for the /mcp endpoint — delegates to StreamableHTTPSessionManager.""" | |
| if api_key: | |
| request = Request(scope, receive) | |
| provided = request.headers.get("x-api-key") or request.query_params.get("api_key") | |
| if provided != api_key: | |
| response = JSONResponse({"error": "Unauthorized"}, status_code=401) | |
| await response(scope, receive, send) | |
| return | |
| await session_manager.handle_request(scope, receive, send) | |
| async def health(request: Request): | |
| return JSONResponse({"status": "ok", "server": "omega-memory"}) | |
| async def server_card(request: Request): | |
| from omega import __version__ | |
| from omega.server.mcp_server import TOOL_SCHEMAS | |
| return JSONResponse({ | |
| "name": "omega-memory", | |
| "version": __version__, | |
| "description": ( | |
| "OMEGA gives AI coding agents persistent memory that survives across sessions. " | |
| "Store decisions, lessons learned, error patterns, and task checkpoints — then " | |
| "retrieve them instantly via semantic search or relationship graph traversal. " | |
| "Features include automatic session briefings, memory consolidation, " | |
| "time-based reminders, encrypted user profiles, and knowledge compaction. " | |
| "Works with Claude Code, Cursor, Windsurf, VS Code, and any MCP-compatible client. " | |
| "Install locally with: pip install omega-memory && omega setup" | |
| ), | |
| "homepage": "https://omegamax.co", | |
| "repository": "https://github.com/omega-memory/core", | |
| "icon": "https://omegamax.co/icon.png", | |
| "transports": [ | |
| {"type": "streamable-http", "url": "/mcp"}, | |
| {"type": "stdio", "command": "python3 -m omega.server.mcp_server"}, | |
| ], | |
| "tools_count": len(TOOL_SCHEMAS), | |
| }) | |
| async def lifespan(app: Starlette) -> AsyncIterator[None]: | |
| async with session_manager.run(): | |
| yield | |
| app = Starlette( | |
| routes=[ | |
| Mount("/mcp", app=mcp_asgi_app), | |
| Route("/health", endpoint=health), | |
| Route("/.well-known/mcp.json", endpoint=server_card), | |
| Route("/.well-known/mcp/server-card.json", endpoint=server_card), | |
| ], | |
| lifespan=lifespan, | |
| ) | |
| return app | |
| async def run_http(host: str, port: int, api_key: str | None) -> None: | |
| """Import existing server, create HTTP app, run uvicorn.""" | |
| import uvicorn | |
| from omega.server.mcp_server import server, _wire_plugin_retrieval | |
| from omega.server.hook_server import start_hook_server | |
| await start_hook_server() | |
| _wire_plugin_retrieval() | |
| app = create_http_app(server, api_key=api_key) | |
| config = uvicorn.Config(app, host=host, port=port, log_level="info") | |
| srv = uvicorn.Server(config) | |
| await srv.serve() | |