Spaces:
Running
Running
| """ | |
| backend/app/pipeline/nodes/enumerate_query.py | |
| Fix 1 β Enumeration Query Classifier and Metadata Retrieval. | |
| Inserted immediately after the Guard node (before Cache / Gemini fast-path). | |
| When enumeration intent is detected, this node queries Qdrant using a | |
| payload filter on metadata.source_type β no vector embedding, no reranker. | |
| It then deduplicates by source_title, sorts alphabetically, and populates | |
| reranked_chunks so the Generate node receives the complete, accurate list. | |
| Why a database filter beats similarity search for enumeration: | |
| Semantic retrieval cannot guarantee completeness β it finds the top-K | |
| most similar chunks, not ALL matching chunks. "List all my projects" | |
| with top_k=20 and 8 projects in the corpus would return the 8 most | |
| similar to the query vector, but which 8 depends on the embedding. | |
| A payload filter returns every matching point, regardless of embedding | |
| position. Completeness is guaranteed; the cosine metric is irrelevant. | |
| Cost: 0 embedding calls, 0 reranker calls, 1 Qdrant scroll. | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| import re | |
| from typing import Callable | |
| from langgraph.config import get_stream_writer | |
| from app.models.pipeline import PipelineState, Chunk | |
| from app.services.vector_store import VectorStore | |
| logger = logging.getLogger(__name__) | |
| # --------------------------------------------------------------------------- | |
| # Enumeration intent patterns | |
| # --------------------------------------------------------------------------- | |
| # Each pattern is checked against the lowercased, whitespace-normalised query. | |
| # Order matters: more specific patterns are checked first. | |
| _ENUM_PREFIXES: tuple[str, ...] = ( | |
| "list all", | |
| "list the", | |
| "list every", | |
| "list your", | |
| "list his", | |
| "list ", | |
| "show all", | |
| "show me all", | |
| "show every", | |
| "give me all", | |
| "give me a list", | |
| "what are all", | |
| "what are your", | |
| "what are his", | |
| "how many", | |
| "count ", | |
| "count of", | |
| "enumerate", | |
| "name all", | |
| "name every", | |
| ) | |
| # Trailing pattern: "what [are|were|is] all the <noun>?" | |
| _ENUM_TRAILING_RE = re.compile( | |
| r"(?:what|which)\s+(?:are|were|is|were)\s+all\s+(?:the\s+)?", | |
| re.IGNORECASE, | |
| ) | |
| def _has_enumeration_intent(query: str) -> bool: | |
| """ | |
| Return True when the lowercased query signals enumeration intent. | |
| Pure string ops β no LLM, no embedding. Runs in < 5Β΅s. | |
| """ | |
| q = " ".join(query.lower().split()) # normalise whitespace | |
| for prefix in _ENUM_PREFIXES: | |
| if q.startswith(prefix) or f" {prefix}" in q: | |
| return True | |
| if _ENUM_TRAILING_RE.search(q): | |
| return True | |
| return False | |
| # --------------------------------------------------------------------------- | |
| # Entity-type extractor | |
| # --------------------------------------------------------------------------- | |
| # Maps query tokens β Qdrant source_type values. | |
| # "all source types" is represented as an empty list (caller scrolls without filter). | |
| _TYPE_MAP: dict[str, list[str]] = { | |
| "project": ["project"], | |
| "projects": ["project"], | |
| "blog": ["blog"], | |
| "blogs": ["blog"], | |
| "post": ["blog"], | |
| "posts": ["blog"], | |
| "article": ["blog"], | |
| "articles": ["blog"], | |
| "writing": ["blog"], | |
| "writings": ["blog"], | |
| "experience": ["cv", "bio"], | |
| "experiences": ["cv", "bio"], | |
| "work": ["cv", "bio"], | |
| "jobs": ["cv", "bio"], | |
| "job": ["cv", "bio"], | |
| "role": ["cv", "bio"], | |
| "roles": ["cv", "bio"], | |
| "company": ["cv", "bio"], | |
| "companies": ["cv", "bio"], | |
| "skills": ["cv", "project", "blog"], | |
| "skill": ["cv", "project", "blog"], | |
| "technologies": ["cv", "project", "blog"], | |
| "technology": ["cv", "project", "blog"], | |
| "tech": ["cv", "project", "blog"], | |
| "tools": ["cv", "project", "blog"], | |
| "readme": ["github_readme", "github"], # RC-6: ingest uses "github_readme" as source_type | |
| "repositories": ["github_readme", "github"], | |
| "repos": ["github_readme", "github"], | |
| "github": ["github_readme", "github"], | |
| } | |
| def _extract_source_types(query: str) -> list[str]: | |
| """ | |
| Map query vocabulary to Qdrant source_type values. | |
| Returns a deduplicated list. An empty list means "all types". | |
| """ | |
| tokens = re.findall(r"[a-z]+", query.lower()) | |
| found: list[str] = [] | |
| seen: set[str] = set() | |
| for tok in tokens: | |
| for st in _TYPE_MAP.get(tok, []): | |
| if st not in seen: | |
| seen.add(st) | |
| found.append(st) | |
| # If no specific type matched, return empty (= all types). | |
| return found | |
| # --------------------------------------------------------------------------- | |
| # Source type display label (used in status event) | |
| # --------------------------------------------------------------------------- | |
| _TYPE_LABEL: dict[str, str] = { | |
| "project": "projects", | |
| "blog": "blog posts", | |
| "cv": "CV/experience", | |
| "bio": "background", | |
| "github": "GitHub repos", | |
| } | |
| def _label_for_types(source_types: list[str]) -> str: | |
| if not source_types: | |
| return "all portfolio content" | |
| return " and ".join(_TYPE_LABEL.get(st, st) for st in source_types[:2]) | |
| # --------------------------------------------------------------------------- | |
| # Node factory | |
| # --------------------------------------------------------------------------- | |
| def make_enumerate_query_node(vector_store: VectorStore) -> Callable[[PipelineState], dict]: | |
| """ | |
| Returns a LangGraph node that: | |
| 1. Classifies whether the query has enumeration intent. | |
| 2. If yes: scrolls Qdrant by source_type, deduplicates by title, | |
| populates reranked_chunks, sets is_enumeration_query=True. | |
| 3. If no: passes through with is_enumeration_query=False so the | |
| rest of the pipeline (cache β gemini_fast β retrieve) runs normally. | |
| No I/O unless enumeration intent is detected. | |
| """ | |
| def enumerate_query_node(state: PipelineState) -> dict: | |
| writer = get_stream_writer() | |
| query = state["query"] | |
| if not _has_enumeration_intent(query): | |
| return {"is_enumeration_query": False} | |
| # Enumeration intent confirmed. | |
| source_types = _extract_source_types(query) | |
| label = _label_for_types(source_types) | |
| writer({"type": "status", "label": f"Fetching complete list of {label}..."}) | |
| # Scroll Qdrant β payload filter, no vector. | |
| all_chunks = vector_store.scroll_by_source_type( | |
| source_types=source_types or ["project", "blog", "cv", "bio", "github"], | |
| ) | |
| if not all_chunks: | |
| # Nothing in the corpus yet β let the normal pipeline handle it. | |
| logger.info("Enumeration scroll returned 0 results; falling back to RAG path.") | |
| return {"is_enumeration_query": False} | |
| # Deduplicate by source_title (many chunks per document; we want title-level list). | |
| seen_titles: set[str] = set() | |
| unique_by_title: list[Chunk] = [] | |
| for chunk in all_chunks: | |
| title = chunk["metadata"].get("source_title", "").strip() | |
| if title and title not in seen_titles: | |
| seen_titles.add(title) | |
| unique_by_title.append(chunk) | |
| # Sort alphabetically by title for stable output. | |
| unique_by_title.sort(key=lambda c: c["metadata"].get("source_title", "").lower()) | |
| logger.info( | |
| "Enumeration: query=%r source_types=%r β %d unique titles", | |
| query, source_types, len(unique_by_title), | |
| ) | |
| # Emit one "reading" event per unique source so the frontend's source card | |
| # row is populated (mirrors the retrieve node's contract). | |
| seen_urls: set[str] = set() | |
| for chunk in unique_by_title: | |
| meta = chunk["metadata"] | |
| url = meta.get("source_url") or "" | |
| dedup_key = url or meta.get("doc_id", "") | |
| if dedup_key and dedup_key not in seen_urls: | |
| seen_urls.add(dedup_key) | |
| writer({ | |
| "type": "reading", | |
| "title": meta.get("source_title", ""), | |
| "url": url or None, | |
| "source_type": meta.get("source_type", ""), | |
| }) | |
| writer({"type": "status", "label": f"Found {len(unique_by_title)} items β composing list..."}) | |
| return { | |
| "is_enumeration_query": True, | |
| "reranked_chunks": unique_by_title, | |
| # Mark path early so log_eval tags enumeration turns separately. | |
| "path": "enumeration", | |
| } | |
| return enumerate_query_node | |