from langgraph.graph import StateGraph, END from langgraph.graph.state import CompiledStateGraph from app.models.pipeline import PipelineState from app.pipeline.nodes.guard import make_guard_node from app.pipeline.nodes.cache import make_cache_node from app.pipeline.nodes.enumerate_query import make_enumerate_query_node from app.pipeline.nodes.gemini_fast import make_gemini_fast_node from app.pipeline.nodes.retrieve import make_retrieve_node from app.pipeline.nodes.rewrite_query import make_rewrite_query_node, _has_meaningful_token from app.pipeline.nodes.generate import make_generate_node from app.pipeline.nodes.log_eval import make_log_eval_node from app.core.portfolio_context import is_portfolio_relevant # Relevance gate threshold — matches retrieve.py constant. _MIN_TOP_SCORE: float = -3.5 # CRAG low-confidence threshold. When retrieval returns chunks but the best # cross-encoder score is below this value (weak match, not an outright miss), # rewrite the query and retry once. Separate from _MIN_TOP_SCORE: chunks above # that floor are not filtered out, but the LLM may get poor context without a # retry. Empirically, scores between -1.5 and -3.5 indicate borderline relevance # where a vocabulary-shifted query usually finds much better chunks. _CRAG_LOW_CONFIDENCE_SCORE: float = -1.5 def route_guard(state: PipelineState) -> str: if state.get("guard_passed", False): return "pass" return "block" def route_enumerate(state: PipelineState) -> str: """ Fix 1: after the enumerate_query node, decide whether to skip the normal retrieval pipeline and go straight to generate. "skip_to_generate" — enumeration intent detected; reranked_chunks is already populated with the complete Qdrant scroll result. "continue" — no enumeration intent; proceed to cache → gemini_fast → retrieve. """ if state.get("is_enumeration_query", False): return "skip_to_generate" return "continue" def route_cache(state: PipelineState) -> str: if state.get("cached", False): return "hit" return "miss" def route_gemini(state: PipelineState) -> str: """ Route after the Gemini fast-path node. "answered" — Gemini answered directly; skip RAG, log and done. "research" — Gemini called search_knowledge_base(); run full RAG. """ if state.get("answer", ""): return "answered" return "research" def route_retrieve_result(state: PipelineState) -> str: """ CRAG routing: trigger a query rewrite when retrieval was weak or empty. Fix 2 Rule 1: portfolio-noun queries are allowed a SECOND CRAG retry after the first retry also finds nothing. This prevents the not-found response from firing on queries where the corpus genuinely should have results (e.g. a typo in a project name or a synonym mismatch that's specific to portfolio content). Attempt tracking (via retrieval_attempts): First retrieve → retrieval_attempts = 1 First rewrite → retrieval_attempts = 2 (rewrite_query increments by +1) Second retrieve → retrieval_attempts = 3 Second rewrite → retrieval_attempts = 4 (portfolio queries only) Third retrieve → retrieval_attempts = 5 Any attempt ≥ 5 (or ≥ 3 for non-portfolio queries) goes to generate. Routing terminates because retrieval_attempts grows monotonically. """ attempts = state.get("retrieval_attempts", 1) reranked = state.get("reranked_chunks", []) query = state.get("query", "") # First CRAG attempt — applies to all queries with meaningful tokens. if attempts == 1 and _has_meaningful_token(query): if not reranked: return "rewrite" top_score = state.get("top_rerank_score") if top_score is not None and top_score < _CRAG_LOW_CONFIDENCE_SCORE: return "rewrite" # Fix 2 Rule 1: second CRAG attempt for portfolio-noun queries only. # attempts==3 means: first retrieve failed → rewrite fired → second retrieve # also failed (still empty after the first CRAG rewrite). When the query # mentions a known portfolio entity, attempt one more vocabulary-shifted rewrite # before admitting the not-found path. if attempts == 3 and is_portfolio_relevant(query): if not reranked: return "rewrite" top_score = state.get("top_rerank_score") if top_score is not None and top_score < _CRAG_LOW_CONFIDENCE_SCORE: return "rewrite" return "generate" def build_pipeline(services: dict) -> CompiledStateGraph: graph = StateGraph(PipelineState) graph.add_node("guard", make_guard_node(services["classifier"])) graph.add_node("enumerate_query", make_enumerate_query_node(services["vector_store"])) graph.add_node("cache", make_cache_node(services["cache"], services["embedder"])) graph.add_node("gemini_fast", make_gemini_fast_node(services["gemini"])) graph.add_node("retrieve", make_retrieve_node( services["vector_store"], services["embedder"], services["reranker"])) # CRAG: query rewrite on failed retrieval — runs up to twice for portfolio queries. graph.add_node("rewrite_query", make_rewrite_query_node(services["gemini"])) graph.add_node("generate", make_generate_node(services["llm"], services["gemini"])) graph.add_node("log_eval", make_log_eval_node(services["db_path"], services.get("github_log"))) graph.set_entry_point("guard") graph.add_conditional_edges("guard", route_guard, {"pass": "enumerate_query", "block": "log_eval"}) # Fix 1: enumerate_query either skips straight to generate (full list fetched) # or falls through to the normal cache → gemini_fast → retrieve pipeline. graph.add_conditional_edges("enumerate_query", route_enumerate, {"skip_to_generate": "generate", "continue": "cache"}) graph.add_conditional_edges("cache", route_cache, {"hit": "log_eval", "miss": "gemini_fast"}) graph.add_conditional_edges("gemini_fast", route_gemini, {"answered": "log_eval", "research": "retrieve"}) # After retrieve: either run CRAG rewrite (up to twice for portfolio queries) # or proceed to generate. graph.add_conditional_edges("retrieve", route_retrieve_result, {"rewrite": "rewrite_query", "generate": "generate"}) # After rewrite: go straight back to retrieve for the next attempt. # The cycle terminates because route_retrieve_result checks retrieval_attempts. graph.add_edge("rewrite_query", "retrieve") graph.add_edge("generate", "log_eval") graph.add_edge("log_eval", END) return graph.compile()