File size: 2,597 Bytes
b0f9887 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 | import os
import sys
import json
import argparse
from typing import Dict, Any, List
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import uvicorn
# Add project root to path
PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if PROJECT_ROOT not in sys.path:
sys.path.append(PROJECT_ROOT)
from mcp_server.registry import ToolRegistry
from mcp_server.discovery import scan_local_tools
from reasoning.tools.qulab import QuLabBridge
app = FastAPI(title="ECH0-PRIME MCP Server")
class ToolCallRequest(BaseModel):
tool: str
args: Dict[str, Any]
@app.get("/health")
async def health():
return {"status": "ok"}
@app.get("/tools")
async def list_tools():
return ToolRegistry.get_schemas()
@app.post("/tools/call")
async def call_tool(request: ToolCallRequest):
result = ToolRegistry.call_tool(request.tool, request.args)
if result and isinstance(result, str) and result.startswith("Error:"):
raise HTTPException(status_code=400, detail=result)
return {"output": result}
@app.post("/shutdown")
async def shutdown():
print("๐ Shutdown requested...")
os._exit(0) # Standard way to kill uvicorn from an endpoint in simple setups
def main():
parser = argparse.ArgumentParser(description="Start ECH0-PRIME MCP Server")
parser.add_argument("--config", type=str, help="Path to server list config")
parser.add_argument("--port", type=int, default=8000, help="Port to run on")
args = parser.parse_args()
print("๐ Initializing ECH0-PRIME Tool Registry...")
# 1. Scan for tools in standard locations
tool_dirs = [
os.path.join(PROJECT_ROOT, "reasoning", "tools"),
os.path.join(PROJECT_ROOT, "core"),
os.path.join(PROJECT_ROOT, "ech0_governance")
]
for d in tool_dirs:
if os.path.exists(d):
print(f"๐ Scanning {d} for tools...")
scan_local_tools(d)
# 2. Specifically initialize QuLabBridge which registers methods manually
print("๐งช Connecting QuLabBridge...")
_qulab = QuLabBridge()
# 3. Load config if provided
if args.config and os.path.exists(args.config):
with open(args.config, 'r') as f:
config = json.load(f)
print(f"๐ Loaded server config from {args.config}")
# Here we could initialize remote tools or other MCP servers listed in the config
print(f"โจ MCP Server ready! Serving {len(ToolRegistry.get_schemas())} tools.")
uvicorn.run(app, host="0.0.0.0", port=args.port)
if __name__ == "__main__":
main()
|