nothingworry commited on
Commit
e44e5dd
·
1 Parent(s): 4749f94

Update the backend

Browse files
backend/README.md CHANGED
@@ -5,10 +5,7 @@ This folder contains the production-ready FastAPI stack plus the companion MCP s
5
  ## Directory Overview
6
 
7
  - `api/` – FastAPI application (routes, services, storage helpers, MCP clients)
8
- - `mcp_servers/` – Stand-alone MCP servers:
9
- - `rag_server.py` / `main.py` – pgvector-backed retrieval over tenant documents
10
- - `web_server.py` – DuckDuckGo-powered search with English bias
11
- - `admin_server.py` – Governance utilities (regex rules, violation logging, tenant registry)
12
  - `workers/` – Celery workers and schedulers for async ingestion + analytics maintenance
13
 
14
  ## Prerequisites
@@ -32,17 +29,14 @@ cp env.example .env # update MCP URLs + LLM settings
32
  uvicorn backend.api.main:app --port 8000 --reload
33
  ```
34
 
35
- 2. **MCP servers** (each in its own shell):
36
  ```bash
37
- # RAG / knowledge base
38
- python -m backend.mcp_servers.main # or python -m backend.mcp_servers.rag_server
39
-
40
- # Web search
41
- python -m backend.mcp_servers.web_server
42
-
43
- # Admin / governance
44
- python -m backend.mcp_servers.admin_server
45
  ```
 
 
 
 
46
 
47
  3. **Optional workers** (if running Celery-based ingestion/analytics jobs):
48
  ```bash
@@ -76,7 +70,7 @@ Use the helper scripts in the repo root when validating backend changes:
76
  - `python check_rag_database.py` – Talks directly to the pgvector database to list tenant IDs, preview stored chunks, and run safeguarded searches via `search_vectors()`. Helpful when troubleshooting suspected cross-tenant leakage.
77
  - `python test_manual.py` – Legacy manual smoke test harness (analytics store, admin rules, API surface).
78
 
79
- > **Troubleshooting tip:** If the isolation script reports a failure, first run `check_rag_database.py` to confirm documents are tagged with the correct `tenant_id`, then restart the RAG MCP server so it reloads the updated SQL filtering logic.
80
 
81
  ## Environment Variables (excerpt)
82
 
@@ -89,3 +83,20 @@ Defined in `env.example`:
89
 
90
  Update these before starting the servers to ensure the agent can reach every MCP endpoint and LLM runtime.
91
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5
  ## Directory Overview
6
 
7
  - `api/` – FastAPI application (routes, services, storage helpers, MCP clients)
8
+ - `mcp_server/` – Unified MCP server exposing rag/web/admin tools via namespaces
 
 
 
9
  - `workers/` – Celery workers and schedulers for async ingestion + analytics maintenance
10
 
11
  ## Prerequisites
 
29
  uvicorn backend.api.main:app --port 8000 --reload
30
  ```
31
 
32
+ 2. **Unified MCP server (rag/web/admin)**
33
  ```bash
34
+ python backend/mcp_server/server.py
 
 
 
 
 
 
 
35
  ```
36
+ This single endpoint exposes the following namespaced tools:
37
+ - `rag.search`, `rag.ingest`, `rag.delete`
38
+ - `web.search`
39
+ - `admin.getRules`, `admin.addRule`, `admin.deleteRule`, `admin.logViolation`
40
 
41
  3. **Optional workers** (if running Celery-based ingestion/analytics jobs):
42
  ```bash
 
70
  - `python check_rag_database.py` – Talks directly to the pgvector database to list tenant IDs, preview stored chunks, and run safeguarded searches via `search_vectors()`. Helpful when troubleshooting suspected cross-tenant leakage.
71
  - `python test_manual.py` – Legacy manual smoke test harness (analytics store, admin rules, API surface).
72
 
73
+ > **Troubleshooting tip:** If the isolation script reports a failure, first run `check_rag_database.py` to confirm documents are tagged with the correct `tenant_id`, then restart the unified MCP server so it reloads the updated SQL filtering logic.
74
 
75
  ## Environment Variables (excerpt)
76
 
 
83
 
84
  Update these before starting the servers to ensure the agent can reach every MCP endpoint and LLM runtime.
85
 
86
+ ## Unified MCP tool instructions
87
+
88
+ Agents that speak the Model Context Protocol should connect to the `integrachat` server id defined in `backend/mcp_server/server.py` and call the namespaced tools directly:
89
+
90
+ | Namespace | Tool | Purpose |
91
+ | --- | --- | --- |
92
+ | `rag` | `search` | Retrieve tenant-scoped document chunks |
93
+ | `rag` | `ingest` | Chunk + store new knowledge |
94
+ | `rag` | `delete` | Remove one/all stored documents |
95
+ | `web` | `search` | DuckDuckGo English-biased search |
96
+ | `admin` | `getRules` | Fetch tenant governance rules (list or detailed) |
97
+ | `admin` | `addRule` | Insert or update a rule |
98
+ | `admin` | `deleteRule` | Remove a rule by text |
99
+ | `admin` | `logViolation` | Persist a red-flag event into analytics |
100
+
101
+ Always send `tenant_id`, and optionally `user_id`, in the payload so the shared middleware can enforce isolation and log analytics.
102
+
backend/api/mcp_clients/rag_client.py CHANGED
@@ -44,6 +44,7 @@ class RAGClient:
44
  async def ingest(self, content: str, tenant_id: str):
45
  """
46
  Sends content to the RAG server for ingestion.
 
47
  """
48
 
49
  try:
@@ -60,6 +61,17 @@ class RAGClient:
60
  return {"error": f"HTTP {response.status_code}"}
61
 
62
  data = response.json()
 
 
 
 
 
 
 
 
 
 
 
63
  return data
64
 
65
  except Exception as e:
@@ -69,6 +81,7 @@ class RAGClient:
69
  async def list_documents(self, tenant_id: str, limit: int = 1000, offset: int = 0):
70
  """
71
  List all documents for a tenant.
 
72
  """
73
 
74
  try:
@@ -86,6 +99,13 @@ class RAGClient:
86
  return {"documents": [], "total": 0, "limit": limit, "offset": offset}
87
 
88
  data = response.json()
 
 
 
 
 
 
 
89
  return data
90
 
91
  except Exception as e:
@@ -95,6 +115,7 @@ class RAGClient:
95
  async def delete_document(self, tenant_id: str, document_id: int):
96
  """
97
  Delete a specific document by ID for a tenant.
 
98
  """
99
  try:
100
  async with httpx.AsyncClient(timeout=30.0) as client:
@@ -115,6 +136,18 @@ class RAGClient:
115
  return {"error": f"HTTP {response.status_code}: {error_text}"}
116
 
117
  data = response.json()
 
 
 
 
 
 
 
 
 
 
 
 
118
  return data
119
 
120
  except httpx.ConnectError as e:
@@ -127,6 +160,7 @@ class RAGClient:
127
  async def delete_all_documents(self, tenant_id: str):
128
  """
129
  Delete all documents for a tenant.
 
130
  """
131
  try:
132
  async with httpx.AsyncClient(timeout=30.0) as client:
@@ -145,6 +179,18 @@ class RAGClient:
145
  return {"error": f"HTTP {response.status_code}: {error_text}"}
146
 
147
  data = response.json()
 
 
 
 
 
 
 
 
 
 
 
 
148
  return data
149
 
150
  except httpx.ConnectError as e:
 
44
  async def ingest(self, content: str, tenant_id: str):
45
  """
46
  Sends content to the RAG server for ingestion.
47
+ Returns the unwrapped data from the MCP server response.
48
  """
49
 
50
  try:
 
61
  return {"error": f"HTTP {response.status_code}"}
62
 
63
  data = response.json()
64
+
65
+ # MCP server wraps response in a 'data' field
66
+ # Extract the actual result data
67
+ if isinstance(data, dict) and "data" in data:
68
+ result = data["data"]
69
+ # Map chunks_ingested to chunks_stored for consistency
70
+ if "chunks_ingested" in result:
71
+ result["chunks_stored"] = result.pop("chunks_ingested")
72
+ return result
73
+
74
+ # If not wrapped, return as-is (backward compatibility)
75
  return data
76
 
77
  except Exception as e:
 
81
  async def list_documents(self, tenant_id: str, limit: int = 1000, offset: int = 0):
82
  """
83
  List all documents for a tenant.
84
+ Returns the unwrapped data from the MCP server response.
85
  """
86
 
87
  try:
 
99
  return {"documents": [], "total": 0, "limit": limit, "offset": offset}
100
 
101
  data = response.json()
102
+
103
+ # MCP server wraps response in a 'data' field
104
+ # Extract the actual result data
105
+ if isinstance(data, dict) and "data" in data:
106
+ return data["data"]
107
+
108
+ # If not wrapped, return as-is (backward compatibility)
109
  return data
110
 
111
  except Exception as e:
 
115
  async def delete_document(self, tenant_id: str, document_id: int):
116
  """
117
  Delete a specific document by ID for a tenant.
118
+ Returns the unwrapped data from the MCP server response.
119
  """
120
  try:
121
  async with httpx.AsyncClient(timeout=30.0) as client:
 
136
  return {"error": f"HTTP {response.status_code}: {error_text}"}
137
 
138
  data = response.json()
139
+
140
+ # Check if MCP server returned an error response
141
+ if isinstance(data, dict) and data.get("status") == "error":
142
+ error_msg = data.get("message", "Unknown error")
143
+ return {"error": error_msg}
144
+
145
+ # MCP server wraps response in a 'data' field
146
+ # Extract the actual result data
147
+ if isinstance(data, dict) and "data" in data:
148
+ return data["data"]
149
+
150
+ # If not wrapped, return as-is (backward compatibility)
151
  return data
152
 
153
  except httpx.ConnectError as e:
 
160
  async def delete_all_documents(self, tenant_id: str):
161
  """
162
  Delete all documents for a tenant.
163
+ Returns the unwrapped data from the MCP server response.
164
  """
165
  try:
166
  async with httpx.AsyncClient(timeout=30.0) as client:
 
179
  return {"error": f"HTTP {response.status_code}: {error_text}"}
180
 
181
  data = response.json()
182
+
183
+ # Check if MCP server returned an error response
184
+ if isinstance(data, dict) and data.get("status") == "error":
185
+ error_msg = data.get("message", "Unknown error")
186
+ return {"error": error_msg}
187
+
188
+ # MCP server wraps response in a 'data' field
189
+ # Extract the actual result data
190
+ if isinstance(data, dict) and "data" in data:
191
+ return data["data"]
192
+
193
+ # If not wrapped, return as-is (backward compatibility)
194
  return data
195
 
196
  except httpx.ConnectError as e:
backend/api/routes/rag.py CHANGED
@@ -264,7 +264,12 @@ async def rag_delete_all(
264
  try:
265
  result = await rag_client.delete_all_documents(x_tenant_id)
266
  if "error" in result:
267
- raise HTTPException(status_code=500, detail=result["error"])
 
 
 
 
 
268
  return result
269
  except HTTPException:
270
  raise
 
264
  try:
265
  result = await rag_client.delete_all_documents(x_tenant_id)
266
  if "error" in result:
267
+ error_msg = result["error"]
268
+ # Check if it's a connection error (503) or other error
269
+ if "Cannot connect" in error_msg:
270
+ raise HTTPException(status_code=503, detail=error_msg)
271
+ else:
272
+ raise HTTPException(status_code=500, detail=error_msg)
273
  return result
274
  except HTTPException:
275
  raise
backend/mcp_server/__init__.py ADDED
@@ -0,0 +1,6 @@
 
 
 
 
 
 
 
1
+ """
2
+ Unified MCP server package for IntegraChat.
3
+ """
4
+
5
+ __all__ = ["common", "rag", "web", "admin"]
6
+
backend/mcp_server/admin/__init__.py ADDED
@@ -0,0 +1,6 @@
 
 
 
 
 
 
 
1
+ """
2
+ Admin governance tooling namespace.
3
+ """
4
+
5
+ __all__ = ["rules", "violations"]
6
+
backend/mcp_server/admin/rules.py ADDED
@@ -0,0 +1,86 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ from typing import Mapping
4
+
5
+ from backend.api.storage.rules_store import RulesStore
6
+ from backend.mcp_server.common.tenant import TenantContext
7
+ from backend.mcp_server.common.utils import ToolValidationError, tool_handler
8
+
9
+ _rules_store = RulesStore()
10
+
11
+
12
+ @tool_handler("admin.getRules")
13
+ async def admin_get_rules(context: TenantContext, payload: Mapping[str, object]) -> dict[str, object]:
14
+ """
15
+ Return the active admin rules for the tenant.
16
+ """
17
+
18
+ detailed = bool(payload.get("detailed", False))
19
+ rules = (
20
+ _rules_store.get_rules_detailed(context.tenant_id)
21
+ if detailed
22
+ else _rules_store.get_rules(context.tenant_id)
23
+ )
24
+ return {"tenant_id": context.tenant_id, "rules": rules, "detailed": detailed}
25
+
26
+
27
+ @tool_handler("admin.addRule")
28
+ async def admin_add_rule(context: TenantContext, payload: Mapping[str, object]) -> dict[str, object]:
29
+ """
30
+ Add a new governance rule.
31
+ """
32
+
33
+ rule_text = payload.get("rule")
34
+ if not isinstance(rule_text, str) or not rule_text.strip():
35
+ raise ToolValidationError("rule must be a non-empty string")
36
+
37
+ pattern = payload.get("pattern")
38
+ if pattern is not None and not isinstance(pattern, str):
39
+ raise ToolValidationError("pattern must be a string if provided")
40
+
41
+ severity = payload.get("severity", "medium")
42
+ if not isinstance(severity, str):
43
+ raise ToolValidationError("severity must be a string")
44
+
45
+ description = payload.get("description")
46
+ if description is not None and not isinstance(description, str):
47
+ raise ToolValidationError("description must be a string if provided")
48
+
49
+ enabled = bool(payload.get("enabled", True))
50
+
51
+ success = _rules_store.add_rule(
52
+ tenant_id=context.tenant_id,
53
+ rule=rule_text.strip(),
54
+ pattern=pattern.strip() if isinstance(pattern, str) and pattern.strip() else None,
55
+ severity=severity.strip(),
56
+ description=description.strip() if isinstance(description, str) and description.strip() else None,
57
+ enabled=enabled,
58
+ )
59
+
60
+ if not success:
61
+ raise ToolValidationError("rule already exists or could not be saved")
62
+
63
+ return {
64
+ "tenant_id": context.tenant_id,
65
+ "rule": rule_text.strip(),
66
+ "enabled": enabled,
67
+ }
68
+
69
+
70
+ @tool_handler("admin.deleteRule")
71
+ async def admin_delete_rule(context: TenantContext, payload: Mapping[str, object]) -> dict[str, object]:
72
+ """
73
+ Delete an existing rule by its text value.
74
+ """
75
+
76
+ rule_text = payload.get("rule")
77
+ if not isinstance(rule_text, str) or not rule_text.strip():
78
+ raise ToolValidationError("rule must be provided for deletion")
79
+
80
+ deleted = _rules_store.delete_rule(context.tenant_id, rule_text.strip())
81
+ return {
82
+ "tenant_id": context.tenant_id,
83
+ "rule": rule_text.strip(),
84
+ "deleted": deleted,
85
+ }
86
+
backend/mcp_server/admin/violations.py ADDED
@@ -0,0 +1,62 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ from typing import Mapping, Optional
4
+
5
+ from backend.mcp_server.common.logging import log_redflag_violation
6
+ from backend.mcp_server.common.tenant import TenantContext
7
+ from backend.mcp_server.common.utils import ToolValidationError, tool_handler
8
+
9
+
10
+ @tool_handler("admin.logViolation")
11
+ async def log_violation(context: TenantContext, payload: Mapping[str, object]) -> dict[str, object]:
12
+ """
13
+ Persist a red-flag violation for analytics and auditing.
14
+ """
15
+
16
+ rule_id = payload.get("rule_id") or payload.get("ruleId")
17
+ if not isinstance(rule_id, str) or not rule_id.strip():
18
+ raise ToolValidationError("rule_id must be provided")
19
+
20
+ rule_pattern = payload.get("rule_pattern") or payload.get("rulePattern") or rule_id
21
+ if not isinstance(rule_pattern, str):
22
+ raise ToolValidationError("rule_pattern must be a string")
23
+
24
+ severity = payload.get("severity", "medium")
25
+ if not isinstance(severity, str):
26
+ raise ToolValidationError("severity must be a string")
27
+
28
+ matched_text = payload.get("matched_text") or payload.get("matchedText")
29
+ if not isinstance(matched_text, str) or not matched_text.strip():
30
+ raise ToolValidationError("matched_text is required")
31
+
32
+ confidence = payload.get("confidence")
33
+ if confidence is not None:
34
+ try:
35
+ confidence_value: Optional[float] = float(confidence)
36
+ except (TypeError, ValueError):
37
+ raise ToolValidationError("confidence must be numeric")
38
+ else:
39
+ confidence_value = None
40
+
41
+ message_preview = payload.get("message_preview") or payload.get("messagePreview")
42
+ if message_preview is not None and not isinstance(message_preview, str):
43
+ raise ToolValidationError("message_preview must be a string if provided")
44
+
45
+ log_redflag_violation(
46
+ tenant_id=context.tenant_id,
47
+ rule_id=rule_id.strip(),
48
+ rule_pattern=rule_pattern.strip(),
49
+ severity=severity.strip(),
50
+ matched_text=matched_text.strip(),
51
+ confidence=confidence_value,
52
+ message_preview=message_preview.strip() if isinstance(message_preview, str) else None,
53
+ user_id=context.user_id,
54
+ )
55
+
56
+ return {
57
+ "tenant_id": context.tenant_id,
58
+ "rule_id": rule_id.strip(),
59
+ "severity": severity.strip(),
60
+ "logged": True,
61
+ }
62
+
backend/mcp_server/common/__init__.py ADDED
@@ -0,0 +1,8 @@
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Shared utilities for the unified MCP server.
3
+ """
4
+
5
+ from .tenant import TenantContext, TenantValidationError, build_tenant_context
6
+
7
+ __all__ = ["TenantContext", "TenantValidationError", "build_tenant_context"]
8
+
backend/{mcp_servers → mcp_server/common}/database.py RENAMED
@@ -1,17 +1,20 @@
1
  """
2
- Supabase database connection and utilities for MCP servers.
3
 
4
- This module provides both:
5
  1. Direct PostgreSQL connections (via psycopg2) for pgvector operations
6
- 2. Supabase client for REST API operations
7
  """
8
 
 
 
9
  import os
10
  from typing import Optional, List, Dict, Any
 
11
  import psycopg2
12
  import psycopg2.extras
13
- from supabase import create_client, Client
14
  from dotenv import load_dotenv
 
15
 
16
  # Load environment variables
17
  load_dotenv()
@@ -20,9 +23,9 @@ load_dotenv()
20
  # Environment variables
21
  # -----------------------------------
22
 
23
- DATABASE_URL = os.getenv("POSTGRESQL_URL") # Direct PostgreSQL connection
24
  SUPABASE_URL = os.getenv("SUPABASE_URL")
25
- SUPABASE_KEY = os.getenv("SUPABASE_SERVICE_KEY") # MUST be service role key
26
 
27
  # Global Supabase client instance
28
  _supabase_client: Optional[Client] = None
@@ -64,7 +67,8 @@ def initialize_database():
64
  print("✅ pgvector extension enabled")
65
 
66
  # Create documents table
67
- cur.execute("""
 
68
  CREATE TABLE IF NOT EXISTS documents (
69
  id BIGSERIAL PRIMARY KEY,
70
  tenant_id TEXT NOT NULL,
@@ -72,23 +76,28 @@ def initialize_database():
72
  embedding vector(384) NOT NULL,
73
  created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
74
  );
75
- """)
 
76
  print("✅ documents table created")
77
 
78
  # Create index for vector similarity search
79
- cur.execute("""
 
80
  CREATE INDEX IF NOT EXISTS documents_embedding_idx
81
  ON documents
82
  USING ivfflat (embedding vector_cosine_ops)
83
  WITH (lists = 100);
84
- """)
 
85
  print("✅ vector index created")
86
 
87
  # Create index for tenant_id for faster filtering
88
- cur.execute("""
 
89
  CREATE INDEX IF NOT EXISTS documents_tenant_id_idx
90
  ON documents (tenant_id);
91
- """)
 
92
  print("✅ tenant_id index created")
93
 
94
  conn.commit()
@@ -112,6 +121,9 @@ def insert_document_chunks(tenant_id: str, text: str, embedding: list):
112
  Insert document chunk + embedding.
113
  """
114
  try:
 
 
 
115
  conn = get_connection()
116
  cur = conn.cursor()
117
 
@@ -120,7 +132,7 @@ def insert_document_chunks(tenant_id: str, text: str, embedding: list):
120
  INSERT INTO documents (tenant_id, chunk_text, embedding)
121
  VALUES (%s, %s, %s);
122
  """,
123
- (tenant_id, text, embedding)
124
  )
125
 
126
  conn.commit()
@@ -142,7 +154,7 @@ def search_vectors(tenant_id: str, vector: list, limit: int = 5) -> List[Dict[st
142
  if not tenant_id or not tenant_id.strip():
143
  print("DB SEARCH ERROR: tenant_id is empty")
144
  return []
145
-
146
  tenant_id = tenant_id.strip()
147
  conn = get_connection()
148
  cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
@@ -159,7 +171,7 @@ def search_vectors(tenant_id: str, vector: list, limit: int = 5) -> List[Dict[st
159
  ORDER BY embedding <=> %s::vector(384)
160
  LIMIT %s;
161
  """,
162
- (vector, tenant_id, vector, limit)
163
  )
164
 
165
  rows = cur.fetchall()
@@ -169,60 +181,88 @@ def search_vectors(tenant_id: str, vector: list, limit: int = 5) -> List[Dict[st
169
  for row in rows:
170
  row_tenant_id = row.get("tenant_id", "")
171
  if row_tenant_id != tenant_id:
172
- print(f"WARNING: Found document with tenant_id '{row_tenant_id}' when searching for '{tenant_id}' - skipping")
 
 
173
  continue
174
-
175
  results.append(
176
  {
177
  "text": row["chunk_text"],
178
  "similarity": float(row.get("similarity", 0.0)),
179
  }
180
  )
181
-
182
  cur.close()
183
  conn.close()
184
-
185
  return results
186
 
187
  except Exception as e:
188
  print(f"DB SEARCH ERROR (tenant_id={tenant_id}): {e}")
189
  import traceback
 
190
  traceback.print_exc()
191
  return []
192
 
193
 
194
- def list_all_documents(tenant_id: str, limit: int = 1000, offset: int = 0) -> Dict[str, Any]:
 
 
195
  """
196
  List all documents for a tenant with pagination.
 
197
  """
198
  try:
 
 
 
199
  conn = get_connection()
200
  cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
201
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
202
  cur.execute(
203
- """
204
  SELECT
205
  id,
206
  chunk_text,
207
  created_at
208
  FROM documents
209
- WHERE tenant_id = %s
210
  ORDER BY created_at DESC
211
  LIMIT %s OFFSET %s;
212
  """,
213
- (tenant_id, limit, offset)
214
  )
215
 
216
  rows = cur.fetchall()
217
 
218
- # Get total count
 
219
  cur.execute(
220
- """
221
  SELECT COUNT(*) as total
222
  FROM documents
223
- WHERE tenant_id = %s;
224
  """,
225
- (tenant_id,)
226
  )
227
  total_row = cur.fetchone()
228
  total = total_row["total"] if total_row else 0
@@ -236,10 +276,17 @@ def list_all_documents(tenant_id: str, limit: int = 1000, offset: int = 0) -> Di
236
  {
237
  "id": row["id"],
238
  "text": row["chunk_text"],
239
- "created_at": row["created_at"].isoformat() if row["created_at"] else None,
 
 
240
  }
241
  )
242
- return {"documents": results, "total": total, "limit": limit, "offset": offset}
 
 
 
 
 
243
 
244
  except Exception as e:
245
  print("DB LIST ERROR:", e)
@@ -252,19 +299,56 @@ def delete_document(tenant_id: str, document_id: int) -> bool:
252
  Returns True if document was deleted, False otherwise.
253
  """
254
  try:
 
 
 
255
  conn = get_connection()
256
  cur = conn.cursor()
257
 
258
- # Delete the document (tenant_id check ensures tenant isolation)
259
  cur.execute(
260
  """
261
- DELETE FROM documents
262
- WHERE id = %s AND tenant_id = %s;
263
  """,
264
- (document_id, tenant_id)
265
  )
266
-
267
- deleted = cur.rowcount > 0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
268
  conn.commit()
269
  cur.close()
270
  conn.close()
@@ -272,7 +356,9 @@ def delete_document(tenant_id: str, document_id: int) -> bool:
272
  return deleted
273
 
274
  except Exception as e:
275
- print("DB DELETE ERROR:", e)
 
 
276
  return False
277
 
278
 
@@ -280,20 +366,50 @@ def delete_all_documents(tenant_id: str) -> int:
280
  """
281
  Delete all documents for a tenant.
282
  Returns the number of documents deleted.
 
283
  """
284
  try:
 
 
 
285
  conn = get_connection()
286
  cur = conn.cursor()
287
 
 
288
  cur.execute(
289
  """
290
- DELETE FROM documents
291
- WHERE tenant_id = %s;
292
- """,
293
- (tenant_id,)
294
  )
295
-
296
- deleted_count = cur.rowcount
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
297
  conn.commit()
298
  cur.close()
299
  conn.close()
@@ -301,7 +417,9 @@ def delete_all_documents(tenant_id: str) -> int:
301
  return deleted_count
302
 
303
  except Exception as e:
304
- print("DB DELETE ALL ERROR:", e)
 
 
305
  return 0
306
 
307
 
@@ -341,3 +459,4 @@ TABLES = {
341
  "analytics": "analytics_events",
342
  "tool_usage": "tool_usage_stats",
343
  }
 
 
1
  """
2
+ Supabase/PostgreSQL database utilities shared by all MCP tools.
3
 
4
+ This module provides:
5
  1. Direct PostgreSQL connections (via psycopg2) for pgvector operations
6
+ 2. A Supabase client for REST-style administrative needs
7
  """
8
 
9
+ from __future__ import annotations
10
+
11
  import os
12
  from typing import Optional, List, Dict, Any
13
+
14
  import psycopg2
15
  import psycopg2.extras
 
16
  from dotenv import load_dotenv
17
+ from supabase import Client, create_client
18
 
19
  # Load environment variables
20
  load_dotenv()
 
23
  # Environment variables
24
  # -----------------------------------
25
 
26
+ DATABASE_URL = os.getenv("POSTGRESQL_URL") # Direct PostgreSQL connection
27
  SUPABASE_URL = os.getenv("SUPABASE_URL")
28
+ SUPABASE_KEY = os.getenv("SUPABASE_SERVICE_KEY") # MUST be service role key
29
 
30
  # Global Supabase client instance
31
  _supabase_client: Optional[Client] = None
 
67
  print("✅ pgvector extension enabled")
68
 
69
  # Create documents table
70
+ cur.execute(
71
+ """
72
  CREATE TABLE IF NOT EXISTS documents (
73
  id BIGSERIAL PRIMARY KEY,
74
  tenant_id TEXT NOT NULL,
 
76
  embedding vector(384) NOT NULL,
77
  created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
78
  );
79
+ """
80
+ )
81
  print("✅ documents table created")
82
 
83
  # Create index for vector similarity search
84
+ cur.execute(
85
+ """
86
  CREATE INDEX IF NOT EXISTS documents_embedding_idx
87
  ON documents
88
  USING ivfflat (embedding vector_cosine_ops)
89
  WITH (lists = 100);
90
+ """
91
+ )
92
  print("✅ vector index created")
93
 
94
  # Create index for tenant_id for faster filtering
95
+ cur.execute(
96
+ """
97
  CREATE INDEX IF NOT EXISTS documents_tenant_id_idx
98
  ON documents (tenant_id);
99
+ """
100
+ )
101
  print("✅ tenant_id index created")
102
 
103
  conn.commit()
 
121
  Insert document chunk + embedding.
122
  """
123
  try:
124
+ # Normalize tenant_id to ensure consistency
125
+ tenant_id = tenant_id.strip()
126
+
127
  conn = get_connection()
128
  cur = conn.cursor()
129
 
 
132
  INSERT INTO documents (tenant_id, chunk_text, embedding)
133
  VALUES (%s, %s, %s);
134
  """,
135
+ (tenant_id, text, embedding),
136
  )
137
 
138
  conn.commit()
 
154
  if not tenant_id or not tenant_id.strip():
155
  print("DB SEARCH ERROR: tenant_id is empty")
156
  return []
157
+
158
  tenant_id = tenant_id.strip()
159
  conn = get_connection()
160
  cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
 
171
  ORDER BY embedding <=> %s::vector(384)
172
  LIMIT %s;
173
  """,
174
+ (vector, tenant_id, vector, limit),
175
  )
176
 
177
  rows = cur.fetchall()
 
181
  for row in rows:
182
  row_tenant_id = row.get("tenant_id", "")
183
  if row_tenant_id != tenant_id:
184
+ print(
185
+ f"WARNING: Found document with tenant_id '{row_tenant_id}' when searching for '{tenant_id}' - skipping"
186
+ )
187
  continue
188
+
189
  results.append(
190
  {
191
  "text": row["chunk_text"],
192
  "similarity": float(row.get("similarity", 0.0)),
193
  }
194
  )
195
+
196
  cur.close()
197
  conn.close()
198
+
199
  return results
200
 
201
  except Exception as e:
202
  print(f"DB SEARCH ERROR (tenant_id={tenant_id}): {e}")
203
  import traceback
204
+
205
  traceback.print_exc()
206
  return []
207
 
208
 
209
+ def list_all_documents(
210
+ tenant_id: str, limit: int = 1000, offset: int = 0
211
+ ) -> Dict[str, Any]:
212
  """
213
  List all documents for a tenant with pagination.
214
+ Handles tenant_id normalization to match documents stored with different formatting.
215
  """
216
  try:
217
+ # Normalize tenant_id to ensure consistency
218
+ tenant_id_normalized = tenant_id.strip()
219
+
220
  conn = get_connection()
221
  cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
222
 
223
+ # Get all unique tenant_ids that match when normalized
224
+ cur.execute("SELECT DISTINCT tenant_id FROM documents;")
225
+ all_tenant_ids = [row[0] for row in cur.fetchall()]
226
+
227
+ # Find tenant_ids that match when normalized
228
+ matching_tenant_ids = []
229
+ for stored_tenant_id in all_tenant_ids:
230
+ if stored_tenant_id and stored_tenant_id.strip() == tenant_id_normalized:
231
+ matching_tenant_ids.append(stored_tenant_id)
232
+
233
+ if not matching_tenant_ids:
234
+ # No matching tenant_ids found
235
+ cur.close()
236
+ conn.close()
237
+ return {"documents": [], "total": 0, "limit": limit, "offset": offset}
238
+
239
+ # Build query to match any of the normalized tenant_ids
240
+ placeholders = ','.join(['%s'] * len(matching_tenant_ids))
241
  cur.execute(
242
+ f"""
243
  SELECT
244
  id,
245
  chunk_text,
246
  created_at
247
  FROM documents
248
+ WHERE tenant_id IN ({placeholders})
249
  ORDER BY created_at DESC
250
  LIMIT %s OFFSET %s;
251
  """,
252
+ tuple(matching_tenant_ids) + (limit, offset),
253
  )
254
 
255
  rows = cur.fetchall()
256
 
257
+ # Get total count for all matching tenant_ids
258
+ placeholders = ','.join(['%s'] * len(matching_tenant_ids))
259
  cur.execute(
260
+ f"""
261
  SELECT COUNT(*) as total
262
  FROM documents
263
+ WHERE tenant_id IN ({placeholders});
264
  """,
265
+ tuple(matching_tenant_ids),
266
  )
267
  total_row = cur.fetchone()
268
  total = total_row["total"] if total_row else 0
 
276
  {
277
  "id": row["id"],
278
  "text": row["chunk_text"],
279
+ "created_at": row["created_at"].isoformat()
280
+ if row["created_at"]
281
+ else None,
282
  }
283
  )
284
+ return {
285
+ "documents": results,
286
+ "total": total,
287
+ "limit": limit,
288
+ "offset": offset,
289
+ }
290
 
291
  except Exception as e:
292
  print("DB LIST ERROR:", e)
 
299
  Returns True if document was deleted, False otherwise.
300
  """
301
  try:
302
+ # Normalize tenant_id to ensure consistency
303
+ tenant_id = tenant_id.strip()
304
+
305
  conn = get_connection()
306
  cur = conn.cursor()
307
 
308
+ # First, verify the document exists
309
  cur.execute(
310
  """
311
+ SELECT id, tenant_id FROM documents
312
+ WHERE id = %s;
313
  """,
314
+ (document_id,),
315
  )
316
+ doc_row = cur.fetchone()
317
+
318
+ if doc_row is None:
319
+ print(f"DB DELETE: Document {document_id} does not exist")
320
+ cur.close()
321
+ conn.close()
322
+ return False
323
+
324
+ doc_tenant_id = doc_row[1] if len(doc_row) > 1 else None
325
+ # Normalize both tenant_ids for comparison (handle existing data with whitespace)
326
+ doc_tenant_id_normalized = doc_tenant_id.strip() if doc_tenant_id else None
327
+ tenant_id_normalized = tenant_id.strip()
328
+
329
+ # Try to delete with normalized comparison - if normalized match, use stored value for actual delete
330
+ if doc_tenant_id_normalized == tenant_id_normalized:
331
+ # Tenant IDs match after normalization - proceed with delete using stored tenant_id
332
+ cur.execute(
333
+ """
334
+ DELETE FROM documents
335
+ WHERE id = %s AND tenant_id = %s;
336
+ """,
337
+ (document_id, doc_tenant_id),
338
+ )
339
+ deleted = cur.rowcount > 0
340
+ else:
341
+ # Tenant IDs don't match - log the mismatch
342
+ print(f"DB DELETE: Document {document_id} belongs to tenant '{doc_tenant_id}' (normalized: '{doc_tenant_id_normalized}'), not '{tenant_id}' (normalized: '{tenant_id_normalized}')")
343
+ print(f"DB DELETE: Tenant ID lengths - stored: {len(doc_tenant_id) if doc_tenant_id else 0}, requested: {len(tenant_id)}")
344
+ print(f"DB DELETE: Tenant ID repr - stored: {repr(doc_tenant_id)}, requested: {repr(tenant_id)}")
345
+ deleted = False
346
+
347
+ if deleted:
348
+ print(f"DB DELETE: Successfully deleted document {document_id} for tenant '{tenant_id}'")
349
+ else:
350
+ print(f"DB DELETE: Failed to delete document {document_id} for tenant '{tenant_id}' (rowcount: {cur.rowcount})")
351
+
352
  conn.commit()
353
  cur.close()
354
  conn.close()
 
356
  return deleted
357
 
358
  except Exception as e:
359
+ print(f"DB DELETE ERROR (document_id={document_id}, tenant_id={tenant_id}): {e}")
360
+ import traceback
361
+ traceback.print_exc()
362
  return False
363
 
364
 
 
366
  """
367
  Delete all documents for a tenant.
368
  Returns the number of documents deleted.
369
+ Handles tenant_id normalization to match documents stored with different formatting.
370
  """
371
  try:
372
+ # Normalize tenant_id
373
+ tenant_id = tenant_id.strip()
374
+
375
  conn = get_connection()
376
  cur = conn.cursor()
377
 
378
+ # First, get all unique tenant_ids that match when normalized
379
  cur.execute(
380
  """
381
+ SELECT DISTINCT tenant_id FROM documents;
382
+ """
 
 
383
  )
384
+ all_tenant_ids = [row[0] for row in cur.fetchall()]
385
+
386
+ # Find tenant_ids that match when normalized
387
+ matching_tenant_ids = []
388
+ tenant_id_normalized = tenant_id.strip()
389
+ for stored_tenant_id in all_tenant_ids:
390
+ if stored_tenant_id and stored_tenant_id.strip() == tenant_id_normalized:
391
+ matching_tenant_ids.append(stored_tenant_id)
392
+
393
+ if not matching_tenant_ids:
394
+ print(f"DB DELETE ALL: No documents found for tenant '{tenant_id}' (normalized: '{tenant_id_normalized}')")
395
+ cur.close()
396
+ conn.close()
397
+ return 0
398
+
399
+ # Delete documents matching any of the normalized tenant_ids
400
+ deleted_count = 0
401
+ for matching_tenant_id in matching_tenant_ids:
402
+ cur.execute(
403
+ """
404
+ DELETE FROM documents
405
+ WHERE tenant_id = %s;
406
+ """,
407
+ (matching_tenant_id,),
408
+ )
409
+ deleted_count += cur.rowcount
410
+
411
+ print(f"DB DELETE ALL: Deleted {deleted_count} document(s) for tenant '{tenant_id}' (matched {len(matching_tenant_ids)} tenant_id variant(s))")
412
+
413
  conn.commit()
414
  cur.close()
415
  conn.close()
 
417
  return deleted_count
418
 
419
  except Exception as e:
420
+ print(f"DB DELETE ALL ERROR (tenant_id={tenant_id}): {e}")
421
+ import traceback
422
+ traceback.print_exc()
423
  return 0
424
 
425
 
 
459
  "analytics": "analytics_events",
460
  "tool_usage": "tool_usage_stats",
461
  }
462
+
backend/{mcp_servers → mcp_server/common}/embeddings.py RENAMED
@@ -1,8 +1,13 @@
 
 
 
 
1
  from sentence_transformers import SentenceTransformer
2
 
3
  # Load MiniLM model (384-dimensional embeddings)
4
  model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")
5
 
 
6
  def embed_text(text: str):
7
  """
8
  Generate sentence embedding for use with pgvector.
@@ -15,3 +20,4 @@ def embed_text(text: str):
15
  """
16
  vector = model.encode(text)
17
  return vector.tolist()
 
 
1
+ """
2
+ Sentence-transformer embeddings shared across all MCP tools.
3
+ """
4
+
5
  from sentence_transformers import SentenceTransformer
6
 
7
  # Load MiniLM model (384-dimensional embeddings)
8
  model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")
9
 
10
+
11
  def embed_text(text: str):
12
  """
13
  Generate sentence embedding for use with pgvector.
 
20
  """
21
  vector = model.encode(text)
22
  return vector.tolist()
23
+
backend/mcp_server/common/logging.py ADDED
@@ -0,0 +1,116 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ import logging
4
+ import os
5
+ from typing import Any, Dict, Optional
6
+
7
+ logger = logging.getLogger("integrachat.mcp")
8
+ if not logger.handlers:
9
+ handler = logging.StreamHandler()
10
+ formatter = logging.Formatter(
11
+ "[%(asctime)s] %(levelname)s %(name)s - %(message)s",
12
+ datefmt="%Y-%m-%d %H:%M:%S",
13
+ )
14
+ handler.setFormatter(formatter)
15
+ logger.addHandler(handler)
16
+
17
+ logger.setLevel(os.getenv("LOG_LEVEL", "INFO").upper())
18
+
19
+ try:
20
+ from backend.api.storage.analytics_store import AnalyticsStore
21
+ except Exception: # pragma: no cover - analytics storage is optional during tests
22
+ AnalyticsStore = None # type: ignore
23
+ _analytics_store = None
24
+ else:
25
+ _analytics_store = AnalyticsStore()
26
+
27
+
28
+ def log_tool_usage(
29
+ tool_name: str,
30
+ tenant_id: Optional[str],
31
+ *,
32
+ success: bool,
33
+ latency_ms: Optional[int] = None,
34
+ metadata: Optional[Dict[str, Any]] = None,
35
+ error_message: Optional[str] = None,
36
+ user_id: Optional[str] = None,
37
+ ):
38
+ log_data = {
39
+ "tool": tool_name,
40
+ "tenant_id": tenant_id,
41
+ "success": success,
42
+ "latency_ms": latency_ms,
43
+ "user_id": user_id,
44
+ "metadata": metadata or {},
45
+ }
46
+ if error_message:
47
+ log_data["error"] = error_message
48
+
49
+ if success:
50
+ logger.info("tool_completed %s", log_data)
51
+ else:
52
+ logger.warning("tool_failed %s", log_data)
53
+
54
+ if _analytics_store and tenant_id:
55
+ try:
56
+ _analytics_store.log_tool_usage(
57
+ tenant_id=tenant_id,
58
+ tool_name=tool_name,
59
+ latency_ms=latency_ms,
60
+ success=success,
61
+ error_message=error_message,
62
+ metadata=metadata,
63
+ user_id=user_id,
64
+ )
65
+ except Exception as exc: # pragma: no cover - analytics failures shouldn't crash tools
66
+ logger.debug("analytics logging failed: %s", exc)
67
+
68
+
69
+ def log_rag_search_metrics(
70
+ tenant_id: str,
71
+ query: str,
72
+ hits_count: int,
73
+ avg_score: Optional[float],
74
+ top_score: Optional[float],
75
+ latency_ms: Optional[int] = None,
76
+ ):
77
+ if _analytics_store:
78
+ try:
79
+ _analytics_store.log_rag_search(
80
+ tenant_id=tenant_id,
81
+ query=query,
82
+ hits_count=hits_count,
83
+ avg_score=avg_score,
84
+ top_score=top_score,
85
+ latency_ms=latency_ms,
86
+ )
87
+ except Exception as exc: # pragma: no cover
88
+ logger.debug("rag analytics logging failed: %s", exc)
89
+
90
+
91
+ def log_redflag_violation(
92
+ tenant_id: str,
93
+ rule_id: str,
94
+ rule_pattern: str,
95
+ severity: str,
96
+ matched_text: str,
97
+ *,
98
+ confidence: Optional[float] = None,
99
+ message_preview: Optional[str] = None,
100
+ user_id: Optional[str] = None,
101
+ ):
102
+ if _analytics_store:
103
+ try:
104
+ _analytics_store.log_redflag_violation(
105
+ tenant_id=tenant_id,
106
+ rule_id=rule_id,
107
+ rule_pattern=rule_pattern,
108
+ severity=severity,
109
+ matched_text=matched_text,
110
+ confidence=confidence,
111
+ message_preview=message_preview,
112
+ user_id=user_id,
113
+ )
114
+ except Exception as exc: # pragma: no cover
115
+ logger.debug("redflag logging failed: %s", exc)
116
+
backend/mcp_server/common/tenant.py ADDED
@@ -0,0 +1,57 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ import re
4
+ from dataclasses import dataclass
5
+ from typing import Any, Mapping, Optional
6
+
7
+
8
+ class TenantValidationError(ValueError):
9
+ """Raised when tenant metadata is missing or malformed."""
10
+
11
+
12
+ TENANT_ID_PATTERN = re.compile(r"^[A-Za-z0-9_\-.:/]{3,128}$")
13
+
14
+
15
+ @dataclass(slots=True)
16
+ class TenantContext:
17
+ tenant_id: str
18
+ user_id: Optional[str] = None
19
+ metadata: Optional[dict[str, Any]] = None
20
+
21
+
22
+ def _extract_tenant_id(payload: Mapping[str, Any]) -> str:
23
+ for key in ("tenant_id", "tenantId", "tenant"):
24
+ if key in payload:
25
+ value = payload[key]
26
+ if isinstance(value, str):
27
+ return value.strip()
28
+ raise TenantValidationError("tenant_id is required for every MCP tool call")
29
+
30
+
31
+ def _normalize_tenant_id(raw_value: str) -> str:
32
+ normalized = raw_value.strip()
33
+ if not normalized:
34
+ raise TenantValidationError("tenant_id cannot be empty")
35
+ if not TENANT_ID_PATTERN.match(normalized):
36
+ raise TenantValidationError(
37
+ "tenant_id must be 3-128 chars and may only contain letters, numbers, '.', '-', '_', or ':'"
38
+ )
39
+ return normalized
40
+
41
+
42
+ def build_tenant_context(payload: Mapping[str, Any]) -> TenantContext:
43
+ tenant_id = _normalize_tenant_id(_extract_tenant_id(payload))
44
+ user_id: Optional[str] = None
45
+ metadata: Optional[dict[str, Any]] = None
46
+
47
+ for key in ("user_id", "userId"):
48
+ if key in payload and isinstance(payload[key], str):
49
+ user_id = payload[key].strip() or None
50
+ break
51
+
52
+ meta_candidate = payload.get("metadata")
53
+ if isinstance(meta_candidate, dict):
54
+ metadata = meta_candidate
55
+
56
+ return TenantContext(tenant_id=tenant_id, user_id=user_id, metadata=metadata)
57
+
backend/mcp_server/common/utils.py ADDED
@@ -0,0 +1,146 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+ import inspect
3
+ import time
4
+ from typing import Any, Awaitable, Callable, Mapping, Optional
5
+
6
+ from .logging import log_tool_usage
7
+ from .tenant import TenantContext, TenantValidationError, build_tenant_context
8
+
9
+
10
+ class ToolValidationError(ValueError):
11
+ """Raised when the caller request payload is invalid."""
12
+
13
+
14
+ class ToolExecutionError(RuntimeError):
15
+ """Raised for unexpected runtime failures."""
16
+
17
+
18
+ Payload = Mapping[str, Any]
19
+ ToolHandler = Callable[[TenantContext, Payload], Awaitable[dict[str, Any]] | dict[str, Any]]
20
+
21
+
22
+ def success_response(
23
+ tool_name: str,
24
+ context: TenantContext,
25
+ data: Any,
26
+ latency_ms: int,
27
+ metadata: Optional[dict[str, Any]] = None,
28
+ ) -> dict[str, Any]:
29
+ return {
30
+ "status": "ok",
31
+ "tool": tool_name,
32
+ "tenant_id": context.tenant_id,
33
+ "latency_ms": latency_ms,
34
+ "metadata": metadata or {},
35
+ "data": data,
36
+ }
37
+
38
+
39
+ def error_response(
40
+ tool_name: str,
41
+ context: Optional[TenantContext],
42
+ error: Exception,
43
+ latency_ms: int,
44
+ error_type: str = "runtime_error",
45
+ ) -> dict[str, Any]:
46
+ return {
47
+ "status": "error",
48
+ "tool": tool_name,
49
+ "tenant_id": context.tenant_id if context else None,
50
+ "latency_ms": latency_ms,
51
+ "error_type": error_type,
52
+ "message": str(error),
53
+ }
54
+
55
+
56
+ async def maybe_await(result: Any) -> Any:
57
+ if inspect.isawaitable(result):
58
+ return await result
59
+ return result
60
+
61
+
62
+ def _truncate(value: Any, max_length: int = 200) -> Any:
63
+ if isinstance(value, str) and len(value) > max_length:
64
+ return value[: max_length - 3] + "..."
65
+ return value
66
+
67
+
68
+ def _trim_payload(payload: Payload) -> dict[str, Any]:
69
+ trimmed: dict[str, Any] = {}
70
+ for key, value in payload.items():
71
+ if key in {"content", "query"} and isinstance(value, str):
72
+ trimmed[key] = _truncate(value)
73
+ elif isinstance(value, (str, int, float, bool)) or value is None:
74
+ trimmed[key] = value
75
+ else:
76
+ trimmed[key] = "<complex>"
77
+ return trimmed
78
+
79
+
80
+ async def execute_tool(
81
+ tool_name: str,
82
+ payload: Payload,
83
+ handler: ToolHandler,
84
+ ) -> dict[str, Any]:
85
+ start = time.perf_counter()
86
+ context: Optional[TenantContext] = None
87
+ try:
88
+ context = build_tenant_context(payload)
89
+ result = await maybe_await(handler(context, payload))
90
+ latency_ms = int((time.perf_counter() - start) * 1000)
91
+
92
+ log_tool_usage(
93
+ tool_name,
94
+ context.tenant_id,
95
+ success=True,
96
+ latency_ms=latency_ms,
97
+ metadata={"payload": _trim_payload(payload)},
98
+ user_id=context.user_id,
99
+ )
100
+ return success_response(
101
+ tool_name,
102
+ context,
103
+ result,
104
+ latency_ms,
105
+ )
106
+ except (TenantValidationError, ToolValidationError) as exc:
107
+ latency_ms = int((time.perf_counter() - start) * 1000)
108
+ log_tool_usage(
109
+ tool_name,
110
+ context.tenant_id if context else None,
111
+ success=False,
112
+ latency_ms=latency_ms,
113
+ error_message=str(exc),
114
+ metadata={"payload": _trim_payload(payload)},
115
+ user_id=context.user_id if context else None,
116
+ )
117
+ return error_response(tool_name, context, exc, latency_ms, "validation_error")
118
+ except Exception as exc: # pragma: no cover - safety net
119
+ latency_ms = int((time.perf_counter() - start) * 1000)
120
+ log_tool_usage(
121
+ tool_name,
122
+ context.tenant_id if context else None,
123
+ success=False,
124
+ latency_ms=latency_ms,
125
+ error_message=str(exc),
126
+ metadata={"payload": _trim_payload(payload)},
127
+ user_id=context.user_id if context else None,
128
+ )
129
+ return error_response(tool_name, context, exc, latency_ms)
130
+
131
+
132
+ def tool_handler(tool_name: str):
133
+ """
134
+ Decorator that wires tenant validation, analytics logging, and error handling.
135
+ """
136
+
137
+ def decorator(func: ToolHandler):
138
+ async def wrapper(payload: Payload) -> dict[str, Any]:
139
+ return await execute_tool(tool_name, payload, func)
140
+
141
+ wrapper.__name__ = func.__name__
142
+ wrapper.__doc__ = func.__doc__
143
+ return wrapper
144
+
145
+ return decorator
146
+
backend/mcp_server/rag/__init__.py ADDED
@@ -0,0 +1,6 @@
 
 
 
 
 
 
 
1
+ """
2
+ Retriever tooling for the unified MCP server.
3
+ """
4
+
5
+ __all__ = ["search", "ingest", "delete"]
6
+
backend/mcp_server/rag/delete.py ADDED
@@ -0,0 +1,48 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ from typing import Mapping
4
+
5
+ from backend.mcp_server.common.database import delete_all_documents, delete_document
6
+ from backend.mcp_server.common.tenant import TenantContext
7
+ from backend.mcp_server.common.utils import ToolValidationError, tool_handler
8
+
9
+
10
+ @tool_handler("rag.delete")
11
+ async def rag_delete(context: TenantContext, payload: Mapping[str, object]) -> dict[str, object]:
12
+ """
13
+ Delete one document by ID or purge all documents for the tenant.
14
+ """
15
+
16
+ document_id = payload.get("document_id")
17
+ delete_all = bool(payload.get("delete_all", False))
18
+
19
+ if delete_all:
20
+ deleted = delete_all_documents(context.tenant_id)
21
+ return {
22
+ "tenant_id": context.tenant_id,
23
+ "deleted_count": deleted,
24
+ "mode": "all",
25
+ }
26
+
27
+ if document_id is None:
28
+ raise ToolValidationError("document_id is required unless delete_all=true")
29
+
30
+ try:
31
+ doc_id_value = int(document_id)
32
+ except (TypeError, ValueError):
33
+ raise ToolValidationError("document_id must be an integer")
34
+
35
+ if doc_id_value <= 0:
36
+ raise ToolValidationError("document_id must be positive")
37
+
38
+ deleted = delete_document(context.tenant_id, doc_id_value)
39
+ if not deleted:
40
+ raise ToolValidationError(f"Document {doc_id_value} not found or access denied")
41
+
42
+ return {
43
+ "tenant_id": context.tenant_id,
44
+ "document_id": doc_id_value,
45
+ "deleted": True,
46
+ "mode": "single",
47
+ }
48
+
backend/mcp_server/rag/ingest.py ADDED
@@ -0,0 +1,43 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ from typing import Mapping
4
+
5
+ from backend.api.utils.text_extractor import extract_text
6
+ from backend.mcp_server.common.database import insert_document_chunks
7
+ from backend.mcp_server.common.embeddings import embed_text
8
+ from backend.mcp_server.common.tenant import TenantContext
9
+ from backend.mcp_server.common.utils import ToolValidationError, tool_handler
10
+
11
+
12
+ @tool_handler("rag.ingest")
13
+ async def rag_ingest(context: TenantContext, payload: Mapping[str, object]) -> dict[str, object]:
14
+ """
15
+ Ingest raw text into the tenant's knowledge base.
16
+ """
17
+
18
+ content = payload.get("content")
19
+ if not isinstance(content, str) or not content.strip():
20
+ raise ToolValidationError("content must be a non-empty string")
21
+
22
+ max_words = payload.get("chunk_words", 300)
23
+ try:
24
+ max_words_value = max(50, min(int(max_words), 800))
25
+ except (TypeError, ValueError):
26
+ raise ToolValidationError("chunk_words must be an integer between 50 and 800")
27
+
28
+ chunks = extract_text(content, max_words=max_words_value)
29
+ if not chunks:
30
+ raise ToolValidationError("no text detected after preprocessing")
31
+
32
+ stored = 0
33
+ for chunk in chunks:
34
+ vector = embed_text(chunk)
35
+ insert_document_chunks(context.tenant_id, chunk, vector)
36
+ stored += 1
37
+
38
+ return {
39
+ "tenant_id": context.tenant_id,
40
+ "chunks_ingested": stored,
41
+ "metadata": {"chunk_words": max_words_value},
42
+ }
43
+
backend/mcp_server/rag/list.py ADDED
@@ -0,0 +1,30 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ from typing import Mapping
4
+
5
+ from backend.mcp_server.common.database import list_all_documents
6
+ from backend.mcp_server.common.tenant import TenantContext
7
+ from backend.mcp_server.common.utils import ToolValidationError, tool_handler
8
+
9
+
10
+ @tool_handler("rag.list")
11
+ async def rag_list(context: TenantContext, payload: Mapping[str, object]) -> dict[str, object]:
12
+ """
13
+ List stored RAG documents for the tenant with pagination.
14
+ """
15
+
16
+ limit = payload.get("limit", 1000)
17
+ offset = payload.get("offset", 0)
18
+
19
+ try:
20
+ limit_value = max(1, min(int(limit), 5000))
21
+ except (TypeError, ValueError):
22
+ raise ToolValidationError("limit must be an integer between 1 and 5000")
23
+
24
+ try:
25
+ offset_value = max(0, int(offset))
26
+ except (TypeError, ValueError):
27
+ raise ToolValidationError("offset must be a non-negative integer")
28
+
29
+ return list_all_documents(context.tenant_id, limit=limit_value, offset=offset_value)
30
+
backend/mcp_server/rag/search.py ADDED
@@ -0,0 +1,64 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ from statistics import mean
4
+ from typing import Mapping
5
+
6
+ from backend.mcp_server.common.database import search_vectors
7
+ from backend.mcp_server.common.embeddings import embed_text
8
+ from backend.mcp_server.common.logging import log_rag_search_metrics
9
+ from backend.mcp_server.common.tenant import TenantContext
10
+ from backend.mcp_server.common.utils import ToolValidationError, tool_handler
11
+
12
+
13
+ @tool_handler("rag.search")
14
+ async def rag_search(context: TenantContext, payload: Mapping[str, Any]) -> dict[str, Any]:
15
+ """
16
+ Perform semantic search across the tenant's knowledge base.
17
+ """
18
+
19
+ query = payload.get("query")
20
+ if not isinstance(query, str) or not query.strip():
21
+ raise ToolValidationError("query must be a non-empty string")
22
+
23
+ limit = payload.get("limit", 10)
24
+ try:
25
+ limit_value = max(1, min(int(limit), 25))
26
+ except (TypeError, ValueError):
27
+ raise ToolValidationError("limit must be an integer between 1 and 25")
28
+
29
+ threshold = payload.get("threshold", 0.55)
30
+ try:
31
+ threshold_value = max(0.0, min(float(threshold), 1.0))
32
+ except (TypeError, ValueError):
33
+ raise ToolValidationError("threshold must be a float between 0.0 and 1.0")
34
+
35
+ embedding = embed_text(query)
36
+ raw_results = search_vectors(context.tenant_id, embedding, limit=limit_value)
37
+ filtered = [
38
+ {"text": chunk.get("text", ""), "relevance": chunk.get("similarity", 0.0)}
39
+ for chunk in raw_results
40
+ if chunk.get("similarity", 0.0) >= threshold_value
41
+ ][:3]
42
+
43
+ hits = len(raw_results)
44
+ avg_score = mean([item.get("similarity", 0.0) for item in raw_results]) if raw_results else None
45
+ top_score = raw_results[0].get("similarity") if raw_results else None
46
+
47
+ log_rag_search_metrics(
48
+ tenant_id=context.tenant_id,
49
+ query=query,
50
+ hits_count=hits,
51
+ avg_score=avg_score,
52
+ top_score=top_score,
53
+ )
54
+
55
+ return {
56
+ "query": query,
57
+ "results": filtered,
58
+ "metadata": {
59
+ "limit": limit_value,
60
+ "threshold": threshold_value,
61
+ "hits_before_filter": hits,
62
+ },
63
+ }
64
+
backend/mcp_server/server.py ADDED
@@ -0,0 +1,200 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ import logging
4
+ import os
5
+ from contextlib import asynccontextmanager
6
+ from typing import Awaitable, Callable, Dict, Optional
7
+
8
+ from fastapi import FastAPI, Query
9
+ import uvicorn
10
+
11
+ from backend.mcp_server.admin.rules import admin_add_rule, admin_delete_rule, admin_get_rules
12
+ from backend.mcp_server.admin.violations import log_violation as admin_log_violation
13
+ from backend.mcp_server.rag.delete import rag_delete
14
+ from backend.mcp_server.rag.ingest import rag_ingest
15
+ from backend.mcp_server.rag.list import rag_list
16
+ from backend.mcp_server.rag.search import rag_search
17
+ from backend.mcp_server.web.search import web_search
18
+
19
+ ToolHandler = Callable[[Dict], Awaitable[Dict] | Dict]
20
+
21
+ logger = logging.getLogger("integrachat.mcp.server")
22
+ if not logger.handlers:
23
+ handler = logging.StreamHandler()
24
+ formatter = logging.Formatter(
25
+ "[%(asctime)s] %(levelname)s %(name)s - %(message)s",
26
+ datefmt="%Y-%m-%d %H:%M:%S",
27
+ )
28
+ handler.setFormatter(formatter)
29
+ logger.addHandler(handler)
30
+ logger.setLevel(logging.INFO)
31
+
32
+
33
+ @asynccontextmanager
34
+ async def lifespan(app: FastAPI):
35
+ """Lifespan context manager for startup and shutdown events."""
36
+ # Startup
37
+ try:
38
+ routes = []
39
+ for route in app.routes:
40
+ if hasattr(route, "path") and hasattr(route, "methods"):
41
+ routes.append(f"{', '.join(route.methods)} {route.path}")
42
+ logger.info("Registered routes: %s", ", ".join(sorted(routes)))
43
+ except Exception as e:
44
+ logger.warning("Could not log routes during startup: %s", e)
45
+ yield
46
+ # Shutdown (if needed in the future)
47
+
48
+
49
+ app = FastAPI(title="IntegraChat MCP", version="1.0.0", lifespan=lifespan)
50
+
51
+
52
+ def _register_tool(tool_name: str, handler: ToolHandler) -> None:
53
+ """
54
+ Register the given tool handler under both a namespaced route
55
+ (/rag/search) and an optional root route (/search) so the server works
56
+ whether clients point to /rag or directly to the namespace port.
57
+ """
58
+ namespace, action = tool_name.split(".", 1)
59
+ namespaced_path = f"/{namespace}/{action}"
60
+ root_path = f"/{action}"
61
+
62
+ @app.post(namespaced_path)
63
+ async def namespaced_endpoint(payload: Dict) -> Dict:
64
+ return await handler(payload) # type: ignore[arg-type]
65
+
66
+ @app.post(root_path)
67
+ async def root_endpoint(payload: Dict) -> Dict:
68
+ return await handler(payload) # type: ignore[arg-type]
69
+
70
+
71
+ # Add GET endpoint support for /rag/list (register BEFORE POST to avoid conflicts)
72
+ @app.get("/rag/list")
73
+ async def rag_list_get(
74
+ tenant_id: str = Query(..., description="Tenant ID"),
75
+ limit: Optional[int] = Query(1000, description="Maximum number of documents to return"),
76
+ offset: Optional[int] = Query(0, description="Number of documents to skip")
77
+ ) -> Dict:
78
+ """GET endpoint for listing RAG documents."""
79
+ logger.info("GET /rag/list called with tenant_id=%s, limit=%s, offset=%s", tenant_id, limit, offset)
80
+ payload = {
81
+ "tenant_id": tenant_id,
82
+ "limit": limit,
83
+ "offset": offset
84
+ }
85
+ result = await rag_list(payload) # type: ignore[arg-type]
86
+ return result
87
+
88
+ @app.get("/list")
89
+ async def rag_list_get_root(
90
+ tenant_id: str = Query(..., description="Tenant ID"),
91
+ limit: Optional[int] = Query(1000, description="Maximum number of documents to return"),
92
+ offset: Optional[int] = Query(0, description="Number of documents to skip")
93
+ ) -> Dict:
94
+ """GET endpoint for listing RAG documents (root path)."""
95
+ logger.info("GET /list called with tenant_id=%s, limit=%s, offset=%s", tenant_id, limit, offset)
96
+ payload = {
97
+ "tenant_id": tenant_id,
98
+ "limit": limit,
99
+ "offset": offset
100
+ }
101
+ result = await rag_list(payload) # type: ignore[arg-type]
102
+ return result
103
+
104
+ # Add DELETE endpoint support for /rag/delete/{document_id}
105
+ @app.delete("/rag/delete/{document_id}")
106
+ async def rag_delete_document(
107
+ document_id: int,
108
+ tenant_id: str = Query(..., description="Tenant ID")
109
+ ) -> Dict:
110
+ """DELETE endpoint for deleting a specific document."""
111
+ try:
112
+ logger.info("DELETE /rag/delete/%s called with tenant_id=%s", document_id, tenant_id)
113
+ payload = {
114
+ "tenant_id": tenant_id,
115
+ "document_id": document_id
116
+ }
117
+ result = await rag_delete(payload) # type: ignore[arg-type]
118
+ logger.info("DELETE /rag/delete/%s result: %s", document_id, result)
119
+ return result
120
+ except Exception as e:
121
+ logger.error("Error in DELETE /rag/delete/%s: %s", document_id, e, exc_info=True)
122
+ raise
123
+
124
+ @app.delete("/delete/{document_id}")
125
+ async def rag_delete_document_root(
126
+ document_id: int,
127
+ tenant_id: str = Query(..., description="Tenant ID")
128
+ ) -> Dict:
129
+ """DELETE endpoint for deleting a specific document (root path)."""
130
+ logger.info("DELETE /delete/%s called with tenant_id=%s", document_id, tenant_id)
131
+ payload = {
132
+ "tenant_id": tenant_id,
133
+ "document_id": document_id
134
+ }
135
+ result = await rag_delete(payload) # type: ignore[arg-type]
136
+ return result
137
+
138
+ # Add DELETE endpoint support for /rag/delete-all
139
+ @app.delete("/rag/delete-all")
140
+ async def rag_delete_all(
141
+ tenant_id: str = Query(..., description="Tenant ID")
142
+ ) -> Dict:
143
+ """DELETE endpoint for deleting all documents."""
144
+ try:
145
+ logger.info("DELETE /rag/delete-all called with tenant_id=%s", tenant_id)
146
+ payload = {
147
+ "tenant_id": tenant_id,
148
+ "delete_all": True
149
+ }
150
+ result = await rag_delete(payload) # type: ignore[arg-type]
151
+ return result
152
+ except Exception as e:
153
+ logger.error("Error in DELETE /rag/delete-all: %s", e, exc_info=True)
154
+ raise
155
+
156
+ @app.delete("/delete-all")
157
+ async def rag_delete_all_root(
158
+ tenant_id: str = Query(..., description="Tenant ID")
159
+ ) -> Dict:
160
+ """DELETE endpoint for deleting all documents (root path)."""
161
+ try:
162
+ logger.info("DELETE /delete-all called with tenant_id=%s", tenant_id)
163
+ payload = {
164
+ "tenant_id": tenant_id,
165
+ "delete_all": True
166
+ }
167
+ result = await rag_delete(payload) # type: ignore[arg-type]
168
+ return result
169
+ except Exception as e:
170
+ logger.error("Error in DELETE /delete-all: %s", e, exc_info=True)
171
+ raise
172
+
173
+ _register_tool("rag.search", rag_search)
174
+ _register_tool("rag.ingest", rag_ingest)
175
+ _register_tool("rag.delete", rag_delete)
176
+ _register_tool("rag.list", rag_list)
177
+
178
+ _register_tool("web.search", web_search)
179
+
180
+ _register_tool("admin.getRules", admin_get_rules)
181
+ _register_tool("admin.addRule", admin_add_rule)
182
+ _register_tool("admin.deleteRule", admin_delete_rule)
183
+ _register_tool("admin.logViolation", admin_log_violation)
184
+
185
+
186
+ @app.get("/health")
187
+ async def health() -> Dict[str, str]:
188
+ return {"status": "ok", "service": "mcp"}
189
+
190
+
191
+ def main():
192
+ host = os.getenv("MCP_HOST", "0.0.0.0")
193
+ port = int(os.getenv("MCP_PORT", "8001"))
194
+ logger.info("Starting IntegraChat MCP HTTP server on %s:%s", host, port)
195
+ uvicorn.run("backend.mcp_server.server:app", host=host, port=port)
196
+
197
+
198
+ if __name__ == "__main__":
199
+ main()
200
+
backend/mcp_server/web/__init__.py ADDED
@@ -0,0 +1,6 @@
 
 
 
 
 
 
 
1
+ """
2
+ Web search tooling namespace.
3
+ """
4
+
5
+ __all__ = ["search"]
6
+
backend/mcp_server/web/search.py ADDED
@@ -0,0 +1,56 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ from typing import Mapping
4
+
5
+ from duckduckgo_search import DDGS
6
+
7
+ from backend.mcp_server.common.tenant import TenantContext
8
+ from backend.mcp_server.common.utils import ToolExecutionError, ToolValidationError, tool_handler
9
+
10
+
11
+ @tool_handler("web.search")
12
+ async def web_search(context: TenantContext, payload: Mapping[str, object]) -> dict[str, object]:
13
+ """
14
+ Perform a DuckDuckGo web search with an English-results bias.
15
+ """
16
+
17
+ query = payload.get("query")
18
+ if not isinstance(query, str) or not query.strip():
19
+ raise ToolValidationError("query must be a non-empty string")
20
+
21
+ max_results = payload.get("max_results", 5)
22
+ try:
23
+ max_results_value = max(1, min(int(max_results), 10))
24
+ except (TypeError, ValueError):
25
+ raise ToolValidationError("max_results must be an integer between 1 and 10")
26
+
27
+ region = str(payload.get("region", "us-en"))
28
+
29
+ try:
30
+ ddg = DDGS()
31
+ query_string = query
32
+ if "lang:en" not in query_string.lower():
33
+ query_string = f"{query_string} lang:en"
34
+
35
+ try:
36
+ results = ddg.text(query_string, max_results=max_results_value, region=region)
37
+ except TypeError:
38
+ results = ddg.text(query_string, max_results=max_results_value)
39
+
40
+ formatted = [
41
+ {
42
+ "title": item.get("title"),
43
+ "snippet": item.get("body"),
44
+ "url": item.get("href"),
45
+ }
46
+ for item in results
47
+ ]
48
+
49
+ return {
50
+ "query": query,
51
+ "results": formatted,
52
+ "metadata": {"max_results": max_results_value, "region": region},
53
+ }
54
+ except Exception as exc:
55
+ raise ToolExecutionError(f"web search failed: {exc}") from exc
56
+
backend/mcp_servers/admin_server.py DELETED
@@ -1,51 +0,0 @@
1
- # =============================================================
2
- # File: backend/mcp_servers/admin_server.py
3
- # =============================================================
4
-
5
- from fastapi import FastAPI
6
- from fastapi.middleware.cors import CORSMiddleware
7
- import logging
8
- import sys
9
- import os
10
-
11
- # Fix Python module paths
12
- current_dir = os.path.dirname(__file__)
13
- sys.path.insert(0, current_dir)
14
-
15
- from models.admin import EvalRequest, AlertPayload
16
-
17
-
18
- admin_app = FastAPI(title="Admin MCP Server")
19
-
20
- # Enable CORS
21
- admin_app.add_middleware(
22
- CORSMiddleware,
23
- allow_origins=["*"],
24
- allow_credentials=True,
25
- allow_methods=["*"],
26
- allow_headers=["*"],
27
- )
28
-
29
- log = logging.getLogger("admin_mcp")
30
- logging.basicConfig(level=logging.INFO)
31
-
32
-
33
- @admin_app.post("/eval")
34
- async def eval_query(req: EvalRequest):
35
- danger = ["delete all data", "export users", "password", "token"]
36
- q = req.query.lower()
37
- for d in danger:
38
- if d in q:
39
- return {"action": "block", "reason": d}
40
- return {"action": "allow"}
41
-
42
-
43
- @admin_app.post("/alert")
44
- async def alert(payload: AlertPayload):
45
- log.warning(f"Alert received for tenant {payload.tenant_id}: {payload.violations}")
46
- return {"status": "ok"}
47
-
48
-
49
- if __name__ == "__main__":
50
- import uvicorn
51
- uvicorn.run(admin_app, host="0.0.0.0", port=8003)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
backend/mcp_servers/main.py DELETED
@@ -1,243 +0,0 @@
1
- from fastapi import FastAPI, HTTPException
2
- from pydantic import BaseModel
3
- from fastapi.middleware.cors import CORSMiddleware
4
- from dotenv import load_dotenv
5
- import sys
6
- import os
7
-
8
- # --------------------------------------------------------
9
- # Fix Python module paths
10
- # --------------------------------------------------------
11
-
12
- current_dir = os.path.dirname(__file__)
13
- parent_dir = os.path.dirname(current_dir)
14
-
15
- sys.path.insert(0, current_dir) # For embeddings + database
16
- sys.path.insert(0, os.path.join(parent_dir, "api")) # For utils
17
-
18
-
19
- # --------------------------------------------------------
20
- # Imports AFTER adjusting paths
21
- # --------------------------------------------------------
22
-
23
- from embeddings import embed_text
24
- from database import insert_document_chunks, search_vectors, list_all_documents, initialize_database, delete_document, delete_all_documents
25
- from utils.text_extractor import extract_text
26
-
27
-
28
- # --------------------------------------------------------
29
- # Load environment variables
30
- # --------------------------------------------------------
31
-
32
- load_dotenv()
33
-
34
-
35
- # --------------------------------------------------------
36
- # FastAPI App
37
- # --------------------------------------------------------
38
-
39
- app = FastAPI(
40
- title="RAG MCP Server",
41
- description="Provides semantic search + ingestion for tenant knowledge bases",
42
- version="1.0.0"
43
- )
44
-
45
-
46
- # --------------------------------------------------------
47
- # Enable CORS
48
- # --------------------------------------------------------
49
-
50
- app.add_middleware(
51
- CORSMiddleware,
52
- allow_origins=["*"],
53
- allow_credentials=True,
54
- allow_methods=["*"],
55
- allow_headers=["*"],
56
- )
57
-
58
-
59
- # --------------------------------------------------------
60
- # Startup Event - Initialize Database
61
- # --------------------------------------------------------
62
-
63
- @app.on_event("startup")
64
- async def startup_event():
65
- """Initialize database schema on server startup."""
66
- try:
67
- print("Initializing database schema...")
68
- initialize_database()
69
- except Exception as e:
70
- print(f"Warning: Database initialization failed: {e}")
71
- print("Server will continue, but database operations may fail.")
72
-
73
-
74
- # --------------------------------------------------------
75
- # Request Models
76
- # --------------------------------------------------------
77
-
78
- class IngestPayload(BaseModel):
79
- tenant_id: str
80
- content: str
81
-
82
-
83
- class SearchPayload(BaseModel):
84
- query: str
85
- tenant_id: str
86
-
87
-
88
- # --------------------------------------------------------
89
- # Health Check
90
- # --------------------------------------------------------
91
-
92
- @app.get("/")
93
- def root():
94
- return {"status": "RAG MCP SERVER RUNNING"}
95
-
96
-
97
- # --------------------------------------------------------
98
- # Ingest Route
99
- # --------------------------------------------------------
100
-
101
- @app.post("/ingest")
102
- def ingest(payload: IngestPayload):
103
- """
104
- Ingest raw text:
105
- - Chunk text
106
- - Embed chunks
107
- - Store in Postgres
108
- """
109
- try:
110
- chunks = extract_text(payload.content)
111
-
112
- if not chunks:
113
- raise HTTPException(400, "No text found to ingest.")
114
-
115
- inserted = 0
116
-
117
- for chunk in chunks:
118
- embedding = embed_text(chunk)
119
- insert_document_chunks(payload.tenant_id, chunk, embedding)
120
- inserted += 1
121
-
122
- return {
123
- "status": "ok",
124
- "tenant_id": payload.tenant_id,
125
- "chunks_stored": inserted
126
- }
127
-
128
- except Exception as e:
129
- raise HTTPException(status_code=500, detail=str(e))
130
-
131
-
132
- # --------------------------------------------------------
133
- # Search Route
134
- # --------------------------------------------------------
135
-
136
- @app.post("/search")
137
- def search(payload: SearchPayload):
138
- """
139
- Semantic search using pgvector + MiniLM embeddings.
140
- Results are filtered by tenant_id in the database query.
141
- """
142
- try:
143
- # Validate tenant_id is provided
144
- if not payload.tenant_id or not payload.tenant_id.strip():
145
- raise HTTPException(status_code=400, detail="tenant_id is required")
146
-
147
- query_embedding = embed_text(payload.query)
148
- # search_vectors filters by tenant_id in the SQL query
149
- results = search_vectors(payload.tenant_id.strip(), query_embedding, limit=10)
150
-
151
- # Log for debugging (remove in production)
152
- print(f"[RAG Search] tenant_id={payload.tenant_id}, query={payload.query[:50]}, results_count={len(results)}")
153
-
154
- return {
155
- "tenant_id": payload.tenant_id,
156
- "query": payload.query,
157
- "results": results
158
- }
159
-
160
- except HTTPException:
161
- raise
162
- except Exception as e:
163
- print(f"[RAG Search Error] tenant_id={payload.tenant_id}, error={str(e)}")
164
- raise HTTPException(status_code=500, detail=str(e))
165
-
166
-
167
- # --------------------------------------------------------
168
- # List All Documents Route
169
- # --------------------------------------------------------
170
-
171
- @app.get("/list")
172
- def list_documents(tenant_id: str, limit: int = 1000, offset: int = 0):
173
- """
174
- List all documents for a tenant with pagination.
175
- """
176
- try:
177
- result = list_all_documents(tenant_id, limit=limit, offset=offset)
178
- return result
179
- except Exception as e:
180
- raise HTTPException(status_code=500, detail=str(e))
181
-
182
-
183
- # --------------------------------------------------------
184
- # Delete Document Route
185
- # --------------------------------------------------------
186
-
187
- @app.delete("/delete/{document_id}")
188
- def delete_doc(document_id: int, tenant_id: str):
189
- """
190
- Delete a specific document by ID for a tenant.
191
- """
192
- try:
193
- deleted = delete_document(tenant_id, document_id)
194
- if not deleted:
195
- raise HTTPException(status_code=404, detail="Document not found or access denied")
196
-
197
- return {
198
- "status": "ok",
199
- "tenant_id": tenant_id,
200
- "document_id": document_id,
201
- "message": "Document deleted successfully"
202
- }
203
- except HTTPException:
204
- raise
205
- except Exception as e:
206
- raise HTTPException(status_code=500, detail=str(e))
207
-
208
-
209
- @app.delete("/delete-all")
210
- def delete_all_docs(tenant_id: str):
211
- """
212
- Delete all documents for a tenant.
213
- """
214
- try:
215
- deleted_count = delete_all_documents(tenant_id)
216
- return {
217
- "status": "ok",
218
- "tenant_id": tenant_id,
219
- "deleted_count": deleted_count,
220
- "message": f"Deleted {deleted_count} document(s)"
221
- }
222
- except Exception as e:
223
- raise HTTPException(status_code=500, detail=str(e))
224
-
225
-
226
- # --------------------------------------------------------
227
- # Allow "python main.py" to start server
228
- # --------------------------------------------------------
229
-
230
- if __name__ == "__main__":
231
- import uvicorn
232
-
233
- print("Starting RAG MCP Server on http://0.0.0.0:8001")
234
- print("API Documentation: http://localhost:8001/docs")
235
- print("Note: Reload mode disabled when running directly")
236
-
237
- # Run the app directly (reload doesn't work with app object)
238
- uvicorn.run(
239
- app, # Pass the app object directly
240
- host="0.0.0.0",
241
- port=8001,
242
- reload=False # Reload requires module path, not app object
243
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
backend/mcp_servers/models/__init__.py DELETED
@@ -1,18 +0,0 @@
1
- """
2
- MCP Server Models Package
3
-
4
- This package contains all Pydantic models used across MCP servers.
5
- """
6
-
7
- from .admin import EvalRequest, AlertPayload
8
- from .rag import IngestRequest, SearchRequest
9
- from .web import WebSearchRequest
10
-
11
- __all__ = [
12
- "EvalRequest",
13
- "AlertPayload",
14
- "IngestRequest",
15
- "SearchRequest",
16
- "WebSearchRequest",
17
- ]
18
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
backend/mcp_servers/models/admin.py DELETED
@@ -1,14 +0,0 @@
1
- from pydantic import BaseModel
2
- from typing import Optional
3
-
4
-
5
- class EvalRequest(BaseModel):
6
- tenant_id: str
7
- query: str
8
-
9
-
10
- class AlertPayload(BaseModel):
11
- tenant_id: str
12
- violations: list
13
- source: Optional[dict] = None
14
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
backend/mcp_servers/models/rag.py DELETED
@@ -1,12 +0,0 @@
1
- from pydantic import BaseModel
2
-
3
-
4
- class IngestRequest(BaseModel):
5
- tenant_id: str
6
- content: str
7
-
8
-
9
- class SearchRequest(BaseModel):
10
- tenant_id: str
11
- query: str
12
-
 
 
 
 
 
 
 
 
 
 
 
 
 
backend/mcp_servers/models/web.py DELETED
@@ -1,7 +0,0 @@
1
- from pydantic import BaseModel
2
-
3
-
4
- class WebSearchRequest(BaseModel):
5
- tenant_id: str
6
- query: str
7
-
 
 
 
 
 
 
 
 
backend/mcp_servers/placeholder.txt DELETED
@@ -1,4 +0,0 @@
1
- This directory contains the MCP server implementations.
2
- For the Hugging Face Space submission, only placeholder files are included.
3
- The full MCP server code exists separately.
4
-
 
 
 
 
 
backend/mcp_servers/rag_server.py DELETED
@@ -1,84 +0,0 @@
1
- # =============================================================
2
- # File: backend/mcp_servers/rag_server.py
3
- # =============================================================
4
-
5
- from fastapi import FastAPI
6
- from fastapi.middleware.cors import CORSMiddleware
7
- import sys
8
- import os
9
-
10
- # Fix Python module paths
11
- current_dir = os.path.dirname(__file__)
12
- sys.path.insert(0, current_dir)
13
-
14
- from typing import Any, Dict, List
15
-
16
- from embeddings import embed_text
17
- from database import insert_document_chunks, search_vectors
18
- from models.rag import IngestRequest, SearchRequest
19
-
20
-
21
- rag_app = FastAPI(title="RAG MCP Server")
22
-
23
- # Enable CORS
24
- rag_app.add_middleware(
25
- CORSMiddleware,
26
- allow_origins=["*"],
27
- allow_credentials=True,
28
- allow_methods=["*"],
29
- allow_headers=["*"],
30
- )
31
-
32
-
33
- # Wrapper functions to match expected interface
34
- def db_insert(tenant_id: str, content: str, vector: list):
35
- """Wrapper for insert_document_chunks to match expected interface."""
36
- return insert_document_chunks(tenant_id, content, vector)
37
-
38
-
39
- def db_search(tenant_id: str, vector: list, limit: int = 5):
40
- """Wrapper for search_vectors to match expected interface."""
41
- results = search_vectors(tenant_id, vector, limit)
42
- # search_vectors returns list of dicts with "text" and "similarity"
43
- # Preserve the structure and use similarity as relevance
44
- return [
45
- {
46
- "text": result.get("text", ""),
47
- "relevance": result.get("similarity", 0.0)
48
- }
49
- for result in results
50
- ]
51
-
52
-
53
- @rag_app.post("/ingest")
54
- async def ingest(req: IngestRequest):
55
- vector = embed_text(req.content)
56
- db_insert(req.tenant_id, req.content, vector)
57
- return {"status": "ok"}
58
-
59
-
60
- @rag_app.post("/search")
61
- async def search(req: SearchRequest):
62
- """
63
- Search documents for a specific tenant.
64
- Results are already filtered by tenant_id in the database query.
65
- """
66
- vector = embed_text(req.query)
67
- # db_search already filters by tenant_id and returns results sorted by similarity
68
- results = db_search(req.tenant_id, vector, limit=10) # Get more results for filtering
69
- # Filter by relevance threshold and limit to top 3
70
- filtered = [chunk for chunk in results if chunk.get("relevance", 0.0) >= 0.55][:3]
71
- return {
72
- "results": filtered,
73
- "metadata": {
74
- "total_retrieved": len(results),
75
- "returned": len(filtered),
76
- "threshold": 0.55
77
- }
78
- }
79
-
80
-
81
- if __name__ == "__main__":
82
- import uvicorn
83
- uvicorn.run(rag_app, host="0.0.0.0", port=8001)
84
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
backend/mcp_servers/web_server.py DELETED
@@ -1,71 +0,0 @@
1
- # =============================================================
2
- # File: backend/mcp_servers/web_server.py
3
- # =============================================================
4
-
5
- from fastapi import FastAPI
6
- from fastapi.middleware.cors import CORSMiddleware
7
- from duckduckgo_search import DDGS
8
- import sys
9
- import os
10
-
11
- # Fix Python module paths
12
- current_dir = os.path.dirname(__file__)
13
- sys.path.insert(0, current_dir)
14
-
15
- from models.web import WebSearchRequest
16
-
17
- web_app = FastAPI(title="Web Search MCP Server")
18
-
19
- # Enable CORS
20
- web_app.add_middleware(
21
- CORSMiddleware,
22
- allow_origins=["*"],
23
- allow_credentials=True,
24
- allow_methods=["*"],
25
- allow_headers=["*"],
26
- )
27
-
28
-
29
- @web_app.post("/search")
30
- async def web_search(req: WebSearchRequest):
31
- """
32
- Web search endpoint using DuckDuckGo.
33
- Accepts tenant_id for multi-tenant support (currently not used but kept for API consistency).
34
- Forces English language results by using region parameter and query modification.
35
- """
36
- try:
37
- ddg = DDGS()
38
-
39
- # Modify query to prefer English results
40
- # Add language hint to help get English content
41
- query = req.query
42
- # Only add language hint if not already present
43
- if "lang:en" not in query.lower() and "site:en" not in query.lower():
44
- query = f"{query} lang:en"
45
-
46
- # Try to use region parameter for English results
47
- # Common region codes: 'us-en' for US English, 'uk-en' for UK English
48
- try:
49
- results = ddg.text(query, max_results=5, region='us-en')
50
- except (TypeError, KeyError):
51
- # If region parameter not supported, try without it
52
- # The lang:en in query should still help
53
- results = ddg.text(query, max_results=5)
54
-
55
- formatted = []
56
- for r in results:
57
- formatted.append({
58
- "title": r.get("title"),
59
- "snippet": r.get("body"),
60
- "url": r.get("href"),
61
- })
62
-
63
- return {"results": formatted}
64
-
65
- except Exception as e:
66
- return {"error": str(e), "results": []}
67
-
68
-
69
- if __name__ == "__main__":
70
- import uvicorn
71
- uvicorn.run(web_app, host="0.0.0.0", port=8002)