personabot-api / app /pipeline /nodes /retrieve.py
GitHub Actions
Deploy 8e14626
8da917e
import asyncio
import logging
import re
from typing import Callable
from langgraph.config import get_stream_writer
logger = logging.getLogger(__name__)
from app.models.pipeline import PipelineState, Chunk
from app.services.vector_store import VectorStore
from app.services.embedder import Embedder
from app.services.reranker import Reranker
from app.services.sparse_encoder import SparseEncoder
# Cross-encoder ms-marco-MiniLM-L-6-v2 returns raw logits (not sigmoid).
# Highly relevant docs score 0–15; completely off-topic score below –5.
# Biographical/skill queries ("Has Darshan used Java?") score lower than
# factual QA pairs even when the passage IS the correct answer, because
# ms-marco is trained on document-passage pairs not person-biography pairs.
# –3.5 is the empirical floor below which retrieved chunks are genuinely
# unrelated (noise), while –3.5 to –1.0 still captures valid skill/project
# passages that answer tech-stack or experience questions.
_MIN_TOP_SCORE: float = -3.5
_MIN_RESCUE_SCORE: float = -6.0
# Default cap: max chunks per source document for BROAD queries.
# Without this, a verbose doc can crowd out all 5 context slots, hiding other
# relevant sources and making the answer look one-dimensional.
_MAX_CHUNKS_PER_DOC_BROAD: int = 2
# For FOCUSED queries, the matching source is allowed more depth (4 slots)
# while all other sources are capped at 1. This prevents the cap from
# removing the 3rd-most-relevant resume section on an experience question.
_MAX_CHUNKS_PER_DOC_FOCUSED: int = 4
_MAX_CHUNKS_OTHER_FOCUSED: int = 1
# Document-graph sibling expansion β€” after RRF fusion, fetch additional chunks
# from the same source documents as the top-N results.
_SIBLING_EXPAND_TOP_N: int = 10 # rank depth to consider for expansion
_SIBLING_FETCH_LIMIT: int = 20 # max chunks fetched via Qdrant doc_id query
_SIBLING_TOTAL_CAP: int = 15 # max new chunks to inject before reranker
# Keywords that imply the visitor wants depth from a specific source type.
# Values are the source_type values set by ingest (ChunkMetadata.source_type).
_FOCUS_KEYWORDS: dict[frozenset[str], str] = {
frozenset({"experience", "work", "job", "role", "career", "internship",
"skills", "skill", "education", "degree", "university",
"certification", "certifications", "qualified", "resume", "cv",
"employment", "professional", "placement", "history",
"tech", "stack", "technology", "technologies", "framework",
"frameworks", "tool", "tools", "tooling", "language", "languages"}): "cv",
frozenset({"project", "built", "build", "developed", "architecture",
"system", "platform", "app", "application"}): "project",
frozenset({"blog", "post", "article", "wrote", "writing", "published"}): "blog",
frozenset({"github", "repo", "repos", "repository", "repositories", "readme"}): "github_readme",
}
# RRF rank fusion constant. k=60 is the original Cormack et al. default.
# Higher k reduces the influence of top-1 rank advantage.
_RRF_K: int = 60
# Module-level singleton β€” BM25 model downloads once (~5 MB), cached in memory.
_sparse_encoder = SparseEncoder()
_CAPABILITY_QUERY_HINTS: frozenset[str] = frozenset(
{
"tech",
"stack",
"technology",
"technologies",
"framework",
"frameworks",
"tool",
"tools",
"tooling",
"language",
"languages",
"skills",
"skill",
}
)
_NORMALISATION_STOPWORDS: frozenset[str] = frozenset(
{
"tell",
"about",
"what",
"which",
"where",
"when",
"could",
"would",
"should",
"your",
"with",
"from",
"that",
"this",
}
)
def _focused_source_type(query: str) -> str | None:
"""
Return the source_type that the query is focused on, or None for broad queries.
A query is focused when it contains at least one keyword that strongly implies
a specific content source (resume, project pages, blog posts). Broad queries
that don't match any category retain the 2-per-doc default cap so no single
source dominates the 5 context slots.
"""
tokens = frozenset(re.findall(r"[a-z0-9]+", query.lower()))
for keyword_set, source_type in _FOCUS_KEYWORDS.items():
if tokens & keyword_set:
return source_type
return None
def _rrf_merge(ranked_lists: list[tuple[float, list[Chunk]]]) -> list[Chunk]:
"""
Reciprocal Rank Fusion across multiple ranked chunk lists.
Accepts a list of tuples: (list_weight, chunk_list).
Score formula: Ξ£ weight * (1 / (rank + 1 + k)) over all lists.
Deduplication by doc_id::section fingerprint.
"""
scores: dict[str, float] = {}
chunks_by_fp: dict[str, Chunk] = {}
for weight, ranked in ranked_lists:
seen_in_list: set[str] = set()
for rank, chunk in enumerate(ranked):
fp = f"{chunk['metadata']['doc_id']}::{chunk['metadata']['section']}"
if fp in seen_in_list:
continue
seen_in_list.add(fp)
scores[fp] = scores.get(fp, 0.0) + weight * (1.0 / (rank + 1 + _RRF_K))
chunks_by_fp[fp] = chunk
sorted_fps = sorted(scores, key=lambda x: scores[x], reverse=True)
return [chunks_by_fp[fp] for fp in sorted_fps]
_TYPE_REMAP: dict[str, str] = {
"github": "readme",
"github_readme": "readme", # RC-6: ingestion uses "github_readme"; map to display label
"bio": "resume",
"cv": "resume",
"blog": "blog",
"project": "project",
"resume": "resume", # RC-3: explicit pass-through so resume chunks aren't "unknown"
}
_FOCUS_VOCAB: frozenset[str] = frozenset(
{
keyword
for keys in _FOCUS_KEYWORDS.keys()
for keyword in keys
if " " not in keyword
}
)
def _edit_distance(a: str, b: str) -> int:
la, lb = len(a), len(b)
dp = list(range(lb + 1))
for i in range(1, la + 1):
prev = dp[0]
dp[0] = i
for j in range(1, lb + 1):
cur = dp[j]
cost = 0 if a[i - 1] == b[j - 1] else 1
dp[j] = min(dp[j] + 1, dp[j - 1] + 1, prev + cost)
prev = cur
return dp[lb]
def _best_focus_replacement(token: str) -> str | None:
best = None
best_score = 99
for candidate in _FOCUS_VOCAB:
if token[0] != candidate[0]:
continue
if abs(len(token) - len(candidate)) > 1:
continue
score = _edit_distance(token, candidate)
if score <= 2 and score < best_score:
best_score = score
best = candidate
return best
def _normalise_focus_typos(query: str) -> str:
"""
Correct minor STT typos for intent words used by focused retrieval.
Example: "walk experience" -> "work experience".
"""
tokens = query.lower().split()
if not tokens:
return query
corrected: list[str] = []
for token in tokens:
stripped = token.strip(".,!?;:\"'()[]{}")
if len(stripped) < 4 or stripped in _FOCUS_VOCAB:
corrected.append(token)
continue
if stripped in _NORMALISATION_STOPWORDS:
corrected.append(token)
continue
replacement = _best_focus_replacement(stripped)
if replacement:
corrected.append(token.replace(stripped, replacement))
else:
corrected.append(token)
return " ".join(corrected)
def _is_capability_query(query: str) -> bool:
tokens = frozenset(re.findall(r"[a-z0-9]+", query.lower()))
return bool(tokens & _CAPABILITY_QUERY_HINTS)
def make_retrieve_node(
vector_store: VectorStore, embedder: Embedder, reranker: Reranker
) -> Callable[[PipelineState], dict]:
async def retrieve_node(state: PipelineState) -> dict:
writer = get_stream_writer()
attempts = state.get("retrieval_attempts", 0)
query = state["query"]
# Stage 2: use the self-contained decontextualized rewrite for embedding
# when one was produced. "Tell me more about that ML project" has terrible
# cosine similarity against "PersonaBot RAG pipeline" passages; the rewrite
# "What ML projects has Darshan built?" dramatically improves recall.
retrieval_query = state.get("decontextualized_query") or query
retrieval_query = _normalise_focus_typos(retrieval_query)
# Reuse the topic computed by the guard node β€” no recomputation needed.
topic = state.get("query_topic") or ""
searching_label = (
f"Searching portfolio for {topic}..."
if topic
else "Searching portfolio..."
)
writer({"type": "status", "label": searching_label})
# On a CRAG retry (attempts >= 1) the query has been rewritten and
# query_embedding is explicitly set to None β€” always re-embed.
# On the first attempt, reuse the embedding computed by the cache node.
cached_embedding: list[float] | None = state.get("query_embedding")
if attempts >= 1:
# Second attempt: re-embed the rewritten query with is_query=True.
cached_embedding = None
expanded = [retrieval_query] # gemini_fast may fill expanded_queries on first attempt
if attempts == 0:
expanded = state.get("expanded_queries") or [retrieval_query]
# Ensure decontextualized form is the primary search query if present.
if retrieval_query != query and retrieval_query not in expanded:
expanded = [retrieval_query] + expanded
# Bug 4: cap total dense searches to avoid Qdrant rate limits.
# Original + up to 3 semantic expansions = 4 maximum.
expanded = expanded[:4]
# Embed all query variants in one batched call (is_query=True for asymmetric BGE).
if cached_embedding is not None and len(expanded) == 1:
query_vectors = [cached_embedding]
else:
query_vectors = await embedder.embed(expanded, is_query=True)
# ── Dense search (all query variants) ─────────────────────────────────
# No chunk_type filter: raptor_summary and question_proxy nodes participate
# so we can expand them to real leaves below.
dense_results: list[list[Chunk]] = []
for vec in query_vectors:
chunks = vector_store.search(query_vector=vec, top_k=15)
dense_results.append(chunks)
# ── Split dense hits into leaf candidates and navigation nodes ─────────
# raptor_summary and question_proxy are navigation-only; they are expanded
# to their real leaf pages via Qdrant point UUID lookups.
leaf_candidates: list[Chunk] = []
leaf_fps_seen: set[str] = set()
nav_expansion_ids: set[str] = set()
for hit_list in dense_results:
for chunk in hit_list:
ct = chunk["metadata"].get("chunk_type", "leaf")
if ct == "leaf":
fp = f"{chunk['metadata']['doc_id']}::{chunk['metadata']['section']}"
if fp not in leaf_fps_seen:
leaf_fps_seen.add(fp)
leaf_candidates.append(chunk)
elif ct == "raptor_summary":
for uid in (chunk["metadata"].get("child_leaf_ids") or []):
nav_expansion_ids.add(uid)
elif ct == "question_proxy":
uid = chunk["metadata"].get("parent_leaf_id", "")
if uid:
nav_expansion_ids.add(uid)
# Expand nav nodes to their leaf pages in a single Qdrant retrieve call.
if nav_expansion_ids:
expanded_leaves = vector_store.fetch_by_point_ids(list(nav_expansion_ids))
for leaf in expanded_leaves:
fp = f"{leaf['metadata']['doc_id']}::{leaf['metadata']['section']}"
if fp not in leaf_fps_seen:
leaf_fps_seen.add(fp)
leaf_candidates.append(leaf)
logger.debug("UUID expansion: +%d leaves from %d nav node UUIDs.",
len(expanded_leaves), len(nav_expansion_ids))
# ── Query Normalization & Alias Generation ─────────────────────────────
# If the user asks for "xsilica", generate "x silica" and "x-silica".
# If they ask for "x silica", generate "xsilica" and "x-silica".
from app.core.topic import _STOPWORDS
canonical_forms: list[str] = state.get("query_canonical_forms") or []
normalized_forms: set[str] = set(canonical_forms)
# Base normalization: stripping out simple hyphens/spaces for exact matches
tokens = retrieval_query.lower().split()
for i in range(len(tokens) - 1):
if tokens[i] not in _STOPWORDS and tokens[i+1] not in _STOPWORDS:
collapsed = tokens[i] + tokens[i+1]
hyphened = tokens[i] + "-" + tokens[i+1]
normalized_forms.add(collapsed)
normalized_forms.add(hyphened)
# Separator split (x-silica -> x silica, xsilica)
if "-" in retrieval_query:
normalized_forms.add(retrieval_query.replace("-", ""))
normalized_forms.add(retrieval_query.replace("-", " "))
# ── Exact Keyword Filter Search (Database hit) ─────────────────────────
# Runs a MatchAny query on Qdrant's `keywords` payload payload.
keyword_results: list[Chunk] = []
extracted_keywords = []
for word in retrieval_query.lower().split():
extracted_keywords.append(word)
for norm in normalized_forms:
extracted_keywords.append(norm)
# Only query strong >= 4 char keywords to avoid noise matching
strong_keywords = [k for k in extracted_keywords if len(k) >= 4 and k not in _STOPWORDS]
if strong_keywords:
keyword_results = vector_store.keyword_filter_search(strong_keywords, top_k=15)
# ── Sparse (BM25) search (primary query only) ─────────────────────────────
sparse_results: list[Chunk] = []
if _sparse_encoder.available:
bm25_query = " ".join([retrieval_query] + list(normalized_forms)).lower()
indices, values = _sparse_encoder.encode_one(bm25_query)
sparse_results = vector_store.search_sparse(indices, values, top_k=20)
# ── Reciprocal Rank Fusion ─────────────────────────────────────────────
# Merge dense (per variant) + sparse + keyword into one ranked list.
# Dynamic Weighting: Explicit keyword entity matches get a 1.5x boost
# over semantic proximity in the RRF formula.
all_ranked_lists: list[tuple[float, list[Chunk]]] = []
for dense_res in dense_results:
all_ranked_lists.append((1.0, dense_res))
if sparse_results:
all_ranked_lists.append((1.0, sparse_results))
if keyword_results:
all_ranked_lists.append((1.5, keyword_results))
fused: list[Chunk] = _rrf_merge(all_ranked_lists)
# ── Reading events β€” one per unique source document ────────────────────
# Emitted BEFORE deduplication so the user sees sources appear in
# real time as Qdrant returns them, matching Perplexity's "Reading..."
# display. Deduplication here is by source_url so blog posts with
# multiple chunk hits fire only one event.
seen_urls: set[str] = set()
for chunk in fused:
meta = chunk["metadata"]
url = meta.get("source_url") or ""
dedup_key = url if url else meta.get("doc_id", "")
if dedup_key and dedup_key not in seen_urls:
seen_urls.add(dedup_key)
writer({
"type": "reading",
"title": meta.get("source_title", ""),
"url": url or None,
"source_type": _TYPE_REMAP.get(meta.get("source_type", ""), meta.get("source_type", "")),
})
# ── Deduplication (question-proxy collapse) ────────────────────────────
# After RRF, fused is already leaf-only (dense expansion + sparse + keyword
# are all guarded to leaf). Dedup by doc_id::section fingerprint so a chunk
# that appeared in multiple ranked lists is counted once.
seen: set[str] = set()
unique_chunks: list[Chunk] = []
for c in fused:
fp = f"{c['metadata']['doc_id']}::{c['metadata']['section']}"
if fp not in seen:
seen.add(fp)
unique_chunks.append(c)
writer({
"type": "status",
"label": f"Comparing {len(unique_chunks)} sources for relevance...",
})
# ── Document-graph sibling expansion ───────────────────────────────────────
# For the top _SIBLING_EXPAND_TOP_N chunks by RRF rank, fetch neighbouring
# chunks from the same source document via doc_id filter (no vector needed).
# If chunk 4 of a blog post matched, chunks 1-3 and 5-6 are now candidates too.
# This is the document-graph connectivity layer: doc_id is the edge linking chunks.
if unique_chunks:
sibling_fps: set[str] = {f"{c['metadata']['doc_id']}::{c['metadata']['section']}" for c in unique_chunks}
sibling_count = 0
for seed in unique_chunks[:_SIBLING_EXPAND_TOP_N]:
if sibling_count >= _SIBLING_TOTAL_CAP:
break
doc_id = seed["metadata"]["doc_id"]
seed_idx = seed["metadata"].get("chunk_index")
if seed_idx is None:
seed_idx = -1
siblings = vector_store.fetch_by_doc_id(doc_id, limit=_SIBLING_FETCH_LIMIT)
# RC-2 fix: sort by chunk_index so we can prefer adjacent chunks.
# Guard against chunk_index being actual None in Qdrant payloads.
siblings.sort(key=lambda c: c["metadata"].get("chunk_index") or 0)
# If seed position is known, prefer adjacent indices (Β±2) first,
# then fall through to remaining siblings in document order.
if seed_idx >= 0:
adjacent = [s for s in siblings
if abs((s["metadata"].get("chunk_index") or -999) - seed_idx) <= 2]
rest = [s for s in siblings
if abs((s["metadata"].get("chunk_index") or -999) - seed_idx) > 2]
ordered_siblings = adjacent + rest
else:
ordered_siblings = siblings
for sib in ordered_siblings:
fp = f"{sib['metadata']['doc_id']}::{sib['metadata']['section']}"
if fp not in sibling_fps:
sibling_fps.add(fp)
unique_chunks.append(sib)
sibling_count += 1
if sibling_count >= _SIBLING_TOTAL_CAP:
break
try:
reranked = await reranker.rerank(retrieval_query, unique_chunks, top_k=10) # RC-5: raised from 7
except (Exception, asyncio.CancelledError) as exc:
logger.error("retrieve: reranker failed (%s); falling back to base retrieval scores.", exc)
writer({"type": "status", "label": "Reranker offline; using base retrieval scores..."})
reranked = unique_chunks[:10]
# mock top_score so relevance gate allows it through if unique_chunks exist
if reranked:
reranked[0]["metadata"]["rerank_score"] = 1.0
# Guard: assert all reranker inputs were leaf chunks.
# Non-leaf nodes (raptor_summary / question_proxy) reaching the reranker
# indicates a pipeline bug; log and strip rather than pass synthetic text
# to the LLM or produce navigation-node source cards.
non_leaf = [c for c in reranked if c["metadata"].get("chunk_type", "leaf") != "leaf"]
if non_leaf:
logger.error(
"retrieve: %d non-leaf chunks reached reranker (chunk_types: %s); stripping.",
len(non_leaf),
[c["metadata"].get("chunk_type") for c in non_leaf],
)
reranked = [c for c in reranked if c["metadata"].get("chunk_type", "leaf") == "leaf"]
# ── Relevance gate ─────────────────────────────────────────────────────
top_score = reranked[0]["metadata"].get("rerank_score", 0.0) if reranked else None
low_confidence = top_score is not None and top_score < _MIN_TOP_SCORE
capability_query = _is_capability_query(retrieval_query)
rescue_low_confidence = bool(
reranked
and low_confidence
and top_score is not None
and top_score >= _MIN_RESCUE_SCORE
and (
capability_query
or _focused_source_type(retrieval_query) is not None
or attempts >= 1
or len(unique_chunks) >= 6
)
)
if low_confidence and not rescue_low_confidence and attempts >= 1 and unique_chunks:
writer(
{
"type": "status",
"label": "Using broader retrieval fallback after low-confidence rerank...",
}
)
reranked = unique_chunks[:10]
top_score = reranked[0]["metadata"].get("rerank_score", top_score)
low_confidence = False
if not reranked or (low_confidence and not rescue_low_confidence):
return {
"answer": "",
"retrieved_chunks": [],
"reranked_chunks": [],
"retrieval_attempts": attempts + 1, "top_rerank_score": top_score, }
if rescue_low_confidence:
writer(
{
"type": "status",
"label": "Applying retrieval rescue for portfolio capability query...",
}
)
# ── Source diversity cap (query-aware) ─────────────────────────────────
focused_type = _focused_source_type(retrieval_query)
doc_counts: dict[str, int] = {}
diverse_chunks: list[Chunk] = []
# RC-3: "cv" focus type must also match source_type="resume" (pdf_parser uses "resume",
# not "cv"). Without this alias, experience/skills queries cap resume chunks at 1 instead of 4.
_FOCUS_TYPE_ALIASES: dict[str, frozenset[str]] = {
"cv": frozenset({"cv", "resume", "bio"}),
"github_readme": frozenset({"github_readme", "github"}),
}
for chunk in reranked:
doc_id = chunk["metadata"]["doc_id"]
src_type = chunk["metadata"].get("source_type", "")
focused_set = _FOCUS_TYPE_ALIASES.get(focused_type, frozenset({focused_type})) if focused_type else frozenset()
if focused_type and src_type in focused_set:
cap = _MAX_CHUNKS_PER_DOC_FOCUSED
elif focused_type:
cap = _MAX_CHUNKS_OTHER_FOCUSED
else:
cap = _MAX_CHUNKS_PER_DOC_BROAD
if doc_counts.get(doc_id, 0) < cap:
diverse_chunks.append(chunk)
doc_counts[doc_id] = doc_counts.get(doc_id, 0) + 1
# Non-leaf guard after diversity cap β€” should be a no-op if the assertion
# above fired correctly, but kept as a belt-and-suspenders safety net.
diverse_chunks = [c for c in diverse_chunks if c["metadata"].get("chunk_type", "leaf") == "leaf"]
if not diverse_chunks:
# All post-rerank candidates were cluster nodes β€” treat as empty retrieval.
return {
"answer": "",
"retrieved_chunks": [],
"reranked_chunks": [],
"retrieval_attempts": attempts + 1,
"top_rerank_score": top_score,
}
# ── Sources event β€” final selected sources shown before the answer ──────
# This is the Perplexity-style source card row that appears before tokens.
# Emitted here so the frontend can display source cards before Groq starts.
sources_payload = []
for chunk in diverse_chunks:
meta = chunk["metadata"]
url = meta.get("source_url") or None
sources_payload.append({
"title": meta.get("source_title", ""),
"url": url,
"source_type": _TYPE_REMAP.get(meta.get("source_type", ""), meta.get("source_type", "")),
"section": meta.get("section", ""),
})
writer({"type": "sources", "sources": sources_payload})
# Let the user know what top source the answer will be written from.
top_title = diverse_chunks[0]["metadata"].get("source_title", "sources")
writer({"type": "status", "label": f"Writing answer from {top_title}..."})
return {
"retrieved_chunks": unique_chunks,
"reranked_chunks": diverse_chunks,
"retrieval_attempts": attempts + 1,
"top_rerank_score": top_score,
"sibling_expansion_count": sibling_count if unique_chunks else 0, # RC-13
"focused_source_type": focused_type, # RC-13
}
return retrieve_node