Spaces:
Sleeping
Sleeping
Ananth Shyam
feat: implement NL-to-SQL agent with PostgreSQL integration and enhance related documentation
825e852 Enterprise Knowledge Copilot β Agent Module
LangGraph-based multi-agent RAG system with Gemini, Qdrant, BGE-M3, and streaming SSE.
Architecture
POST /agent/query
β
βΌ
planner_node (Gemini 2.5 Pro)
β ExecutionPlan
βΌ
βββββββ΄βββββββββββ¬βββββββββββββββ
β β β (parallel)
doc_search ticket_lookup sql_query
β βββββββββββββββββββββββββββ
β live_docs (conditional)
ββββββββββββββββ
β
join_node (fan-in)
β
synthesiser_node (Gemini 2.5 Pro, streaming)
β
guardrail_node (Gemini 2.5 Flash)
β
done / escalate
Two-level orchestration
- Planner (Level 1): Gemini analyses the query and returns a structured
ExecutionPlanβ which agents to run and which can be parallelised. - LangGraph (Level 2): Executes the plan, running independent nodes concurrently via
asyncio.
Parallelism rules
doc_searchandticket_lookupalways run in parallel when both are needed.sql_queryruns in parallel with other agents when the query is about structured/aggregated data.live_docsruns afterdoc_searchonly if confidence is low OR the query names an external library.- Each agent node calls exactly one tool. No agent calls two tools.
Confidence gating
After BGE reranker scoring:
β₯ 0.6βhigh0.4β0.6βmedium< 0.4βlow
The synthesiser adjusts its tone and the guardrail applies stricter escalation at low confidence.
Setup
# 1. Install dependencies
pip install -r requirements.txt
# 2. Copy env file and fill in keys
cp .env.example .env
# Set at minimum: GOOGLE_API_KEY
# 3. Start Qdrant locally
docker run -p 6333:6333 qdrant/qdrant
# 4. Run the API
uvicorn main:app --reload
Your main.py should include:
from fastapi import FastAPI
from agent.api import router
app = FastAPI()
app.include_router(router)
Environment variables
| Variable | Required | Description |
|---|---|---|
GOOGLE_API_KEY |
β | Google AI Studio key |
QDRANT_HOST |
optional | Default: localhost |
QDRANT_PORT |
optional | Default: 6333 |
JIRA_BASE_URL |
optional | Enables ticket_lookup |
JIRA_API_TOKEN |
optional | Enables ticket_lookup |
FIRECRAWL_API_KEY |
optional | Enables live_docs |
TAVILY_API_KEY |
optional | Enables live_docs |
DATABASE_URL |
optional | Direct PostgreSQL URL β enables sql_query. e.g. postgresql://postgres:pw@localhost:5432/postgres |
SQL_MAX_ROWS |
optional | Max rows returned per SQL query (default: 20) |
BM25 index
doc_search expects a BM25 index at data/bm25_index.pkl as a pickle with:
{
"index": BM25Okapi(...),
"corpus": ["doc text 1", "doc text 2", ...],
"doc_ids": ["chunk_id_1", "chunk_id_2", ...]
}
If the file is missing, BM25 is silently skipped and only Qdrant vectors are used.
Qdrant collection schema
Collection name: knowledge_base
dense vector: name="dense", size=1024
sparse vector: name="sparse"
payload: chunk_id, text, source, source_type, team_id
Data is filtered by team_id on every query β teams see only their own documents.
Adding a new agent
- Add a new tool in
tools/my_tool.pywithasync def run_my_tool(query, team_id) -> list[RetrievedChunk]. - Add
"my_tool"to theLiteralinmodels.py β AgentTask.agent. - Add a node function in
graph.py:
async def my_tool_node(state: KnowledgeGraphState) -> dict:
await _push_event(queue, "agent_started", {"agent": "my_tool"})
chunks = await run_my_tool(task_input, state.query_input.team_id)
...
- Register the node and wire its edges in
build_graph(). - Update
PLANNER_SYSTEM_PROMPTinprompts.pyto describe when to use the new agent.
SSE event stream
Events emitted in order:
| Event | Payload |
|---|---|
plan_ready |
{tasks, reasoning} |
agent_started |
{agent} β agent names: doc_search, ticket_lookup, live_docs, sql_query |
agent_done |
{agent, retrieval_confidence} |
synthesis_started |
{} |
answer_chunk |
{chunk} (one per token) |
guardrail_result |
{score, escalate} |
done |
{} |
error |
{message} |