File size: 18,566 Bytes
d6ea378
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
"""Deep Research Multi-Agent System implementation.

This implementation focuses on:
- Building a multi-agent system for comprehensive research
- Using LangGraph with MessagesState for proper state management
- Synthesizing research findings into structured reports
"""

from typing import Dict, List, Optional, Any, Annotated
from langchain_core.messages import HumanMessage, AIMessage, BaseMessage
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_core.tools import tool
from langgraph.graph import StateGraph, END, START, MessagesState
from langgraph.prebuilt import create_react_agent
from pydantic import BaseModel, Field

from core.chat_interface import ChatInterface
from opik.integrations.langchain import OpikTracer
from agents.prompts import (
    RESEARCH_MANAGER_PROMPT,
    REPORT_FINALIZER_PROMPT,
)
from dotenv import load_dotenv
load_dotenv()


class ResearchQuestion(BaseModel):
    """A research question with a title and description."""
    title: str = Field(description="The title of the research question/section")
    description: str = Field(description="Description of what to research for this section")
    completed: bool = Field(default=False, description="Whether research has been completed for this section")


class ResearchPlan(BaseModel):
    """The overall research plan created by the Research Manager."""
    topic: str = Field(description="The main research topic")
    questions: List[ResearchQuestion] = Field(description="The list of research questions to investigate")
    current_question_index: int = Field(default=0, description="Index of the current question being researched")


class Report(BaseModel):
    """The final research report structure."""
    executive_summary: Optional[str] = Field(default=None, description="Executive summary of the research")
    key_findings: Optional[str] = Field(default=None, description="Key findings from the research")
    detailed_analysis: List[Dict[str, Any]] = Field(default_factory=list, description="Detailed analysis sections")
    limitations: Optional[str] = Field(default=None, description="Limitations and further research")


class ResearchState(MessagesState):
    """State tracking for the deep research workflow using MessagesState as base."""
    research_plan: Optional[ResearchPlan] = None
    report: Optional[Report] = None
    next_step: str = "research_manager"
    
    # MessagesState already handles messages with add_messages reducer


class DeepResearchChat(ChatInterface):
    """Deep research implementation using multi-agent system with proper state management."""
    
    def __init__(self):
        self.llm = None
        self.research_manager = None
        self.specialized_research_agent = None
        self.finalizer = None
        self.workflow = None
        self.tavily_search_tool = None
    
    def initialize(self) -> None:
        """Initialize components for the deep research system."""
        # Initialize LLM model
        self.llm = ChatOpenAI(model="gpt-4o", temperature=0)
        
        # Create Tavily search tool for agents
        self.tavily_search_tool = TavilySearchResults(max_results=5)
        
        # Create components
        self.research_manager = self._create_research_manager()
        self.specialized_research_agent = self._create_specialized_research_agent()
        self.finalizer = self._create_finalizer()
        
        # Create the workflow graph using these agents
        self.workflow = self._create_workflow()
        
        # Optional: Create Opik Tracer for monitoring
        try:
            self.tracer = OpikTracer(
                graph=self.workflow.get_graph(xray=True),
                project_name="nexus-research-workflow"
            )
        except:
            self.tracer = None
            print("Opik tracer not available, continuing without monitoring")
    
    def _create_research_manager(self) -> Any:
        """Create the research manager agent."""
        research_manager = (
            RESEARCH_MANAGER_PROMPT 
            | self.llm.with_structured_output(ResearchPlan)
        )
        
        return research_manager

    def _create_specialized_research_agent(self) -> Any:
        """Create specialized research agents."""
        # Create search tool for the agent
        @tool("web_search")
        def search_web(query: str) -> str:
            """Search the web for information on the research topic."""
            results = self.tavily_search_tool.invoke(query)
            formatted_results = []
            
            for i, result in enumerate(results, 1):
                formatted_results.append(f"Result {i}:")
                formatted_results.append(f"Title: {result.get('title', 'N/A')}")
                formatted_results.append(f"Content: {result.get('content', 'N/A')}")
                formatted_results.append(f"URL: {result.get('url', 'N/A')}")
                formatted_results.append("")
            
            return "\n".join(formatted_results)
        
        # Create the specialized agent
        tools = [search_web]
        
        # Define the system message for the specialized research agent
        system_message = """You are a Specialized Research Agent responsible for thoroughly researching a specific topic section.

        Process:
        1. Analyze the research question and description
        2. Generate effective search queries to gather information
        3. Use the web_search tool to find relevant information
        4. Synthesize findings into a comprehensive section
        5. Include proper citations to your sources

        Your response should be:
        - Thorough (at least 500 words)
        - Well-structured with subsections
        - Based on factual information (not made up)
        - Include proper citations to sources

        Always critically evaluate information and ensure you cover the topic comprehensively.
        """
        
        # Create the specialized research agent
        specialized_agent = create_react_agent(
            model=self.llm,
            tools=tools,
            prompt=system_message
        )
        
        return specialized_agent
    
    def _create_finalizer(self) -> Any:
        """Create the finalizer component."""
        finalizer = REPORT_FINALIZER_PROMPT | self.llm | StrOutputParser()
        return finalizer
    
    def _create_workflow(self) -> Any:
        """Create the multi-agent deep research workflow with proper MessagesState usage."""
        # Create a state graph
        workflow = StateGraph(ResearchState)
        
        # Define the nodes
        
        # Research Manager Node
        def research_manager_node(state: ResearchState) -> Dict:
            """Create the research plan and update messages."""
            print("\n=== RESEARCH MANAGER NODE ===")
            
            # Get the topic from the LAST user message (not first)
            # This handles conversation context better
            user_messages = [msg for msg in state["messages"] if isinstance(msg, HumanMessage)]
            topic = user_messages[-1].content if user_messages else state["messages"][-1].content
            
            print(f"Planning research for topic: {topic}")
            
            # Generate research plan
            research_plan = self.research_manager.invoke({"topic": topic})
            print(f"Created research plan with {len(research_plan.questions)} questions")
            
            # Initialize empty report structure
            report = Report(
                detailed_analysis=[
                    {"title": q.title, "content": None, "sources": []} 
                    for q in research_plan.questions
                ]
            )
    
            # Add planning message to state
            planning_msg = AIMessage(
                content=f"Research plan created with {len(research_plan.questions)} sections to investigate."
            )
            
            return {
                "messages": [planning_msg],
                "research_plan": research_plan,
                "report": report,
            }
        
        # Specialized Research Node
        def specialized_research_node(state: ResearchState) -> Dict:
            """Conduct research on the current question and update messages."""
            print("\n=== SPECIALIZED RESEARCH NODE ===")
            
            research_plan = state["research_plan"]
            assert research_plan is not None, "Research plan is None"
            current_index = research_plan.current_question_index
            
            if current_index >= len(research_plan.questions):
                print("All research questions completed")
                return {}
            
            current_question = research_plan.questions[current_index]
            print(f"Researching question {current_index + 1}/{len(research_plan.questions)}: "
                  f"{current_question.title}")
            
            # Create input for the specialized agent
            research_input = {
                "messages": [
                    ("user", f"""Research the following topic thoroughly:
                    
                    Topic: {current_question.title}
                    
                    Description: {current_question.description}
                    
                    Provide a detailed analysis with proper citations to sources.
                    """)
                ]
            }
            
            # Invoke the specialized agent
            result = self.specialized_research_agent.invoke(research_input)
            
            # Extract content from the result
            last_message = result["messages"][-1]
            if isinstance(last_message, tuple):
                content = last_message[1]
            else:
                content = last_message.content
            
            # Parse out sources from the content
            sources = []
            for line in content.split("\n"):
                if "http" in line and "://" in line:
                    sources.append(line.strip())
            
            # Update the research plan
            research_plan.questions[current_index].completed = True
            
            # Update the report
            report = state["report"]
            assert report is not None, "Report is None"
            report.detailed_analysis[current_index]["content"] = content
            report.detailed_analysis[current_index]["sources"] = sources
            
            # Move to the next question
            research_plan.current_question_index += 1
            
            # Add research progress message
            progress_msg = AIMessage(
                content=f"Completed research for section: {current_question.title}"
            )
            
            return {
                "messages": [progress_msg],
                "research_plan": research_plan,
                "report": report,
            }
            
        # Research Evaluator Node
        def evaluator_node(state: ResearchState) -> Dict:
            """Evaluate the research progress and determine next steps."""
            print("\n=== EVALUATOR NODE ===")
            
            research_plan = state["research_plan"]
            assert research_plan is not None, "Research plan is None"
            
            # Check if we've completed all questions
            all_completed = research_plan.current_question_index >= len(research_plan.questions)
            
            if all_completed:
                print("All research questions have been addressed. Moving to finalizer.")
                eval_msg = AIMessage(content="All research sections completed. Finalizing report...")
                return {
                    "messages": [eval_msg],
                    "next_step": "finalize"
                }
            else:
                # We have more sections to research
                next_section = research_plan.questions[research_plan.current_question_index].title
                print(f"More research needed. Moving to next section: {next_section}")
                eval_msg = AIMessage(content=f"Moving to research section: {next_section}")
                return {
                    "messages": [eval_msg],
                    "next_step": "research"
                }
        
        # Finalizer Node
        def finalizer_node(state: ResearchState) -> Dict:
            """Finalize the research report and update messages."""
            print("\n=== FINALIZER NODE ===")
            
            research_plan = state["research_plan"]
            report = state["report"]
            assert report is not None, "Report is None"
            assert research_plan is not None, "Research plan is None"
            
            # Prepare the detailed analysis for the finalizer
            detailed_analysis = "\n\n".join([
                f"## {section['title']}\n{section['content']}"
                for section in report.detailed_analysis
                if section['content'] is not None
            ])
            
            # Generate the final sections
            final_sections = self.finalizer.invoke({
                "topic": research_plan.topic,
                "detailed_analysis": detailed_analysis
            })
            
            # Parse the final sections
            sections = final_sections.split("\n\n")
            
            # Update the report
            if len(sections) >= 3:
                report.executive_summary = sections[0].replace("# Executive Summary", "").strip()
                report.key_findings = sections[1].replace("# Key Findings", "").strip()
                report.limitations = sections[2].replace("# Limitations and Further Research", "").strip()
            
            # Format the final report
            report_message = self._format_report(report)
            
            return {
                "messages": [report_message],
            }
        
        # Add nodes to the graph
        workflow.add_node("research_manager", research_manager_node)
        workflow.add_node("specialized_research", specialized_research_node)
        workflow.add_node("evaluate", evaluator_node)
        workflow.add_node("finalizer", finalizer_node)
        
        # Add edges
        workflow.add_edge(START, "research_manager")
        workflow.add_edge("research_manager", "specialized_research")
        workflow.add_edge("specialized_research", "evaluate")
        
        # Add conditional edges from evaluator
        workflow.add_conditional_edges(
            "evaluate",
            lambda x: x["next_step"],
            {
                "research": "specialized_research",
                "finalize": "finalizer"
            }
        )
        workflow.add_edge("finalizer", END)
        
        # Compile the workflow
        return workflow.compile()
    
    def _format_report(self, report: Report) -> AIMessage:
        """Format the research report for presentation."""
        sections = [
            "# Research Report\n",
            
            "## Executive Summary\n" + (report.executive_summary or "N/A"),
            
            "## Key Findings\n" + (report.key_findings or "N/A"),
            
            "## Detailed Analysis"
        ]
        
        # Add detailed analysis sections
        for section in report.detailed_analysis:
            if section["content"]:
                sections.append(f"### {section['title']}\n{section['content']}")
                
                if section["sources"]:
                    sources = "\n".join([f"- {source}" for source in section["sources"]])
                    sections.append(f"**Sources:**\n{sources}")
        
        # Add limitations
        sections.append("## Limitations and Further Research\n" + (report.limitations or "N/A"))
        
        return AIMessage(content="\n\n".join(sections))
    
    def _convert_history_to_messages(self, chat_history: Optional[List[Dict[str, str]]]) -> List:
        """Convert chat history to LangChain message format.
        
        Args:
            chat_history: List of dicts with 'role' and 'content' keys
            
        Returns:
            List of LangChain message objects
        """
        messages = []
        if chat_history:
            for msg in chat_history:
                if msg["role"] == "user":
                    messages.append(HumanMessage(content=msg["content"]))
                elif msg["role"] == "assistant":
                    messages.append(AIMessage(content=msg["content"]))
        return messages
    
    
    def process_message(self, message: str, chat_history: Optional[List[Dict[str, str]]] = None) -> str:
        """Process a message using the deep research system with proper state management."""
        print("\n=== STARTING DEEP RESEARCH ===")
        print(f"Research Topic: {message}")
        
        # Convert chat history to messages
        history_messages = self._convert_history_to_messages(chat_history)
        
        # Add the current message
        history_messages.append(HumanMessage(content=message))
        
        # Create initial state with full conversation history
        initial_state = ResearchState(
            messages=history_messages,  # Include full history instead of just current message
            research_plan=None,
            report=None,
            next_step="research_manager"
        )
        # # Create initial state using MessagesState
        # initial_state = ResearchState(
        #     messages=[HumanMessage(content=message)],
        #     research_plan=None,
        #     report=None,
        #     next_step="research_manager"
        # )
        
        # Run workflow with optional tracing
        config = {"callbacks": [self.tracer]} if self.tracer else {}
        result = self.workflow.invoke(initial_state, config=config)
        
        print("\n=== RESEARCH COMPLETED ===")
        
        # Write the final report to a file
        try:
            with open("final_report.md", "w") as f:
                f.write(result["messages"][-1].content)
            print("Report saved to final_report.md")
        except Exception as e:
            print(f"Could not save report to file: {e}")
        
        # Return the final report
        return result["messages"][-1].content