Spaces:
Running
Running
File size: 5,120 Bytes
09ed8ca | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 | """MCP server exposing SecureAgentRAG retrieval + query as tools.
Run with ``uv run python -m interfaces.mcp_server`` (stdio transport). Add
to your Claude Desktop / Claude Code / Cursor config under ``mcpServers``:
{
"secureagentrag": {
"command": "uv",
"args": ["run", "python", "-m", "interfaces.mcp_server"],
"cwd": "F:/CV_project/secureagentrag"
}
}
Two tools are exposed:
- ``retrieve(query, user_id, org_id, roles, clearance_level, top_k)`` —
RBAC-filtered hybrid search; returns ranked chunks with metadata.
- ``query(query, user_id, org_id, roles, clearance_level, prefer_cloud)`` —
full multi-agent RAG pipeline; returns answer + citations + provenance.
The server is intentionally thin — it serialises ``QueryResponse`` (defined
in ``core/schemas.py``) so clients get the same shape FastAPI returns.
"""
from __future__ import annotations
import json
from typing import Any
from core.graph import run_rag_pipeline
from core.schemas import QueryResponse
from ingestion.metadata import UserContext
from utils.logging import get_logger
logger = get_logger(__name__)
try:
from mcp.server.fastmcp import FastMCP # type: ignore[import-not-found]
_MCP_AVAILABLE = True
except ImportError:
FastMCP = None # type: ignore[assignment,misc]
_MCP_AVAILABLE = False
def _build_user_context(
user_id: str, org_id: str, roles: list[str], clearance_level: int
) -> UserContext:
return UserContext(
user_id=user_id,
org_id=org_id,
roles=roles or ["viewer"],
clearance_level=clearance_level,
)
async def _retrieve_impl(
query: str,
user_id: str,
org_id: str = "",
roles: list[str] | None = None,
clearance_level: int = 1,
top_k: int = 5,
) -> list[dict[str, Any]]:
"""Run RBAC-filtered hybrid search and return raw chunks (no synthesis)."""
from core.agents.retriever import _get_hybrid_searcher
user_ctx = _build_user_context(user_id, org_id, roles or ["viewer"], clearance_level)
searcher = _get_hybrid_searcher()
results = await searcher.search(query=query, user_context=user_ctx, top_k=top_k)
return [
{
"doc_id": r.id,
"text": r.text,
"score": r.score,
"metadata": r.metadata,
}
for r in results
]
async def _query_impl(
query: str,
user_id: str,
org_id: str = "",
roles: list[str] | None = None,
clearance_level: int = 1,
prefer_cloud: bool = False,
) -> dict[str, Any]:
"""Run the full multi-agent RAG pipeline and return a ``QueryResponse``."""
user_ctx = _build_user_context(user_id, org_id, roles or ["viewer"], clearance_level)
state = await run_rag_pipeline(
query=query,
user_context=user_ctx,
thread_id=f"mcp-{user_id}",
prefer_cloud=prefer_cloud,
)
return QueryResponse.from_state(state).model_dump()
def build_server() -> Any:
"""Build the FastMCP server with the two SecureAgentRAG tools registered."""
if not _MCP_AVAILABLE:
raise RuntimeError("mcp package not installed. Run: uv sync --extra mcp")
mcp = FastMCP("secureagentrag")
@mcp.tool()
async def retrieve(
query: str,
user_id: str,
org_id: str = "",
roles: list[str] | None = None,
clearance_level: int = 1,
top_k: int = 5,
) -> str:
"""Search the SecureAgentRAG corpus with RBAC filters and return ranked chunks.
Use this when you want the raw evidence rather than a synthesised
answer. RBAC is enforced at the Qdrant payload level — only chunks
the user's roles grant access to are returned.
"""
results = await _retrieve_impl(
query=query,
user_id=user_id,
org_id=org_id,
roles=roles,
clearance_level=clearance_level,
top_k=top_k,
)
return json.dumps(results, ensure_ascii=False)
@mcp.tool()
async def query(
query: str,
user_id: str,
org_id: str = "",
roles: list[str] | None = None,
clearance_level: int = 1,
prefer_cloud: bool = False,
) -> str:
"""Run the full multi-agent RAG pipeline. Returns answer + citations + provenance.
Routes through guardrails -> security -> retrieve -> grade -> synth ->
eval. HIGH-sensitivity data is forced local regardless of
``prefer_cloud``.
"""
response = await _query_impl(
query=query,
user_id=user_id,
org_id=org_id,
roles=roles,
clearance_level=clearance_level,
prefer_cloud=prefer_cloud,
)
return json.dumps(response, ensure_ascii=False)
return mcp
def main() -> None:
"""Stdio entrypoint — invoked by Claude Desktop / Code via ``mcpServers``."""
if not _MCP_AVAILABLE:
raise SystemExit("mcp package not installed. Run: uv sync --extra mcp")
server = build_server()
server.run()
if __name__ == "__main__":
main()
|