personabot-api / app /pipeline /nodes /enumerate_query.py
GitHub Actions
Deploy f8b1b4c
d1766f7
"""
backend/app/pipeline/nodes/enumerate_query.py
Fix 1 β€” Enumeration Query Classifier and Metadata Retrieval.
Inserted immediately after the Guard node (before Cache / Gemini fast-path).
When enumeration intent is detected, this node queries Qdrant using a
payload filter on metadata.source_type β€” no vector embedding, no reranker.
It then deduplicates by source_title, sorts alphabetically, and populates
reranked_chunks so the Generate node receives the complete, accurate list.
Why a database filter beats similarity search for enumeration:
Semantic retrieval cannot guarantee completeness β€” it finds the top-K
most similar chunks, not ALL matching chunks. "List all my projects"
with top_k=20 and 8 projects in the corpus would return the 8 most
similar to the query vector, but which 8 depends on the embedding.
A payload filter returns every matching point, regardless of embedding
position. Completeness is guaranteed; the cosine metric is irrelevant.
Cost: 0 embedding calls, 0 reranker calls, 1 Qdrant scroll.
"""
from __future__ import annotations
import logging
import re
from typing import Callable
from langgraph.config import get_stream_writer
from app.models.pipeline import PipelineState, Chunk
from app.services.vector_store import VectorStore
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Enumeration intent patterns
# ---------------------------------------------------------------------------
# Each pattern is checked against the lowercased, whitespace-normalised query.
# Order matters: more specific patterns are checked first.
_ENUM_PREFIXES: tuple[str, ...] = (
"list all",
"list the",
"list every",
"list your",
"list his",
"list ",
"show all",
"show me all",
"show every",
"give me all",
"give me a list",
"what are all",
"what are your",
"what are his",
"how many",
"count ",
"count of",
"enumerate",
"name all",
"name every",
)
# Trailing pattern: "what [are|were|is] all the <noun>?"
_ENUM_TRAILING_RE = re.compile(
r"(?:what|which)\s+(?:are|were|is|were)\s+all\s+(?:the\s+)?",
re.IGNORECASE,
)
def _has_enumeration_intent(query: str) -> bool:
"""
Return True when the lowercased query signals enumeration intent.
Pure string ops β€” no LLM, no embedding. Runs in < 5Β΅s.
"""
q = " ".join(query.lower().split()) # normalise whitespace
for prefix in _ENUM_PREFIXES:
if q.startswith(prefix) or f" {prefix}" in q:
return True
if _ENUM_TRAILING_RE.search(q):
return True
return False
# ---------------------------------------------------------------------------
# Entity-type extractor
# ---------------------------------------------------------------------------
# Maps query tokens β†’ Qdrant source_type values.
# "all source types" is represented as an empty list (caller scrolls without filter).
_TYPE_MAP: dict[str, list[str]] = {
"project": ["project"],
"projects": ["project"],
"blog": ["blog"],
"blogs": ["blog"],
"post": ["blog"],
"posts": ["blog"],
"article": ["blog"],
"articles": ["blog"],
"writing": ["blog"],
"writings": ["blog"],
"experience": ["cv", "bio"],
"experiences": ["cv", "bio"],
"work": ["cv", "bio"],
"jobs": ["cv", "bio"],
"job": ["cv", "bio"],
"role": ["cv", "bio"],
"roles": ["cv", "bio"],
"company": ["cv", "bio"],
"companies": ["cv", "bio"],
"skills": ["cv", "project", "blog"],
"skill": ["cv", "project", "blog"],
"technologies": ["cv", "project", "blog"],
"technology": ["cv", "project", "blog"],
"tech": ["cv", "project", "blog"],
"tools": ["cv", "project", "blog"],
"readme": ["github_readme", "github"], # RC-6: ingest uses "github_readme" as source_type
"repositories": ["github_readme", "github"],
"repos": ["github_readme", "github"],
"github": ["github_readme", "github"],
}
def _extract_source_types(query: str) -> list[str]:
"""
Map query vocabulary to Qdrant source_type values.
Returns a deduplicated list. An empty list means "all types".
"""
tokens = re.findall(r"[a-z]+", query.lower())
found: list[str] = []
seen: set[str] = set()
for tok in tokens:
for st in _TYPE_MAP.get(tok, []):
if st not in seen:
seen.add(st)
found.append(st)
# If no specific type matched, return empty (= all types).
return found
# ---------------------------------------------------------------------------
# Source type display label (used in status event)
# ---------------------------------------------------------------------------
_TYPE_LABEL: dict[str, str] = {
"project": "projects",
"blog": "blog posts",
"cv": "CV/experience",
"bio": "background",
"github": "GitHub repos",
}
def _label_for_types(source_types: list[str]) -> str:
if not source_types:
return "all portfolio content"
return " and ".join(_TYPE_LABEL.get(st, st) for st in source_types[:2])
# ---------------------------------------------------------------------------
# Node factory
# ---------------------------------------------------------------------------
def make_enumerate_query_node(vector_store: VectorStore) -> Callable[[PipelineState], dict]:
"""
Returns a LangGraph node that:
1. Classifies whether the query has enumeration intent.
2. If yes: scrolls Qdrant by source_type, deduplicates by title,
populates reranked_chunks, sets is_enumeration_query=True.
3. If no: passes through with is_enumeration_query=False so the
rest of the pipeline (cache β†’ gemini_fast β†’ retrieve) runs normally.
No I/O unless enumeration intent is detected.
"""
def enumerate_query_node(state: PipelineState) -> dict:
writer = get_stream_writer()
query = state["query"]
if not _has_enumeration_intent(query):
return {"is_enumeration_query": False}
# Enumeration intent confirmed.
source_types = _extract_source_types(query)
label = _label_for_types(source_types)
writer({"type": "status", "label": f"Fetching complete list of {label}..."})
# Scroll Qdrant β€” payload filter, no vector.
all_chunks = vector_store.scroll_by_source_type(
source_types=source_types or ["project", "blog", "cv", "bio", "github"],
)
if not all_chunks:
# Nothing in the corpus yet β€” let the normal pipeline handle it.
logger.info("Enumeration scroll returned 0 results; falling back to RAG path.")
return {"is_enumeration_query": False}
# Deduplicate by source_title (many chunks per document; we want title-level list).
seen_titles: set[str] = set()
unique_by_title: list[Chunk] = []
for chunk in all_chunks:
title = chunk["metadata"].get("source_title", "").strip()
if title and title not in seen_titles:
seen_titles.add(title)
unique_by_title.append(chunk)
# Sort alphabetically by title for stable output.
unique_by_title.sort(key=lambda c: c["metadata"].get("source_title", "").lower())
logger.info(
"Enumeration: query=%r source_types=%r β†’ %d unique titles",
query, source_types, len(unique_by_title),
)
# Emit one "reading" event per unique source so the frontend's source card
# row is populated (mirrors the retrieve node's contract).
seen_urls: set[str] = set()
for chunk in unique_by_title:
meta = chunk["metadata"]
url = meta.get("source_url") or ""
dedup_key = url or meta.get("doc_id", "")
if dedup_key and dedup_key not in seen_urls:
seen_urls.add(dedup_key)
writer({
"type": "reading",
"title": meta.get("source_title", ""),
"url": url or None,
"source_type": meta.get("source_type", ""),
})
writer({"type": "status", "label": f"Found {len(unique_by_title)} items β€” composing list..."})
return {
"is_enumeration_query": True,
"reranked_chunks": unique_by_title,
# Mark path early so log_eval tags enumeration turns separately.
"path": "enumeration",
}
return enumerate_query_node