nothingworry commited on
Commit
484cae8
Β·
1 Parent(s): 9c03abd

Fix role propagation in ingestion pipeline and improve error handling

Browse files
app.py CHANGED
@@ -2,9 +2,12 @@ import gradio as gr
2
  import requests
3
  import json
4
  import os
 
5
  from pathlib import Path
6
  from collections import Counter
7
  from datetime import datetime
 
 
8
 
9
  try:
10
  import plotly.graph_objects as go
@@ -334,9 +337,21 @@ def ingest_document(
334
  doc_id: str,
335
  metadata_json: str
336
  ):
 
 
 
 
 
 
337
  if not tenant_id or not tenant_id.strip():
338
  return "❗ Tenant ID is required to ingest documents."
339
 
 
 
 
 
 
 
340
  if not can_ingest_documents(role):
341
  return "❌ Access Denied: You need Editor, Admin, or Owner role to ingest documents."
342
 
@@ -373,10 +388,14 @@ def ingest_document(
373
  }
374
 
375
  try:
 
 
 
 
376
  headers = {
377
  "Content-Type": "application/json",
378
  "x-tenant-id": tenant_id,
379
- "x-user-role": role if role else DEFAULT_ROLE
380
  }
381
  response = requests.post(
382
  f"{BACKEND_BASE_URL}/rag/ingest-document",
@@ -413,6 +432,14 @@ def ingest_document(
413
  message += f"- **Extraction Method:** {method}\n"
414
 
415
  return message
 
 
 
 
 
 
 
 
416
  return f"❌ Ingestion failed ({response.status_code}): {response.text}"
417
  except requests.exceptions.ConnectionError:
418
  return "❌ Could not reach the backend. Make sure the FastAPI server is running."
@@ -423,6 +450,9 @@ def ingest_document(
423
 
424
 
425
  def ingest_file(tenant_id: str, role: str, file_obj):
 
 
 
426
  if not tenant_id or not tenant_id.strip():
427
  return "❗ Tenant ID is required to ingest files."
428
  if file_obj is None:
@@ -1618,32 +1648,21 @@ with gr.Blocks(
1618
  box-shadow: 0 18px 60px rgba(15, 23, 42, 1);
1619
  }
1620
 
1621
- .chatbot .message {
1622
- border-radius: 16px;
1623
- padding: 10px 14px;
1624
- font-size: 0.95rem;
1625
- line-height: 1.6;
1626
- max-width: 80%;
1627
- }
1628
-
1629
  .chatbot .message.user {
1630
- margin-left: auto;
1631
  background: #0ea5e9;
1632
  color: #0b1020;
1633
- box-shadow: 0 12px 32px rgba(15, 23, 42, 0.9);
1634
  }
1635
 
1636
  .chatbot .message.bot {
1637
- margin-right: auto;
1638
  background: #020617;
1639
- border: 1px solid rgba(148, 163, 184, 0.8);
1640
  color: #e5e7eb;
1641
- box-shadow: 0 14px 40px rgba(15, 23, 42, 1);
1642
  }
1643
 
1644
  .chatbot .message.error {
1645
- background: linear-gradient(135deg, rgba(239, 68, 68, 0.16) 0%, rgba(127, 29, 29, 0.9) 100%);
1646
- border: 1px solid rgba(248, 113, 113, 0.9);
1647
  }
1648
  """
1649
  ) as demo:
@@ -1934,10 +1953,18 @@ with gr.Blocks(
1934
  doc_id_value,
1935
  metadata
1936
  ):
 
 
 
 
 
 
 
 
1937
  source_type = "raw_text" if mode == "Raw Text" else "url"
1938
  result = ingest_document(
1939
  tenant_id=tenant_id,
1940
- role=role,
1941
  source_type=source_type,
1942
  content=content,
1943
  document_url=doc_url,
 
2
  import requests
3
  import json
4
  import os
5
+ import sys
6
  from pathlib import Path
7
  from collections import Counter
8
  from datetime import datetime
9
+ from dotenv import load_dotenv
10
+ load_dotenv()
11
 
12
  try:
13
  import plotly.graph_objects as go
 
337
  doc_id: str,
338
  metadata_json: str
339
  ):
340
+ # Debug: Log the role value
341
+ print(f"πŸ” DEBUG: ingest_document received role='{role}' (type: {type(role)})", file=sys.stderr)
342
+
343
+ if not BACKEND_BASE_URL:
344
+ return "❌ Backend URL is not configured. Please set BACKEND_BASE_URL environment variable or ensure it defaults to http://localhost:8000"
345
+
346
  if not tenant_id or not tenant_id.strip():
347
  return "❗ Tenant ID is required to ingest documents."
348
 
349
+ # Ensure role is not None or empty
350
+ if not role or not role.strip():
351
+ role = DEFAULT_ROLE
352
+ print(f"⚠️ WARNING: Role was empty/None in ingest_document, defaulting to '{role}'", file=sys.stderr)
353
+ role = role.strip()
354
+
355
  if not can_ingest_documents(role):
356
  return "❌ Access Denied: You need Editor, Admin, or Owner role to ingest documents."
357
 
 
388
  }
389
 
390
  try:
391
+ # Ensure role is set correctly for the header
392
+ final_role = role.strip() if role and role.strip() else DEFAULT_ROLE
393
+ print(f"πŸ” DEBUG: Sending request with role='{final_role}' in x-user-role header", file=sys.stderr)
394
+
395
  headers = {
396
  "Content-Type": "application/json",
397
  "x-tenant-id": tenant_id,
398
+ "x-user-role": final_role
399
  }
400
  response = requests.post(
401
  f"{BACKEND_BASE_URL}/rag/ingest-document",
 
432
  message += f"- **Extraction Method:** {method}\n"
433
 
434
  return message
435
+ elif response.status_code == 403:
436
+ # Permission denied - show clear message
437
+ try:
438
+ error_data = response.json()
439
+ error_detail = error_data.get('detail', response.text)
440
+ except:
441
+ error_detail = response.text
442
+ return f"πŸ”’ **Permission Denied (403):**\n\n{error_detail}\n\n**Solution:** Change your **User Role** dropdown (top right) from 'viewer' to 'editor', 'admin', or 'owner' and try again."
443
  return f"❌ Ingestion failed ({response.status_code}): {response.text}"
444
  except requests.exceptions.ConnectionError:
445
  return "❌ Could not reach the backend. Make sure the FastAPI server is running."
 
450
 
451
 
452
  def ingest_file(tenant_id: str, role: str, file_obj):
453
+ if not BACKEND_BASE_URL:
454
+ return "❌ Backend URL is not configured. Please set BACKEND_BASE_URL environment variable or ensure it defaults to http://localhost:8000"
455
+
456
  if not tenant_id or not tenant_id.strip():
457
  return "❗ Tenant ID is required to ingest files."
458
  if file_obj is None:
 
1648
  box-shadow: 0 18px 60px rgba(15, 23, 42, 1);
1649
  }
1650
 
1651
+ /* Keep Gradio's default layout, only adjust colors lightly */
 
 
 
 
 
 
 
1652
  .chatbot .message.user {
 
1653
  background: #0ea5e9;
1654
  color: #0b1020;
 
1655
  }
1656
 
1657
  .chatbot .message.bot {
 
1658
  background: #020617;
1659
+ border-color: rgba(148, 163, 184, 0.8);
1660
  color: #e5e7eb;
 
1661
  }
1662
 
1663
  .chatbot .message.error {
1664
+ background: rgba(239, 68, 68, 0.18);
1665
+ border-color: rgba(248, 113, 113, 0.9);
1666
  }
1667
  """
1668
  ) as demo:
 
1953
  doc_id_value,
1954
  metadata
1955
  ):
1956
+ # Debug: Log the role value received
1957
+ print(f"πŸ” DEBUG: handle_ingest_document received role='{role}' (type: {type(role)})", file=sys.stderr)
1958
+
1959
+ # Ensure role is not None or empty
1960
+ if not role or role.strip() == "":
1961
+ role = DEFAULT_ROLE
1962
+ print(f"⚠️ WARNING: Role was empty/None, defaulting to '{role}'", file=sys.stderr)
1963
+
1964
  source_type = "raw_text" if mode == "Raw Text" else "url"
1965
  result = ingest_document(
1966
  tenant_id=tenant_id,
1967
+ role=role.strip() if role else DEFAULT_ROLE,
1968
  source_type=source_type,
1969
  content=content,
1970
  document_url=doc_url,
backend/api/mcp_clients/rag_client.py CHANGED
@@ -64,11 +64,19 @@ class RAGClient:
64
  content: str,
65
  tenant_id: str,
66
  metadata: Optional[Dict[str, Any]] = None,
67
- doc_id: Optional[str] = None
 
68
  ):
69
  """
70
  Sends content to the RAG server for ingestion with metadata.
71
  Returns the unwrapped data from the MCP server response.
 
 
 
 
 
 
 
72
  """
73
 
74
  try:
@@ -78,6 +86,10 @@ class RAGClient:
78
  "content": content
79
  }
80
 
 
 
 
 
81
  # Add metadata if provided
82
  if metadata:
83
  payload["metadata"] = metadata
@@ -90,7 +102,14 @@ class RAGClient:
90
  )
91
 
92
  if response.status_code != 200:
93
- return {"error": f"HTTP {response.status_code}"}
 
 
 
 
 
 
 
94
 
95
  data = response.json()
96
 
@@ -106,9 +125,23 @@ class RAGClient:
106
  # If not wrapped, return as-is (backward compatibility)
107
  return data
108
 
 
 
 
 
 
 
 
 
 
 
109
  except Exception as e:
110
- print("RAG Ingest Error:", e)
111
- return {"error": str(e)}
 
 
 
 
112
 
113
  async def list_documents(self, tenant_id: str, limit: int = 1000, offset: int = 0):
114
  """
 
64
  content: str,
65
  tenant_id: str,
66
  metadata: Optional[Dict[str, Any]] = None,
67
+ doc_id: Optional[str] = None,
68
+ user_role: Optional[str] = None
69
  ):
70
  """
71
  Sends content to the RAG server for ingestion with metadata.
72
  Returns the unwrapped data from the MCP server response.
73
+
74
+ Args:
75
+ content: Text content to ingest
76
+ tenant_id: Tenant identifier
77
+ metadata: Optional metadata dictionary
78
+ doc_id: Optional document ID
79
+ user_role: User role (viewer, editor, admin, owner) - required for permission checks
80
  """
81
 
82
  try:
 
86
  "content": content
87
  }
88
 
89
+ # Add role to payload (MCP server expects it for permission checks)
90
+ if user_role:
91
+ payload["user_role"] = user_role
92
+
93
  # Add metadata if provided
94
  if metadata:
95
  payload["metadata"] = metadata
 
102
  )
103
 
104
  if response.status_code != 200:
105
+ error_text = response.text[:500] if hasattr(response, 'text') else f"HTTP {response.status_code}"
106
+ raise RuntimeError(
107
+ f"RAG server returned error {response.status_code}: {error_text}\n\n"
108
+ f"Please check:\n"
109
+ f"1. RAG MCP server is running at {self.base_url}\n"
110
+ f"2. Database connection (POSTGRESQL_URL) is configured\n"
111
+ f"3. The 'documents' table exists in the database"
112
+ )
113
 
114
  data = response.json()
115
 
 
125
  # If not wrapped, return as-is (backward compatibility)
126
  return data
127
 
128
+ except httpx.RequestError as e:
129
+ error_msg = f"Failed to connect to RAG server at {self.base_url}: {str(e)}"
130
+ print(f"❌ RAG Ingest Connection Error: {error_msg}")
131
+ raise RuntimeError(
132
+ f"{error_msg}\n\n"
133
+ f"Please check:\n"
134
+ f"1. RAG_MCP_URL is set correctly (current: {self.base_url})\n"
135
+ f"2. RAG MCP server is running\n"
136
+ f"3. Network connectivity to the server"
137
+ ) from e
138
  except Exception as e:
139
+ error_msg = f"RAG ingestion error: {str(e)}"
140
+ print(f"❌ {error_msg}")
141
+ raise RuntimeError(
142
+ f"{error_msg}\n\n"
143
+ f"Please check the RAG server logs for more details."
144
+ ) from e
145
 
146
  async def list_documents(self, tenant_id: str, limit: int = 1000, offset: int = 0):
147
  """
backend/api/routes/agent.py CHANGED
@@ -148,9 +148,59 @@ Response:"""
148
 
149
  # STEP 2: ONLY IF NO RULES MATCHED - Proceed with normal flow
150
  yield f"data: {json.dumps({'status': 'classifying', 'message': 'Understanding your question...'})}\n\n"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
151
  intent = await orchestrator.intent.classify(agent_req.message)
152
 
153
- # Pre-fetch RAG if needed
154
  rag_results = []
155
  if intent == "rag" or "rag" in intent.lower():
156
  yield f"data: {json.dumps({'status': 'searching', 'message': 'Searching knowledge base...'})}\n\n"
@@ -161,6 +211,16 @@ Response:"""
161
  except Exception:
162
  pass
163
 
 
 
 
 
 
 
 
 
 
 
164
  # Build prompt with context
165
  if rag_results:
166
  context = "\n\n".join([r.get("text", "")[:500] for r in rag_results[:3]])
 
148
 
149
  # STEP 2: ONLY IF NO RULES MATCHED - Proceed with normal flow
150
  yield f"data: {json.dumps({'status': 'classifying', 'message': 'Understanding your question...'})}\n\n"
151
+
152
+ # Check if this is an admin identity question - handle it specially
153
+ user_text = agent_req.message.lower().strip()
154
+ user_text_normalized = " ".join(user_text.split())
155
+ admin_phrases = [
156
+ "who is the admin",
157
+ "who's the admin",
158
+ "who is admin",
159
+ "who is the administrator",
160
+ "who administers this platform",
161
+ "who is the owner",
162
+ "who owns this platform",
163
+ "who is the admin of integrachat",
164
+ "who administers integrachat",
165
+ ]
166
+ is_admin_question = (
167
+ any(p in user_text_normalized for p in admin_phrases) or
168
+ ("who" in user_text and "admin" in user_text)
169
+ )
170
+
171
+ # For admin questions, ALWAYS check RAG first and answer directly from knowledge base
172
+ if is_admin_question:
173
+ yield f"data: {json.dumps({'status': 'searching', 'message': 'Searching knowledge base for admin information...'})}\n\n"
174
+ try:
175
+ rag_prefetch = await orchestrator.mcp.call_rag(agent_req.tenant_id, agent_req.message)
176
+ rag_results = []
177
+ if isinstance(rag_prefetch, dict):
178
+ rag_results = rag_prefetch.get("results") or rag_prefetch.get("hits") or []
179
+
180
+ # If we have RAG hits, return the answer directly from the knowledge base
181
+ if rag_results:
182
+ best_hit = rag_results[0]
183
+ admin_text = best_hit.get("text") or best_hit.get("content") or str(best_hit)
184
+ response_text = f"According to the tenant knowledge base, {admin_text.strip()}"
185
+ else:
186
+ response_text = "I don't know who administers this platform based on the tenant data."
187
+
188
+ # Stream the response word by word
189
+ yield f"data: {json.dumps({'status': 'streaming', 'message': ''})}\n\n"
190
+ import asyncio
191
+ words = response_text.split()
192
+ for word in words:
193
+ yield f"data: {json.dumps({'token': word + ' ', 'done': False})}\n\n"
194
+ await asyncio.sleep(0)
195
+ yield f"data: {json.dumps({'token': '', 'done': True})}\n\n"
196
+ return
197
+ except Exception as rag_err:
198
+ # If RAG fails, fall through to normal flow
199
+ pass
200
+
201
  intent = await orchestrator.intent.classify(agent_req.message)
202
 
203
+ # Pre-fetch RAG if needed (for non-admin questions)
204
  rag_results = []
205
  if intent == "rag" or "rag" in intent.lower():
206
  yield f"data: {json.dumps({'status': 'searching', 'message': 'Searching knowledge base...'})}\n\n"
 
211
  except Exception:
212
  pass
213
 
214
+ # Also check if we have prefetched RAG results from earlier (for all questions)
215
+ # This ensures RAG context is used even if intent isn't "rag"
216
+ if not rag_results:
217
+ try:
218
+ rag_prefetch = await orchestrator.mcp.call_rag(agent_req.tenant_id, agent_req.message)
219
+ if isinstance(rag_prefetch, dict):
220
+ rag_results = rag_prefetch.get("results") or rag_prefetch.get("hits") or []
221
+ except Exception:
222
+ pass
223
+
224
  # Build prompt with context
225
  if rag_results:
226
  context = "\n\n".join([r.get("text", "")[:500] for r in rag_results[:3]])
backend/api/routes/rag.py CHANGED
@@ -1,4 +1,4 @@
1
- from fastapi import APIRouter, Header, HTTPException, UploadFile, File, Form
2
  from pydantic import BaseModel
3
  from typing import Optional, Dict, Any
4
  from api.mcp_clients.rag_client import RAGClient
@@ -85,6 +85,7 @@ async def rag_ingest(
85
  @router.post("/ingest-document")
86
  async def rag_ingest_document(
87
  req: DocumentIngestRequest,
 
88
  x_tenant_id: Optional[str] = Header(None),
89
  x_user_role: str = Header("viewer")
90
  ):
@@ -114,26 +115,107 @@ async def rag_ingest_document(
114
  tenant_id = req.tenant_id or x_tenant_id
115
  if not tenant_id:
116
  raise HTTPException(status_code=400, detail="Missing tenant ID")
 
 
 
 
 
 
 
 
 
 
117
  require_api_permission(x_user_role, "ingest_documents")
118
 
 
 
 
 
 
 
 
 
 
 
119
  try:
 
120
  # Prepare ingestion payload (async for URL fetching)
121
- payload = await prepare_ingestion_payload(
122
- tenant_id=tenant_id,
123
- content=req.content,
124
- source_type=req.source_type,
125
- filename=req.metadata.get("filename") if req.metadata else None,
126
- url=req.metadata.get("url") if req.metadata else None,
127
- doc_id=req.metadata.get("doc_id") if req.metadata else None,
128
- metadata=req.metadata
129
- )
 
 
 
 
 
 
 
130
 
 
131
  # Process ingestion with metadata extraction
132
  extract_metadata = req.metadata.get("extract_metadata", True) if req.metadata else True
133
- result = await process_ingestion(payload, rag_client, extract_metadata=extract_metadata)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
134
 
135
  # Build response message
136
- message = f"Document ingested successfully. {result.get('chunks_stored', 0)} chunk(s) stored."
137
  if result.get("extracted_metadata"):
138
  metadata_info = result["extracted_metadata"]
139
  if metadata_info.get("title"):
@@ -146,10 +228,55 @@ async def rag_ingest_document(
146
  "message": message,
147
  **result
148
  }
 
 
 
149
  except ValueError as e:
150
- raise HTTPException(status_code=400, detail=str(e))
 
 
 
151
  except Exception as e:
152
- raise HTTPException(status_code=500, detail=str(e))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
153
 
154
 
155
  @router.post("/ingest-file")
 
1
+ from fastapi import APIRouter, Header, HTTPException, UploadFile, File, Form, Request
2
  from pydantic import BaseModel
3
  from typing import Optional, Dict, Any
4
  from api.mcp_clients.rag_client import RAGClient
 
85
  @router.post("/ingest-document")
86
  async def rag_ingest_document(
87
  req: DocumentIngestRequest,
88
+ request: Request,
89
  x_tenant_id: Optional[str] = Header(None),
90
  x_user_role: str = Header("viewer")
91
  ):
 
115
  tenant_id = req.tenant_id or x_tenant_id
116
  if not tenant_id:
117
  raise HTTPException(status_code=400, detail="Missing tenant ID")
118
+
119
+ import sys
120
+ # Debug: Check actual headers received
121
+ all_headers = dict(request.headers)
122
+ print(f"πŸ” DEBUG: All headers received: {list(all_headers.keys())}", file=sys.stderr)
123
+ print(f"πŸ” DEBUG: x-user-role header value: '{all_headers.get('x-user-role', 'NOT FOUND')}'", file=sys.stderr)
124
+ print(f"πŸ” DEBUG: x-user-role header value (case-insensitive): '{all_headers.get('X-User-Role', all_headers.get('x-user-role', 'NOT FOUND'))}'", file=sys.stderr)
125
+ print(f"πŸ” DEBUG: Backend received x_user_role parameter='{x_user_role}' (type: {type(x_user_role)})", file=sys.stderr)
126
+ print(f"πŸ” DEBUG: x_tenant_id header='{x_tenant_id}'", file=sys.stderr)
127
+
128
  require_api_permission(x_user_role, "ingest_documents")
129
 
130
+ content_length = len(req.content) if req.content else 0
131
+ print(f"πŸ“₯ Ingestion request received: tenant_id={tenant_id}, source_type={req.source_type}, content_length={content_length}", file=sys.stderr)
132
+
133
+ # Validate content is not too short
134
+ if not req.content or not req.content.strip():
135
+ raise HTTPException(status_code=400, detail="Content cannot be empty. Please provide text to ingest.")
136
+
137
+ if content_length < 10:
138
+ print(f"⚠️ Warning: Content is very short ({content_length} chars). This may result in no chunks being created.", file=sys.stderr)
139
+
140
  try:
141
+ print("πŸ”§ Step 1: Preparing ingestion payload...", file=sys.stderr)
142
  # Prepare ingestion payload (async for URL fetching)
143
+ try:
144
+ payload = await prepare_ingestion_payload(
145
+ tenant_id=tenant_id,
146
+ content=req.content,
147
+ source_type=req.source_type,
148
+ filename=req.metadata.get("filename") if req.metadata else None,
149
+ url=req.metadata.get("url") if req.metadata else None,
150
+ doc_id=req.metadata.get("doc_id") if req.metadata else None,
151
+ metadata=req.metadata
152
+ )
153
+ print(f"βœ… Step 1 complete: payload prepared", file=sys.stderr)
154
+ except Exception as prep_err:
155
+ print(f"❌ Step 1 FAILED (prepare_ingestion_payload): {prep_err}", file=sys.stderr)
156
+ import traceback
157
+ print(traceback.format_exc(), file=sys.stderr)
158
+ raise
159
 
160
+ print("πŸ”§ Step 2: Processing ingestion with RAG client...", file=sys.stderr)
161
  # Process ingestion with metadata extraction
162
  extract_metadata = req.metadata.get("extract_metadata", True) if req.metadata else True
163
+ try:
164
+ result = await process_ingestion(payload, rag_client, extract_metadata=extract_metadata, user_role=x_user_role)
165
+ print(f"βœ… Step 2 complete: chunks_stored={result.get('chunks_stored', 0) if isinstance(result, dict) else 'N/A'}", file=sys.stderr)
166
+ except HTTPException:
167
+ # Re-raise HTTP exceptions (like 403 permission errors) as-is
168
+ raise
169
+ except Exception as proc_err:
170
+ # Check if it's a permission error with status_code attribute
171
+ if hasattr(proc_err, 'status_code') and proc_err.status_code == 403:
172
+ raise HTTPException(status_code=403, detail=getattr(proc_err, 'detail', str(proc_err)))
173
+
174
+ print(f"❌ Step 2 FAILED (process_ingestion): {proc_err}", file=sys.stderr)
175
+ import traceback
176
+ print(traceback.format_exc(), file=sys.stderr)
177
+ raise
178
+
179
+ # Check if ingestion actually succeeded
180
+ # First check if the result itself indicates an error
181
+ if isinstance(result, dict) and result.get('status') == 'error':
182
+ error_msg = result.get('message') or result.get('error') or "Unknown error from RAG server"
183
+ error_type = result.get('error_type', 'unknown')
184
+ print(f"❌ RAG server returned error ({error_type}): {error_msg}", file=sys.stderr)
185
+
186
+ # If it's a permission error, return 403
187
+ if 'permission' in error_msg.lower() or 'not permitted' in error_msg.lower() or error_type == 'validation_error':
188
+ raise HTTPException(
189
+ status_code=403,
190
+ detail=f"Permission denied: {error_msg}\n\nPlease change your role to 'editor', 'admin', or 'owner' in the User Role dropdown."
191
+ )
192
+ else:
193
+ raise HTTPException(status_code=500, detail=f"RAG server error: {error_msg}")
194
+
195
+ chunks_stored = result.get('chunks_stored', 0)
196
+ print(f"πŸ” Debug: result keys={list(result.keys()) if isinstance(result, dict) else 'not a dict'}, chunks_stored={chunks_stored}", file=sys.stderr)
197
+
198
+ if chunks_stored == 0:
199
+ # Get more details about why no chunks were stored
200
+ error_detail = result.get('error') or result.get('warnings') or result.get('message') or "No chunks were stored"
201
+ warnings = result.get('warnings')
202
+
203
+ error_msg = f"Ingestion failed: {error_detail}"
204
+ if warnings:
205
+ error_msg += f"\nWarnings: {warnings}"
206
+ error_msg += (
207
+ "\n\nPossible causes:\n"
208
+ "1. Content too short or empty (minimum text required)\n"
209
+ "2. Database connection issue (check POSTGRESQL_URL in RAG server)\n"
210
+ "3. RAG MCP server error (check RAG server logs)\n"
211
+ "4. Database table 'documents' doesn't exist"
212
+ )
213
+
214
+ print(f"❌ No chunks stored. Error detail: {error_detail}", file=sys.stderr)
215
+ raise HTTPException(status_code=500, detail=error_msg)
216
 
217
  # Build response message
218
+ message = f"Document ingested successfully. {chunks_stored} chunk(s) stored."
219
  if result.get("extracted_metadata"):
220
  metadata_info = result["extracted_metadata"]
221
  if metadata_info.get("title"):
 
228
  "message": message,
229
  **result
230
  }
231
+ except HTTPException:
232
+ # Re-raise HTTP exceptions as-is
233
+ raise
234
  except ValueError as e:
235
+ import traceback
236
+ print(f"❌ Ingestion ValueError: {e}")
237
+ print(traceback.format_exc())
238
+ raise HTTPException(status_code=400, detail=f"Validation error: {str(e)}")
239
  except Exception as e:
240
+ import traceback
241
+ import sys
242
+ error_detail = str(e)
243
+ error_type = type(e).__name__
244
+ full_traceback = traceback.format_exc()
245
+
246
+ # Log to console with full details (use both stderr and stdout to ensure visibility)
247
+ error_log = f"❌ Ingestion Error ({error_type}): {error_detail}\nFull traceback:\n{full_traceback}"
248
+ print(error_log, file=sys.stderr)
249
+ print(error_log) # Also print to stdout for uvicorn logs
250
+
251
+ # Provide helpful error message
252
+ if "POSTGRESQL_URL" in error_detail or "database" in error_detail.lower() or "connection" in error_detail.lower():
253
+ error_msg = (
254
+ f"Database connection error: {error_detail}\n\n"
255
+ f"Please check:\n"
256
+ f"1. POSTGRESQL_URL is set correctly in your .env file\n"
257
+ f"2. Database is accessible\n"
258
+ f"3. The 'documents' table exists (run initialize_database() if needed)"
259
+ )
260
+ elif "RAG" in error_detail or "rag" in error_detail.lower() or "mcp" in error_detail.lower():
261
+ error_msg = (
262
+ f"RAG server error: {error_detail}\n\n"
263
+ f"Please check:\n"
264
+ f"1. RAG_MCP_URL is set correctly (default: http://localhost:8001)\n"
265
+ f"2. RAG MCP server is running\n"
266
+ f"3. Database connection (POSTGRESQL_URL) is configured in the RAG server"
267
+ )
268
+ else:
269
+ # For unknown errors, include the full error message
270
+ error_msg = f"Ingestion failed ({error_type}): {error_detail}"
271
+ # If it's a long traceback, include just the first few lines
272
+ if len(error_detail) > 500:
273
+ error_msg = f"Ingestion failed ({error_type}): {error_detail[:500]}...\n\nSee server logs for full traceback."
274
+
275
+ # Ensure error message is not too long for HTTP response
276
+ if len(error_msg) > 2000:
277
+ error_msg = error_msg[:2000] + "...\n\n(Error message truncated. See server logs for full details.)"
278
+
279
+ raise HTTPException(status_code=500, detail=error_msg)
280
 
281
 
282
  @router.post("/ingest-file")
backend/api/services/agent_orchestrator.py CHANGED
@@ -610,12 +610,157 @@ Response:"""
610
  return AgentResponse(text=json.dumps(admin_resp), decision=decision, tool_traces=tool_traces, reasoning_trace=reasoning_trace)
611
 
612
  if decision.tool == "llm":
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
613
  llm_start = time.time()
614
- llm_out = await self.llm.simple_call(req.message, temperature=req.temperature)
615
  llm_latency_ms = int((time.time() - llm_start) * 1000)
616
  tools_used.append("llm")
617
 
618
- estimated_tokens = len(llm_out) // 4 + len(req.message) // 4
619
  total_tokens += estimated_tokens
620
 
621
  self._analytics_log_tool_usage(
@@ -1046,7 +1191,73 @@ Response:"""
1046
  # Build comprehensive prompt with all collected data
1047
  data_section = "\n---\n".join(collected_data) if collected_data else ""
1048
 
1049
- # Build final prompt
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1050
  if data_section:
1051
  prompt = (
1052
  f"You are an assistant helping tenant {req.tenant_id}.\n\n"
@@ -1061,7 +1272,6 @@ Response:"""
1061
  f"and practical steps whenever possible. If the information is incomplete, explain "
1062
  f"what can and cannot be concluded from the available data."
1063
  )
1064
-
1065
  else:
1066
  # No data collected, just answer the question
1067
  prompt = req.message
@@ -1072,10 +1282,10 @@ Response:"""
1072
  llm_out = await self.llm.simple_call(prompt, temperature=req.temperature)
1073
  llm_latency_ms = int((time.time() - llm_start) * 1000)
1074
  tools_used.append("llm")
1075
-
1076
  estimated_tokens = len(llm_out) // 4 + len(prompt) // 4
1077
  total_tokens += estimated_tokens
1078
-
1079
  self._analytics_log_tool_usage(
1080
  tenant_id=req.tenant_id,
1081
  tool_name="llm",
@@ -1084,7 +1294,7 @@ Response:"""
1084
  success=True,
1085
  user_id=req.user_id
1086
  )
1087
-
1088
  total_latency_ms = int((time.time() - start_time) * 1000)
1089
  self._analytics_log_agent_query(
1090
  tenant_id=req.tenant_id,
@@ -1096,7 +1306,7 @@ Response:"""
1096
  success=True,
1097
  user_id=req.user_id
1098
  )
1099
-
1100
  return AgentResponse(
1101
  text=llm_out,
1102
  decision=decision,
 
610
  return AgentResponse(text=json.dumps(admin_resp), decision=decision, tool_traces=tool_traces, reasoning_trace=reasoning_trace)
611
 
612
  if decision.tool == "llm":
613
+ # If the user is asking who the admin / owner is, try to ground the
614
+ # answer in tenant-specific RAG before falling back to a generic LLM reply.
615
+ user_text = req.message.lower()
616
+ # Normalize whitespace to make matching more robust
617
+ user_text_normalized = " ".join(user_text.split())
618
+ admin_phrases = [
619
+ "who is the admin",
620
+ "who's the admin",
621
+ "who is admin",
622
+ "who is the administrator",
623
+ "who's the administrator",
624
+ "who administers this platform",
625
+ "who administers the platform",
626
+ "who is the owner",
627
+ "who's the owner",
628
+ "who owns this platform",
629
+ "who owns the platform",
630
+ "who is the admin of integrachat",
631
+ "who's the admin of integrachat",
632
+ ]
633
+ use_rag_for_admin = any(p in user_text_normalized for p in admin_phrases) or (
634
+ "admin" in user_text and "who" in user_text
635
+ )
636
+
637
+ prompt_for_llm = req.message
638
+
639
+ if use_rag_for_admin:
640
+ try:
641
+ rag_start = time.time()
642
+ rag_resp = await self.rag_with_repair(
643
+ query=req.message,
644
+ tenant_id=req.tenant_id,
645
+ original_threshold=0.2,
646
+ reasoning_trace=reasoning_trace,
647
+ user_id=req.user_id,
648
+ )
649
+ rag_latency_ms = int((time.time() - rag_start) * 1000)
650
+ tools_used.append("rag")
651
+
652
+ rag_formatted = self._format_tool_output("rag", rag_resp, rag_latency_ms)
653
+ tool_traces.append({"tool": "rag", "response": rag_formatted})
654
+
655
+ hits = self._extract_hits(rag_formatted)
656
+ hits_count = len(hits)
657
+ avg_score = rag_formatted.get("avg_score")
658
+ top_score = rag_formatted.get("top_score")
659
+
660
+ self._analytics_log_tool_usage(
661
+ tenant_id=req.tenant_id,
662
+ tool_name="rag",
663
+ latency_ms=rag_latency_ms,
664
+ success=True,
665
+ user_id=req.user_id,
666
+ )
667
+
668
+ reasoning_trace.append(
669
+ {
670
+ "step": "tool_execution",
671
+ "tool": "rag",
672
+ "hit_count": hits_count,
673
+ "top_score": top_score,
674
+ "avg_score": avg_score,
675
+ "summary": self._summarize_hits(rag_formatted, limit=2),
676
+ "note": "admin_identity_override",
677
+ }
678
+ )
679
+
680
+ # For admin questions, answer directly from RAG and avoid any
681
+ # generic LLM behaviour. If there is at least one hit, return
682
+ # that snippet; otherwise return an explicit "don't know".
683
+ if hits:
684
+ best = hits[0]
685
+ admin_text = best.get("text") or best.get("content") or str(best)
686
+ llm_out = f"According to the tenant knowledge base, {admin_text.strip()}"
687
+ else:
688
+ llm_out = "I don't know who administers this platform based on the tenant data."
689
+
690
+ llm_latency_ms = 0
691
+ estimated_tokens = len(llm_out) // 4 + len(req.message) // 4
692
+ total_tokens += estimated_tokens
693
+
694
+ self._analytics_log_tool_usage(
695
+ tenant_id=req.tenant_id,
696
+ tool_name="llm",
697
+ latency_ms=llm_latency_ms,
698
+ tokens_used=estimated_tokens,
699
+ success=True,
700
+ user_id=req.user_id,
701
+ )
702
+
703
+ reasoning_trace.append(
704
+ {
705
+ "step": "llm_response",
706
+ "mode": "admin_from_rag_only",
707
+ "latency_ms": llm_latency_ms,
708
+ "estimated_tokens": estimated_tokens,
709
+ }
710
+ )
711
+
712
+ total_latency_ms = int((time.time() - start_time) * 1000)
713
+ self._analytics_log_agent_query(
714
+ tenant_id=req.tenant_id,
715
+ message_preview=req.message[:200],
716
+ intent=intent,
717
+ tools_used=tools_used,
718
+ total_tokens=total_tokens,
719
+ total_latency_ms=total_latency_ms,
720
+ success=True,
721
+ user_id=req.user_id,
722
+ )
723
+
724
+ return AgentResponse(text=llm_out, decision=decision, reasoning_trace=reasoning_trace)
725
+
726
+ except Exception as rag_err:
727
+ reasoning_trace.append(
728
+ {
729
+ "step": "rag_for_admin_fallback",
730
+ "status": "error",
731
+ "error": str(rag_err),
732
+ }
733
+ )
734
+
735
+ # For all other questions, if we already have RAG hits from pgvector
736
+ # (rag_results from the prefetch step), reuse them to ground the
737
+ # LLM response instead of answering purely from the model.
738
+ if not use_rag_for_admin and rag_results:
739
+ try:
740
+ rag_prefetched_dict: Dict[str, Any] = {"results": rag_results}
741
+ prompt_for_llm = self._build_prompt_with_rag(req, rag_prefetched_dict)
742
+ reasoning_trace.append(
743
+ {
744
+ "step": "rag_context_for_llm",
745
+ "hit_count": len(rag_results),
746
+ "note": "used_prefetched_pgvector_hits",
747
+ }
748
+ )
749
+ except Exception as build_err:
750
+ reasoning_trace.append(
751
+ {
752
+ "step": "rag_context_for_llm",
753
+ "status": "error",
754
+ "error": str(build_err),
755
+ }
756
+ )
757
+
758
  llm_start = time.time()
759
+ llm_out = await self.llm.simple_call(prompt_for_llm, temperature=req.temperature)
760
  llm_latency_ms = int((time.time() - llm_start) * 1000)
761
  tools_used.append("llm")
762
 
763
+ estimated_tokens = len(llm_out) // 4 + len(prompt_for_llm) // 4
764
  total_tokens += estimated_tokens
765
 
766
  self._analytics_log_tool_usage(
 
1191
  # Build comprehensive prompt with all collected data
1192
  data_section = "\n---\n".join(collected_data) if collected_data else ""
1193
 
1194
+ # Build final response. For admin-identity style questions, bypass generic
1195
+ # multi-step LLM behaviour and answer directly from RAG data if available.
1196
+ user_text = req.message.lower()
1197
+ user_text_normalized = " ".join(user_text.split())
1198
+ admin_phrases = [
1199
+ "who is the admin",
1200
+ "who's the admin",
1201
+ "who is admin",
1202
+ "who is the administrator",
1203
+ "who's the administrator",
1204
+ "who administers this platform",
1205
+ "who administers the platform",
1206
+ "who is the owner",
1207
+ "who's the owner",
1208
+ "who owns this platform",
1209
+ "who owns the platform",
1210
+ "who is the admin of integrachat",
1211
+ "who's the admin of integrachat",
1212
+ ]
1213
+ if any(p in user_text_normalized for p in admin_phrases) or ("admin" in user_text and "who" in user_text):
1214
+ hits = self._extract_hits(rag_data) if rag_data else []
1215
+ if hits:
1216
+ best = hits[0]
1217
+ admin_text = best.get("text") or best.get("content") or str(best)
1218
+ llm_out = f"According to the tenant knowledge base, {admin_text.strip()}"
1219
+ else:
1220
+ llm_out = "I don't know who administers this platform based on the tenant data."
1221
+
1222
+ llm_latency_ms = 0
1223
+ estimated_tokens = len(llm_out) // 4 + len(req.message) // 4
1224
+ total_tokens += estimated_tokens
1225
+ tools_used.append("llm")
1226
+
1227
+ self._analytics_log_tool_usage(
1228
+ tenant_id=req.tenant_id,
1229
+ tool_name="llm",
1230
+ latency_ms=llm_latency_ms,
1231
+ tokens_used=estimated_tokens,
1232
+ success=True,
1233
+ user_id=req.user_id
1234
+ )
1235
+
1236
+ total_latency_ms = int((time.time() - start_time) * 1000)
1237
+ self._analytics_log_agent_query(
1238
+ tenant_id=req.tenant_id,
1239
+ message_preview=req.message[:200],
1240
+ intent="multi_step",
1241
+ tools_used=tools_used,
1242
+ total_tokens=total_tokens,
1243
+ total_latency_ms=total_latency_ms,
1244
+ success=True,
1245
+ user_id=req.user_id
1246
+ )
1247
+
1248
+ return AgentResponse(
1249
+ text=llm_out,
1250
+ decision=decision,
1251
+ tool_traces=tool_traces,
1252
+ reasoning_trace=reasoning_trace + [{
1253
+ "step": "llm_response",
1254
+ "mode": "multi_step_admin_from_rag_only",
1255
+ "latency_ms": llm_latency_ms,
1256
+ "estimated_tokens": estimated_tokens
1257
+ }]
1258
+ )
1259
+
1260
+ # Otherwise, build the normal multi-step synthesis prompt.
1261
  if data_section:
1262
  prompt = (
1263
  f"You are an assistant helping tenant {req.tenant_id}.\n\n"
 
1272
  f"and practical steps whenever possible. If the information is incomplete, explain "
1273
  f"what can and cannot be concluded from the available data."
1274
  )
 
1275
  else:
1276
  # No data collected, just answer the question
1277
  prompt = req.message
 
1282
  llm_out = await self.llm.simple_call(prompt, temperature=req.temperature)
1283
  llm_latency_ms = int((time.time() - llm_start) * 1000)
1284
  tools_used.append("llm")
1285
+
1286
  estimated_tokens = len(llm_out) // 4 + len(prompt) // 4
1287
  total_tokens += estimated_tokens
1288
+
1289
  self._analytics_log_tool_usage(
1290
  tenant_id=req.tenant_id,
1291
  tool_name="llm",
 
1294
  success=True,
1295
  user_id=req.user_id
1296
  )
1297
+
1298
  total_latency_ms = int((time.time() - start_time) * 1000)
1299
  self._analytics_log_agent_query(
1300
  tenant_id=req.tenant_id,
 
1306
  success=True,
1307
  user_id=req.user_id
1308
  )
1309
+
1310
  return AgentResponse(
1311
  text=llm_out,
1312
  decision=decision,
backend/api/services/document_ingestion.py CHANGED
@@ -217,7 +217,8 @@ async def prepare_ingestion_payload(
217
  async def process_ingestion(
218
  payload: Dict[str, Any],
219
  rag_client,
220
- extract_metadata: bool = True
 
221
  ) -> Dict[str, Any]:
222
  """
223
  Process the ingestion payload by sending it to the RAG MCP server.
@@ -260,22 +261,71 @@ async def process_ingestion(
260
  }
261
 
262
  # Send to RAG MCP server with metadata
263
- result = await rag_client.ingest_with_metadata(
264
- content=content,
265
- tenant_id=tenant_id,
266
- metadata=final_metadata,
267
- doc_id=doc_id
268
- )
269
-
270
- # Enhance result with metadata
271
- return {
272
- "status": "ok",
273
- "tenant_id": tenant_id,
274
- "source_type": source_type,
275
- "doc_id": doc_id,
276
- "chunks_stored": result.get("chunks_stored", 0),
277
- "metadata": final_metadata,
278
- "extracted_metadata": extracted_metadata, # Include extracted metadata in response
279
- **result
280
- }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
281
 
 
217
  async def process_ingestion(
218
  payload: Dict[str, Any],
219
  rag_client,
220
+ extract_metadata: bool = True,
221
+ user_role: Optional[str] = None
222
  ) -> Dict[str, Any]:
223
  """
224
  Process the ingestion payload by sending it to the RAG MCP server.
 
261
  }
262
 
263
  # Send to RAG MCP server with metadata
264
+ try:
265
+ result = await rag_client.ingest_with_metadata(
266
+ content=content,
267
+ tenant_id=tenant_id,
268
+ metadata=final_metadata,
269
+ doc_id=doc_id,
270
+ user_role=user_role
271
+ )
272
+
273
+ # Check if result indicates an error (multiple ways the RAG server can signal errors)
274
+ if isinstance(result, dict):
275
+ # Check for explicit error status
276
+ if result.get("status") == "error":
277
+ error_msg = result.get("message") or result.get("error") or "Unknown error from RAG server"
278
+ error_type = result.get("error_type", "unknown_error")
279
+ logger.error(f"RAG ingestion error ({error_type}): {error_msg}")
280
+
281
+ # For permission errors, raise a specific exception that can be caught and converted to HTTPException
282
+ if "permission" in error_msg.lower() or "not permitted" in error_msg.lower() or error_type == "validation_error":
283
+ # Create a custom exception that will be caught and converted to HTTPException
284
+ class PermissionError(Exception):
285
+ pass
286
+ perm_err = PermissionError(f"Permission denied: {error_msg}")
287
+ perm_err.status_code = 403
288
+ perm_err.detail = f"Permission denied: {error_msg}\n\nPlease change your role to 'editor', 'admin', or 'owner' in the User Role dropdown in app.py."
289
+ raise perm_err
290
+
291
+ raise ValueError(f"RAG server error ({error_type}): {error_msg}")
292
+
293
+ # Check for error field
294
+ if "error" in result:
295
+ error_msg = result.get("error", "Unknown error from RAG server")
296
+ logger.error(f"RAG ingestion error: {error_msg}")
297
+ raise ValueError(f"RAG server error: {error_msg}")
298
+
299
+ chunks_stored = result.get("chunks_stored", 0) if isinstance(result, dict) else 0
300
+
301
+ # Enhance result with metadata
302
+ response = {
303
+ "status": "ok",
304
+ "tenant_id": tenant_id,
305
+ "source_type": source_type,
306
+ "doc_id": doc_id,
307
+ "chunks_stored": chunks_stored,
308
+ "metadata": final_metadata,
309
+ "extracted_metadata": extracted_metadata, # Include extracted metadata in response
310
+ }
311
+
312
+ # Add any additional fields from result if it's a dict
313
+ if isinstance(result, dict):
314
+ response.update(result)
315
+
316
+ return response
317
+ except Exception as e:
318
+ # Re-raise permission errors as-is (they'll be caught and converted to HTTPException)
319
+ if hasattr(e, 'status_code') and e.status_code == 403:
320
+ raise
321
+
322
+ logger.error(f"Failed to ingest document to RAG server: {e}", exc_info=True)
323
+ # Re-raise with more context
324
+ raise RuntimeError(
325
+ f"Failed to send document to RAG MCP server: {str(e)}\n\n"
326
+ f"Please check:\n"
327
+ f"1. RAG_MCP_URL is set correctly (default: http://localhost:8001)\n"
328
+ f"2. RAG MCP server is running\n"
329
+ f"3. Database connection (POSTGRESQL_URL) is configured in the RAG server"
330
+ ) from e
331
 
backend/mcp_server/common/database.py CHANGED
@@ -137,11 +137,22 @@ def insert_document_chunks(tenant_id: str, text: str, embedding: list, metadata:
137
  metadata: Optional JSON metadata (title, summary, tags, topics, etc.)
138
  doc_id: Optional document ID to group chunks from the same document
139
  """
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
140
  try:
141
- import json
142
- # Normalize tenant_id to ensure consistency
143
- tenant_id = tenant_id.strip()
144
-
145
  conn = get_connection()
146
  cur = conn.cursor()
147
 
@@ -159,10 +170,25 @@ def insert_document_chunks(tenant_id: str, text: str, embedding: list, metadata:
159
  conn.commit()
160
  cur.close()
161
  conn.close()
 
 
162
 
163
- except Exception as e:
164
- print("DB INSERT ERROR:", e)
 
165
  raise
 
 
 
 
 
 
 
 
 
 
 
 
166
 
167
 
168
  def search_vectors(tenant_id: str, vector: list, limit: int = 5) -> List[Dict[str, Any]]:
 
137
  metadata: Optional JSON metadata (title, summary, tags, topics, etc.)
138
  doc_id: Optional document ID to group chunks from the same document
139
  """
140
+ import json
141
+ import traceback
142
+
143
+ # Normalize tenant_id to ensure consistency
144
+ tenant_id = tenant_id.strip()
145
+
146
+ if not tenant_id:
147
+ raise ValueError("tenant_id cannot be empty")
148
+
149
+ if not text or not text.strip():
150
+ raise ValueError("text cannot be empty")
151
+
152
+ if not embedding or len(embedding) != 384:
153
+ raise ValueError(f"embedding must be a 384-dimensional vector, got {len(embedding) if embedding else 0} dimensions")
154
+
155
  try:
 
 
 
 
156
  conn = get_connection()
157
  cur = conn.cursor()
158
 
 
170
  conn.commit()
171
  cur.close()
172
  conn.close()
173
+
174
+ print(f"βœ… DB INSERT: Successfully inserted chunk for tenant '{tenant_id}' (doc_id: {doc_id or 'N/A'})")
175
 
176
+ except ValueError as ve:
177
+ # Re-raise ValueError as-is (validation errors)
178
+ print(f"❌ DB INSERT VALIDATION ERROR: {ve}")
179
  raise
180
+ except Exception as e:
181
+ error_msg = f"DB INSERT ERROR (tenant_id='{tenant_id}'): {str(e)}"
182
+ print(f"❌ {error_msg}")
183
+ print(traceback.format_exc())
184
+ # Wrap in a more descriptive error
185
+ raise RuntimeError(
186
+ f"Failed to insert document into database: {str(e)}\n"
187
+ f"Please check:\n"
188
+ f"1. POSTGRESQL_URL is set correctly in .env\n"
189
+ f"2. Database is accessible and pgvector extension is installed\n"
190
+ f"3. Documents table exists (run initialize_database() if needed)"
191
+ ) from e
192
 
193
 
194
  def search_vectors(tenant_id: str, vector: list, limit: int = 5) -> List[Dict[str, Any]]:
backend/mcp_server/rag/ingest.py CHANGED
@@ -45,22 +45,46 @@ async def rag_ingest(context: TenantContext, payload: Mapping[str, object]) -> d
45
  raise ToolValidationError("no text detected after preprocessing")
46
 
47
  stored = 0
48
- for chunk in chunks:
49
- vector = embed_text(chunk)
50
- # Store metadata with each chunk (same metadata for all chunks from same document)
51
- insert_document_chunks(
52
- context.tenant_id,
53
- chunk,
54
- vector,
55
- metadata=metadata,
56
- doc_id=doc_id
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
57
  )
58
- stored += 1
 
 
 
59
 
60
  return {
61
  "tenant_id": context.tenant_id,
62
  "chunks_ingested": stored,
63
  "metadata": {"chunk_words": max_words_value, **(metadata or {})},
64
  "doc_id": doc_id,
 
65
  }
66
 
 
45
  raise ToolValidationError("no text detected after preprocessing")
46
 
47
  stored = 0
48
+ errors = []
49
+
50
+ for i, chunk in enumerate(chunks):
51
+ try:
52
+ vector = embed_text(chunk)
53
+ # Store metadata with each chunk (same metadata for all chunks from same document)
54
+ insert_document_chunks(
55
+ context.tenant_id,
56
+ chunk,
57
+ vector,
58
+ metadata=metadata,
59
+ doc_id=doc_id
60
+ )
61
+ stored += 1
62
+ except Exception as e:
63
+ error_msg = f"Failed to store chunk {i+1}/{len(chunks)}: {str(e)}"
64
+ errors.append(error_msg)
65
+ print(f"❌ {error_msg}")
66
+ # Continue with other chunks, but log the error
67
+
68
+ if stored == 0:
69
+ # If no chunks were stored, raise an error
70
+ error_summary = "\n".join(errors) if errors else "Unknown error during database insertion"
71
+ raise ToolValidationError(
72
+ f"Failed to store any chunks to database. Errors:\n{error_summary}\n\n"
73
+ f"Please check:\n"
74
+ f"1. POSTGRESQL_URL is set correctly in your .env file\n"
75
+ f"2. Database is accessible and the 'documents' table exists\n"
76
+ f"3. pgvector extension is installed in your PostgreSQL database"
77
  )
78
+
79
+ if errors:
80
+ # Some chunks failed, but some succeeded - return a warning
81
+ print(f"⚠️ WARNING: {len(errors)} chunk(s) failed to store, but {stored} chunk(s) were stored successfully")
82
 
83
  return {
84
  "tenant_id": context.tenant_id,
85
  "chunks_ingested": stored,
86
  "metadata": {"chunk_words": max_words_value, **(metadata or {})},
87
  "doc_id": doc_id,
88
+ "warnings": errors if errors else None,
89
  }
90