GodSpeed / agent /README.md
Ananth Shyam
feat: implement NL-to-SQL agent with PostgreSQL integration and enhance related documentation
825e852

Enterprise Knowledge Copilot β€” Agent Module

LangGraph-based multi-agent RAG system with Gemini, Qdrant, BGE-M3, and streaming SSE.

Architecture

POST /agent/query
       β”‚
       β–Ό
  planner_node  (Gemini 2.5 Pro)
       β”‚  ExecutionPlan
       β–Ό
 β”Œβ”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
 β”‚                β”‚              β”‚    (parallel)
doc_search    ticket_lookup  sql_query
 β”‚    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
 β”‚  live_docs   (conditional)
 β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
       β”‚
    join_node   (fan-in)
       β”‚
  synthesiser_node  (Gemini 2.5 Pro, streaming)
       β”‚
  guardrail_node   (Gemini 2.5 Flash)
       β”‚
    done / escalate

Two-level orchestration

  1. Planner (Level 1): Gemini analyses the query and returns a structured ExecutionPlan β€” which agents to run and which can be parallelised.
  2. LangGraph (Level 2): Executes the plan, running independent nodes concurrently via asyncio.

Parallelism rules

  • doc_search and ticket_lookup always run in parallel when both are needed.
  • sql_query runs in parallel with other agents when the query is about structured/aggregated data.
  • live_docs runs after doc_search only if confidence is low OR the query names an external library.
  • Each agent node calls exactly one tool. No agent calls two tools.

Confidence gating

After BGE reranker scoring:

  • β‰₯ 0.6 β†’ high
  • 0.4–0.6 β†’ medium
  • < 0.4 β†’ low

The synthesiser adjusts its tone and the guardrail applies stricter escalation at low confidence.

Setup

# 1. Install dependencies
pip install -r requirements.txt

# 2. Copy env file and fill in keys
cp .env.example .env
# Set at minimum: GOOGLE_API_KEY

# 3. Start Qdrant locally
docker run -p 6333:6333 qdrant/qdrant

# 4. Run the API
uvicorn main:app --reload

Your main.py should include:

from fastapi import FastAPI
from agent.api import router

app = FastAPI()
app.include_router(router)

Environment variables

Variable Required Description
GOOGLE_API_KEY βœ… Google AI Studio key
QDRANT_HOST optional Default: localhost
QDRANT_PORT optional Default: 6333
JIRA_BASE_URL optional Enables ticket_lookup
JIRA_API_TOKEN optional Enables ticket_lookup
FIRECRAWL_API_KEY optional Enables live_docs
TAVILY_API_KEY optional Enables live_docs
DATABASE_URL optional Direct PostgreSQL URL β€” enables sql_query. e.g. postgresql://postgres:pw@localhost:5432/postgres
SQL_MAX_ROWS optional Max rows returned per SQL query (default: 20)

BM25 index

doc_search expects a BM25 index at data/bm25_index.pkl as a pickle with:

{
  "index": BM25Okapi(...),
  "corpus": ["doc text 1", "doc text 2", ...],
  "doc_ids": ["chunk_id_1", "chunk_id_2", ...]
}

If the file is missing, BM25 is silently skipped and only Qdrant vectors are used.

Qdrant collection schema

Collection name: knowledge_base

dense vector:  name="dense",  size=1024
sparse vector: name="sparse"
payload:       chunk_id, text, source, source_type, team_id

Data is filtered by team_id on every query β€” teams see only their own documents.

Adding a new agent

  1. Add a new tool in tools/my_tool.py with async def run_my_tool(query, team_id) -> list[RetrievedChunk].
  2. Add "my_tool" to the Literal in models.py β†’ AgentTask.agent.
  3. Add a node function in graph.py:
async def my_tool_node(state: KnowledgeGraphState) -> dict:
    await _push_event(queue, "agent_started", {"agent": "my_tool"})
    chunks = await run_my_tool(task_input, state.query_input.team_id)
    ...
  1. Register the node and wire its edges in build_graph().
  2. Update PLANNER_SYSTEM_PROMPT in prompts.py to describe when to use the new agent.

SSE event stream

Events emitted in order:

Event Payload
plan_ready {tasks, reasoning}
agent_started {agent} β€” agent names: doc_search, ticket_lookup, live_docs, sql_query
agent_done {agent, retrieval_confidence}
synthesis_started {}
answer_chunk {chunk} (one per token)
guardrail_result {score, escalate}
done {}
error {message}