Spaces:
Running
Running
| import asyncio | |
| import logging | |
| import re | |
| from typing import Callable | |
| from langgraph.config import get_stream_writer | |
| logger = logging.getLogger(__name__) | |
| from app.models.pipeline import PipelineState, Chunk | |
| from app.services.vector_store import VectorStore | |
| from app.services.embedder import Embedder | |
| from app.services.reranker import Reranker | |
| from app.services.sparse_encoder import SparseEncoder | |
| # Cross-encoder ms-marco-MiniLM-L-6-v2 returns raw logits (not sigmoid). | |
| # Highly relevant docs score 0β15; completely off-topic score below β5. | |
| # Biographical/skill queries ("Has Darshan used Java?") score lower than | |
| # factual QA pairs even when the passage IS the correct answer, because | |
| # ms-marco is trained on document-passage pairs not person-biography pairs. | |
| # β3.5 is the empirical floor below which retrieved chunks are genuinely | |
| # unrelated (noise), while β3.5 to β1.0 still captures valid skill/project | |
| # passages that answer tech-stack or experience questions. | |
| _MIN_TOP_SCORE: float = -3.5 | |
| _MIN_RESCUE_SCORE: float = -6.0 | |
| # Default cap: max chunks per source document for BROAD queries. | |
| # Without this, a verbose doc can crowd out all 5 context slots, hiding other | |
| # relevant sources and making the answer look one-dimensional. | |
| _MAX_CHUNKS_PER_DOC_BROAD: int = 2 | |
| # For FOCUSED queries, the matching source is allowed more depth (4 slots) | |
| # while all other sources are capped at 1. This prevents the cap from | |
| # removing the 3rd-most-relevant resume section on an experience question. | |
| _MAX_CHUNKS_PER_DOC_FOCUSED: int = 4 | |
| _MAX_CHUNKS_OTHER_FOCUSED: int = 1 | |
| # Document-graph sibling expansion β after RRF fusion, fetch additional chunks | |
| # from the same source documents as the top-N results. | |
| _SIBLING_EXPAND_TOP_N: int = 10 # rank depth to consider for expansion | |
| _SIBLING_FETCH_LIMIT: int = 20 # max chunks fetched via Qdrant doc_id query | |
| _SIBLING_TOTAL_CAP: int = 15 # max new chunks to inject before reranker | |
| # Keywords that imply the visitor wants depth from a specific source type. | |
| # Values are the source_type values set by ingest (ChunkMetadata.source_type). | |
| _FOCUS_KEYWORDS: dict[frozenset[str], str] = { | |
| frozenset({"experience", "work", "job", "role", "career", "internship", | |
| "skills", "skill", "education", "degree", "university", | |
| "certification", "certifications", "qualified", "resume", "cv", | |
| "employment", "professional", "placement", "history", | |
| "tech", "stack", "technology", "technologies", "framework", | |
| "frameworks", "tool", "tools", "tooling", "language", "languages"}): "cv", | |
| frozenset({"project", "built", "build", "developed", "architecture", | |
| "system", "platform", "app", "application"}): "project", | |
| frozenset({"blog", "post", "article", "wrote", "writing", "published"}): "blog", | |
| frozenset({"github", "repo", "repos", "repository", "repositories", "readme"}): "github_readme", | |
| } | |
| # RRF rank fusion constant. k=60 is the original Cormack et al. default. | |
| # Higher k reduces the influence of top-1 rank advantage. | |
| _RRF_K: int = 60 | |
| # Module-level singleton β BM25 model downloads once (~5 MB), cached in memory. | |
| _sparse_encoder = SparseEncoder() | |
| _CAPABILITY_QUERY_HINTS: frozenset[str] = frozenset( | |
| { | |
| "tech", | |
| "stack", | |
| "technology", | |
| "technologies", | |
| "framework", | |
| "frameworks", | |
| "tool", | |
| "tools", | |
| "tooling", | |
| "language", | |
| "languages", | |
| "skills", | |
| "skill", | |
| } | |
| ) | |
| _NORMALISATION_STOPWORDS: frozenset[str] = frozenset( | |
| { | |
| "tell", | |
| "about", | |
| "what", | |
| "which", | |
| "where", | |
| "when", | |
| "could", | |
| "would", | |
| "should", | |
| "your", | |
| "with", | |
| "from", | |
| "that", | |
| "this", | |
| } | |
| ) | |
| def _focused_source_type(query: str) -> str | None: | |
| """ | |
| Return the source_type that the query is focused on, or None for broad queries. | |
| A query is focused when it contains at least one keyword that strongly implies | |
| a specific content source (resume, project pages, blog posts). Broad queries | |
| that don't match any category retain the 2-per-doc default cap so no single | |
| source dominates the 5 context slots. | |
| """ | |
| tokens = frozenset(re.findall(r"[a-z0-9]+", query.lower())) | |
| for keyword_set, source_type in _FOCUS_KEYWORDS.items(): | |
| if tokens & keyword_set: | |
| return source_type | |
| return None | |
| def _rrf_merge(ranked_lists: list[tuple[float, list[Chunk]]]) -> list[Chunk]: | |
| """ | |
| Reciprocal Rank Fusion across multiple ranked chunk lists. | |
| Accepts a list of tuples: (list_weight, chunk_list). | |
| Score formula: Ξ£ weight * (1 / (rank + 1 + k)) over all lists. | |
| Deduplication by doc_id::section fingerprint. | |
| """ | |
| scores: dict[str, float] = {} | |
| chunks_by_fp: dict[str, Chunk] = {} | |
| for weight, ranked in ranked_lists: | |
| seen_in_list: set[str] = set() | |
| for rank, chunk in enumerate(ranked): | |
| fp = f"{chunk['metadata']['doc_id']}::{chunk['metadata']['section']}" | |
| if fp in seen_in_list: | |
| continue | |
| seen_in_list.add(fp) | |
| scores[fp] = scores.get(fp, 0.0) + weight * (1.0 / (rank + 1 + _RRF_K)) | |
| chunks_by_fp[fp] = chunk | |
| sorted_fps = sorted(scores, key=lambda x: scores[x], reverse=True) | |
| return [chunks_by_fp[fp] for fp in sorted_fps] | |
| _TYPE_REMAP: dict[str, str] = { | |
| "github": "readme", | |
| "github_readme": "readme", # RC-6: ingestion uses "github_readme"; map to display label | |
| "bio": "resume", | |
| "cv": "resume", | |
| "blog": "blog", | |
| "project": "project", | |
| "resume": "resume", # RC-3: explicit pass-through so resume chunks aren't "unknown" | |
| } | |
| _FOCUS_VOCAB: frozenset[str] = frozenset( | |
| { | |
| keyword | |
| for keys in _FOCUS_KEYWORDS.keys() | |
| for keyword in keys | |
| if " " not in keyword | |
| } | |
| ) | |
| def _edit_distance(a: str, b: str) -> int: | |
| la, lb = len(a), len(b) | |
| dp = list(range(lb + 1)) | |
| for i in range(1, la + 1): | |
| prev = dp[0] | |
| dp[0] = i | |
| for j in range(1, lb + 1): | |
| cur = dp[j] | |
| cost = 0 if a[i - 1] == b[j - 1] else 1 | |
| dp[j] = min(dp[j] + 1, dp[j - 1] + 1, prev + cost) | |
| prev = cur | |
| return dp[lb] | |
| def _best_focus_replacement(token: str) -> str | None: | |
| best = None | |
| best_score = 99 | |
| for candidate in _FOCUS_VOCAB: | |
| if token[0] != candidate[0]: | |
| continue | |
| if abs(len(token) - len(candidate)) > 1: | |
| continue | |
| score = _edit_distance(token, candidate) | |
| if score <= 2 and score < best_score: | |
| best_score = score | |
| best = candidate | |
| return best | |
| def _normalise_focus_typos(query: str) -> str: | |
| """ | |
| Correct minor STT typos for intent words used by focused retrieval. | |
| Example: "walk experience" -> "work experience". | |
| """ | |
| tokens = query.lower().split() | |
| if not tokens: | |
| return query | |
| corrected: list[str] = [] | |
| for token in tokens: | |
| stripped = token.strip(".,!?;:\"'()[]{}") | |
| if len(stripped) < 4 or stripped in _FOCUS_VOCAB: | |
| corrected.append(token) | |
| continue | |
| if stripped in _NORMALISATION_STOPWORDS: | |
| corrected.append(token) | |
| continue | |
| replacement = _best_focus_replacement(stripped) | |
| if replacement: | |
| corrected.append(token.replace(stripped, replacement)) | |
| else: | |
| corrected.append(token) | |
| return " ".join(corrected) | |
| def _is_capability_query(query: str) -> bool: | |
| tokens = frozenset(re.findall(r"[a-z0-9]+", query.lower())) | |
| return bool(tokens & _CAPABILITY_QUERY_HINTS) | |
| def make_retrieve_node( | |
| vector_store: VectorStore, embedder: Embedder, reranker: Reranker | |
| ) -> Callable[[PipelineState], dict]: | |
| async def retrieve_node(state: PipelineState) -> dict: | |
| writer = get_stream_writer() | |
| attempts = state.get("retrieval_attempts", 0) | |
| query = state["query"] | |
| # Stage 2: use the self-contained decontextualized rewrite for embedding | |
| # when one was produced. "Tell me more about that ML project" has terrible | |
| # cosine similarity against "PersonaBot RAG pipeline" passages; the rewrite | |
| # "What ML projects has Darshan built?" dramatically improves recall. | |
| retrieval_query = state.get("decontextualized_query") or query | |
| retrieval_query = _normalise_focus_typos(retrieval_query) | |
| # Reuse the topic computed by the guard node β no recomputation needed. | |
| topic = state.get("query_topic") or "" | |
| searching_label = ( | |
| f"Searching portfolio for {topic}..." | |
| if topic | |
| else "Searching portfolio..." | |
| ) | |
| writer({"type": "status", "label": searching_label}) | |
| # On a CRAG retry (attempts >= 1) the query has been rewritten and | |
| # query_embedding is explicitly set to None β always re-embed. | |
| # On the first attempt, reuse the embedding computed by the cache node. | |
| cached_embedding: list[float] | None = state.get("query_embedding") | |
| if attempts >= 1: | |
| # Second attempt: re-embed the rewritten query with is_query=True. | |
| cached_embedding = None | |
| expanded = [retrieval_query] # gemini_fast may fill expanded_queries on first attempt | |
| if attempts == 0: | |
| expanded = state.get("expanded_queries") or [retrieval_query] | |
| # Ensure decontextualized form is the primary search query if present. | |
| if retrieval_query != query and retrieval_query not in expanded: | |
| expanded = [retrieval_query] + expanded | |
| # Bug 4: cap total dense searches to avoid Qdrant rate limits. | |
| # Original + up to 3 semantic expansions = 4 maximum. | |
| expanded = expanded[:4] | |
| # Embed all query variants in one batched call (is_query=True for asymmetric BGE). | |
| if cached_embedding is not None and len(expanded) == 1: | |
| query_vectors = [cached_embedding] | |
| else: | |
| query_vectors = await embedder.embed(expanded, is_query=True) | |
| # ββ Dense search (all query variants) βββββββββββββββββββββββββββββββββ | |
| # No chunk_type filter: raptor_summary and question_proxy nodes participate | |
| # so we can expand them to real leaves below. | |
| dense_results: list[list[Chunk]] = [] | |
| for vec in query_vectors: | |
| chunks = vector_store.search(query_vector=vec, top_k=15) | |
| dense_results.append(chunks) | |
| # ββ Split dense hits into leaf candidates and navigation nodes βββββββββ | |
| # raptor_summary and question_proxy are navigation-only; they are expanded | |
| # to their real leaf pages via Qdrant point UUID lookups. | |
| leaf_candidates: list[Chunk] = [] | |
| leaf_fps_seen: set[str] = set() | |
| nav_expansion_ids: set[str] = set() | |
| for hit_list in dense_results: | |
| for chunk in hit_list: | |
| ct = chunk["metadata"].get("chunk_type", "leaf") | |
| if ct == "leaf": | |
| fp = f"{chunk['metadata']['doc_id']}::{chunk['metadata']['section']}" | |
| if fp not in leaf_fps_seen: | |
| leaf_fps_seen.add(fp) | |
| leaf_candidates.append(chunk) | |
| elif ct == "raptor_summary": | |
| for uid in (chunk["metadata"].get("child_leaf_ids") or []): | |
| nav_expansion_ids.add(uid) | |
| elif ct == "question_proxy": | |
| uid = chunk["metadata"].get("parent_leaf_id", "") | |
| if uid: | |
| nav_expansion_ids.add(uid) | |
| # Expand nav nodes to their leaf pages in a single Qdrant retrieve call. | |
| if nav_expansion_ids: | |
| expanded_leaves = vector_store.fetch_by_point_ids(list(nav_expansion_ids)) | |
| for leaf in expanded_leaves: | |
| fp = f"{leaf['metadata']['doc_id']}::{leaf['metadata']['section']}" | |
| if fp not in leaf_fps_seen: | |
| leaf_fps_seen.add(fp) | |
| leaf_candidates.append(leaf) | |
| logger.debug("UUID expansion: +%d leaves from %d nav node UUIDs.", | |
| len(expanded_leaves), len(nav_expansion_ids)) | |
| # ββ Query Normalization & Alias Generation βββββββββββββββββββββββββββββ | |
| # If the user asks for "xsilica", generate "x silica" and "x-silica". | |
| # If they ask for "x silica", generate "xsilica" and "x-silica". | |
| from app.core.topic import _STOPWORDS | |
| canonical_forms: list[str] = state.get("query_canonical_forms") or [] | |
| normalized_forms: set[str] = set(canonical_forms) | |
| # Base normalization: stripping out simple hyphens/spaces for exact matches | |
| tokens = retrieval_query.lower().split() | |
| for i in range(len(tokens) - 1): | |
| if tokens[i] not in _STOPWORDS and tokens[i+1] not in _STOPWORDS: | |
| collapsed = tokens[i] + tokens[i+1] | |
| hyphened = tokens[i] + "-" + tokens[i+1] | |
| normalized_forms.add(collapsed) | |
| normalized_forms.add(hyphened) | |
| # Separator split (x-silica -> x silica, xsilica) | |
| if "-" in retrieval_query: | |
| normalized_forms.add(retrieval_query.replace("-", "")) | |
| normalized_forms.add(retrieval_query.replace("-", " ")) | |
| # ββ Exact Keyword Filter Search (Database hit) βββββββββββββββββββββββββ | |
| # Runs a MatchAny query on Qdrant's `keywords` payload payload. | |
| keyword_results: list[Chunk] = [] | |
| extracted_keywords = [] | |
| for word in retrieval_query.lower().split(): | |
| extracted_keywords.append(word) | |
| for norm in normalized_forms: | |
| extracted_keywords.append(norm) | |
| # Only query strong >= 4 char keywords to avoid noise matching | |
| strong_keywords = [k for k in extracted_keywords if len(k) >= 4 and k not in _STOPWORDS] | |
| if strong_keywords: | |
| keyword_results = vector_store.keyword_filter_search(strong_keywords, top_k=15) | |
| # ββ Sparse (BM25) search (primary query only) βββββββββββββββββββββββββββββ | |
| sparse_results: list[Chunk] = [] | |
| if _sparse_encoder.available: | |
| bm25_query = " ".join([retrieval_query] + list(normalized_forms)).lower() | |
| indices, values = _sparse_encoder.encode_one(bm25_query) | |
| sparse_results = vector_store.search_sparse(indices, values, top_k=20) | |
| # ββ Reciprocal Rank Fusion βββββββββββββββββββββββββββββββββββββββββββββ | |
| # Merge dense (per variant) + sparse + keyword into one ranked list. | |
| # Dynamic Weighting: Explicit keyword entity matches get a 1.5x boost | |
| # over semantic proximity in the RRF formula. | |
| all_ranked_lists: list[tuple[float, list[Chunk]]] = [] | |
| for dense_res in dense_results: | |
| all_ranked_lists.append((1.0, dense_res)) | |
| if sparse_results: | |
| all_ranked_lists.append((1.0, sparse_results)) | |
| if keyword_results: | |
| all_ranked_lists.append((1.5, keyword_results)) | |
| fused: list[Chunk] = _rrf_merge(all_ranked_lists) | |
| # ββ Reading events β one per unique source document ββββββββββββββββββββ | |
| # Emitted BEFORE deduplication so the user sees sources appear in | |
| # real time as Qdrant returns them, matching Perplexity's "Reading..." | |
| # display. Deduplication here is by source_url so blog posts with | |
| # multiple chunk hits fire only one event. | |
| seen_urls: set[str] = set() | |
| for chunk in fused: | |
| meta = chunk["metadata"] | |
| url = meta.get("source_url") or "" | |
| dedup_key = url if url else meta.get("doc_id", "") | |
| if dedup_key and dedup_key not in seen_urls: | |
| seen_urls.add(dedup_key) | |
| writer({ | |
| "type": "reading", | |
| "title": meta.get("source_title", ""), | |
| "url": url or None, | |
| "source_type": _TYPE_REMAP.get(meta.get("source_type", ""), meta.get("source_type", "")), | |
| }) | |
| # ββ Deduplication (question-proxy collapse) ββββββββββββββββββββββββββββ | |
| # After RRF, fused is already leaf-only (dense expansion + sparse + keyword | |
| # are all guarded to leaf). Dedup by doc_id::section fingerprint so a chunk | |
| # that appeared in multiple ranked lists is counted once. | |
| seen: set[str] = set() | |
| unique_chunks: list[Chunk] = [] | |
| for c in fused: | |
| fp = f"{c['metadata']['doc_id']}::{c['metadata']['section']}" | |
| if fp not in seen: | |
| seen.add(fp) | |
| unique_chunks.append(c) | |
| writer({ | |
| "type": "status", | |
| "label": f"Comparing {len(unique_chunks)} sources for relevance...", | |
| }) | |
| # ββ Document-graph sibling expansion βββββββββββββββββββββββββββββββββββββββ | |
| # For the top _SIBLING_EXPAND_TOP_N chunks by RRF rank, fetch neighbouring | |
| # chunks from the same source document via doc_id filter (no vector needed). | |
| # If chunk 4 of a blog post matched, chunks 1-3 and 5-6 are now candidates too. | |
| # This is the document-graph connectivity layer: doc_id is the edge linking chunks. | |
| if unique_chunks: | |
| sibling_fps: set[str] = {f"{c['metadata']['doc_id']}::{c['metadata']['section']}" for c in unique_chunks} | |
| sibling_count = 0 | |
| for seed in unique_chunks[:_SIBLING_EXPAND_TOP_N]: | |
| if sibling_count >= _SIBLING_TOTAL_CAP: | |
| break | |
| doc_id = seed["metadata"]["doc_id"] | |
| seed_idx = seed["metadata"].get("chunk_index") | |
| if seed_idx is None: | |
| seed_idx = -1 | |
| siblings = vector_store.fetch_by_doc_id(doc_id, limit=_SIBLING_FETCH_LIMIT) | |
| # RC-2 fix: sort by chunk_index so we can prefer adjacent chunks. | |
| # Guard against chunk_index being actual None in Qdrant payloads. | |
| siblings.sort(key=lambda c: c["metadata"].get("chunk_index") or 0) | |
| # If seed position is known, prefer adjacent indices (Β±2) first, | |
| # then fall through to remaining siblings in document order. | |
| if seed_idx >= 0: | |
| adjacent = [s for s in siblings | |
| if abs((s["metadata"].get("chunk_index") or -999) - seed_idx) <= 2] | |
| rest = [s for s in siblings | |
| if abs((s["metadata"].get("chunk_index") or -999) - seed_idx) > 2] | |
| ordered_siblings = adjacent + rest | |
| else: | |
| ordered_siblings = siblings | |
| for sib in ordered_siblings: | |
| fp = f"{sib['metadata']['doc_id']}::{sib['metadata']['section']}" | |
| if fp not in sibling_fps: | |
| sibling_fps.add(fp) | |
| unique_chunks.append(sib) | |
| sibling_count += 1 | |
| if sibling_count >= _SIBLING_TOTAL_CAP: | |
| break | |
| try: | |
| reranked = await reranker.rerank(retrieval_query, unique_chunks, top_k=10) # RC-5: raised from 7 | |
| except (Exception, asyncio.CancelledError) as exc: | |
| logger.error("retrieve: reranker failed (%s); falling back to base retrieval scores.", exc) | |
| writer({"type": "status", "label": "Reranker offline; using base retrieval scores..."}) | |
| reranked = unique_chunks[:10] | |
| # mock top_score so relevance gate allows it through if unique_chunks exist | |
| if reranked: | |
| reranked[0]["metadata"]["rerank_score"] = 1.0 | |
| # Guard: assert all reranker inputs were leaf chunks. | |
| # Non-leaf nodes (raptor_summary / question_proxy) reaching the reranker | |
| # indicates a pipeline bug; log and strip rather than pass synthetic text | |
| # to the LLM or produce navigation-node source cards. | |
| non_leaf = [c for c in reranked if c["metadata"].get("chunk_type", "leaf") != "leaf"] | |
| if non_leaf: | |
| logger.error( | |
| "retrieve: %d non-leaf chunks reached reranker (chunk_types: %s); stripping.", | |
| len(non_leaf), | |
| [c["metadata"].get("chunk_type") for c in non_leaf], | |
| ) | |
| reranked = [c for c in reranked if c["metadata"].get("chunk_type", "leaf") == "leaf"] | |
| # ββ Relevance gate βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| top_score = reranked[0]["metadata"].get("rerank_score", 0.0) if reranked else None | |
| low_confidence = top_score is not None and top_score < _MIN_TOP_SCORE | |
| capability_query = _is_capability_query(retrieval_query) | |
| rescue_low_confidence = bool( | |
| reranked | |
| and low_confidence | |
| and top_score is not None | |
| and top_score >= _MIN_RESCUE_SCORE | |
| and ( | |
| capability_query | |
| or _focused_source_type(retrieval_query) is not None | |
| or attempts >= 1 | |
| or len(unique_chunks) >= 6 | |
| ) | |
| ) | |
| if low_confidence and not rescue_low_confidence and attempts >= 1 and unique_chunks: | |
| writer( | |
| { | |
| "type": "status", | |
| "label": "Using broader retrieval fallback after low-confidence rerank...", | |
| } | |
| ) | |
| reranked = unique_chunks[:10] | |
| top_score = reranked[0]["metadata"].get("rerank_score", top_score) | |
| low_confidence = False | |
| if not reranked or (low_confidence and not rescue_low_confidence): | |
| return { | |
| "answer": "", | |
| "retrieved_chunks": [], | |
| "reranked_chunks": [], | |
| "retrieval_attempts": attempts + 1, "top_rerank_score": top_score, } | |
| if rescue_low_confidence: | |
| writer( | |
| { | |
| "type": "status", | |
| "label": "Applying retrieval rescue for portfolio capability query...", | |
| } | |
| ) | |
| # ββ Source diversity cap (query-aware) βββββββββββββββββββββββββββββββββ | |
| focused_type = _focused_source_type(retrieval_query) | |
| doc_counts: dict[str, int] = {} | |
| diverse_chunks: list[Chunk] = [] | |
| # RC-3: "cv" focus type must also match source_type="resume" (pdf_parser uses "resume", | |
| # not "cv"). Without this alias, experience/skills queries cap resume chunks at 1 instead of 4. | |
| _FOCUS_TYPE_ALIASES: dict[str, frozenset[str]] = { | |
| "cv": frozenset({"cv", "resume", "bio"}), | |
| "github_readme": frozenset({"github_readme", "github"}), | |
| } | |
| for chunk in reranked: | |
| doc_id = chunk["metadata"]["doc_id"] | |
| src_type = chunk["metadata"].get("source_type", "") | |
| focused_set = _FOCUS_TYPE_ALIASES.get(focused_type, frozenset({focused_type})) if focused_type else frozenset() | |
| if focused_type and src_type in focused_set: | |
| cap = _MAX_CHUNKS_PER_DOC_FOCUSED | |
| elif focused_type: | |
| cap = _MAX_CHUNKS_OTHER_FOCUSED | |
| else: | |
| cap = _MAX_CHUNKS_PER_DOC_BROAD | |
| if doc_counts.get(doc_id, 0) < cap: | |
| diverse_chunks.append(chunk) | |
| doc_counts[doc_id] = doc_counts.get(doc_id, 0) + 1 | |
| # Non-leaf guard after diversity cap β should be a no-op if the assertion | |
| # above fired correctly, but kept as a belt-and-suspenders safety net. | |
| diverse_chunks = [c for c in diverse_chunks if c["metadata"].get("chunk_type", "leaf") == "leaf"] | |
| if not diverse_chunks: | |
| # All post-rerank candidates were cluster nodes β treat as empty retrieval. | |
| return { | |
| "answer": "", | |
| "retrieved_chunks": [], | |
| "reranked_chunks": [], | |
| "retrieval_attempts": attempts + 1, | |
| "top_rerank_score": top_score, | |
| } | |
| # ββ Sources event β final selected sources shown before the answer ββββββ | |
| # This is the Perplexity-style source card row that appears before tokens. | |
| # Emitted here so the frontend can display source cards before Groq starts. | |
| sources_payload = [] | |
| for chunk in diverse_chunks: | |
| meta = chunk["metadata"] | |
| url = meta.get("source_url") or None | |
| sources_payload.append({ | |
| "title": meta.get("source_title", ""), | |
| "url": url, | |
| "source_type": _TYPE_REMAP.get(meta.get("source_type", ""), meta.get("source_type", "")), | |
| "section": meta.get("section", ""), | |
| }) | |
| writer({"type": "sources", "sources": sources_payload}) | |
| # Let the user know what top source the answer will be written from. | |
| top_title = diverse_chunks[0]["metadata"].get("source_title", "sources") | |
| writer({"type": "status", "label": f"Writing answer from {top_title}..."}) | |
| return { | |
| "retrieved_chunks": unique_chunks, | |
| "reranked_chunks": diverse_chunks, | |
| "retrieval_attempts": attempts + 1, | |
| "top_rerank_score": top_score, | |
| "sibling_expansion_count": sibling_count if unique_chunks else 0, # RC-13 | |
| "focused_source_type": focused_type, # RC-13 | |
| } | |
| return retrieve_node | |