File size: 5,120 Bytes
09ed8ca
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
"""MCP server exposing SecureAgentRAG retrieval + query as tools.

Run with ``uv run python -m interfaces.mcp_server`` (stdio transport). Add
to your Claude Desktop / Claude Code / Cursor config under ``mcpServers``:

    {
      "secureagentrag": {
        "command": "uv",
        "args": ["run", "python", "-m", "interfaces.mcp_server"],
        "cwd": "F:/CV_project/secureagentrag"
      }
    }

Two tools are exposed:

- ``retrieve(query, user_id, org_id, roles, clearance_level, top_k)`` —
  RBAC-filtered hybrid search; returns ranked chunks with metadata.
- ``query(query, user_id, org_id, roles, clearance_level, prefer_cloud)`` —
  full multi-agent RAG pipeline; returns answer + citations + provenance.

The server is intentionally thin — it serialises ``QueryResponse`` (defined
in ``core/schemas.py``) so clients get the same shape FastAPI returns.
"""

from __future__ import annotations

import json
from typing import Any

from core.graph import run_rag_pipeline
from core.schemas import QueryResponse
from ingestion.metadata import UserContext
from utils.logging import get_logger

logger = get_logger(__name__)

try:
    from mcp.server.fastmcp import FastMCP  # type: ignore[import-not-found]

    _MCP_AVAILABLE = True
except ImportError:
    FastMCP = None  # type: ignore[assignment,misc]
    _MCP_AVAILABLE = False


def _build_user_context(
    user_id: str, org_id: str, roles: list[str], clearance_level: int
) -> UserContext:
    return UserContext(
        user_id=user_id,
        org_id=org_id,
        roles=roles or ["viewer"],
        clearance_level=clearance_level,
    )


async def _retrieve_impl(
    query: str,
    user_id: str,
    org_id: str = "",
    roles: list[str] | None = None,
    clearance_level: int = 1,
    top_k: int = 5,
) -> list[dict[str, Any]]:
    """Run RBAC-filtered hybrid search and return raw chunks (no synthesis)."""
    from core.agents.retriever import _get_hybrid_searcher

    user_ctx = _build_user_context(user_id, org_id, roles or ["viewer"], clearance_level)
    searcher = _get_hybrid_searcher()
    results = await searcher.search(query=query, user_context=user_ctx, top_k=top_k)
    return [
        {
            "doc_id": r.id,
            "text": r.text,
            "score": r.score,
            "metadata": r.metadata,
        }
        for r in results
    ]


async def _query_impl(
    query: str,
    user_id: str,
    org_id: str = "",
    roles: list[str] | None = None,
    clearance_level: int = 1,
    prefer_cloud: bool = False,
) -> dict[str, Any]:
    """Run the full multi-agent RAG pipeline and return a ``QueryResponse``."""
    user_ctx = _build_user_context(user_id, org_id, roles or ["viewer"], clearance_level)
    state = await run_rag_pipeline(
        query=query,
        user_context=user_ctx,
        thread_id=f"mcp-{user_id}",
        prefer_cloud=prefer_cloud,
    )
    return QueryResponse.from_state(state).model_dump()


def build_server() -> Any:
    """Build the FastMCP server with the two SecureAgentRAG tools registered."""
    if not _MCP_AVAILABLE:
        raise RuntimeError("mcp package not installed. Run: uv sync --extra mcp")

    mcp = FastMCP("secureagentrag")

    @mcp.tool()
    async def retrieve(
        query: str,
        user_id: str,
        org_id: str = "",
        roles: list[str] | None = None,
        clearance_level: int = 1,
        top_k: int = 5,
    ) -> str:
        """Search the SecureAgentRAG corpus with RBAC filters and return ranked chunks.

        Use this when you want the raw evidence rather than a synthesised
        answer. RBAC is enforced at the Qdrant payload level — only chunks
        the user's roles grant access to are returned.
        """
        results = await _retrieve_impl(
            query=query,
            user_id=user_id,
            org_id=org_id,
            roles=roles,
            clearance_level=clearance_level,
            top_k=top_k,
        )
        return json.dumps(results, ensure_ascii=False)

    @mcp.tool()
    async def query(
        query: str,
        user_id: str,
        org_id: str = "",
        roles: list[str] | None = None,
        clearance_level: int = 1,
        prefer_cloud: bool = False,
    ) -> str:
        """Run the full multi-agent RAG pipeline. Returns answer + citations + provenance.

        Routes through guardrails -> security -> retrieve -> grade -> synth ->
        eval. HIGH-sensitivity data is forced local regardless of
        ``prefer_cloud``.
        """
        response = await _query_impl(
            query=query,
            user_id=user_id,
            org_id=org_id,
            roles=roles,
            clearance_level=clearance_level,
            prefer_cloud=prefer_cloud,
        )
        return json.dumps(response, ensure_ascii=False)

    return mcp


def main() -> None:
    """Stdio entrypoint — invoked by Claude Desktop / Code via ``mcpServers``."""
    if not _MCP_AVAILABLE:
        raise SystemExit("mcp package not installed. Run: uv sync --extra mcp")
    server = build_server()
    server.run()


if __name__ == "__main__":
    main()