GitHub Actions
Deploy 2e8cff3
c44df3b
from langgraph.graph import StateGraph, END
from langgraph.graph.state import CompiledStateGraph
from app.models.pipeline import PipelineState
from app.pipeline.nodes.guard import make_guard_node
from app.pipeline.nodes.cache import make_cache_node
from app.pipeline.nodes.enumerate_query import make_enumerate_query_node
from app.pipeline.nodes.gemini_fast import make_gemini_fast_node
from app.pipeline.nodes.retrieve import make_retrieve_node
from app.pipeline.nodes.rewrite_query import make_rewrite_query_node, _has_meaningful_token
from app.pipeline.nodes.generate import make_generate_node
from app.pipeline.nodes.log_eval import make_log_eval_node
from app.core.portfolio_context import is_portfolio_relevant
# Relevance gate threshold β€” matches retrieve.py constant.
_MIN_TOP_SCORE: float = -3.5
# CRAG low-confidence threshold. When retrieval returns chunks but the best
# cross-encoder score is below this value (weak match, not an outright miss),
# rewrite the query and retry once. Separate from _MIN_TOP_SCORE: chunks above
# that floor are not filtered out, but the LLM may get poor context without a
# retry. Empirically, scores between -1.5 and -3.5 indicate borderline relevance
# where a vocabulary-shifted query usually finds much better chunks.
_CRAG_LOW_CONFIDENCE_SCORE: float = -1.5
def route_guard(state: PipelineState) -> str:
if state.get("guard_passed", False):
return "pass"
return "block"
def route_enumerate(state: PipelineState) -> str:
"""
Fix 1: after the enumerate_query node, decide whether to skip the normal
retrieval pipeline and go straight to generate.
"skip_to_generate" β€” enumeration intent detected; reranked_chunks is already
populated with the complete Qdrant scroll result.
"continue" β€” no enumeration intent; proceed to cache β†’ gemini_fast β†’ retrieve.
"""
if state.get("is_enumeration_query", False):
return "skip_to_generate"
return "continue"
def route_cache(state: PipelineState) -> str:
if state.get("cached", False):
return "hit"
return "miss"
def route_gemini(state: PipelineState) -> str:
"""
Route after the Gemini fast-path node.
"answered" β€” Gemini answered directly; skip RAG, log and done.
"research" β€” Gemini called search_knowledge_base(); run full RAG.
"""
if state.get("answer", ""):
return "answered"
return "research"
def route_retrieve_result(state: PipelineState) -> str:
"""
CRAG routing: trigger a query rewrite when retrieval was weak or empty.
Fix 2 Rule 1: portfolio-noun queries are allowed a SECOND CRAG retry after
the first retry also finds nothing. This prevents the not-found response from
firing on queries where the corpus genuinely should have results (e.g. a typo
in a project name or a synonym mismatch that's specific to portfolio content).
Attempt tracking (via retrieval_attempts):
First retrieve β†’ retrieval_attempts = 1
First rewrite β†’ retrieval_attempts = 2 (rewrite_query increments by +1)
Second retrieve β†’ retrieval_attempts = 3
Second rewrite β†’ retrieval_attempts = 4 (portfolio queries only)
Third retrieve β†’ retrieval_attempts = 5
Any attempt β‰₯ 5 (or β‰₯ 3 for non-portfolio queries) goes to generate.
Routing terminates because retrieval_attempts grows monotonically.
"""
attempts = state.get("retrieval_attempts", 1)
reranked = state.get("reranked_chunks", [])
query = state.get("query", "")
# First CRAG attempt β€” applies to all queries with meaningful tokens.
if attempts == 1 and _has_meaningful_token(query):
if not reranked:
return "rewrite"
top_score = state.get("top_rerank_score")
if top_score is not None and top_score < _CRAG_LOW_CONFIDENCE_SCORE:
return "rewrite"
# Fix 2 Rule 1: second CRAG attempt for portfolio-noun queries only.
# attempts==3 means: first retrieve failed β†’ rewrite fired β†’ second retrieve
# also failed (still empty after the first CRAG rewrite). When the query
# mentions a known portfolio entity, attempt one more vocabulary-shifted rewrite
# before admitting the not-found path.
if attempts == 3 and is_portfolio_relevant(query):
if not reranked:
return "rewrite"
top_score = state.get("top_rerank_score")
if top_score is not None and top_score < _CRAG_LOW_CONFIDENCE_SCORE:
return "rewrite"
return "generate"
def build_pipeline(services: dict) -> CompiledStateGraph:
graph = StateGraph(PipelineState)
graph.add_node("guard", make_guard_node(services["classifier"]))
graph.add_node("enumerate_query", make_enumerate_query_node(services["vector_store"]))
graph.add_node("cache", make_cache_node(services["cache"], services["embedder"]))
graph.add_node("gemini_fast", make_gemini_fast_node(services["gemini"]))
graph.add_node("retrieve", make_retrieve_node(
services["vector_store"],
services["embedder"],
services["reranker"]))
# CRAG: query rewrite on failed retrieval β€” runs up to twice for portfolio queries.
graph.add_node("rewrite_query", make_rewrite_query_node(services["gemini"]))
graph.add_node("generate", make_generate_node(services["llm"], services["gemini"]))
graph.add_node("log_eval", make_log_eval_node(services["db_path"], services.get("github_log")))
graph.set_entry_point("guard")
graph.add_conditional_edges("guard", route_guard,
{"pass": "enumerate_query", "block": "log_eval"})
# Fix 1: enumerate_query either skips straight to generate (full list fetched)
# or falls through to the normal cache β†’ gemini_fast β†’ retrieve pipeline.
graph.add_conditional_edges("enumerate_query", route_enumerate,
{"skip_to_generate": "generate", "continue": "cache"})
graph.add_conditional_edges("cache", route_cache,
{"hit": "log_eval", "miss": "gemini_fast"})
graph.add_conditional_edges("gemini_fast", route_gemini,
{"answered": "log_eval", "research": "retrieve"})
# After retrieve: either run CRAG rewrite (up to twice for portfolio queries)
# or proceed to generate.
graph.add_conditional_edges("retrieve", route_retrieve_result,
{"rewrite": "rewrite_query", "generate": "generate"})
# After rewrite: go straight back to retrieve for the next attempt.
# The cycle terminates because route_retrieve_result checks retrieval_attempts.
graph.add_edge("rewrite_query", "retrieve")
graph.add_edge("generate", "log_eval")
graph.add_edge("log_eval", END)
return graph.compile()