File size: 11,996 Bytes
aca8ab4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
"""
Synthesis Agent: Compare findings across papers and identify patterns.
"""
import os
import json
import logging
from typing import Dict, Any, List
from openai import AzureOpenAI

from utils.schemas import Analysis, SynthesisResult, ConsensusPoint, Contradiction, Paper
from rag.retrieval import RAGRetriever
from utils.langfuse_client import observe

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)


class SynthesisAgent:
    """Agent for synthesizing findings across multiple papers."""

    def __init__(
        self,
        rag_retriever: RAGRetriever,
        model=os.getenv("AZURE_OPENAI_DEPLOYMENT_NAME"),
        temperature: float = 0.0,
        timeout: int = 90
    ):
        """
        Initialize Synthesis Agent.

        Args:
            rag_retriever: RAGRetriever instance
            model: Azure OpenAI model deployment name
            temperature: Temperature for generation (0 for deterministic)
            timeout: Request timeout in seconds (default: 90, longer than analyzer)
        """
        self.rag_retriever = rag_retriever
        self.model = model
        self.temperature = temperature
        self.timeout = timeout

        # Initialize Azure OpenAI client with timeout
        self.client = AzureOpenAI(
            api_key=os.getenv("AZURE_OPENAI_API_KEY"),
            #api_version="2024-02-01",
            api_version=os.getenv("AZURE_OPENAI_API_VERSION"),
            azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
            timeout=timeout
        )

    def _create_synthesis_prompt(
        self,
        papers: List[Paper],
        analyses: List[Analysis],
        query: str
    ) -> str:
        """Create prompt for synthesis."""
        # Format paper summaries
        paper_summaries = []
        for paper, analysis in zip(papers, analyses):
            summary = f"""
Paper ID: {paper.arxiv_id}
Title: {paper.title}
Authors: {", ".join(paper.authors)}

Analysis:
- Methodology: {analysis.methodology}
- Key Findings: {", ".join(analysis.key_findings)}
- Conclusions: {analysis.conclusions}
- Contributions: {", ".join(analysis.main_contributions)}
- Limitations: {", ".join(analysis.limitations)}
"""
            paper_summaries.append(summary)

        prompt = f"""You are a research synthesis expert. Analyze the following papers in relation to the user's research question.

Research Question: {query}

Papers Analyzed:
{"=" * 80}
{chr(10).join(paper_summaries)}
{"=" * 80}

Synthesize these findings and provide:
1. Consensus points - areas where papers agree
2. Contradictions - areas where papers disagree
3. Research gaps - what's missing or needs further investigation
4. Executive summary addressing the research question

Provide your synthesis in the following JSON format:
{{
    "consensus_points": [
        {{
            "statement": "Clear consensus statement",
            "supporting_papers": ["arxiv_id1", "arxiv_id2"],
            "citations": ["Specific evidence from papers"],
            "confidence": 0.0-1.0
        }}
    ],
    "contradictions": [
        {{
            "topic": "Topic of disagreement",
            "viewpoint_a": "First viewpoint",
            "papers_a": ["arxiv_id1"],
            "viewpoint_b": "Second viewpoint",
            "papers_b": ["arxiv_id2"],
            "citations": ["Evidence for both sides"],
            "confidence": 0.0-1.0
        }}
    ],
    "research_gaps": [
        "Gap 1: What's missing",
        "Gap 2: What needs further research"
    ],
    "summary": "Executive summary addressing the research question with synthesis of all findings",
    "confidence_score": 0.0-1.0
}}

CRITICAL JSON FORMATTING RULES:
- Ground all statements in the provided analyses
- Be specific about which papers support which claims
- Identify both agreements and disagreements
- Provide confidence scores based on consistency and evidence strength
- For ALL array fields (citations, supporting_papers, papers_a, papers_b, research_gaps):
  * MUST be flat arrays of strings ONLY: ["item1", "item2"]
  * NEVER nest arrays: [[], "text"] or [["nested"]] are INVALID
  * NEVER include null, empty strings, or non-string values
  * Each array element must be a non-empty string
"""
        return prompt

    def _normalize_synthesis_response(self, data: dict) -> dict:
        """
        Normalize synthesis LLM response to ensure all list fields contain only strings.

        Handles nested lists, None values, and mixed types in:
        - consensus_points[].citations
        - consensus_points[].supporting_papers
        - contradictions[].citations
        - contradictions[].papers_a
        - contradictions[].papers_b
        - research_gaps

        Args:
            data: Raw synthesis data dictionary from LLM

        Returns:
            Normalized dictionary with correct types for all fields
        """
        def flatten_and_clean(value):
            """Recursively flatten nested lists and clean values."""
            if isinstance(value, str):
                return [value.strip()] if value.strip() else []
            elif isinstance(value, list):
                cleaned = []
                for item in value:
                    if isinstance(item, str):
                        if item.strip():
                            cleaned.append(item.strip())
                    elif isinstance(item, list):
                        cleaned.extend(flatten_and_clean(item))
                    elif item is not None and str(item).strip():
                        cleaned.append(str(item).strip())
                return cleaned
            elif value is not None:
                str_value = str(value).strip()
                return [str_value] if str_value else []
            else:
                return []

        # Normalize top-level research_gaps
        if "research_gaps" in data:
            data["research_gaps"] = flatten_and_clean(data["research_gaps"])
        else:
            data["research_gaps"] = []

        # Normalize consensus_points
        if "consensus_points" in data and isinstance(data["consensus_points"], list):
            for cp in data["consensus_points"]:
                if isinstance(cp, dict):
                    cp["citations"] = flatten_and_clean(cp.get("citations", []))
                    cp["supporting_papers"] = flatten_and_clean(cp.get("supporting_papers", []))

        # Normalize contradictions
        if "contradictions" in data and isinstance(data["contradictions"], list):
            for contr in data["contradictions"]:
                if isinstance(contr, dict):
                    contr["citations"] = flatten_and_clean(contr.get("citations", []))
                    contr["papers_a"] = flatten_and_clean(contr.get("papers_a", []))
                    contr["papers_b"] = flatten_and_clean(contr.get("papers_b", []))

        logger.debug("Synthesis response normalized successfully")
        return data

    def synthesize(
        self,
        papers: List[Paper],
        analyses: List[Analysis],
        query: str,
        state: Dict[str, Any]
    ) -> SynthesisResult:
        """
        Synthesize findings across papers.

        Args:
            papers: List of Paper objects
            analyses: List of Analysis objects
            query: Original research question
            state: Agent state for token tracking

        Returns:
            SynthesisResult object
        """
        try:
            logger.info(f"Synthesizing {len(papers)} papers")

            # Create synthesis prompt
            prompt = self._create_synthesis_prompt(papers, analyses, query)

            # Call Azure OpenAI with temperature=0 and output limits
            response = self.client.chat.completions.create(
                model=self.model,
                messages=[
                    {"role": "system", "content": "You are a research synthesis expert. Provide accurate, grounded synthesis based only on the provided analyses."},
                    {"role": "user", "content": prompt}
                ],
                temperature=self.temperature,
                max_tokens=2500,  # Larger limit for multi-paper synthesis
                response_format={"type": "json_object"}
            )

            # Track token usage
            if hasattr(response, 'usage') and response.usage:
                prompt_tokens = response.usage.prompt_tokens
                completion_tokens = response.usage.completion_tokens
                state["token_usage"]["input_tokens"] += prompt_tokens
                state["token_usage"]["output_tokens"] += completion_tokens
                logger.info(f"Synthesis token usage: {prompt_tokens} input, {completion_tokens} output")

            # Parse response
            synthesis_data = json.loads(response.choices[0].message.content)

            # Normalize response to handle nested lists and mixed types
            synthesis_data = self._normalize_synthesis_response(synthesis_data)

            # Create structured objects
            consensus_points = [
                ConsensusPoint(**cp) for cp in synthesis_data.get("consensus_points", [])
            ]

            contradictions = [
                Contradiction(**c) for c in synthesis_data.get("contradictions", [])
            ]

            # Create SynthesisResult
            synthesis = SynthesisResult(
                consensus_points=consensus_points,
                contradictions=contradictions,
                research_gaps=synthesis_data.get("research_gaps", []),
                summary=synthesis_data.get("summary", ""),
                confidence_score=synthesis_data.get("confidence_score", 0.5),
                papers_analyzed=[p.arxiv_id for p in papers]
            )

            logger.info(f"Synthesis completed with confidence {synthesis.confidence_score:.2f}")
            return synthesis

        except Exception as e:
            logger.error(f"Error during synthesis: {str(e)}")
            # Return minimal synthesis on error
            return SynthesisResult(
                consensus_points=[],
                contradictions=[],
                research_gaps=["Synthesis failed - unable to identify gaps"],
                summary="Synthesis failed due to an error",
                confidence_score=0.0,
                papers_analyzed=[p.arxiv_id for p in papers]
            )

    @observe(name="synthesis_agent_run", as_type="generation")
    def run(self, state: Dict[str, Any]) -> Dict[str, Any]:
        """
        Execute synthesis agent.

        Args:
            state: Current agent state

        Returns:
            Updated state with synthesis
        """
        try:
            logger.info("=== Synthesis Agent Started ===")

            papers = state.get("papers", [])
            analyses = state.get("analyses", [])
            query = state.get("query", "")

            if not papers or not analyses:
                error_msg = "No papers or analyses available for synthesis"
                logger.error(error_msg)
                state["errors"].append(error_msg)
                return state

            if len(papers) != len(analyses):
                error_msg = f"Mismatch: {len(papers)} papers but {len(analyses)} analyses"
                logger.warning(error_msg)
                # Use minimum length
                min_len = min(len(papers), len(analyses))
                papers = papers[:min_len]
                analyses = analyses[:min_len]

            # Perform synthesis
            synthesis = self.synthesize(papers, analyses, query, state)
            state["synthesis"] = synthesis

            logger.info("=== Synthesis Agent Completed ===")
            return state

        except Exception as e:
            error_msg = f"Synthesis Agent error: {str(e)}"
            logger.error(error_msg)
            state["errors"].append(error_msg)
            return state