|
|
""" |
|
|
Synthesizer Agent |
|
|
|
|
|
Generates grounded answers with proper citations. |
|
|
Follows FAANG best practices for production RAG systems. |
|
|
|
|
|
Key Features: |
|
|
- Structured answer generation with citations |
|
|
- Multi-source synthesis |
|
|
- Confidence estimation |
|
|
- Abstention when information is insufficient |
|
|
- Support for different answer formats (prose, list, table) |
|
|
""" |
|
|
|
|
|
from typing import List, Optional, Dict, Any, Literal |
|
|
from pydantic import BaseModel, Field |
|
|
from loguru import logger |
|
|
from enum import Enum |
|
|
import json |
|
|
import re |
|
|
|
|
|
try: |
|
|
import httpx |
|
|
HTTPX_AVAILABLE = True |
|
|
except ImportError: |
|
|
HTTPX_AVAILABLE = False |
|
|
|
|
|
from .reranker import RankedResult |
|
|
from .query_planner import QueryPlan, QueryIntent |
|
|
|
|
|
|
|
|
class AnswerFormat(str, Enum): |
|
|
"""Format for generated answer.""" |
|
|
PROSE = "prose" |
|
|
BULLET_POINTS = "bullet_points" |
|
|
TABLE = "table" |
|
|
STEP_BY_STEP = "step_by_step" |
|
|
|
|
|
|
|
|
class Citation(BaseModel): |
|
|
"""A citation reference in the answer.""" |
|
|
index: int |
|
|
chunk_id: str |
|
|
document_id: str |
|
|
page: Optional[int] = None |
|
|
text_snippet: str |
|
|
relevance_score: float |
|
|
|
|
|
|
|
|
class SynthesisResult(BaseModel): |
|
|
"""Result from answer synthesis.""" |
|
|
answer: str |
|
|
citations: List[Citation] |
|
|
confidence: float |
|
|
format: AnswerFormat |
|
|
|
|
|
|
|
|
num_sources_used: int |
|
|
abstained: bool = False |
|
|
abstain_reason: Optional[str] = None |
|
|
|
|
|
|
|
|
raw_context: Optional[str] = None |
|
|
|
|
|
|
|
|
class SynthesizerConfig(BaseModel): |
|
|
"""Configuration for synthesizer.""" |
|
|
|
|
|
model: str = Field(default="llama3.2:3b") |
|
|
base_url: str = Field(default="http://localhost:11434") |
|
|
temperature: float = Field(default=0.2) |
|
|
max_tokens: int = Field(default=1024) |
|
|
|
|
|
|
|
|
require_citations: bool = Field(default=True) |
|
|
min_citations: int = Field(default=1) |
|
|
citation_format: str = Field(default="[{index}]") |
|
|
|
|
|
|
|
|
abstain_on_low_confidence: bool = Field(default=True) |
|
|
confidence_threshold: float = Field(default=0.4) |
|
|
min_sources: int = Field(default=1) |
|
|
|
|
|
|
|
|
max_context_length: int = Field(default=4000) |
|
|
|
|
|
|
|
|
class SynthesizerAgent: |
|
|
""" |
|
|
Generates grounded answers with citations. |
|
|
|
|
|
Capabilities: |
|
|
1. Context-aware answer generation |
|
|
2. Proper citation formatting |
|
|
3. Multi-source synthesis |
|
|
4. Confidence-based abstention |
|
|
5. Format adaptation based on query intent |
|
|
""" |
|
|
|
|
|
SYNTHESIS_PROMPT = """You are a precise document question-answering assistant. |
|
|
Generate an answer to the query based ONLY on the provided context. |
|
|
|
|
|
RULES: |
|
|
1. Only use information from the provided context |
|
|
2. Cite sources using [N] notation where N matches the source number (e.g., [1], [2]) |
|
|
3. If the context doesn't contain enough information, say "I cannot answer this question based on the available information." |
|
|
4. Be precise, accurate, and concise |
|
|
5. Include at least one citation for factual claims |
|
|
6. Do not make up information not in the context |
|
|
|
|
|
CONTEXT: |
|
|
{context} |
|
|
|
|
|
QUERY: {query} |
|
|
|
|
|
FORMAT: {format_instruction} |
|
|
|
|
|
ANSWER:""" |
|
|
|
|
|
FORMAT_INSTRUCTIONS = { |
|
|
AnswerFormat.PROSE: "Write a clear, flowing paragraph with proper citations.", |
|
|
AnswerFormat.BULLET_POINTS: "Use bullet points for each key point, with citations.", |
|
|
AnswerFormat.TABLE: "Format as a markdown table if comparing items.", |
|
|
AnswerFormat.STEP_BY_STEP: "Number each step clearly with citations.", |
|
|
} |
|
|
|
|
|
def __init__(self, config: Optional[SynthesizerConfig] = None): |
|
|
""" |
|
|
Initialize Synthesizer Agent. |
|
|
|
|
|
Args: |
|
|
config: Synthesizer configuration |
|
|
""" |
|
|
self.config = config or SynthesizerConfig() |
|
|
logger.info(f"SynthesizerAgent initialized (model={self.config.model})") |
|
|
|
|
|
def synthesize( |
|
|
self, |
|
|
query: str, |
|
|
results: List[RankedResult], |
|
|
plan: Optional[QueryPlan] = None, |
|
|
format_override: Optional[AnswerFormat] = None, |
|
|
) -> SynthesisResult: |
|
|
""" |
|
|
Generate answer from ranked results. |
|
|
|
|
|
Args: |
|
|
query: User's question |
|
|
results: Ranked retrieval results |
|
|
plan: Optional query plan for context |
|
|
format_override: Override auto-detected format |
|
|
|
|
|
Returns: |
|
|
SynthesisResult with answer and citations |
|
|
""" |
|
|
|
|
|
if not results: |
|
|
return self._abstain("No relevant sources found") |
|
|
|
|
|
|
|
|
avg_confidence = sum(r.relevance_score for r in results) / len(results) |
|
|
|
|
|
if self.config.abstain_on_low_confidence: |
|
|
if avg_confidence < self.config.confidence_threshold: |
|
|
return self._abstain( |
|
|
f"Low confidence ({avg_confidence:.2f}) in available sources" |
|
|
) |
|
|
if len(results) < self.config.min_sources: |
|
|
return self._abstain( |
|
|
f"Insufficient sources ({len(results)} < {self.config.min_sources})" |
|
|
) |
|
|
|
|
|
|
|
|
answer_format = format_override or self._detect_format(query, plan) |
|
|
|
|
|
|
|
|
context, citations = self._build_context(results) |
|
|
|
|
|
|
|
|
if HTTPX_AVAILABLE: |
|
|
raw_answer = self._generate_answer(query, context, answer_format) |
|
|
else: |
|
|
raw_answer = self._simple_answer(query, results) |
|
|
|
|
|
|
|
|
used_citations = self._extract_used_citations(raw_answer, citations) |
|
|
|
|
|
|
|
|
confidence = self._calculate_confidence(results, used_citations) |
|
|
|
|
|
return SynthesisResult( |
|
|
answer=raw_answer, |
|
|
citations=used_citations, |
|
|
confidence=confidence, |
|
|
format=answer_format, |
|
|
num_sources_used=len(used_citations), |
|
|
abstained=False, |
|
|
raw_context=context if len(context) < 2000 else None, |
|
|
) |
|
|
|
|
|
def synthesize_multi_hop( |
|
|
self, |
|
|
query: str, |
|
|
sub_results: Dict[str, List[RankedResult]], |
|
|
plan: QueryPlan, |
|
|
) -> SynthesisResult: |
|
|
""" |
|
|
Synthesize answer from multiple sub-query results. |
|
|
|
|
|
Args: |
|
|
query: Original query |
|
|
sub_results: Results for each sub-query |
|
|
plan: Query plan with sub-queries |
|
|
|
|
|
Returns: |
|
|
Synthesized answer combining all sources |
|
|
""" |
|
|
|
|
|
all_results = [] |
|
|
for sq_id, results in sub_results.items(): |
|
|
all_results.extend(results) |
|
|
|
|
|
|
|
|
seen = set() |
|
|
unique_results = [] |
|
|
for result in all_results: |
|
|
if result.chunk_id not in seen: |
|
|
seen.add(result.chunk_id) |
|
|
unique_results.append(result) |
|
|
|
|
|
|
|
|
unique_results.sort(key=lambda x: x.relevance_score, reverse=True) |
|
|
|
|
|
|
|
|
if plan.requires_aggregation: |
|
|
return self._synthesize_aggregation(query, unique_results, plan) |
|
|
|
|
|
return self.synthesize(query, unique_results, plan) |
|
|
|
|
|
def _abstain(self, reason: str) -> SynthesisResult: |
|
|
"""Create an abstention result.""" |
|
|
return SynthesisResult( |
|
|
answer="I cannot answer this question based on the available information.", |
|
|
citations=[], |
|
|
confidence=0.0, |
|
|
format=AnswerFormat.PROSE, |
|
|
num_sources_used=0, |
|
|
abstained=True, |
|
|
abstain_reason=reason, |
|
|
) |
|
|
|
|
|
def _detect_format( |
|
|
self, |
|
|
query: str, |
|
|
plan: Optional[QueryPlan], |
|
|
) -> AnswerFormat: |
|
|
"""Auto-detect best answer format.""" |
|
|
query_lower = query.lower() |
|
|
|
|
|
if plan: |
|
|
if plan.intent == QueryIntent.COMPARISON: |
|
|
return AnswerFormat.TABLE |
|
|
if plan.intent == QueryIntent.PROCEDURAL: |
|
|
return AnswerFormat.STEP_BY_STEP |
|
|
if plan.intent == QueryIntent.LIST: |
|
|
return AnswerFormat.BULLET_POINTS |
|
|
|
|
|
|
|
|
if any(p in query_lower for p in ["list", "what are all", "enumerate"]): |
|
|
return AnswerFormat.BULLET_POINTS |
|
|
if any(p in query_lower for p in ["compare", "difference", "vs"]): |
|
|
return AnswerFormat.TABLE |
|
|
if any(p in query_lower for p in ["how to", "steps", "process"]): |
|
|
return AnswerFormat.STEP_BY_STEP |
|
|
|
|
|
return AnswerFormat.PROSE |
|
|
|
|
|
def _build_context( |
|
|
self, |
|
|
results: List[RankedResult], |
|
|
) -> tuple[str, List[Citation]]: |
|
|
"""Build context string and citation list.""" |
|
|
context_parts = [] |
|
|
citations = [] |
|
|
|
|
|
total_length = 0 |
|
|
|
|
|
for i, result in enumerate(results, 1): |
|
|
|
|
|
chunk_text = result.text |
|
|
if total_length + len(chunk_text) > self.config.max_context_length: |
|
|
|
|
|
remaining = self.config.max_context_length - total_length |
|
|
if remaining > 100: |
|
|
chunk_text = chunk_text[:remaining] + "..." |
|
|
else: |
|
|
break |
|
|
|
|
|
|
|
|
header = f"[{i}]" |
|
|
if result.page is not None: |
|
|
header += f" (Page {result.page + 1})" |
|
|
if result.source_path: |
|
|
header += f" - {result.source_path}" |
|
|
|
|
|
context_parts.append(f"{header}:\n{chunk_text}\n") |
|
|
total_length += len(chunk_text) |
|
|
|
|
|
|
|
|
citations.append(Citation( |
|
|
index=i, |
|
|
chunk_id=result.chunk_id, |
|
|
document_id=result.document_id, |
|
|
page=result.page, |
|
|
text_snippet=chunk_text[:150] + ("..." if len(chunk_text) > 150 else ""), |
|
|
relevance_score=result.relevance_score, |
|
|
)) |
|
|
|
|
|
return "\n".join(context_parts), citations |
|
|
|
|
|
def _generate_answer( |
|
|
self, |
|
|
query: str, |
|
|
context: str, |
|
|
answer_format: AnswerFormat, |
|
|
) -> str: |
|
|
"""Generate answer using LLM.""" |
|
|
format_instruction = self.FORMAT_INSTRUCTIONS.get( |
|
|
answer_format, |
|
|
self.FORMAT_INSTRUCTIONS[AnswerFormat.PROSE] |
|
|
) |
|
|
|
|
|
prompt = self.SYNTHESIS_PROMPT.format( |
|
|
context=context, |
|
|
query=query, |
|
|
format_instruction=format_instruction, |
|
|
) |
|
|
|
|
|
with httpx.Client(timeout=60.0) as client: |
|
|
response = client.post( |
|
|
f"{self.config.base_url}/api/generate", |
|
|
json={ |
|
|
"model": self.config.model, |
|
|
"prompt": prompt, |
|
|
"stream": False, |
|
|
"options": { |
|
|
"temperature": self.config.temperature, |
|
|
"num_predict": self.config.max_tokens, |
|
|
}, |
|
|
}, |
|
|
) |
|
|
response.raise_for_status() |
|
|
result = response.json() |
|
|
|
|
|
return result.get("response", "").strip() |
|
|
|
|
|
def _simple_answer( |
|
|
self, |
|
|
query: str, |
|
|
results: List[RankedResult], |
|
|
) -> str: |
|
|
"""Simple answer without LLM (fallback).""" |
|
|
if not results: |
|
|
return "No information found." |
|
|
|
|
|
|
|
|
answer_parts = ["Based on the available sources:\n"] |
|
|
for i, result in enumerate(results[:3], 1): |
|
|
answer_parts.append(f"[{i}] {result.text[:200]}...") |
|
|
|
|
|
return "\n\n".join(answer_parts) |
|
|
|
|
|
def _extract_used_citations( |
|
|
self, |
|
|
answer: str, |
|
|
all_citations: List[Citation], |
|
|
) -> List[Citation]: |
|
|
"""Extract citations actually used in the answer.""" |
|
|
used_indices = set() |
|
|
|
|
|
|
|
|
pattern = r'\[(\d+)\]' |
|
|
matches = re.findall(pattern, answer) |
|
|
|
|
|
for match in matches: |
|
|
idx = int(match) |
|
|
if 1 <= idx <= len(all_citations): |
|
|
used_indices.add(idx) |
|
|
|
|
|
|
|
|
return [c for c in all_citations if c.index in used_indices] |
|
|
|
|
|
def _calculate_confidence( |
|
|
self, |
|
|
results: List[RankedResult], |
|
|
used_citations: List[Citation], |
|
|
) -> float: |
|
|
"""Calculate overall confidence in the answer.""" |
|
|
if not results: |
|
|
return 0.0 |
|
|
|
|
|
|
|
|
|
|
|
if used_citations: |
|
|
source_confidence = sum(c.relevance_score for c in used_citations) / len(used_citations) |
|
|
else: |
|
|
source_confidence = sum(r.relevance_score for r in results) / len(results) |
|
|
|
|
|
|
|
|
source_count_factor = min(len(used_citations) / 3, 1.0) if used_citations else 0.5 |
|
|
|
|
|
|
|
|
|
|
|
consistency_factor = 0.8 |
|
|
|
|
|
confidence = ( |
|
|
0.5 * source_confidence + |
|
|
0.3 * source_count_factor + |
|
|
0.2 * consistency_factor |
|
|
) |
|
|
|
|
|
return min(max(confidence, 0.0), 1.0) |
|
|
|
|
|
def _synthesize_aggregation( |
|
|
self, |
|
|
query: str, |
|
|
results: List[RankedResult], |
|
|
plan: QueryPlan, |
|
|
) -> SynthesisResult: |
|
|
"""Synthesize aggregation-style answer.""" |
|
|
|
|
|
return self.synthesize( |
|
|
query, |
|
|
results, |
|
|
plan, |
|
|
format_override=AnswerFormat.BULLET_POINTS, |
|
|
) |
|
|
|