Spaces:
Running
Running
| from langgraph.graph import StateGraph, END | |
| from langgraph.graph.state import CompiledStateGraph | |
| from app.models.pipeline import PipelineState | |
| from app.pipeline.nodes.guard import make_guard_node | |
| from app.pipeline.nodes.cache import make_cache_node | |
| from app.pipeline.nodes.enumerate_query import make_enumerate_query_node | |
| from app.pipeline.nodes.gemini_fast import make_gemini_fast_node | |
| from app.pipeline.nodes.retrieve import make_retrieve_node | |
| from app.pipeline.nodes.rewrite_query import make_rewrite_query_node, _has_meaningful_token | |
| from app.pipeline.nodes.generate import make_generate_node | |
| from app.pipeline.nodes.log_eval import make_log_eval_node | |
| from app.core.portfolio_context import is_portfolio_relevant | |
| # Relevance gate threshold β matches retrieve.py constant. | |
| _MIN_TOP_SCORE: float = -3.5 | |
| # CRAG low-confidence threshold. When retrieval returns chunks but the best | |
| # cross-encoder score is below this value (weak match, not an outright miss), | |
| # rewrite the query and retry once. Separate from _MIN_TOP_SCORE: chunks above | |
| # that floor are not filtered out, but the LLM may get poor context without a | |
| # retry. Empirically, scores between -1.5 and -3.5 indicate borderline relevance | |
| # where a vocabulary-shifted query usually finds much better chunks. | |
| _CRAG_LOW_CONFIDENCE_SCORE: float = -1.5 | |
| def route_guard(state: PipelineState) -> str: | |
| if state.get("guard_passed", False): | |
| return "pass" | |
| return "block" | |
| def route_enumerate(state: PipelineState) -> str: | |
| """ | |
| Fix 1: after the enumerate_query node, decide whether to skip the normal | |
| retrieval pipeline and go straight to generate. | |
| "skip_to_generate" β enumeration intent detected; reranked_chunks is already | |
| populated with the complete Qdrant scroll result. | |
| "continue" β no enumeration intent; proceed to cache β gemini_fast β retrieve. | |
| """ | |
| if state.get("is_enumeration_query", False): | |
| return "skip_to_generate" | |
| return "continue" | |
| def route_cache(state: PipelineState) -> str: | |
| if state.get("cached", False): | |
| return "hit" | |
| return "miss" | |
| def route_gemini(state: PipelineState) -> str: | |
| """ | |
| Route after the Gemini fast-path node. | |
| "answered" β Gemini answered directly; skip RAG, log and done. | |
| "research" β Gemini called search_knowledge_base(); run full RAG. | |
| """ | |
| if state.get("answer", ""): | |
| return "answered" | |
| return "research" | |
| def route_retrieve_result(state: PipelineState) -> str: | |
| """ | |
| CRAG routing: trigger a query rewrite when retrieval was weak or empty. | |
| Fix 2 Rule 1: portfolio-noun queries are allowed a SECOND CRAG retry after | |
| the first retry also finds nothing. This prevents the not-found response from | |
| firing on queries where the corpus genuinely should have results (e.g. a typo | |
| in a project name or a synonym mismatch that's specific to portfolio content). | |
| Attempt tracking (via retrieval_attempts): | |
| First retrieve β retrieval_attempts = 1 | |
| First rewrite β retrieval_attempts = 2 (rewrite_query increments by +1) | |
| Second retrieve β retrieval_attempts = 3 | |
| Second rewrite β retrieval_attempts = 4 (portfolio queries only) | |
| Third retrieve β retrieval_attempts = 5 | |
| Any attempt β₯ 5 (or β₯ 3 for non-portfolio queries) goes to generate. | |
| Routing terminates because retrieval_attempts grows monotonically. | |
| """ | |
| attempts = state.get("retrieval_attempts", 1) | |
| reranked = state.get("reranked_chunks", []) | |
| query = state.get("query", "") | |
| # First CRAG attempt β applies to all queries with meaningful tokens. | |
| if attempts == 1 and _has_meaningful_token(query): | |
| if not reranked: | |
| return "rewrite" | |
| top_score = state.get("top_rerank_score") | |
| if top_score is not None and top_score < _CRAG_LOW_CONFIDENCE_SCORE: | |
| return "rewrite" | |
| # Fix 2 Rule 1: second CRAG attempt for portfolio-noun queries only. | |
| # attempts==3 means: first retrieve failed β rewrite fired β second retrieve | |
| # also failed (still empty after the first CRAG rewrite). When the query | |
| # mentions a known portfolio entity, attempt one more vocabulary-shifted rewrite | |
| # before admitting the not-found path. | |
| if attempts == 3 and is_portfolio_relevant(query): | |
| if not reranked: | |
| return "rewrite" | |
| top_score = state.get("top_rerank_score") | |
| if top_score is not None and top_score < _CRAG_LOW_CONFIDENCE_SCORE: | |
| return "rewrite" | |
| return "generate" | |
| def build_pipeline(services: dict) -> CompiledStateGraph: | |
| graph = StateGraph(PipelineState) | |
| graph.add_node("guard", make_guard_node(services["classifier"])) | |
| graph.add_node("enumerate_query", make_enumerate_query_node(services["vector_store"])) | |
| graph.add_node("cache", make_cache_node(services["cache"], services["embedder"])) | |
| graph.add_node("gemini_fast", make_gemini_fast_node(services["gemini"])) | |
| graph.add_node("retrieve", make_retrieve_node( | |
| services["vector_store"], | |
| services["embedder"], | |
| services["reranker"])) | |
| # CRAG: query rewrite on failed retrieval β runs up to twice for portfolio queries. | |
| graph.add_node("rewrite_query", make_rewrite_query_node(services["gemini"])) | |
| graph.add_node("generate", make_generate_node(services["llm"], services["gemini"])) | |
| graph.add_node("log_eval", make_log_eval_node(services["db_path"], services.get("github_log"))) | |
| graph.set_entry_point("guard") | |
| graph.add_conditional_edges("guard", route_guard, | |
| {"pass": "enumerate_query", "block": "log_eval"}) | |
| # Fix 1: enumerate_query either skips straight to generate (full list fetched) | |
| # or falls through to the normal cache β gemini_fast β retrieve pipeline. | |
| graph.add_conditional_edges("enumerate_query", route_enumerate, | |
| {"skip_to_generate": "generate", "continue": "cache"}) | |
| graph.add_conditional_edges("cache", route_cache, | |
| {"hit": "log_eval", "miss": "gemini_fast"}) | |
| graph.add_conditional_edges("gemini_fast", route_gemini, | |
| {"answered": "log_eval", "research": "retrieve"}) | |
| # After retrieve: either run CRAG rewrite (up to twice for portfolio queries) | |
| # or proceed to generate. | |
| graph.add_conditional_edges("retrieve", route_retrieve_result, | |
| {"rewrite": "rewrite_query", "generate": "generate"}) | |
| # After rewrite: go straight back to retrieve for the next attempt. | |
| # The cycle terminates because route_retrieve_result checks retrieval_attempts. | |
| graph.add_edge("rewrite_query", "retrieve") | |
| graph.add_edge("generate", "log_eval") | |
| graph.add_edge("log_eval", END) | |
| return graph.compile() | |