File size: 23,921 Bytes
2c5e855
 
 
 
 
 
 
59de368
2c5e855
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
59de368
 
2c5e855
 
 
59de368
2c5e855
 
 
 
 
bb3909a
59de368
2c5e855
 
 
 
 
 
 
59de368
2c5e855
 
 
 
 
 
 
 
 
 
59de368
2c5e855
 
 
 
59de368
2c5e855
 
 
 
 
a929e66
 
 
 
2c5e855
 
59de368
2c5e855
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a929e66
2c5e855
a929e66
 
 
2c5e855
a929e66
2c5e855
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
59de368
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2c5e855
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
59de368
 
 
 
 
2c5e855
 
 
 
 
 
 
 
 
 
59de368
 
 
2c5e855
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
59de368
2c5e855
 
 
 
59de368
 
 
2c5e855
59de368
2c5e855
 
 
 
 
 
 
 
 
 
 
 
59de368
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
# agents.py - Core Analysis & Orchestration Agents
import os
import asyncio
import logging
from typing import Optional, Dict, Any, List, AsyncGenerator
import time

from utils import call_openai_chat, load_pdf_text_cached, load_pdf_text_chunked, get_document_metadata, create_hierarchical_summary
from config import Config

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)


class BaseAgent:
    def __init__(self, name: str, model: str, tasks_completed: int = 0):
        self.name = name
        self.model = model
        self.tasks_completed = tasks_completed

    async def handle(self, user_id: str, prompt: str, file_path: Optional[str] = None, context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
        raise NotImplementedError(f"{self.__class__.__name__}.handle must be implemented.")
    
    async def handle_streaming(self, user_id: str, prompt: str, file_path: Optional[str] = None, context: Optional[Dict[str, Any]] = None) -> AsyncGenerator[str, None]:
        """Streaming version of handle - override in subclasses for streaming support"""
        result = await self.handle(user_id, prompt, file_path, context)
        # Default implementation: yield the result as a single chunk
        for key, value in result.items():
            yield f"{key}: {value}"


# --------------------
# Core Analysis Agent
# --------------------
class AnalysisAgent(BaseAgent):
    async def handle(self, user_id: str, prompt: str, file_path: Optional[str] = None, context: Optional[Dict[str, Any]] = None):
        start_time = time.time()
        
        if file_path:
            # Get document metadata
            metadata = get_document_metadata(file_path)
            
            # Load text with caching
            text = load_pdf_text_cached(file_path)
            
            # Check if document needs chunking
            if len(text) > Config.CHUNK_SIZE:
                return await self._handle_large_document(prompt, text, metadata)
            else:
                content = f"User prompt: {prompt}\n\nDocument text:\n{text}"
        else:
            content = f"User prompt: {prompt}"
            metadata = {}
        
        system = "You are AnalysisAgent: produce concise insights and structured summaries. Adapt your language and complexity to the target audience. Provide clear, actionable insights with appropriate examples and analogies for complex topics."
        
        try:
            response = await call_openai_chat(
                model=self.model,
                messages=[{"role": "system", "content": system},
                         {"role": "user", "content": content}],
                temperature=Config.OPENAI_TEMPERATURE,
                max_tokens=Config.OPENAI_MAX_TOKENS
            )
        except Exception as e:
            logger.exception("AnalysisAgent failed")
            response = f"Error during analysis: {str(e)}"
        
        self.tasks_completed += 1
        
        # Add processing metadata
        processing_time = time.time() - start_time
        result = {
            "analysis": response,
            "metadata": {
                "processing_time": round(processing_time, 2),
                "document_metadata": metadata,
                "agent": self.name,
                "tasks_completed": self.tasks_completed
            }
        }
        
        return result
    
    async def _handle_large_document(self, prompt: str, text: str, metadata: Dict[str, Any]) -> Dict[str, Any]:
        """Handle large documents by processing in chunks"""
        from utils import chunk_text
        chunks = chunk_text(text, Config.CHUNK_SIZE)
        chunk_results = []
        
        system = "You are AnalysisAgent: produce concise insights and structured summaries. Adapt your language and complexity to the target audience. Provide clear, actionable insights with appropriate examples and analogies for complex topics."
        
        for i, chunk in enumerate(chunks):
            content = f"User prompt: {prompt}\n\nDocument chunk {i+1}/{len(chunks)}:\n{chunk}"
            
            try:
                response = await call_openai_chat(
                    model=self.model,
                    messages=[{"role": "system", "content": system},
                             {"role": "user", "content": content}],
                    temperature=Config.OPENAI_TEMPERATURE,
                    max_tokens=Config.OPENAI_MAX_TOKENS
                )
                chunk_results.append(f"--- Chunk {i+1} Analysis ---\n{response}")
            except Exception as e:
                logger.exception(f"AnalysisAgent failed on chunk {i+1}")
                chunk_results.append(f"--- Chunk {i+1} Error ---\nError: {str(e)}")
        
        # Combine chunk results
        combined_analysis = "\n\n".join(chunk_results)
        
        # Create final summary using hierarchical approach to avoid token limits
        try:
            final_summary = await create_hierarchical_summary(
                chunk_results=chunk_results,
                prompt=prompt,
                model=self.model,
                max_tokens=6000  # Conservative limit to avoid context length errors
            )
        except Exception as e:
            logger.exception("AnalysisAgent failed on final summary")
            final_summary = f"Error creating final summary: {str(e)}\n\nChunk Results:\n{combined_analysis}"
        
        return {
            "analysis": final_summary,
            "metadata": {
                "processing_method": "chunked",
                "chunks_processed": len(chunks),
                "document_metadata": metadata,
                "agent": self.name,
                "tasks_completed": self.tasks_completed
            }
        }
    
    async def handle_streaming(self, user_id: str, prompt: str, file_path: Optional[str] = None, context: Optional[Dict[str, Any]] = None) -> AsyncGenerator[str, None]:
        """Streaming version of analysis"""
        yield "πŸ” Starting analysis..."
        
        if file_path:
            metadata = get_document_metadata(file_path)
            yield f"πŸ“„ Document loaded: {metadata.get('page_count', 0)} pages, {metadata.get('file_size', 0) / 1024:.1f} KB"
            
            text = load_pdf_text_cached(file_path)
            
            if len(text) > Config.CHUNK_SIZE:
                yield "πŸ“š Large document detected, processing in chunks..."
                from utils import chunk_text
                chunks = chunk_text(text, Config.CHUNK_SIZE)
                yield f"πŸ“Š Document split into {len(chunks)} chunks"
                
                # Process chunks with progress updates
                for i, chunk in enumerate(chunks):
                    yield f"⏳ Processing chunk {i+1}/{len(chunks)}..."
                    # Process chunk (simplified for streaming)
                    await asyncio.sleep(0.1)  # Simulate processing time
                
                yield "πŸ”„ Combining chunk results..."
                await asyncio.sleep(0.2)
                yield "βœ… Analysis complete!"
            else:
                yield "⚑ Processing document..."
                await asyncio.sleep(0.3)
                yield "βœ… Analysis complete!"
        else:
            yield "⚑ Processing request..."
            await asyncio.sleep(0.2)
            yield "βœ… Analysis complete!"
        
        # Get the actual result
        result = await self.handle(user_id, prompt, file_path, context)
        yield f"\nπŸ“‹ Analysis Result:\n{result.get('analysis', 'No result')}"


# --------------------
# Collaboration Agent
# --------------------
class CollaborationAgent(BaseAgent):
    async def handle(self, user_id: str, prompt: str, file_path: Optional[str] = None, context: Optional[Dict[str, Any]] = None):
        system = "You are CollaborationAgent: produce reviewer-style comments and suggestions for improvement. Focus on constructive feedback and actionable recommendations."
        content = prompt if isinstance(prompt, str) else str(prompt)
        try:
            response = await call_openai_chat(model=self.model,
                                              messages=[{"role": "system", "content": system},
                                                        {"role": "user", "content": content}],
                                              temperature=0.2,
                                              max_tokens=800)
        except Exception as e:
            logger.exception("CollaborationAgent failed")
            response = f"Error during collaboration: {str(e)}"
        self.tasks_completed += 1
        return {"collaboration": response}


# --------------------
# Conversation Agent
# --------------------
class ConversationAgent(BaseAgent):
    async def handle(self, user_id: str, prompt: str, file_path: Optional[str] = None, context: Optional[Dict[str, Any]] = None):
        system = "You are ConversationAgent: respond politely and helpfully. Provide context-aware responses and guide users on how to get the best results from the analysis system."
        try:
            response = await call_openai_chat(model=self.model,
                                              messages=[{"role": "system", "content": system},
                                                        {"role": "user", "content": prompt}],
                                              temperature=0.3,
                                              max_tokens=400)
        except Exception as e:
            logger.exception("ConversationAgent failed")
            response = f"Error in conversation: {str(e)}"
        self.tasks_completed += 1
        return {"conversation": response}


# --------------------
# Senior Research Analyst Agent
# --------------------
class ResearchAnalystAgent(BaseAgent):
    async def handle(self, user_id: str, prompt: str, file_path: Optional[str] = None, context: Optional[Dict[str, Any]] = None):
        start_time = time.time()
        
        if file_path:
            # Get document metadata
            metadata = get_document_metadata(file_path)
            
            # Load text with caching
            text = load_pdf_text_cached(file_path)
            
            # Check if document needs chunking
            if len(text) > Config.CHUNK_SIZE:
                return await self._handle_large_document_research(prompt, text, metadata)
            else:
                content = f"User prompt: {prompt}\n\nDocument text:\n{text}"
        else:
            content = f"User prompt: {prompt}"
            metadata = {}
        
        system = """You are a Senior Research Analyst with deep expertise in product and engineering R&D pipelines. Your role is to:

1. **Extract High-Value Insights**: Identify novel ideas, breakthrough concepts, and innovative approaches that could drive significant product/engineering impact.

2. **Assess Commercial Viability**: Evaluate the potential for practical application, market readiness, and competitive advantage.

3. **Generate R&D Pipeline Outcomes**: Convert insights into concrete, actionable items for:
   - **Experiments**: Specific hypotheses to test, methodologies to validate
   - **Prototypes**: Technical implementations to build and demonstrate
   - **Product Decisions**: Strategic choices for development priorities and resource allocation

4. **Prioritize by Impact**: Focus on ideas with the highest potential for transformative change and measurable business value.

Provide structured analysis with clear next steps that engineering and product teams can immediately act upon."""
        
        try:
            response = await call_openai_chat(
                model=self.model,
                messages=[{"role": "system", "content": system},
                         {"role": "user", "content": content}],
                temperature=0.1,  # Lower temperature for more focused analysis
                max_tokens=Config.OPENAI_MAX_TOKENS * 2  # More tokens for detailed research analysis
            )
        except Exception as e:
            logger.exception("ResearchAnalystAgent failed")
            response = f"Error during research analysis: {str(e)}"
        
        self.tasks_completed += 1
        
        # Add processing metadata
        processing_time = time.time() - start_time
        result = {
            "research_analysis": response,
            "metadata": {
                "processing_time": round(processing_time, 2),
                "document_metadata": metadata,
                "agent": self.name,
                "tasks_completed": self.tasks_completed,
                "analysis_type": "research_and_development"
            }
        }
        
        return result
    
    async def _handle_large_document_research(self, prompt: str, text: str, metadata: Dict[str, Any]) -> Dict[str, Any]:
        """Handle large documents with research-focused chunking strategy"""
        from utils import chunk_text
        chunks = chunk_text(text, Config.CHUNK_SIZE)
        chunk_results = []
        
        system = """You are a Senior Research Analyst extracting high-value insights from document sections. Focus on:
- Novel technical concepts and methodologies
- Innovation opportunities and breakthrough potential
- Practical applications and commercial viability
- R&D pipeline implications

Provide structured insights that can feed into experiments, prototypes, and product decisions."""
        
        for i, chunk in enumerate(chunks):
            content = f"User prompt: {prompt}\n\nDocument section {i+1}/{len(chunks)}:\n{chunk}"
            
            try:
                response = await call_openai_chat(
                    model=self.model,
                    messages=[{"role": "system", "content": system},
                             {"role": "user", "content": content}],
                    temperature=0.1,
                    max_tokens=Config.OPENAI_MAX_TOKENS
                )
                chunk_results.append(f"--- Research Insights from Section {i+1} ---\n{response}")
            except Exception as e:
                logger.exception(f"ResearchAnalystAgent failed on chunk {i+1}")
                chunk_results.append(f"--- Section {i+1} Analysis Error ---\nError: {str(e)}")
        
        # Combine chunk results with research synthesis
        try:
            research_summary = await self._synthesize_research_insights(
                chunk_results=chunk_results,
                prompt=prompt,
                model=self.model
            )
        except Exception as e:
            logger.exception("ResearchAnalystAgent failed on research synthesis")
            research_summary = f"Error creating research synthesis: {str(e)}\n\nSection Results:\n{chr(10).join(chunk_results)}"
        
        return {
            "research_analysis": research_summary,
            "metadata": {
                "processing_method": "research_chunked",
                "chunks_processed": len(chunks),
                "document_metadata": metadata,
                "agent": self.name,
                "tasks_completed": self.tasks_completed,
                "analysis_type": "research_and_development"
            }
        }
    
    async def _synthesize_research_insights(self, chunk_results: List[str], prompt: str, model: str) -> str:
        """Synthesize research insights from multiple document sections"""
        synthesis_prompt = f"""
As a Senior Research Analyst, synthesize the following research insights into a comprehensive R&D pipeline strategy:

Original Analysis Request: {prompt}

Section Analysis Results:
{chr(10).join(chunk_results)}

Provide a structured synthesis that includes:

1. **Key Innovation Opportunities**: The most promising novel ideas with highest impact potential
2. **Technical Breakthroughs**: Specific technical concepts that could drive significant advancement
3. **R&D Pipeline Roadmap**:
   - **Phase 1 Experiments**: Immediate hypotheses to test (3-5 specific experiments)
   - **Phase 2 Prototypes**: Technical implementations to build (2-3 prototype concepts)
   - **Phase 3 Product Decisions**: Strategic choices for development priorities (2-3 key decisions)

4. **Impact Assessment**: Expected outcomes and measurable business value
5. **Risk Mitigation**: Potential challenges and mitigation strategies

Focus on actionable outcomes that engineering and product teams can immediately implement.
"""
        
        try:
            response = await call_openai_chat(
                model=model,
                messages=[{"role": "user", "content": synthesis_prompt}],
                temperature=0.1,
                max_tokens=8000  # Larger context for comprehensive synthesis
            )
            return response
        except Exception as e:
            logger.exception("Research synthesis failed")
            return f"Research synthesis error: {str(e)}"
    
    async def handle_streaming(self, user_id: str, prompt: str, file_path: Optional[str] = None, context: Optional[Dict[str, Any]] = None) -> AsyncGenerator[str, None]:
        """Streaming version of research analysis"""
        yield "πŸ”¬ Starting senior research analysis..."
        
        if file_path:
            metadata = get_document_metadata(file_path)
            yield f"πŸ“„ Research document loaded: {metadata.get('page_count', 0)} pages, {metadata.get('file_size', 0) / 1024:.1f} KB"
            
            text = load_pdf_text_cached(file_path)
            
            if len(text) > Config.CHUNK_SIZE:
                yield "πŸ“š Large document detected, applying research-focused chunking strategy..."
                from utils import chunk_text
                chunks = chunk_text(text, Config.CHUNK_SIZE)
                yield f"πŸ” Analyzing {len(chunks)} sections for innovation opportunities..."
                
                # Process chunks with research focus
                for i, chunk in enumerate(chunks):
                    yield f"βš—οΈ Extracting insights from research section {i+1}/{len(chunks)}..."
                    await asyncio.sleep(0.1)  # Simulate processing time
                
                yield "πŸ”„ Synthesizing research insights into R&D pipeline strategy..."
                await asyncio.sleep(0.3)
                yield "🎯 Generating concrete experiments, prototypes, and product decisions..."
                await asyncio.sleep(0.2)
                yield "βœ… Research analysis complete!"
            else:
                yield "⚑ Analyzing document for high-value R&D insights..."
                await asyncio.sleep(0.3)
                yield "🎯 Converting insights into actionable R&D pipeline outcomes..."
                await asyncio.sleep(0.2)
                yield "βœ… Research analysis complete!"
        else:
            yield "⚑ Processing research analysis request..."
            await asyncio.sleep(0.2)
            yield "βœ… Research analysis complete!"
        
        # Get the actual result
        result = await self.handle(user_id, prompt, file_path, context)
        yield f"\nπŸ“‹ Research Analysis Result:\n{result.get('research_analysis', 'No result')}"


# --------------------
# Master Orchestrator - Focused on Analysis
# --------------------
class MasterOrchestrator:
    def __init__(self, agents: Dict[str, BaseAgent]):
        self.agents = agents

    async def handle_user_prompt(self, user_id: str, prompt: str, file_path: Optional[str] = None, targets: Optional[List[str]] = None) -> Dict[str, Any]:
        results: Dict[str, Any] = {}
        targets = targets or []

        # Always start with conversation agent for context
        if "conversation" in self.agents:
            try:
                conv_res = await self.agents["conversation"].handle(user_id, prompt, file_path)
                results.update(conv_res)
            except Exception:
                pass

        # Core analysis functionality
        if "analysis" in targets and "analysis" in self.agents:
            analysis_res = await self.agents["analysis"].handle(user_id, prompt, file_path)
            results.update(analysis_res)
            payload = analysis_res.get("analysis", "")
            
            # Trigger collaboration agent asynchronously for additional insights
            if "collab" in self.agents:
                asyncio.create_task(self.agents["collab"].handle(user_id, payload, file_path))

        # Research analysis functionality
        if "research" in targets and "research" in self.agents:
            research_res = await self.agents["research"].handle(user_id, prompt, file_path)
            results.update(research_res)

        return results
    
    async def handle_user_prompt_streaming(self, user_id: str, prompt: str, file_path: Optional[str] = None, targets: Optional[List[str]] = None) -> AsyncGenerator[str, None]:
        """Streaming version of handle_user_prompt"""
        targets = targets or []
        
        # Stream analysis if requested
        if "analysis" in targets and "analysis" in self.agents:
            async for chunk in self.agents["analysis"].handle_streaming(user_id, prompt, file_path):
                yield chunk
        elif "research" in targets and "research" in self.agents:
            async for chunk in self.agents["research"].handle_streaming(user_id, prompt, file_path):
                yield chunk
        else:
            # Fallback to regular handling
            result = await self.handle_user_prompt(user_id, prompt, file_path, targets)
            yield str(result)
    
    async def handle_batch_analysis(self, user_id: str, prompt: str, file_paths: List[str], targets: Optional[List[str]] = None) -> Dict[str, Any]:
        """Handle batch analysis of multiple PDFs"""
        results = {
            "batch_results": [],
            "summary": {},
            "total_files": len(file_paths),
            "successful": 0,
            "failed": 0
        }
        
        targets = targets or ["analysis"]
        
        for i, file_path in enumerate(file_paths):
            try:
                file_result = await self.handle_user_prompt(user_id, prompt, file_path, targets)
                file_result["file_index"] = i
                file_result["file_path"] = file_path
                results["batch_results"].append(file_result)
                results["successful"] += 1
            except Exception as e:
                error_result = {
                    "file_index": i,
                    "file_path": file_path,
                    "error": str(e),
                    "analysis": f"Error processing file: {str(e)}"
                }
                results["batch_results"].append(error_result)
                results["failed"] += 1
        
        # Create batch summary using hierarchical approach
        if results["successful"] > 0:
            successful_analyses = [r["analysis"] for r in results["batch_results"] if "error" not in r]
            
            try:
                summary_response = await create_hierarchical_summary(
                    chunk_results=successful_analyses,
                    prompt=f"Batch analysis summary for: {prompt}",
                    model=Config.OPENAI_MODEL,
                    max_tokens=6000
                )
                results["summary"]["batch_analysis"] = summary_response
            except Exception as e:
                results["summary"]["batch_analysis"] = f"Error creating batch summary: {str(e)}"
        
        results["summary"]["processing_stats"] = {
            "total_files": len(file_paths),
            "successful": results["successful"],
            "failed": results["failed"],
            "success_rate": f"{(results['successful'] / len(file_paths)) * 100:.1f}%" if file_paths else "0%"
        }
        
        return results