personabot-api / app /pipeline /nodes /rewrite_query.py
GitHub Actions
Deploy d8ad462
e7c9ee6
"""
backend/app/pipeline/nodes/rewrite_query.py
CRAG (Corrective RAG) query rewriter β€” fires exactly once per request when:
1. The first retrieval attempt returned no chunks above the relevance threshold.
2. The query contains at least one meaningful non-stop-word token.
Calls Gemini Flash (temp 0.7) to produce one alternative phrasing that preserves
the visitor's intent but uses different vocabulary. The pipeline then runs Retrieve
and Rerank a second time with this new query. There is exactly one retry β€” the
graph routing enforces this via the retrieval_attempts counter in state.
"""
from __future__ import annotations
import logging
from typing import Any
from app.models.pipeline import PipelineState
from app.services.gemini_client import GeminiClient
logger = logging.getLogger(__name__)
_REWRITE_PROMPT = """\
A search query failed to find relevant results in a portfolio knowledge base about Darshan Chheda.
The knowledge base contains his blog posts, project descriptions, CV/resume, and GitHub README files.
Original query: {query}
Rephrase this query using different vocabulary that might better match how the content is written.
Strategies: expand abbreviations, use synonyms, reframe as "did Darshan..." if the query uses a name/tech.
Output ONLY the rewritten query β€” one sentence, no explanation, no quotes.
"""
# Same stop-word set as generate.py β€” keeps modules consistent.
_STOP_WORDS = frozenset({
"a", "an", "the", "is", "are", "was", "were", "be", "been", "being",
"have", "has", "had", "do", "does", "did", "will", "would", "could",
"should", "may", "might", "can", "to", "of", "in", "on", "for",
"with", "at", "by", "from", "and", "or", "but", "not", "what",
"who", "how", "why", "when", "where", "tell", "me", "about", "his",
"he", "him", "any", "some", "that", "this", "it", "its",
})
def _has_meaningful_token(query: str) -> bool:
"""True when the query has at least one non-stop-word token of length >= 3."""
return any(
w not in _STOP_WORDS and len(w) >= 3
for w in __import__("re").findall(r"[a-z]+", query.lower())
)
def make_rewrite_query_node(gemini_client: GeminiClient) -> Any:
async def rewrite_query_node(state: PipelineState) -> dict:
query = state["query"]
logger.info("CRAG: rewriting failed query %r", query)
if not gemini_client.is_configured:
# No Gemini β€” pass query through unchanged; second retrieve will also fail
# and generate will handle the not-found path gracefully.
logger.debug("Gemini not configured; skipping query rewrite.")
return {
"rewritten_query": query,
"retrieval_attempts": state.get("retrieval_attempts", 1) + 1,
"query_embedding": None, # Force re-embed so retrieve doesn't use stale embedding
}
try:
response = await gemini_client._client.aio.models.generate_content(
model=gemini_client._model,
contents=_REWRITE_PROMPT.format(query=query),
config={"temperature": 0.7},
)
rewritten = (response.text or query).strip().strip('"').strip("'")
except Exception as exc:
logger.warning("Query rewrite Gemini call failed (%s); using original.", exc)
rewritten = query
if not rewritten or rewritten == query:
logger.debug("Rewrite produced no change; using original query.")
rewritten = query
else:
logger.info("CRAG rewrite: %r β†’ %r", query, rewritten)
# Clearing query_embedding forces the retrieve node to re-embed the new query.
# retrieval_attempts is incremented so the graph does not loop again after
# this second retrieval attempt.
return {
"query": rewritten,
"rewritten_query": rewritten,
"retrieval_attempts": state.get("retrieval_attempts", 1) + 1,
"query_embedding": None,
}
return rewrite_query_node