File size: 11,996 Bytes
aca8ab4 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 | """
Synthesis Agent: Compare findings across papers and identify patterns.
"""
import os
import json
import logging
from typing import Dict, Any, List
from openai import AzureOpenAI
from utils.schemas import Analysis, SynthesisResult, ConsensusPoint, Contradiction, Paper
from rag.retrieval import RAGRetriever
from utils.langfuse_client import observe
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class SynthesisAgent:
"""Agent for synthesizing findings across multiple papers."""
def __init__(
self,
rag_retriever: RAGRetriever,
model=os.getenv("AZURE_OPENAI_DEPLOYMENT_NAME"),
temperature: float = 0.0,
timeout: int = 90
):
"""
Initialize Synthesis Agent.
Args:
rag_retriever: RAGRetriever instance
model: Azure OpenAI model deployment name
temperature: Temperature for generation (0 for deterministic)
timeout: Request timeout in seconds (default: 90, longer than analyzer)
"""
self.rag_retriever = rag_retriever
self.model = model
self.temperature = temperature
self.timeout = timeout
# Initialize Azure OpenAI client with timeout
self.client = AzureOpenAI(
api_key=os.getenv("AZURE_OPENAI_API_KEY"),
#api_version="2024-02-01",
api_version=os.getenv("AZURE_OPENAI_API_VERSION"),
azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
timeout=timeout
)
def _create_synthesis_prompt(
self,
papers: List[Paper],
analyses: List[Analysis],
query: str
) -> str:
"""Create prompt for synthesis."""
# Format paper summaries
paper_summaries = []
for paper, analysis in zip(papers, analyses):
summary = f"""
Paper ID: {paper.arxiv_id}
Title: {paper.title}
Authors: {", ".join(paper.authors)}
Analysis:
- Methodology: {analysis.methodology}
- Key Findings: {", ".join(analysis.key_findings)}
- Conclusions: {analysis.conclusions}
- Contributions: {", ".join(analysis.main_contributions)}
- Limitations: {", ".join(analysis.limitations)}
"""
paper_summaries.append(summary)
prompt = f"""You are a research synthesis expert. Analyze the following papers in relation to the user's research question.
Research Question: {query}
Papers Analyzed:
{"=" * 80}
{chr(10).join(paper_summaries)}
{"=" * 80}
Synthesize these findings and provide:
1. Consensus points - areas where papers agree
2. Contradictions - areas where papers disagree
3. Research gaps - what's missing or needs further investigation
4. Executive summary addressing the research question
Provide your synthesis in the following JSON format:
{{
"consensus_points": [
{{
"statement": "Clear consensus statement",
"supporting_papers": ["arxiv_id1", "arxiv_id2"],
"citations": ["Specific evidence from papers"],
"confidence": 0.0-1.0
}}
],
"contradictions": [
{{
"topic": "Topic of disagreement",
"viewpoint_a": "First viewpoint",
"papers_a": ["arxiv_id1"],
"viewpoint_b": "Second viewpoint",
"papers_b": ["arxiv_id2"],
"citations": ["Evidence for both sides"],
"confidence": 0.0-1.0
}}
],
"research_gaps": [
"Gap 1: What's missing",
"Gap 2: What needs further research"
],
"summary": "Executive summary addressing the research question with synthesis of all findings",
"confidence_score": 0.0-1.0
}}
CRITICAL JSON FORMATTING RULES:
- Ground all statements in the provided analyses
- Be specific about which papers support which claims
- Identify both agreements and disagreements
- Provide confidence scores based on consistency and evidence strength
- For ALL array fields (citations, supporting_papers, papers_a, papers_b, research_gaps):
* MUST be flat arrays of strings ONLY: ["item1", "item2"]
* NEVER nest arrays: [[], "text"] or [["nested"]] are INVALID
* NEVER include null, empty strings, or non-string values
* Each array element must be a non-empty string
"""
return prompt
def _normalize_synthesis_response(self, data: dict) -> dict:
"""
Normalize synthesis LLM response to ensure all list fields contain only strings.
Handles nested lists, None values, and mixed types in:
- consensus_points[].citations
- consensus_points[].supporting_papers
- contradictions[].citations
- contradictions[].papers_a
- contradictions[].papers_b
- research_gaps
Args:
data: Raw synthesis data dictionary from LLM
Returns:
Normalized dictionary with correct types for all fields
"""
def flatten_and_clean(value):
"""Recursively flatten nested lists and clean values."""
if isinstance(value, str):
return [value.strip()] if value.strip() else []
elif isinstance(value, list):
cleaned = []
for item in value:
if isinstance(item, str):
if item.strip():
cleaned.append(item.strip())
elif isinstance(item, list):
cleaned.extend(flatten_and_clean(item))
elif item is not None and str(item).strip():
cleaned.append(str(item).strip())
return cleaned
elif value is not None:
str_value = str(value).strip()
return [str_value] if str_value else []
else:
return []
# Normalize top-level research_gaps
if "research_gaps" in data:
data["research_gaps"] = flatten_and_clean(data["research_gaps"])
else:
data["research_gaps"] = []
# Normalize consensus_points
if "consensus_points" in data and isinstance(data["consensus_points"], list):
for cp in data["consensus_points"]:
if isinstance(cp, dict):
cp["citations"] = flatten_and_clean(cp.get("citations", []))
cp["supporting_papers"] = flatten_and_clean(cp.get("supporting_papers", []))
# Normalize contradictions
if "contradictions" in data and isinstance(data["contradictions"], list):
for contr in data["contradictions"]:
if isinstance(contr, dict):
contr["citations"] = flatten_and_clean(contr.get("citations", []))
contr["papers_a"] = flatten_and_clean(contr.get("papers_a", []))
contr["papers_b"] = flatten_and_clean(contr.get("papers_b", []))
logger.debug("Synthesis response normalized successfully")
return data
def synthesize(
self,
papers: List[Paper],
analyses: List[Analysis],
query: str,
state: Dict[str, Any]
) -> SynthesisResult:
"""
Synthesize findings across papers.
Args:
papers: List of Paper objects
analyses: List of Analysis objects
query: Original research question
state: Agent state for token tracking
Returns:
SynthesisResult object
"""
try:
logger.info(f"Synthesizing {len(papers)} papers")
# Create synthesis prompt
prompt = self._create_synthesis_prompt(papers, analyses, query)
# Call Azure OpenAI with temperature=0 and output limits
response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": "You are a research synthesis expert. Provide accurate, grounded synthesis based only on the provided analyses."},
{"role": "user", "content": prompt}
],
temperature=self.temperature,
max_tokens=2500, # Larger limit for multi-paper synthesis
response_format={"type": "json_object"}
)
# Track token usage
if hasattr(response, 'usage') and response.usage:
prompt_tokens = response.usage.prompt_tokens
completion_tokens = response.usage.completion_tokens
state["token_usage"]["input_tokens"] += prompt_tokens
state["token_usage"]["output_tokens"] += completion_tokens
logger.info(f"Synthesis token usage: {prompt_tokens} input, {completion_tokens} output")
# Parse response
synthesis_data = json.loads(response.choices[0].message.content)
# Normalize response to handle nested lists and mixed types
synthesis_data = self._normalize_synthesis_response(synthesis_data)
# Create structured objects
consensus_points = [
ConsensusPoint(**cp) for cp in synthesis_data.get("consensus_points", [])
]
contradictions = [
Contradiction(**c) for c in synthesis_data.get("contradictions", [])
]
# Create SynthesisResult
synthesis = SynthesisResult(
consensus_points=consensus_points,
contradictions=contradictions,
research_gaps=synthesis_data.get("research_gaps", []),
summary=synthesis_data.get("summary", ""),
confidence_score=synthesis_data.get("confidence_score", 0.5),
papers_analyzed=[p.arxiv_id for p in papers]
)
logger.info(f"Synthesis completed with confidence {synthesis.confidence_score:.2f}")
return synthesis
except Exception as e:
logger.error(f"Error during synthesis: {str(e)}")
# Return minimal synthesis on error
return SynthesisResult(
consensus_points=[],
contradictions=[],
research_gaps=["Synthesis failed - unable to identify gaps"],
summary="Synthesis failed due to an error",
confidence_score=0.0,
papers_analyzed=[p.arxiv_id for p in papers]
)
@observe(name="synthesis_agent_run", as_type="generation")
def run(self, state: Dict[str, Any]) -> Dict[str, Any]:
"""
Execute synthesis agent.
Args:
state: Current agent state
Returns:
Updated state with synthesis
"""
try:
logger.info("=== Synthesis Agent Started ===")
papers = state.get("papers", [])
analyses = state.get("analyses", [])
query = state.get("query", "")
if not papers or not analyses:
error_msg = "No papers or analyses available for synthesis"
logger.error(error_msg)
state["errors"].append(error_msg)
return state
if len(papers) != len(analyses):
error_msg = f"Mismatch: {len(papers)} papers but {len(analyses)} analyses"
logger.warning(error_msg)
# Use minimum length
min_len = min(len(papers), len(analyses))
papers = papers[:min_len]
analyses = analyses[:min_len]
# Perform synthesis
synthesis = self.synthesize(papers, analyses, query, state)
state["synthesis"] = synthesis
logger.info("=== Synthesis Agent Completed ===")
return state
except Exception as e:
error_msg = f"Synthesis Agent error: {str(e)}"
logger.error(error_msg)
state["errors"].append(error_msg)
return state
|