sofhiaazzhr Claude Sonnet 4.6 commited on
Commit
0700e2b
·
1 Parent(s): 59b14af

[NOTICKET] cleanup PR: wire ChatHandler, drop Phase 1 remnants, implement catalog rebuild

Browse files

- wire chat.py to ChatHandler; remove inline intent routing, retrieval, and answer generation
- drop Phase 1 dual-write in db_client /ingest: only on_db_registered runs now
- implement on_catalog_rebuild_requested + POST /api/v1/data-catalog/rebuild endpoint
- delete dead modules: agents/chatbot (Phase 1), pipeline/orchestrator, query/base, api/v1/knowledge, config/agents/
- rename answer_agent.py -> chatbot.py, AnswerAgent -> ChatbotAgent
- fix ChatHandler to use RetrievalRouter (restores Redis retrieval cache)
- remove dead _build_csv_documents/_build_excel_documents from processing_service
- add top_values to ColumnStats for low-cardinality columns (query planner context)
- replace deprecated @app .on_event("startup") with lifespan context manager

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

main.py CHANGED
@@ -1,5 +1,7 @@
1
  """Main application entry point."""
2
 
 
 
3
  from fastapi import FastAPI
4
  from src.middlewares.logging import configure_logging, get_logger
5
  from src.middlewares.cors import add_cors_middleware
@@ -9,7 +11,6 @@ from src.api.v1.document import router as document_router
9
  from src.api.v1.chat import router as chat_router
10
  from src.api.v1.room import router as room_router
11
  from src.api.v1.users import router as users_router
12
- from src.api.v1.knowledge import router as knowledge_router
13
  from src.api.v1.db_client import router as db_client_router
14
  from src.api.v1.data_catalog import router as data_catalog_router
15
  from src.db.postgres.init_db import init_db
@@ -19,11 +20,21 @@ import uvicorn
19
  configure_logging()
20
  logger = get_logger("main")
21
 
 
 
 
 
 
 
 
 
 
22
  # Create FastAPI app
23
  app = FastAPI(
24
  title="DataEyond Agentic Service",
25
  description="Multi-agent AI backend with RAG capabilities",
26
- version="0.1.0"
 
27
  )
28
 
29
  # Add middleware
@@ -34,21 +45,12 @@ app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
34
  # Include routers
35
  app.include_router(users_router)
36
  app.include_router(document_router)
37
- app.include_router(knowledge_router)
38
  app.include_router(room_router)
39
  app.include_router(chat_router)
40
  app.include_router(db_client_router)
41
  app.include_router(data_catalog_router)
42
 
43
 
44
- @app.on_event("startup")
45
- async def startup_event():
46
- """Initialize database on startup."""
47
- logger.info("Starting application...")
48
- await init_db()
49
- logger.info("Database initialized")
50
-
51
-
52
  @app.get("/")
53
  async def root():
54
  """Root endpoint."""
 
1
  """Main application entry point."""
2
 
3
+ from contextlib import asynccontextmanager
4
+
5
  from fastapi import FastAPI
6
  from src.middlewares.logging import configure_logging, get_logger
7
  from src.middlewares.cors import add_cors_middleware
 
11
  from src.api.v1.chat import router as chat_router
12
  from src.api.v1.room import router as room_router
13
  from src.api.v1.users import router as users_router
 
14
  from src.api.v1.db_client import router as db_client_router
15
  from src.api.v1.data_catalog import router as data_catalog_router
16
  from src.db.postgres.init_db import init_db
 
20
  configure_logging()
21
  logger = get_logger("main")
22
 
23
+
24
+ @asynccontextmanager
25
+ async def lifespan(app: FastAPI):
26
+ logger.info("Starting application...")
27
+ await init_db()
28
+ logger.info("Database initialized")
29
+ yield
30
+
31
+
32
  # Create FastAPI app
33
  app = FastAPI(
34
  title="DataEyond Agentic Service",
35
  description="Multi-agent AI backend with RAG capabilities",
36
+ version="0.1.0",
37
+ lifespan=lifespan,
38
  )
39
 
40
  # Add middleware
 
45
  # Include routers
46
  app.include_router(users_router)
47
  app.include_router(document_router)
 
48
  app.include_router(room_router)
49
  app.include_router(chat_router)
50
  app.include_router(db_client_router)
51
  app.include_router(data_catalog_router)
52
 
53
 
 
 
 
 
 
 
 
 
54
  @app.get("/")
55
  async def root():
56
  """Root endpoint."""
src/agents/answer_agent.py DELETED
@@ -1,170 +0,0 @@
1
- """AnswerAgent — final answer formation. Phase 2 chatbot.
2
-
3
- Receives one of:
4
- - a `QueryResult` (structured query path),
5
- - a list of document chunks (unstructured path), or
6
- - nothing (chat-only path: greeting, farewell, meta question).
7
-
8
- Streams the answer token-by-token so the chat handler can wrap each token
9
- into an SSE event. Conversation history is supported.
10
-
11
- Lives at `agents/answer_agent.py` rather than `agents/chatbot.py` to avoid
12
- colliding with the Phase 1 chatbot still imported by the legacy chat
13
- endpoint. PR7 cleanup will rename this to `chatbot.py` after Phase 1's
14
- chat endpoint is rewired to call this through `agents/chat_handler.py`.
15
- """
16
-
17
- from __future__ import annotations
18
-
19
- from collections.abc import AsyncIterator
20
- from dataclasses import dataclass
21
- from pathlib import Path
22
- from typing import Any
23
-
24
- from langchain_core.messages import BaseMessage
25
- from langchain_core.output_parsers import StrOutputParser
26
- from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
27
- from langchain_core.runnables import Runnable
28
- from langchain_openai import AzureChatOpenAI
29
-
30
- from src.middlewares.logging import get_logger
31
-
32
- from ..query.executor.base import QueryResult
33
-
34
- logger = get_logger("answer_agent")
35
-
36
-
37
- _PROMPT_DIR = Path(__file__).resolve().parent.parent / "config" / "prompts"
38
- _SYSTEM_PROMPT_PATH = _PROMPT_DIR / "chatbot_system.md"
39
- _GUARDRAILS_PATH = _PROMPT_DIR / "guardrails.md"
40
-
41
-
42
- @dataclass
43
- class DocumentChunk:
44
- """One retrieved document chunk for the unstructured path."""
45
-
46
- content: str
47
- filename: str | None = None
48
- page_label: str | None = None
49
-
50
-
51
- def _load_system_prompt() -> str:
52
- """Compose system prompt = chatbot_system.md + guardrails.md.
53
-
54
- Guardrails appended last so they take precedence in conflict (matches
55
- the docstring at the top of guardrails.md).
56
- """
57
- chatbot = _SYSTEM_PROMPT_PATH.read_text(encoding="utf-8")
58
- guardrails = _GUARDRAILS_PATH.read_text(encoding="utf-8")
59
- return f"{chatbot}\n\n{guardrails}"
60
-
61
-
62
- def _format_query_result(qr: QueryResult) -> str:
63
- """Render a QueryResult as a compact context block for the LLM."""
64
- if qr.error:
65
- return (
66
- f"[Query result — FAILED]\n"
67
- f"source_id={qr.source_id}\n"
68
- f"error: {qr.error}"
69
- )
70
- lines: list[str] = [
71
- "[Query result]",
72
- f"source_id: {qr.source_id}",
73
- f"backend: {qr.backend}",
74
- f"row_count: {qr.row_count}"
75
- + (" (truncated)" if qr.truncated else ""),
76
- f"elapsed_ms: {qr.elapsed_ms}",
77
- ]
78
- if qr.rows:
79
- # Cap rendering at 25 rows; the LLM doesn't need the full set
80
- cap = min(len(qr.rows), 25)
81
- columns = list(qr.rows[0].keys())
82
- lines.append("columns: " + ", ".join(columns))
83
- lines.append("rows:")
84
- for row in qr.rows[:cap]:
85
- lines.append(" " + ", ".join(f"{k}={row[k]!r}" for k in columns))
86
- if cap < len(qr.rows):
87
- lines.append(f" ... (+{len(qr.rows) - cap} more rows omitted from prompt)")
88
- return "\n".join(lines)
89
-
90
-
91
- def _format_document_chunks(chunks: list[DocumentChunk]) -> str:
92
- if not chunks:
93
- return ""
94
- blocks: list[str] = []
95
- for c in chunks:
96
- label_parts = [p for p in (c.filename, c.page_label) if p]
97
- label = ", ".join(label_parts) if label_parts else "Unknown source"
98
- blocks.append(f"[Source: {label}]\n{c.content}")
99
- return "\n\n".join(blocks)
100
-
101
-
102
- def _build_context_block(
103
- query_result: QueryResult | None,
104
- chunks: list[DocumentChunk] | None,
105
- ) -> str:
106
- parts: list[str] = []
107
- if query_result is not None:
108
- parts.append(_format_query_result(query_result))
109
- if chunks:
110
- parts.append(_format_document_chunks(chunks))
111
- return "\n\n".join(parts) if parts else "(no data context — answer conversationally)"
112
-
113
-
114
- def _build_default_chain() -> Runnable:
115
- from src.config.settings import settings
116
-
117
- llm = AzureChatOpenAI(
118
- azure_deployment=settings.azureai_deployment_name_4o,
119
- openai_api_version=settings.azureai_api_version_4o,
120
- azure_endpoint=settings.azureai_endpoint_url_4o,
121
- api_key=settings.azureai_api_key_4o,
122
- temperature=0.3,
123
- )
124
- prompt = ChatPromptTemplate.from_messages(
125
- [
126
- ("system", _load_system_prompt()),
127
- MessagesPlaceholder(variable_name="history", optional=True),
128
- ("human", "{message}"),
129
- ("system", "Data context for this turn:\n\n{context}"),
130
- ]
131
- )
132
- return prompt | llm | StrOutputParser()
133
-
134
-
135
- class AnswerAgent:
136
- """Formats and streams the final user-facing answer.
137
-
138
- `chain` is injectable: tests pass a fake that yields canned tokens.
139
- Default constructs the production Azure OpenAI streaming chain on
140
- first use.
141
- """
142
-
143
- def __init__(self, chain: Runnable | None = None) -> None:
144
- self._chain = chain
145
-
146
- def _ensure_chain(self) -> Runnable:
147
- if self._chain is None:
148
- self._chain = _build_default_chain()
149
- return self._chain
150
-
151
- async def astream(
152
- self,
153
- message: str,
154
- history: list[BaseMessage] | None = None,
155
- query_result: QueryResult | None = None,
156
- chunks: list[DocumentChunk] | None = None,
157
- ) -> AsyncIterator[str]:
158
- """Stream tokens of the final answer.
159
-
160
- Caller wraps each token into the SSE format. Empty `history` and
161
- no context = pure chat reply.
162
- """
163
- chain = self._ensure_chain()
164
- payload: dict[str, Any] = {
165
- "message": message,
166
- "history": history or [],
167
- "context": _build_context_block(query_result, chunks),
168
- }
169
- async for token in chain.astream(payload):
170
- yield token
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
src/agents/chat_handler.py CHANGED
@@ -4,11 +4,11 @@ End-to-end flow per user message:
4
 
5
  1. `IntentRouter.classify` → `chat` / `unstructured` / `structured`.
6
  2. Route:
7
- - `chat` → no context. Pass straight to AnswerAgent.
8
  - `structured` → CatalogReader → QueryService → QueryResult.
9
  - `unstructured` → DocumentRetriever (placeholder, raises until TAB
10
  ships) → list[DocumentChunk].
11
- 3. `AnswerAgent.astream` → yield text tokens.
12
  4. Wrap each step into an SSE-style event dict so the API endpoint can
13
  stream them as Server-Sent Events.
14
 
@@ -30,13 +30,13 @@ from langchain_core.messages import BaseMessage
30
  from src.middlewares.logging import get_logger
31
 
32
  from src.retrieval.base import RetrievalResult
33
- from .answer_agent import AnswerAgent, DocumentChunk
34
  from .intent_router import IntentRouter
35
 
36
  if TYPE_CHECKING:
37
  from ..catalog.reader import CatalogReader
38
  from ..query.service import QueryService
39
- from ..retrieval.document import DocumentRetriever
40
 
41
  logger = get_logger("chat_handler")
42
 
@@ -55,10 +55,10 @@ class ChatHandler:
55
  def __init__(
56
  self,
57
  intent_router: IntentRouter | None = None,
58
- answer_agent: AnswerAgent | None = None,
59
  catalog_reader: CatalogReader | None = None,
60
  query_service: QueryService | None = None,
61
- document_retriever: DocumentRetriever | None = None,
62
  ) -> None:
63
  self._intent_router = intent_router
64
  self._answer_agent = answer_agent
@@ -75,9 +75,9 @@ class ChatHandler:
75
  self._intent_router = IntentRouter()
76
  return self._intent_router
77
 
78
- def _get_answer_agent(self) -> AnswerAgent:
79
  if self._answer_agent is None:
80
- self._answer_agent = AnswerAgent()
81
  return self._answer_agent
82
 
83
  def _get_catalog_reader(self) -> CatalogReader:
@@ -95,11 +95,11 @@ class ChatHandler:
95
  self._query_service = QueryService()
96
  return self._query_service
97
 
98
- def _get_document_retriever(self) -> DocumentRetriever:
99
  if self._document_retriever is None:
100
- from ..retrieval.document import DocumentRetriever
101
 
102
- self._document_retriever = DocumentRetriever()
103
  return self._document_retriever
104
 
105
  # ------------------------------------------------------------------
 
4
 
5
  1. `IntentRouter.classify` → `chat` / `unstructured` / `structured`.
6
  2. Route:
7
+ - `chat` → no context. Pass straight to ChatbotAgent.
8
  - `structured` → CatalogReader → QueryService → QueryResult.
9
  - `unstructured` → DocumentRetriever (placeholder, raises until TAB
10
  ships) → list[DocumentChunk].
11
+ 3. `ChatbotAgent.astream` → yield text tokens.
12
  4. Wrap each step into an SSE-style event dict so the API endpoint can
13
  stream them as Server-Sent Events.
14
 
 
30
  from src.middlewares.logging import get_logger
31
 
32
  from src.retrieval.base import RetrievalResult
33
+ from .chatbot import ChatbotAgent, DocumentChunk
34
  from .intent_router import IntentRouter
35
 
36
  if TYPE_CHECKING:
37
  from ..catalog.reader import CatalogReader
38
  from ..query.service import QueryService
39
+ from ..retrieval.router import RetrievalRouter
40
 
41
  logger = get_logger("chat_handler")
42
 
 
55
  def __init__(
56
  self,
57
  intent_router: IntentRouter | None = None,
58
+ answer_agent: ChatbotAgent | None = None,
59
  catalog_reader: CatalogReader | None = None,
60
  query_service: QueryService | None = None,
61
+ document_retriever: RetrievalRouter | None = None,
62
  ) -> None:
63
  self._intent_router = intent_router
64
  self._answer_agent = answer_agent
 
75
  self._intent_router = IntentRouter()
76
  return self._intent_router
77
 
78
+ def _get_answer_agent(self) -> ChatbotAgent:
79
  if self._answer_agent is None:
80
+ self._answer_agent = ChatbotAgent()
81
  return self._answer_agent
82
 
83
  def _get_catalog_reader(self) -> CatalogReader:
 
95
  self._query_service = QueryService()
96
  return self._query_service
97
 
98
+ def _get_document_retriever(self) -> RetrievalRouter:
99
  if self._document_retriever is None:
100
+ from ..retrieval.router import RetrievalRouter
101
 
102
+ self._document_retriever = RetrievalRouter()
103
  return self._document_retriever
104
 
105
  # ------------------------------------------------------------------
src/agents/chatbot.py CHANGED
@@ -1,85 +1,165 @@
1
- """Chatbot agent with RAG capabilities."""
2
 
3
- import tiktoken
4
- from langchain_openai import AzureChatOpenAI
5
- from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6
  from langchain_core.output_parsers import StrOutputParser
7
- from src.config.settings import settings
 
 
 
8
  from src.middlewares.logging import get_logger
9
- from langchain_core.messages import HumanMessage, AIMessage
 
10
 
11
  logger = get_logger("chatbot")
12
 
13
- _enc = tiktoken.get_encoding("cl100k_base")
14
 
 
 
 
15
 
16
- def _count_tokens(messages: list, context: str) -> dict:
17
- msg_tokens = sum(len(_enc.encode(m.content)) for m in messages)
18
- ctx_tokens = len(_enc.encode(context))
19
- return {"messages_tokens": msg_tokens, "context_tokens": ctx_tokens, "total": msg_tokens + ctx_tokens}
20
 
 
 
 
21
 
22
- class ChatbotAgent:
23
- """Chatbot agent with RAG capabilities."""
24
-
25
- def __init__(self):
26
- self.llm = AzureChatOpenAI(
27
- azure_deployment=settings.azureai_deployment_name_4o,
28
- openai_api_version=settings.azureai_api_version_4o,
29
- azure_endpoint=settings.azureai_endpoint_url_4o,
30
- api_key=settings.azureai_api_key_4o,
31
- temperature=0.7
 
 
 
 
 
 
 
 
 
 
 
 
 
32
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
33
 
34
- # Read system prompt
35
- try:
36
- with open("src/config/agents/system_prompt.md", "r") as f:
37
- system_prompt = f.read()
38
- except FileNotFoundError:
39
- system_prompt = "You are a helpful AI assistant with access to user's uploaded documents."
40
 
41
- # Create prompt template
42
- self.prompt = ChatPromptTemplate.from_messages([
43
- ("system", system_prompt),
44
- MessagesPlaceholder(variable_name="messages"),
45
- ("system", "Relevant documents:\n{context}")
46
- ])
47
 
48
- # Create chain
49
- self.chain = self.prompt | self.llm | StrOutputParser()
 
 
50
 
51
- async def generate_response(
52
  self,
53
- messages: list,
54
- context: str = ""
55
- ) -> str:
56
- """Generate response with optional RAG context."""
57
- try:
58
- logger.info("Generating chatbot response")
59
-
60
- # Generate response
61
- response = await self.chain.ainvoke({
62
- "messages": messages,
63
- "context": context
64
- })
65
-
66
- logger.info(f"Generated response: {response[:100]}...")
67
- return response
68
-
69
- except Exception as e:
70
- logger.error("Response generation failed", error=str(e))
71
- raise
72
-
73
- async def astream_response(self, messages: list, context: str = ""):
74
- """Stream response tokens as they are generated."""
75
- try:
76
- token_counts = _count_tokens(messages, context)
77
- logger.info("LLM input tokens", **token_counts)
78
- async for token in self.chain.astream({"messages": messages, "context": context}):
79
- yield token
80
- except Exception as e:
81
- logger.error("Response streaming failed", error=str(e))
82
- raise
83
-
84
-
85
- chatbot = ChatbotAgent()
 
1
+ """ChatbotAgent final answer formation. Phase 2 chatbot.
2
 
3
+ Receives one of:
4
+ - a `QueryResult` (structured query path),
5
+ - a list of document chunks (unstructured path), or
6
+ - nothing (chat-only path: greeting, farewell, meta question).
7
+
8
+ Streams the answer token-by-token so the chat handler can wrap each token
9
+ into an SSE event. Conversation history is supported.
10
+ """
11
+
12
+ from __future__ import annotations
13
+
14
+ from collections.abc import AsyncIterator
15
+ from dataclasses import dataclass
16
+ from pathlib import Path
17
+ from typing import Any
18
+
19
+ from langchain_core.messages import BaseMessage
20
  from langchain_core.output_parsers import StrOutputParser
21
+ from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
22
+ from langchain_core.runnables import Runnable
23
+ from langchain_openai import AzureChatOpenAI
24
+
25
  from src.middlewares.logging import get_logger
26
+
27
+ from ..query.executor.base import QueryResult
28
 
29
  logger = get_logger("chatbot")
30
 
 
31
 
32
+ _PROMPT_DIR = Path(__file__).resolve().parent.parent / "config" / "prompts"
33
+ _SYSTEM_PROMPT_PATH = _PROMPT_DIR / "chatbot_system.md"
34
+ _GUARDRAILS_PATH = _PROMPT_DIR / "guardrails.md"
35
 
 
 
 
 
36
 
37
+ @dataclass
38
+ class DocumentChunk:
39
+ """One retrieved document chunk for the unstructured path."""
40
 
41
+ content: str
42
+ filename: str | None = None
43
+ page_label: str | None = None
44
+
45
+
46
+ def _load_system_prompt() -> str:
47
+ """Compose system prompt = chatbot_system.md + guardrails.md.
48
+
49
+ Guardrails appended last so they take precedence in conflict (matches
50
+ the docstring at the top of guardrails.md).
51
+ """
52
+ chatbot = _SYSTEM_PROMPT_PATH.read_text(encoding="utf-8")
53
+ guardrails = _GUARDRAILS_PATH.read_text(encoding="utf-8")
54
+ return f"{chatbot}\n\n{guardrails}"
55
+
56
+
57
+ def _format_query_result(qr: QueryResult) -> str:
58
+ """Render a QueryResult as a compact context block for the LLM."""
59
+ if qr.error:
60
+ return (
61
+ f"[Query result — FAILED]\n"
62
+ f"source_id={qr.source_id}\n"
63
+ f"error: {qr.error}"
64
  )
65
+ lines: list[str] = [
66
+ "[Query result]",
67
+ f"source_id: {qr.source_id}",
68
+ f"backend: {qr.backend}",
69
+ f"row_count: {qr.row_count}"
70
+ + (" (truncated)" if qr.truncated else ""),
71
+ f"elapsed_ms: {qr.elapsed_ms}",
72
+ ]
73
+ if qr.rows:
74
+ # Cap rendering at 25 rows; the LLM doesn't need the full set
75
+ cap = min(len(qr.rows), 25)
76
+ columns = list(qr.rows[0].keys())
77
+ lines.append("columns: " + ", ".join(columns))
78
+ lines.append("rows:")
79
+ for row in qr.rows[:cap]:
80
+ lines.append(" " + ", ".join(f"{k}={row[k]!r}" for k in columns))
81
+ if cap < len(qr.rows):
82
+ lines.append(f" ... (+{len(qr.rows) - cap} more rows omitted from prompt)")
83
+ return "\n".join(lines)
84
+
85
+
86
+ def _format_document_chunks(chunks: list[DocumentChunk]) -> str:
87
+ if not chunks:
88
+ return ""
89
+ blocks: list[str] = []
90
+ for c in chunks:
91
+ label_parts = [p for p in (c.filename, c.page_label) if p]
92
+ label = ", ".join(label_parts) if label_parts else "Unknown source"
93
+ blocks.append(f"[Source: {label}]\n{c.content}")
94
+ return "\n\n".join(blocks)
95
+
96
+
97
+ def _build_context_block(
98
+ query_result: QueryResult | None,
99
+ chunks: list[DocumentChunk] | None,
100
+ ) -> str:
101
+ parts: list[str] = []
102
+ if query_result is not None:
103
+ parts.append(_format_query_result(query_result))
104
+ if chunks:
105
+ parts.append(_format_document_chunks(chunks))
106
+ return "\n\n".join(parts) if parts else "(no data context — answer conversationally)"
107
+
108
+
109
+ def _build_default_chain() -> Runnable:
110
+ from src.config.settings import settings
111
+
112
+ llm = AzureChatOpenAI(
113
+ azure_deployment=settings.azureai_deployment_name_4o,
114
+ openai_api_version=settings.azureai_api_version_4o,
115
+ azure_endpoint=settings.azureai_endpoint_url_4o,
116
+ api_key=settings.azureai_api_key_4o,
117
+ temperature=0.3,
118
+ )
119
+ prompt = ChatPromptTemplate.from_messages(
120
+ [
121
+ ("system", _load_system_prompt()),
122
+ MessagesPlaceholder(variable_name="history", optional=True),
123
+ ("human", "{message}"),
124
+ ("system", "Data context for this turn:\n\n{context}"),
125
+ ]
126
+ )
127
+ return prompt | llm | StrOutputParser()
128
+
129
+
130
+ class ChatbotAgent:
131
+ """Formats and streams the final user-facing answer.
132
 
133
+ `chain` is injectable: tests pass a fake that yields canned tokens.
134
+ Default constructs the production Azure OpenAI streaming chain on
135
+ first use.
136
+ """
 
 
137
 
138
+ def __init__(self, chain: Runnable | None = None) -> None:
139
+ self._chain = chain
 
 
 
 
140
 
141
+ def _ensure_chain(self) -> Runnable:
142
+ if self._chain is None:
143
+ self._chain = _build_default_chain()
144
+ return self._chain
145
 
146
+ async def astream(
147
  self,
148
+ message: str,
149
+ history: list[BaseMessage] | None = None,
150
+ query_result: QueryResult | None = None,
151
+ chunks: list[DocumentChunk] | None = None,
152
+ ) -> AsyncIterator[str]:
153
+ """Stream tokens of the final answer.
154
+
155
+ Caller wraps each token into the SSE format. Empty `history` and
156
+ no context = pure chat reply.
157
+ """
158
+ chain = self._ensure_chain()
159
+ payload: dict[str, Any] = {
160
+ "message": message,
161
+ "history": history or [],
162
+ "context": _build_context_block(query_result, chunks),
163
+ }
164
+ async for token in chain.astream(payload):
165
+ yield token
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
src/api/v1/chat.py CHANGED
@@ -1,33 +1,33 @@
1
  """Chat endpoint with streaming support."""
2
 
3
  import uuid
 
 
 
4
  from fastapi import APIRouter, Depends, HTTPException
 
 
 
5
  from sqlalchemy.ext.asyncio import AsyncSession
 
 
 
 
6
  from src.db.postgres.connection import get_db
7
  from src.db.postgres.models import ChatMessage, MessageSource
8
- from src.agents.intent_router import IntentRouter
9
- from src.agents.answer_agent import AnswerAgent, DocumentChunk
10
- from src.retrieval.router import retrieval_router as retriever
11
- from src.retrieval.base import RetrievalResult
12
- from src.catalog.reader import CatalogReader
13
- from src.catalog.store import CatalogStore
14
- from src.query.service import QueryService
15
  from src.db.redis.connection import get_redis
16
- from src.config.settings import settings
17
  from src.middlewares.logging import get_logger, log_execution
18
- from sse_starlette.sse import EventSourceResponse
19
- from langchain_core.messages import HumanMessage, AIMessage
20
- from sqlalchemy import select
21
- from pydantic import BaseModel
22
- from typing import List, Dict, Any, Optional
23
- import json
24
 
25
  _GREETINGS = frozenset(["hi", "hello", "hey", "halo", "hai", "hei"])
26
  _GOODBYES = frozenset(["bye", "goodbye", "thanks", "thank you", "terima kasih", "sampai jumpa"])
27
 
28
 
29
  def _fast_intent(message: str) -> Optional[str]:
30
- """Return a direct response string for obvious greetings/farewells, else None."""
31
  lower = message.lower().strip().rstrip("!.,?")
32
  if lower in _GREETINGS:
33
  return "Hello! How can I assist you today?"
@@ -36,62 +36,12 @@ def _fast_intent(message: str) -> Optional[str]:
36
  return None
37
 
38
 
39
- logger = get_logger("chat_api")
40
-
41
- router = APIRouter(prefix="/api/v1", tags=["Chat"])
42
-
43
-
44
  class ChatRequest(BaseModel):
45
  user_id: str
46
  room_id: str
47
  message: str
48
 
49
 
50
- def _extract_sources(results: List[RetrievalResult]) -> List[Dict[str, Any]]:
51
- """Extract deduplicated source references from retrieval results."""
52
- seen = set()
53
- sources = []
54
- for result in results:
55
- meta = result.metadata
56
- data = meta.get("data", {})
57
- if "document_id" in data:
58
- key = (data.get("document_id"), data.get("page_label"))
59
- if key not in seen:
60
- seen.add(key)
61
- sources.append({
62
- "document_id": data.get("document_id"),
63
- "filename": data.get("filename", "Unknown"),
64
- "page_label": data.get("page_label", "Unknown"),
65
- })
66
- else:
67
- key = (data.get("table_name"), data.get("column_name"))
68
- if key not in seen:
69
- seen.add(key)
70
- table_name = data.get("table_name")
71
- user_id = meta.get("user_id")
72
- sources.append({
73
- "document_id": f"{user_id}_{table_name}",
74
- "filename": data.get("table_name", "Unknown"),
75
- "page_label": data.get("column_name", "Unknown"),
76
- })
77
- logger.debug(f"Extracted sources: {sources}")
78
- return sources
79
-
80
-
81
- def _to_document_chunks(results: List[RetrievalResult]) -> List[DocumentChunk]:
82
- """Convert Phase 1 RetrievalResult list to Phase 2 DocumentChunk list."""
83
- chunks = []
84
- for r in results:
85
- data = r.metadata.get("data", {})
86
- page = data.get("page_label")
87
- chunks.append(DocumentChunk(
88
- content=r.content,
89
- filename=data.get("filename"),
90
- page_label=str(page) if page is not None else None,
91
- ))
92
- return chunks
93
-
94
-
95
  async def get_cached_response(redis, cache_key: str) -> Optional[str]:
96
  cached = await redis.get(cache_key)
97
  if cached:
@@ -147,7 +97,7 @@ async def chat_stream(request: ChatRequest, db: AsyncSession = Depends(get_db)):
147
  """Chat endpoint with streaming response.
148
 
149
  SSE event sequence:
150
- 1. sources — JSON array of {document_id, filename, page_label}
151
  2. chunk — text fragments of the answer
152
  3. done — signals end of stream
153
  """
@@ -181,49 +131,24 @@ async def chat_stream(request: ChatRequest, db: AsyncSession = Depends(get_db)):
181
 
182
  return EventSourceResponse(stream_direct())
183
 
184
- # Load history for intent routing
185
- history = await load_history(db, request.room_id, limit=6)
186
-
187
- # Phase 2: IntentRouter classifies message
188
- decision = await IntentRouter().classify(request.message, history)
189
- rewritten = decision.rewritten_query or request.message
190
-
191
- query_result = None
192
- chunks: List[DocumentChunk] | None = None
193
- sources: List[Dict[str, Any]] = []
194
-
195
- if decision.source_hint == "structured":
196
- catalog = await CatalogReader(CatalogStore()).read(request.user_id, "structured")
197
- query_result = await QueryService().run(request.user_id, rewritten, catalog)
198
 
199
- elif decision.source_hint == "unstructured":
200
- raw_results = await retriever.retrieve(
201
- query=rewritten,
202
- user_id=request.user_id,
203
- db=db,
204
- k=5,
205
- )
206
- chunks = _to_document_chunks(raw_results)
207
- sources = _extract_sources(raw_results)
208
-
209
- # Load full history for answer generation
210
- full_history = await load_history(db, request.room_id, limit=10)
211
-
212
- # Phase 2: AnswerAgent streams answer tokens
213
  async def stream_response():
214
  full_response = ""
215
- yield {"event": "sources", "data": json.dumps(sources)}
216
- async for token in AnswerAgent().astream(
217
- request.message,
218
- history=full_history,
219
- query_result=query_result,
220
- chunks=chunks,
221
- ):
222
- full_response += token
223
- yield {"event": "chunk", "data": token}
224
- yield {"event": "done", "data": ""}
225
- await cache_response(redis, cache_key, full_response)
226
- await save_messages(db, request.room_id, request.message, full_response, sources=sources)
 
227
 
228
  return EventSourceResponse(stream_response())
229
 
 
1
  """Chat endpoint with streaming support."""
2
 
3
  import uuid
4
+ import json
5
+ from typing import List, Dict, Any, Optional
6
+
7
  from fastapi import APIRouter, Depends, HTTPException
8
+ from langchain_core.messages import HumanMessage, AIMessage
9
+ from pydantic import BaseModel
10
+ from sqlalchemy import select
11
  from sqlalchemy.ext.asyncio import AsyncSession
12
+ from sse_starlette.sse import EventSourceResponse
13
+
14
+ from src.agents.chat_handler import ChatHandler
15
+ from src.config.settings import settings
16
  from src.db.postgres.connection import get_db
17
  from src.db.postgres.models import ChatMessage, MessageSource
 
 
 
 
 
 
 
18
  from src.db.redis.connection import get_redis
 
19
  from src.middlewares.logging import get_logger, log_execution
20
+
21
+ logger = get_logger("chat_api")
22
+
23
+ router = APIRouter(prefix="/api/v1", tags=["Chat"])
 
 
24
 
25
  _GREETINGS = frozenset(["hi", "hello", "hey", "halo", "hai", "hei"])
26
  _GOODBYES = frozenset(["bye", "goodbye", "thanks", "thank you", "terima kasih", "sampai jumpa"])
27
 
28
 
29
  def _fast_intent(message: str) -> Optional[str]:
30
+ """Return a direct response for obvious greetings/farewells, else None."""
31
  lower = message.lower().strip().rstrip("!.,?")
32
  if lower in _GREETINGS:
33
  return "Hello! How can I assist you today?"
 
36
  return None
37
 
38
 
 
 
 
 
 
39
  class ChatRequest(BaseModel):
40
  user_id: str
41
  room_id: str
42
  message: str
43
 
44
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
45
  async def get_cached_response(redis, cache_key: str) -> Optional[str]:
46
  cached = await redis.get(cache_key)
47
  if cached:
 
97
  """Chat endpoint with streaming response.
98
 
99
  SSE event sequence:
100
+ 1. sources — JSON array of source references (empty until retrieval is wired into ChatHandler)
101
  2. chunk — text fragments of the answer
102
  3. done — signals end of stream
103
  """
 
131
 
132
  return EventSourceResponse(stream_direct())
133
 
134
+ history = await load_history(db, request.room_id, limit=10)
135
+ handler = ChatHandler()
 
 
 
 
 
 
 
 
 
 
 
 
136
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
137
  async def stream_response():
138
  full_response = ""
139
+ yield {"event": "sources", "data": json.dumps([])}
140
+ async for event in handler.handle(request.message, request.user_id, history):
141
+ if event["event"] == "chunk":
142
+ full_response += event["data"]
143
+ yield event
144
+ elif event["event"] == "done":
145
+ await cache_response(redis, cache_key, full_response)
146
+ await save_messages(db, request.room_id, request.message, full_response, sources=[])
147
+ yield event
148
+ elif event["event"] == "error":
149
+ yield event
150
+ return
151
+ # "intent" event: consumed internally, not forwarded to frontend
152
 
153
  return EventSourceResponse(stream_response())
154
 
src/api/v1/data_catalog.py CHANGED
@@ -9,11 +9,12 @@ here on purpose.
9
 
10
  from typing import List
11
 
12
- from fastapi import APIRouter, HTTPException, status
13
 
14
  from src.catalog.store import CatalogStore
15
  from src.middlewares.logging import get_logger, log_execution
16
  from src.models.api.catalog import CatalogIndexEntry
 
17
 
18
  logger = get_logger("data_catalog_api")
19
 
@@ -65,3 +66,35 @@ async def list_data_catalog_index(user_id: str):
65
  )
66
  for s in catalog.sources
67
  ]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9
 
10
  from typing import List
11
 
12
+ from fastapi import APIRouter, HTTPException, Query, status
13
 
14
  from src.catalog.store import CatalogStore
15
  from src.middlewares.logging import get_logger, log_execution
16
  from src.models.api.catalog import CatalogIndexEntry
17
+ from src.pipeline.triggers import on_catalog_rebuild_requested
18
 
19
  logger = get_logger("data_catalog_api")
20
 
 
66
  )
67
  for s in catalog.sources
68
  ]
69
+
70
+
71
+ @router.post(
72
+ "/data-catalog/rebuild",
73
+ status_code=status.HTTP_200_OK,
74
+ summary="Rebuild the catalog for a user",
75
+ response_description="Confirmation that the rebuild was triggered.",
76
+ responses={
77
+ 200: {"description": "Rebuild completed. Per-source errors are logged but do not fail this request."},
78
+ 500: {"description": "Unexpected error before the rebuild loop started."},
79
+ },
80
+ )
81
+ @log_execution(logger)
82
+ async def rebuild_data_catalog(
83
+ user_id: str = Query(..., description="ID of the user whose catalog should be rebuilt."),
84
+ ):
85
+ """
86
+ Re-introspect every source in the user's catalog and upsert the results.
87
+
88
+ Each source (DB connection or tabular file) is processed independently.
89
+ A failure on one source is logged but does not abort the remaining sources.
90
+ If the user has no catalog yet, returns success with no-op.
91
+ """
92
+ try:
93
+ await on_catalog_rebuild_requested(user_id)
94
+ except Exception as e:
95
+ logger.error("catalog rebuild failed", user_id=user_id, error=str(e))
96
+ raise HTTPException(
97
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
98
+ detail=f"Catalog rebuild failed: {e}",
99
+ )
100
+ return {"status": "success", "user_id": user_id}
src/api/v1/db_client.py CHANGED
@@ -27,9 +27,7 @@ from src.models.credentials import ( # noqa: F401 — re-exported for Swagger s
27
  SqlServerCredentials,
28
  SupabaseCredentials,
29
  )
30
- from src.pipeline.db_pipeline import db_pipeline_service
31
  from src.pipeline.triggers import on_db_registered
32
- from src.utils.db_credential_encryption import decrypt_credentials_dict
33
 
34
  logger = get_logger("database_client_api")
35
 
@@ -414,14 +412,14 @@ async def delete_database_client(
414
  @router.post(
415
  "/database-clients/{client_id}/ingest",
416
  status_code=status.HTTP_200_OK,
417
- summary="Ingest schema from a registered database into the vector store",
418
- response_description="Count of chunks ingested.",
419
  responses={
420
- 200: {"description": "Ingestion completed successfully."},
421
  403: {"description": "Access denied — user_id does not own this connection."},
422
  404: {"description": "Connection not found."},
423
- 501: {"description": "The connection's db_type is not yet supported by the pipeline."},
424
- 500: {"description": "Ingestion failed (connection error, profiling error, etc.)."},
425
  },
426
  )
427
  @limiter.limit("5/minute")
@@ -433,11 +431,9 @@ async def ingest_database_client(
433
  db: AsyncSession = Depends(get_db),
434
  ):
435
  """
436
- Decrypt the stored credentials, connect to the user's database, introspect
437
- its schema, profile each column, embed the descriptions, and store them in
438
- the shared PGVector collection tagged with `source_type="database"`.
439
-
440
- Chunks become retrievable via the same retriever used for document chunks.
441
  """
442
  client = await database_client_service.get(db, client_id)
443
 
@@ -454,26 +450,12 @@ async def ingest_database_client(
454
  )
455
 
456
  try:
457
- creds = decrypt_credentials_dict(client.credentials)
458
- with db_pipeline_service.engine_scope(
459
- db_type=client.db_type,
460
- credentials=creds,
461
- ) as engine:
462
- total = await db_pipeline_service.run(user_id=user_id, client_id=client_id, engine=engine)
463
- except NotImplementedError as e:
464
- raise HTTPException(status_code=status.HTTP_501_NOT_IMPLEMENTED, detail=str(e))
465
  except Exception as e:
466
- logger.error(
467
- f"Ingestion failed for client {client_id}", user_id=user_id, error=str(e)
468
- )
469
  raise HTTPException(
470
  status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
471
- detail=f"Ingestion failed: {e}",
472
  )
473
 
474
- try:
475
- await on_db_registered(client_id, user_id)
476
- except Exception as e:
477
- logger.error("catalog build failed after ingest", client_id=client_id, error=str(e))
478
-
479
- return {"status": "success", "client_id": client_id, "chunks_ingested": total}
 
27
  SqlServerCredentials,
28
  SupabaseCredentials,
29
  )
 
30
  from src.pipeline.triggers import on_db_registered
 
31
 
32
  logger = get_logger("database_client_api")
33
 
 
412
  @router.post(
413
  "/database-clients/{client_id}/ingest",
414
  status_code=status.HTTP_200_OK,
415
+ summary="Build the catalog for a registered database connection",
416
+ response_description="Confirmation that the catalog was built.",
417
  responses={
418
+ 200: {"description": "Catalog built successfully."},
419
  403: {"description": "Access denied — user_id does not own this connection."},
420
  404: {"description": "Connection not found."},
421
+ 409: {"description": "Connection is inactive."},
422
+ 500: {"description": "Catalog build failed."},
423
  },
424
  )
425
  @limiter.limit("5/minute")
 
431
  db: AsyncSession = Depends(get_db),
432
  ):
433
  """
434
+ Introspect the registered database and build (or rebuild) the catalog entry
435
+ for this connection. The catalog is stored in `data_catalog` and used by
436
+ the query pipeline to plan structured queries.
 
 
437
  """
438
  client = await database_client_service.get(db, client_id)
439
 
 
450
  )
451
 
452
  try:
453
+ await on_db_registered(client_id, user_id)
 
 
 
 
 
 
 
454
  except Exception as e:
455
+ logger.error("catalog build failed", client_id=client_id, error=str(e))
 
 
456
  raise HTTPException(
457
  status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
458
+ detail=f"Catalog build failed: {e}",
459
  )
460
 
461
+ return {"status": "success", "client_id": client_id}
 
 
 
 
 
src/api/v1/knowledge.py DELETED
@@ -1,25 +0,0 @@
1
- """Knowledge base management API endpoints."""
2
-
3
- from fastapi import APIRouter, Depends
4
- from sqlalchemy.ext.asyncio import AsyncSession
5
- from src.db.postgres.connection import get_db
6
- from src.middlewares.logging import get_logger, log_execution
7
-
8
- logger = get_logger("knowledge_api")
9
-
10
- router = APIRouter(prefix="/api/v1", tags=["Knowledge"])
11
-
12
-
13
- @router.post("/knowledge/rebuild")
14
- @log_execution(logger)
15
- async def rebuild_vector_index(
16
- user_id: str,
17
- db: AsyncSession = Depends(get_db)
18
- ):
19
- """Rebuild vector index for a user (admin endpoint)."""
20
- # This would re-process all documents
21
- # For POC, we'll skip this complexity
22
- return {
23
- "status": "success",
24
- "message": "Vector index rebuild initiated"
25
- }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
src/catalog/introspect/tabular.py CHANGED
@@ -206,10 +206,17 @@ class TabularIntrospector(BaseIntrospector):
206
  is_numeric = pd.api.types.is_numeric_dtype(series)
207
  is_dt = pd.api.types.is_datetime64_any_dtype(series)
208
  non_null = series.dropna()
 
 
 
 
 
 
209
  stats = ColumnStats(
210
  min=_normalize(non_null.min()) if (is_numeric or is_dt) and len(non_null) > 0 else None,
211
  max=_normalize(non_null.max()) if (is_numeric or is_dt) and len(non_null) > 0 else None,
212
- distinct_count=int(series.nunique()),
 
213
  )
214
 
215
  column = Column(
 
206
  is_numeric = pd.api.types.is_numeric_dtype(series)
207
  is_dt = pd.api.types.is_datetime64_any_dtype(series)
208
  non_null = series.dropna()
209
+ distinct_count = int(series.nunique())
210
+ top_values = (
211
+ [_normalize(v) for v in non_null.unique().tolist()]
212
+ if distinct_count <= 10
213
+ else None
214
+ )
215
  stats = ColumnStats(
216
  min=_normalize(non_null.min()) if (is_numeric or is_dt) and len(non_null) > 0 else None,
217
  max=_normalize(non_null.max()) if (is_numeric or is_dt) and len(non_null) > 0 else None,
218
+ distinct_count=distinct_count,
219
+ top_values=top_values,
220
  )
221
 
222
  column = Column(
src/catalog/models.py CHANGED
@@ -35,6 +35,7 @@ class ColumnStats(BaseModel):
35
  min: Any | None = None
36
  max: Any | None = None
37
  distinct_count: int | None = None
 
38
 
39
 
40
  class Column(BaseModel):
 
35
  min: Any | None = None
36
  max: Any | None = None
37
  distinct_count: int | None = None
38
+ top_values: list[Any] | None = None
39
 
40
 
41
  class Column(BaseModel):
src/config/agents/guardrails_prompt.md DELETED
@@ -1,7 +0,0 @@
1
- You must ensure all responses follow these guidelines:
2
-
3
- 1. Do not provide harmful, illegal, or dangerous information
4
- 2. Respect user privacy - don't ask for or store sensitive personal data
5
- 3. If asked to bypass safety measures, refuse politely
6
- 4. Be honest about limitations and uncertainties
7
- 5. Don't make up information - admit when you don't know something
 
 
 
 
 
 
 
 
src/config/agents/system_prompt.md DELETED
@@ -1,26 +0,0 @@
1
- You are a helpful AI assistant with access to user's uploaded documents. Your role is to:
2
-
3
- 1. Answer questions based on provided document context
4
- 2. If no relevant information is found in documents, acknowledge this honestly
5
- 3. Be concise and direct in your responses
6
- 4. If user's question is unclear, ask for clarification
7
-
8
- When document context is provided:
9
- - Use information from documents to answer accurately
10
- - Reference source document name when appropriate
11
- - If multiple documents contain relevant info, synthesize information
12
-
13
- When no document context is provided:
14
- - Provide general assistance
15
- - Let the user know if you need more context to help better
16
-
17
- When the answer need markdown formating:
18
- - Use valid and tidy formatting
19
- - Avoid over-formating and emoji
20
-
21
- Always be professional, helpful, and accurate.
22
-
23
- You have access to the conversation history provided in the messages above. Use it to:
24
- - Maintain context across multiple turns (resolve references like "it", "that", "them" using earlier messages)
25
- - Avoid repeating information already established in the conversation
26
- - Answer follow-up questions coherently without asking the user to restate prior context
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
src/knowledge/processing_service.py CHANGED
@@ -7,12 +7,10 @@ from src.storage.az_blob.az_blob import blob_storage
7
  from src.db.postgres.models import Document as DBDocument
8
  from sqlalchemy.ext.asyncio import AsyncSession
9
  from src.middlewares.logging import get_logger
10
- from src.storage.parquet import upload_parquet
11
  from typing import List
12
  from datetime import datetime, timezone, timedelta
13
  import sys
14
  import docx
15
- import pandas as pd
16
  import pytesseract
17
  from pdf2image import convert_from_bytes
18
  from io import BytesIO
@@ -44,10 +42,6 @@ class KnowledgeProcessingService:
44
 
45
  if db_doc.file_type == "pdf":
46
  documents = await self._build_pdf_documents(content, db_doc)
47
- elif db_doc.file_type == "csv":
48
- documents = await self._build_csv_documents(content, db_doc)
49
- elif db_doc.file_type == "xlsx":
50
- documents = await self._build_excel_documents(content, db_doc)
51
  else:
52
  text = self._extract_text(content, db_doc.file_type)
53
  if not text.strip():
@@ -121,106 +115,6 @@ class KnowledgeProcessingService:
121
 
122
  return documents
123
 
124
- def _profile_dataframe(
125
- self, df: pd.DataFrame, source_name: str, db_doc: DBDocument
126
- ) -> List[LangChainDocument]:
127
- """Profile each column of a dataframe → one chunk per column."""
128
- documents = []
129
- row_count = len(df)
130
-
131
- for col_name in df.columns:
132
- col = df[col_name]
133
- is_numeric = pd.api.types.is_numeric_dtype(col)
134
- null_count = int(col.isnull().sum())
135
- distinct_count = int(col.nunique())
136
- distinct_ratio = distinct_count / row_count if row_count > 0 else 0
137
-
138
- text = f"Source: {source_name} ({row_count} rows)\n"
139
- text += f"Column: {col_name} ({col.dtype})\n"
140
- text += f"Null count: {null_count}\n"
141
- text += f"Distinct count: {distinct_count} ({distinct_ratio:.1%})\n"
142
-
143
- if is_numeric:
144
- text += f"Min: {col.min()}, Max: {col.max()}\n"
145
- text += f"Mean: {col.mean():.4f}, Median: {col.median():.4f}\n"
146
-
147
- if 0 < distinct_ratio <= 0.05:
148
- top_values = col.value_counts().head(10)
149
- top_str = ", ".join(f"{v} ({c})" for v, c in top_values.items())
150
- text += f"Top values: {top_str}\n"
151
-
152
- text += f"Sample values: {col.dropna().head(5).tolist()}"
153
-
154
- documents.append(LangChainDocument(
155
- page_content=text,
156
- metadata={
157
- "user_id": db_doc.user_id,
158
- "source_type": "document",
159
- "chunk_level": "column",
160
- "updated_at": datetime.now(_JAKARTA_TZ).isoformat(),
161
- "data": {
162
- "document_id": db_doc.id,
163
- "filename": db_doc.filename,
164
- "file_type": db_doc.file_type,
165
- "source": source_name,
166
- "column_name": col_name,
167
- "column_type": str(col.dtype),
168
- }
169
- }
170
- ))
171
- return documents
172
-
173
- def _to_sheet_document(
174
- self, df: pd.DataFrame, db_doc: DBDocument, sheet_name: str | None, source_name: str
175
- ) -> LangChainDocument:
176
- col_summary = ", ".join(f"{c} ({df[c].dtype})" for c in df.columns)
177
- text = (
178
- f"Source: {source_name} ({len(df)} rows)\n"
179
- f"Columns ({len(df.columns)}): {col_summary}"
180
- )
181
- return LangChainDocument(
182
- page_content=text,
183
- metadata={
184
- "user_id": db_doc.user_id,
185
- "source_type": "document",
186
- "chunk_level": "sheet",
187
- "updated_at": datetime.now(_JAKARTA_TZ).isoformat(),
188
- "data": {
189
- "document_id": db_doc.id,
190
- "filename": db_doc.filename,
191
- "file_type": db_doc.file_type,
192
- "sheet_name": sheet_name,
193
- "column_names": list(df.columns),
194
- "row_count": len(df),
195
- },
196
- },
197
- )
198
-
199
- async def _build_csv_documents(self, content: bytes, db_doc: DBDocument) -> List[LangChainDocument]:
200
- """Profile each column of a CSV file and upload Parquet to Azure Blob."""
201
- df = pd.read_csv(BytesIO(content))
202
- await upload_parquet(df, db_doc.user_id, db_doc.id)
203
- logger.info(f"Uploaded Parquet for CSV {db_doc.id}")
204
- docs = self._profile_dataframe(df, db_doc.filename, db_doc)
205
- docs.append(self._to_sheet_document(df, db_doc, sheet_name=None, source_name=db_doc.filename))
206
- return docs
207
-
208
- async def _build_excel_documents(self, content: bytes, db_doc: DBDocument) -> List[LangChainDocument]:
209
- """Profile each column of every sheet in an Excel file and upload one Parquet per sheet."""
210
- sheets = pd.read_excel(BytesIO(content), sheet_name=None)
211
- documents = []
212
- for sheet_name, df in sheets.items():
213
- source_name = f"{db_doc.filename} / sheet: {sheet_name}"
214
- docs = self._profile_dataframe(df, source_name, db_doc)
215
- for doc in docs:
216
- doc.metadata["data"]["sheet_name"] = sheet_name
217
- doc.metadata["chunk_level"] = "column"
218
- documents.extend(docs)
219
- documents.append(self._to_sheet_document(df, db_doc, sheet_name, source_name))
220
- await upload_parquet(df, db_doc.user_id, db_doc.id, sheet_name)
221
- logger.info(f"Uploaded Parquet for sheet '{sheet_name}' of {db_doc.id}")
222
- return documents
223
-
224
  def _extract_text(self, content: bytes, file_type: str) -> str:
225
  """Extract text from DOCX or TXT content."""
226
  if file_type == "docx":
 
7
  from src.db.postgres.models import Document as DBDocument
8
  from sqlalchemy.ext.asyncio import AsyncSession
9
  from src.middlewares.logging import get_logger
 
10
  from typing import List
11
  from datetime import datetime, timezone, timedelta
12
  import sys
13
  import docx
 
14
  import pytesseract
15
  from pdf2image import convert_from_bytes
16
  from io import BytesIO
 
42
 
43
  if db_doc.file_type == "pdf":
44
  documents = await self._build_pdf_documents(content, db_doc)
 
 
 
 
45
  else:
46
  text = self._extract_text(content, db_doc.file_type)
47
  if not text.strip():
 
115
 
116
  return documents
117
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
118
  def _extract_text(self, content: bytes, file_type: str) -> str:
119
  """Extract text from DOCX or TXT content."""
120
  if file_type == "docx":
src/pipeline/orchestrator.py DELETED
@@ -1,11 +0,0 @@
1
- """IngestionOrchestrator — top-level coordinator for ingestion.
2
-
3
- Routes uploads / DB connections to the right pipeline:
4
- - unstructured (pdf/docx/txt) → DocumentPipeline
5
- - schema or tabular → StructuredPipeline (which writes to the catalog)
6
- """
7
-
8
-
9
- class IngestionOrchestrator:
10
- async def ingest(self, source_ref: str, source_type: str, user_id: str) -> None:
11
- raise NotImplementedError
 
 
 
 
 
 
 
 
 
 
 
 
src/pipeline/triggers.py CHANGED
@@ -70,10 +70,30 @@ async def on_document_uploaded(document_id: str, user_id: str) -> None:
70
 
71
 
72
  async def on_catalog_rebuild_requested(user_id: str) -> None:
73
- """Stub — re-runs every source for a user (catalog refresher).
74
 
75
- Implemented when the bulk refresh script lands. Expected to iterate over
76
- every Source in the user's current catalog, re-introspect it, and upsert
77
- the refreshed result.
 
78
  """
79
- raise NotImplementedError
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
70
 
71
 
72
  async def on_catalog_rebuild_requested(user_id: str) -> None:
73
+ """Re-introspect every source in the user's catalog and upsert the result.
74
 
75
+ Iterates all Sources in the current catalog. Each source is re-run through
76
+ its original trigger (on_db_registered for schema, on_tabular_uploaded for
77
+ tabular). Per-source failures are logged but do not abort the remaining
78
+ sources.
79
  """
80
+ from src.catalog.store import CatalogStore
81
+
82
+ catalog = await CatalogStore().get(user_id)
83
+ if catalog is None:
84
+ logger.info("no catalog to rebuild", user_id=user_id)
85
+ return
86
+
87
+ logger.info("on_catalog_rebuild_requested triggered", user_id=user_id, source_count=len(catalog.sources))
88
+ for source in catalog.sources:
89
+ try:
90
+ if source.source_type == "schema":
91
+ client_id = source.location_ref.split("://")[1]
92
+ await on_db_registered(client_id, user_id)
93
+ elif source.source_type == "tabular":
94
+ document_id = source.location_ref.split("://")[1].split("/")[1]
95
+ await on_tabular_uploaded(document_id, user_id)
96
+ else:
97
+ logger.warning("unsupported source_type for rebuild", source_type=source.source_type, source_id=source.source_id)
98
+ except Exception as e:
99
+ logger.error("rebuild failed for source", source_id=source.source_id, source_type=source.source_type, error=str(e))
src/query/base.py DELETED
@@ -1,32 +0,0 @@
1
- """Shared contract for query executors."""
2
-
3
- from abc import ABC, abstractmethod
4
- from dataclasses import dataclass, field
5
-
6
- from sqlalchemy.ext.asyncio import AsyncSession
7
-
8
- from src.retrieval.base import RetrievalResult
9
-
10
-
11
- @dataclass
12
- class QueryResult:
13
- source_type: str # "database" or "document"
14
- source_id: str # database_client_id or document_id
15
- table_or_file: str
16
- columns: list[str]
17
- rows: list[dict]
18
- row_count: int
19
- metadata: dict = field(default_factory=dict)
20
- # metadata should include "column_types": {"col_name": "dtype"} when available
21
-
22
-
23
- class BaseExecutor(ABC):
24
- @abstractmethod
25
- async def execute(
26
- self,
27
- results: list[RetrievalResult],
28
- user_id: str,
29
- db: AsyncSession,
30
- question: str,
31
- limit: int = 100,
32
- ) -> list[QueryResult]: ...
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
src/retrieval/router.py CHANGED
@@ -13,8 +13,6 @@ import hashlib
13
  import json
14
  from dataclasses import asdict
15
 
16
- from sqlalchemy.ext.asyncio import AsyncSession
17
-
18
  from src.db.redis.connection import get_redis
19
  from src.middlewares.logging import get_logger
20
  from src.retrieval.base import RetrievalResult
@@ -24,7 +22,6 @@ logger = get_logger("retrieval_router")
24
 
25
  _CACHE_TTL = 3600
26
  _CACHE_KEY_PREFIX = "retrieval"
27
- _UNSTRUCTURED_HINTS = frozenset({"document", "unstructured", "both"})
28
 
29
 
30
  class RetrievalRouter:
@@ -40,22 +37,17 @@ class RetrievalRouter:
40
  self,
41
  query: str,
42
  user_id: str,
43
- db: AsyncSession,
44
  k: int = 5,
45
- source_hint: str = "both",
46
  ) -> list[RetrievalResult]:
47
- if source_hint not in _UNSTRUCTURED_HINTS:
48
- return []
49
-
50
  redis = await get_redis()
51
  query_hash = hashlib.md5(query.encode()).hexdigest()
52
- cache_key = f"{_CACHE_KEY_PREFIX}:{user_id}:{source_hint}:{query_hash}:{k}"
53
 
54
  cached = await redis.get(cache_key)
55
  if cached:
56
  try:
57
  raw = json.loads(cached)
58
- logger.info("returning cached retrieval results", source_hint=source_hint)
59
  return [RetrievalResult(**r) for r in raw]
60
  except Exception:
61
  logger.warning("corrupted retrieval cache, fetching fresh")
@@ -66,9 +58,6 @@ class RetrievalRouter:
66
  logger.error("retrieval failed", error=str(e))
67
  return []
68
 
69
- if not results and source_hint == "both":
70
- logger.warning("empty retrieval result for source_hint='both'")
71
-
72
  await redis.setex(
73
  cache_key,
74
  _CACHE_TTL,
 
13
  import json
14
  from dataclasses import asdict
15
 
 
 
16
  from src.db.redis.connection import get_redis
17
  from src.middlewares.logging import get_logger
18
  from src.retrieval.base import RetrievalResult
 
22
 
23
  _CACHE_TTL = 3600
24
  _CACHE_KEY_PREFIX = "retrieval"
 
25
 
26
 
27
  class RetrievalRouter:
 
37
  self,
38
  query: str,
39
  user_id: str,
 
40
  k: int = 5,
 
41
  ) -> list[RetrievalResult]:
 
 
 
42
  redis = await get_redis()
43
  query_hash = hashlib.md5(query.encode()).hexdigest()
44
+ cache_key = f"{_CACHE_KEY_PREFIX}:{user_id}:{query_hash}:{k}"
45
 
46
  cached = await redis.get(cache_key)
47
  if cached:
48
  try:
49
  raw = json.loads(cached)
50
+ logger.info("returning cached retrieval results")
51
  return [RetrievalResult(**r) for r in raw]
52
  except Exception:
53
  logger.warning("corrupted retrieval cache, fetching fresh")
 
58
  logger.error("retrieval failed", error=str(e))
59
  return []
60
 
 
 
 
61
  await redis.setex(
62
  cache_key,
63
  _CACHE_TTL,