GitHub Actions
Clean sync from GitHub - no large files in history
aca8ab4
"""
Synthesis Agent: Compare findings across papers and identify patterns.
"""
import os
import json
import logging
from typing import Dict, Any, List
from openai import AzureOpenAI
from utils.schemas import Analysis, SynthesisResult, ConsensusPoint, Contradiction, Paper
from rag.retrieval import RAGRetriever
from utils.langfuse_client import observe
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class SynthesisAgent:
"""Agent for synthesizing findings across multiple papers."""
def __init__(
self,
rag_retriever: RAGRetriever,
model=os.getenv("AZURE_OPENAI_DEPLOYMENT_NAME"),
temperature: float = 0.0,
timeout: int = 90
):
"""
Initialize Synthesis Agent.
Args:
rag_retriever: RAGRetriever instance
model: Azure OpenAI model deployment name
temperature: Temperature for generation (0 for deterministic)
timeout: Request timeout in seconds (default: 90, longer than analyzer)
"""
self.rag_retriever = rag_retriever
self.model = model
self.temperature = temperature
self.timeout = timeout
# Initialize Azure OpenAI client with timeout
self.client = AzureOpenAI(
api_key=os.getenv("AZURE_OPENAI_API_KEY"),
#api_version="2024-02-01",
api_version=os.getenv("AZURE_OPENAI_API_VERSION"),
azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
timeout=timeout
)
def _create_synthesis_prompt(
self,
papers: List[Paper],
analyses: List[Analysis],
query: str
) -> str:
"""Create prompt for synthesis."""
# Format paper summaries
paper_summaries = []
for paper, analysis in zip(papers, analyses):
summary = f"""
Paper ID: {paper.arxiv_id}
Title: {paper.title}
Authors: {", ".join(paper.authors)}
Analysis:
- Methodology: {analysis.methodology}
- Key Findings: {", ".join(analysis.key_findings)}
- Conclusions: {analysis.conclusions}
- Contributions: {", ".join(analysis.main_contributions)}
- Limitations: {", ".join(analysis.limitations)}
"""
paper_summaries.append(summary)
prompt = f"""You are a research synthesis expert. Analyze the following papers in relation to the user's research question.
Research Question: {query}
Papers Analyzed:
{"=" * 80}
{chr(10).join(paper_summaries)}
{"=" * 80}
Synthesize these findings and provide:
1. Consensus points - areas where papers agree
2. Contradictions - areas where papers disagree
3. Research gaps - what's missing or needs further investigation
4. Executive summary addressing the research question
Provide your synthesis in the following JSON format:
{{
"consensus_points": [
{{
"statement": "Clear consensus statement",
"supporting_papers": ["arxiv_id1", "arxiv_id2"],
"citations": ["Specific evidence from papers"],
"confidence": 0.0-1.0
}}
],
"contradictions": [
{{
"topic": "Topic of disagreement",
"viewpoint_a": "First viewpoint",
"papers_a": ["arxiv_id1"],
"viewpoint_b": "Second viewpoint",
"papers_b": ["arxiv_id2"],
"citations": ["Evidence for both sides"],
"confidence": 0.0-1.0
}}
],
"research_gaps": [
"Gap 1: What's missing",
"Gap 2: What needs further research"
],
"summary": "Executive summary addressing the research question with synthesis of all findings",
"confidence_score": 0.0-1.0
}}
CRITICAL JSON FORMATTING RULES:
- Ground all statements in the provided analyses
- Be specific about which papers support which claims
- Identify both agreements and disagreements
- Provide confidence scores based on consistency and evidence strength
- For ALL array fields (citations, supporting_papers, papers_a, papers_b, research_gaps):
* MUST be flat arrays of strings ONLY: ["item1", "item2"]
* NEVER nest arrays: [[], "text"] or [["nested"]] are INVALID
* NEVER include null, empty strings, or non-string values
* Each array element must be a non-empty string
"""
return prompt
def _normalize_synthesis_response(self, data: dict) -> dict:
"""
Normalize synthesis LLM response to ensure all list fields contain only strings.
Handles nested lists, None values, and mixed types in:
- consensus_points[].citations
- consensus_points[].supporting_papers
- contradictions[].citations
- contradictions[].papers_a
- contradictions[].papers_b
- research_gaps
Args:
data: Raw synthesis data dictionary from LLM
Returns:
Normalized dictionary with correct types for all fields
"""
def flatten_and_clean(value):
"""Recursively flatten nested lists and clean values."""
if isinstance(value, str):
return [value.strip()] if value.strip() else []
elif isinstance(value, list):
cleaned = []
for item in value:
if isinstance(item, str):
if item.strip():
cleaned.append(item.strip())
elif isinstance(item, list):
cleaned.extend(flatten_and_clean(item))
elif item is not None and str(item).strip():
cleaned.append(str(item).strip())
return cleaned
elif value is not None:
str_value = str(value).strip()
return [str_value] if str_value else []
else:
return []
# Normalize top-level research_gaps
if "research_gaps" in data:
data["research_gaps"] = flatten_and_clean(data["research_gaps"])
else:
data["research_gaps"] = []
# Normalize consensus_points
if "consensus_points" in data and isinstance(data["consensus_points"], list):
for cp in data["consensus_points"]:
if isinstance(cp, dict):
cp["citations"] = flatten_and_clean(cp.get("citations", []))
cp["supporting_papers"] = flatten_and_clean(cp.get("supporting_papers", []))
# Normalize contradictions
if "contradictions" in data and isinstance(data["contradictions"], list):
for contr in data["contradictions"]:
if isinstance(contr, dict):
contr["citations"] = flatten_and_clean(contr.get("citations", []))
contr["papers_a"] = flatten_and_clean(contr.get("papers_a", []))
contr["papers_b"] = flatten_and_clean(contr.get("papers_b", []))
logger.debug("Synthesis response normalized successfully")
return data
def synthesize(
self,
papers: List[Paper],
analyses: List[Analysis],
query: str,
state: Dict[str, Any]
) -> SynthesisResult:
"""
Synthesize findings across papers.
Args:
papers: List of Paper objects
analyses: List of Analysis objects
query: Original research question
state: Agent state for token tracking
Returns:
SynthesisResult object
"""
try:
logger.info(f"Synthesizing {len(papers)} papers")
# Create synthesis prompt
prompt = self._create_synthesis_prompt(papers, analyses, query)
# Call Azure OpenAI with temperature=0 and output limits
response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": "You are a research synthesis expert. Provide accurate, grounded synthesis based only on the provided analyses."},
{"role": "user", "content": prompt}
],
temperature=self.temperature,
max_tokens=2500, # Larger limit for multi-paper synthesis
response_format={"type": "json_object"}
)
# Track token usage
if hasattr(response, 'usage') and response.usage:
prompt_tokens = response.usage.prompt_tokens
completion_tokens = response.usage.completion_tokens
state["token_usage"]["input_tokens"] += prompt_tokens
state["token_usage"]["output_tokens"] += completion_tokens
logger.info(f"Synthesis token usage: {prompt_tokens} input, {completion_tokens} output")
# Parse response
synthesis_data = json.loads(response.choices[0].message.content)
# Normalize response to handle nested lists and mixed types
synthesis_data = self._normalize_synthesis_response(synthesis_data)
# Create structured objects
consensus_points = [
ConsensusPoint(**cp) for cp in synthesis_data.get("consensus_points", [])
]
contradictions = [
Contradiction(**c) for c in synthesis_data.get("contradictions", [])
]
# Create SynthesisResult
synthesis = SynthesisResult(
consensus_points=consensus_points,
contradictions=contradictions,
research_gaps=synthesis_data.get("research_gaps", []),
summary=synthesis_data.get("summary", ""),
confidence_score=synthesis_data.get("confidence_score", 0.5),
papers_analyzed=[p.arxiv_id for p in papers]
)
logger.info(f"Synthesis completed with confidence {synthesis.confidence_score:.2f}")
return synthesis
except Exception as e:
logger.error(f"Error during synthesis: {str(e)}")
# Return minimal synthesis on error
return SynthesisResult(
consensus_points=[],
contradictions=[],
research_gaps=["Synthesis failed - unable to identify gaps"],
summary="Synthesis failed due to an error",
confidence_score=0.0,
papers_analyzed=[p.arxiv_id for p in papers]
)
@observe(name="synthesis_agent_run", as_type="generation")
def run(self, state: Dict[str, Any]) -> Dict[str, Any]:
"""
Execute synthesis agent.
Args:
state: Current agent state
Returns:
Updated state with synthesis
"""
try:
logger.info("=== Synthesis Agent Started ===")
papers = state.get("papers", [])
analyses = state.get("analyses", [])
query = state.get("query", "")
if not papers or not analyses:
error_msg = "No papers or analyses available for synthesis"
logger.error(error_msg)
state["errors"].append(error_msg)
return state
if len(papers) != len(analyses):
error_msg = f"Mismatch: {len(papers)} papers but {len(analyses)} analyses"
logger.warning(error_msg)
# Use minimum length
min_len = min(len(papers), len(analyses))
papers = papers[:min_len]
analyses = analyses[:min_len]
# Perform synthesis
synthesis = self.synthesize(papers, analyses, query, state)
state["synthesis"] = synthesis
logger.info("=== Synthesis Agent Completed ===")
return state
except Exception as e:
error_msg = f"Synthesis Agent error: {str(e)}"
logger.error(error_msg)
state["errors"].append(error_msg)
return state