Rifqi Hafizuddin commited on
Commit
a25febe
·
1 Parent(s): 110ee34

[KM-520] Integrate db query executor pipeline with existing rag retrieve pipeline

Browse files
Files changed (1) hide show
  1. src/api/v1/chat.py +40 -0
src/api/v1/chat.py CHANGED
@@ -9,6 +9,9 @@ from src.db.postgres.models import ChatMessage, MessageSource
9
  from src.agents.orchestration import orchestrator
10
  from src.agents.chatbot import chatbot
11
  from src.rag.retriever import retriever
 
 
 
12
  from src.db.redis.connection import get_redis
13
  from src.config.settings import settings
14
  from src.middlewares.logging import get_logger, log_execution
@@ -88,6 +91,22 @@ def _extract_sources(results: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
88
  return sources
89
 
90
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
91
  async def get_cached_response(redis, cache_key: str) -> Optional[str]:
92
  cached = await redis.get(cache_key)
93
  if cached:
@@ -208,6 +227,27 @@ async def chat_stream(request: ChatRequest, db: AsyncSession = Depends(get_db)):
208
  context = _format_context(raw_results)
209
  sources = _extract_sources(raw_results)
210
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
211
  # Step 3: Direct response for greetings / non-document intents
212
  if intent_result.get("direct_response"):
213
  response = intent_result["direct_response"]
 
9
  from src.agents.orchestration import orchestrator
10
  from src.agents.chatbot import chatbot
11
  from src.rag.retriever import retriever
12
+ from src.rag.base import RetrievalResult
13
+ from src.query.query_executor import query_executor
14
+ from src.query.base import QueryResult
15
  from src.db.redis.connection import get_redis
16
  from src.config.settings import settings
17
  from src.middlewares.logging import get_logger, log_execution
 
91
  return sources
92
 
93
 
94
+ def _format_query_results(results: list[QueryResult]) -> str:
95
+ if not results:
96
+ return ""
97
+ lines = []
98
+ for r in results:
99
+ name = r.metadata.get("client_name", r.source_id)
100
+ lines.append(f"[Query result — {name}, tables: {r.table_or_file}]")
101
+ lines.append(f"SQL: {r.metadata.get('sql', '')}")
102
+ if r.columns and r.rows:
103
+ lines.append(" | ".join(r.columns))
104
+ for row in r.rows[:20]:
105
+ lines.append(" | ".join(str(row.get(c, "")) for c in r.columns))
106
+ lines.append(f"({r.row_count} rows total)\n")
107
+ return "\n".join(lines)
108
+
109
+
110
  async def get_cached_response(redis, cache_key: str) -> Optional[str]:
111
  cached = await redis.get(cache_key)
112
  if cached:
 
227
  context = _format_context(raw_results)
228
  sources = _extract_sources(raw_results)
229
 
230
+ source_hint = intent_result.get("source_hint", "both")
231
+ if source_hint in ("schema", "both"):
232
+ retrieval_objects = [
233
+ RetrievalResult(
234
+ content=r["content"],
235
+ metadata=r["metadata"],
236
+ score=0.0,
237
+ source_type=r["metadata"].get("source_type", ""),
238
+ )
239
+ for r in raw_results
240
+ ]
241
+ query_results = await query_executor.execute(
242
+ results=retrieval_objects,
243
+ user_id=request.user_id,
244
+ db=db,
245
+ question=request.message,
246
+ )
247
+ query_context = _format_query_results(query_results)
248
+ if query_context:
249
+ context = query_context + "\n\n" + context
250
+
251
  # Step 3: Direct response for greetings / non-document intents
252
  if intent_result.get("direct_response"):
253
  response = intent_result["direct_response"]