Spaces:
Sleeping
Sleeping
| """The agentic RAG loop. | |
| Pipeline: | |
| 1. Self-RAG router: RETRIEVE / ANSWER_DIRECTLY / CLARIFY | |
| 2. (RETRIEVE branch) plan -> tools -> answer -> self-critique -> refine | |
| 3. Self-Healing layer: hallucination check, chunk expansion, gap detection | |
| """ | |
| from __future__ import annotations | |
| from dataclasses import dataclass, field | |
| from typing import Any | |
| from config import AGENT_CONFIG | |
| from agent.critic import critique, refine_query | |
| from agent.planner import plan | |
| from agent.router import route | |
| from agent.tools import ToolResult, vector_search | |
| from llm.client_factory import get_llm | |
| from retrieval.dense import Hit | |
| class TraceStep: | |
| kind: str | |
| detail: dict[str, Any] = field(default_factory=dict) | |
| class AgentResult: | |
| answer: str | |
| citations: list[dict] | |
| confidence: float | |
| trace: list[TraceStep] | |
| iterations: int | |
| route: str | |
| healing_trace: list[dict] = field(default_factory=list) | |
| health_score: float = 100.0 | |
| ANSWER_SYSTEM = ( | |
| "You are a careful research assistant. Use ONLY the provided passages to " | |
| "answer the question. Cite sources inline with [N] where N is the passage " | |
| "number. If the passages are insufficient, say so explicitly." | |
| ) | |
| ANSWER_PROMPT = """Question: {question} | |
| Passages: | |
| {context} | |
| Write a concise, well-grounded answer. Use inline citations like [1], [2] that | |
| match the passage numbers above. If multiple passages support a claim, cite | |
| them all. If the passages do not contain enough information, say so plainly. | |
| """ | |
| def _format_context_block(hits: list[Hit]) -> tuple[str, list[dict]]: | |
| lines = [] | |
| citations = [] | |
| for i, h in enumerate(hits, start=1): | |
| meta = h.metadata | |
| title = meta.get("title") or meta.get("source_path", "?") | |
| pages = f"p.{meta.get('page_start')}-{meta.get('page_end')}" | |
| head = f"[{i}] {title} ({pages})" | |
| lines.append(f"{head}\n{h.text}") | |
| citations.append( | |
| { | |
| "n": i, | |
| "chunk_id": h.chunk_id, | |
| "title": title, | |
| "source_path": meta.get("source_path"), | |
| "page_start": meta.get("page_start"), | |
| "page_end": meta.get("page_end"), | |
| "score": float(h.score), | |
| } | |
| ) | |
| return "\n\n".join(lines), citations | |
| def _dedupe_hits(hits: list[Hit], limit: int) -> list[Hit]: | |
| seen: set[str] = set() | |
| out: list[Hit] = [] | |
| for h in hits: | |
| if h.chunk_id in seen: | |
| continue | |
| seen.add(h.chunk_id) | |
| out.append(h) | |
| if len(out) >= limit: | |
| break | |
| return out | |
| def run_agent( | |
| query: str, | |
| llm=None, | |
| enable_healing: bool = True, | |
| ) -> AgentResult: | |
| llm = llm or get_llm() | |
| trace: list[TraceStep] = [] | |
| # 1. Router | |
| decision = route(query, llm=llm) | |
| trace.append(TraceStep("router", decision)) | |
| if decision["action"] == "ANSWER_DIRECTLY": | |
| ans = llm.generate( | |
| prompt=query, | |
| system="You are a helpful research assistant. Be concise.", | |
| temperature=0.2, | |
| ) | |
| return AgentResult( | |
| answer=ans, citations=[], confidence=1.0, | |
| trace=trace, iterations=0, route="ANSWER_DIRECTLY", | |
| ) | |
| if decision["action"] == "CLARIFY": | |
| ans = llm.generate( | |
| prompt=( | |
| "The user asked: " + query + "\n\n" | |
| "It is too ambiguous to answer well. Ask one short clarifying " | |
| "question to narrow it down." | |
| ), | |
| system="You are a helpful research assistant.", | |
| temperature=0.2, | |
| ) | |
| return AgentResult( | |
| answer=ans, citations=[], confidence=0.0, | |
| trace=trace, iterations=0, route="CLARIFY", | |
| ) | |
| # 2. RETRIEVE branch — agentic loop | |
| current_query = query | |
| last_critique: dict[str, Any] = {} | |
| accumulated: list[Hit] = [] | |
| answer = "" | |
| citations: list[dict] = [] | |
| unique_hits: list[Hit] = [] | |
| final_iteration = 0 | |
| for iteration in range(AGENT_CONFIG["max_iterations"]): | |
| prior_summary = "" | |
| if accumulated: | |
| titles = sorted({h.metadata.get("title", "?") for h in accumulated}) | |
| prior_summary = "Already gathered passages from: " + ", ".join(titles) | |
| steps = plan(current_query, prior_summary=prior_summary, llm=llm) | |
| trace.append(TraceStep("plan", {"iteration": iteration, "steps": steps})) | |
| for step in steps: | |
| tool_res: ToolResult = vector_search(step["query"]) | |
| accumulated.extend(tool_res.hits) | |
| trace.append( | |
| TraceStep( | |
| "tool", | |
| { | |
| "tool": "vector_search", | |
| "query": step["query"], | |
| "n_hits": len(tool_res.hits), | |
| "top_titles": [h.metadata.get("title") for h in tool_res.hits[:3]], | |
| }, | |
| ) | |
| ) | |
| unique_hits = _dedupe_hits(accumulated, limit=8) | |
| context_block, citations = _format_context_block(unique_hits) | |
| answer = llm.generate( | |
| prompt=ANSWER_PROMPT.format(question=query, context=context_block), | |
| system=ANSWER_SYSTEM, | |
| temperature=0.1, | |
| ) | |
| trace.append(TraceStep("answer", {"iteration": iteration, "n_passages": len(unique_hits)})) | |
| crit = critique(query, answer, context_block, llm=llm) | |
| last_critique = crit | |
| final_iteration = iteration + 1 | |
| trace.append(TraceStep("critique", {"iteration": iteration, **crit})) | |
| if crit["confidence"] >= AGENT_CONFIG["confidence_threshold"] and crit["grounded"]: | |
| break | |
| current_query = refine_query(query, crit.get("missing", ""), llm=llm) | |
| trace.append(TraceStep("refine", {"new_query": current_query})) | |
| # 3. Self-Healing layer | |
| healing_trace: list[dict] = [] | |
| health_score = 100.0 | |
| if enable_healing and unique_hits: | |
| from healing.healing_loop import self_heal | |
| healed = self_heal(query, answer, unique_hits, citations, llm=llm) | |
| answer = healed.answer | |
| citations = healed.citations | |
| healing_trace = healed.healing_trace | |
| health_score = healed.health_score | |
| trace.append( | |
| TraceStep( | |
| "healing", | |
| { | |
| "attempts": healed.attempts_used, | |
| "health_score": health_score, | |
| "issues_found": [ | |
| a["issues"] | |
| for a in healing_trace | |
| if not a.get("healthy", False) | |
| ], | |
| }, | |
| ) | |
| ) | |
| return AgentResult( | |
| answer=answer, | |
| citations=citations, | |
| confidence=last_critique.get("confidence", 0.0), | |
| trace=trace, | |
| iterations=final_iteration, | |
| route="RETRIEVE", | |
| healing_trace=healing_trace, | |
| health_score=health_score, | |
| ) | |