Spaces:
Sleeping
Sleeping
File size: 7,006 Bytes
e0670a4 4b8c9b5 e0670a4 4b8c9b5 e0670a4 4b8c9b5 e0670a4 4b8c9b5 e0670a4 4b8c9b5 e0670a4 4b8c9b5 e0670a4 4b8c9b5 e0670a4 4b8c9b5 e0670a4 4b8c9b5 e0670a4 4b8c9b5 e0670a4 4b8c9b5 e0670a4 4b8c9b5 e0670a4 4b8c9b5 e0670a4 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 | """The agentic RAG loop.
Pipeline:
1. Self-RAG router: RETRIEVE / ANSWER_DIRECTLY / CLARIFY
2. (RETRIEVE branch) plan -> tools -> answer -> self-critique -> refine
3. Self-Healing layer: hallucination check, chunk expansion, gap detection
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any
from config import AGENT_CONFIG
from agent.critic import critique, refine_query
from agent.planner import plan
from agent.router import route
from agent.tools import ToolResult, vector_search
from llm.client_factory import get_llm
from retrieval.dense import Hit
@dataclass
class TraceStep:
kind: str
detail: dict[str, Any] = field(default_factory=dict)
@dataclass
class AgentResult:
answer: str
citations: list[dict]
confidence: float
trace: list[TraceStep]
iterations: int
route: str
healing_trace: list[dict] = field(default_factory=list)
health_score: float = 100.0
ANSWER_SYSTEM = (
"You are a careful research assistant. Use ONLY the provided passages to "
"answer the question. Cite sources inline with [N] where N is the passage "
"number. If the passages are insufficient, say so explicitly."
)
ANSWER_PROMPT = """Question: {question}
Passages:
{context}
Write a concise, well-grounded answer. Use inline citations like [1], [2] that
match the passage numbers above. If multiple passages support a claim, cite
them all. If the passages do not contain enough information, say so plainly.
"""
def _format_context_block(hits: list[Hit]) -> tuple[str, list[dict]]:
lines = []
citations = []
for i, h in enumerate(hits, start=1):
meta = h.metadata
title = meta.get("title") or meta.get("source_path", "?")
pages = f"p.{meta.get('page_start')}-{meta.get('page_end')}"
head = f"[{i}] {title} ({pages})"
lines.append(f"{head}\n{h.text}")
citations.append(
{
"n": i,
"chunk_id": h.chunk_id,
"title": title,
"source_path": meta.get("source_path"),
"page_start": meta.get("page_start"),
"page_end": meta.get("page_end"),
"score": float(h.score),
}
)
return "\n\n".join(lines), citations
def _dedupe_hits(hits: list[Hit], limit: int) -> list[Hit]:
seen: set[str] = set()
out: list[Hit] = []
for h in hits:
if h.chunk_id in seen:
continue
seen.add(h.chunk_id)
out.append(h)
if len(out) >= limit:
break
return out
def run_agent(
query: str,
llm=None,
enable_healing: bool = True,
) -> AgentResult:
llm = llm or get_llm()
trace: list[TraceStep] = []
# 1. Router
decision = route(query, llm=llm)
trace.append(TraceStep("router", decision))
if decision["action"] == "ANSWER_DIRECTLY":
ans = llm.generate(
prompt=query,
system="You are a helpful research assistant. Be concise.",
temperature=0.2,
)
return AgentResult(
answer=ans, citations=[], confidence=1.0,
trace=trace, iterations=0, route="ANSWER_DIRECTLY",
)
if decision["action"] == "CLARIFY":
ans = llm.generate(
prompt=(
"The user asked: " + query + "\n\n"
"It is too ambiguous to answer well. Ask one short clarifying "
"question to narrow it down."
),
system="You are a helpful research assistant.",
temperature=0.2,
)
return AgentResult(
answer=ans, citations=[], confidence=0.0,
trace=trace, iterations=0, route="CLARIFY",
)
# 2. RETRIEVE branch — agentic loop
current_query = query
last_critique: dict[str, Any] = {}
accumulated: list[Hit] = []
answer = ""
citations: list[dict] = []
unique_hits: list[Hit] = []
final_iteration = 0
for iteration in range(AGENT_CONFIG["max_iterations"]):
prior_summary = ""
if accumulated:
titles = sorted({h.metadata.get("title", "?") for h in accumulated})
prior_summary = "Already gathered passages from: " + ", ".join(titles)
steps = plan(current_query, prior_summary=prior_summary, llm=llm)
trace.append(TraceStep("plan", {"iteration": iteration, "steps": steps}))
for step in steps:
tool_res: ToolResult = vector_search(step["query"])
accumulated.extend(tool_res.hits)
trace.append(
TraceStep(
"tool",
{
"tool": "vector_search",
"query": step["query"],
"n_hits": len(tool_res.hits),
"top_titles": [h.metadata.get("title") for h in tool_res.hits[:3]],
},
)
)
unique_hits = _dedupe_hits(accumulated, limit=8)
context_block, citations = _format_context_block(unique_hits)
answer = llm.generate(
prompt=ANSWER_PROMPT.format(question=query, context=context_block),
system=ANSWER_SYSTEM,
temperature=0.1,
)
trace.append(TraceStep("answer", {"iteration": iteration, "n_passages": len(unique_hits)}))
crit = critique(query, answer, context_block, llm=llm)
last_critique = crit
final_iteration = iteration + 1
trace.append(TraceStep("critique", {"iteration": iteration, **crit}))
if crit["confidence"] >= AGENT_CONFIG["confidence_threshold"] and crit["grounded"]:
break
current_query = refine_query(query, crit.get("missing", ""), llm=llm)
trace.append(TraceStep("refine", {"new_query": current_query}))
# 3. Self-Healing layer
healing_trace: list[dict] = []
health_score = 100.0
if enable_healing and unique_hits:
from healing.healing_loop import self_heal
healed = self_heal(query, answer, unique_hits, citations, llm=llm)
answer = healed.answer
citations = healed.citations
healing_trace = healed.healing_trace
health_score = healed.health_score
trace.append(
TraceStep(
"healing",
{
"attempts": healed.attempts_used,
"health_score": health_score,
"issues_found": [
a["issues"]
for a in healing_trace
if not a.get("healthy", False)
],
},
)
)
return AgentResult(
answer=answer,
citations=citations,
confidence=last_critique.get("confidence", 0.0),
trace=trace,
iterations=final_iteration,
route="RETRIEVE",
healing_trace=healing_trace,
health_score=health_score,
)
|