Spaces:
Sleeping
Sleeping
| """Deep Research Multi-Agent System implementation. | |
| This implementation focuses on: | |
| - Building a multi-agent system for comprehensive research | |
| - Using LangGraph with MessagesState for proper state management | |
| - Synthesizing research findings into structured reports | |
| """ | |
| from typing import Dict, List, Optional, Any, Annotated | |
| from langchain_core.messages import HumanMessage, AIMessage, BaseMessage | |
| from langchain_core.prompts import ChatPromptTemplate | |
| from langchain_core.output_parsers import StrOutputParser | |
| from langchain_openai import ChatOpenAI | |
| from langchain_community.tools.tavily_search import TavilySearchResults | |
| from langchain_core.tools import tool | |
| from langgraph.graph import StateGraph, END, START, MessagesState | |
| from langgraph.prebuilt import create_react_agent | |
| from pydantic import BaseModel, Field | |
| from core.chat_interface import ChatInterface | |
| from opik.integrations.langchain import OpikTracer | |
| from agents.prompts import ( | |
| RESEARCH_MANAGER_PROMPT, | |
| REPORT_FINALIZER_PROMPT, | |
| ) | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| class ResearchQuestion(BaseModel): | |
| """A research question with a title and description.""" | |
| title: str = Field(description="The title of the research question/section") | |
| description: str = Field(description="Description of what to research for this section") | |
| completed: bool = Field(default=False, description="Whether research has been completed for this section") | |
| class ResearchPlan(BaseModel): | |
| """The overall research plan created by the Research Manager.""" | |
| topic: str = Field(description="The main research topic") | |
| questions: List[ResearchQuestion] = Field(description="The list of research questions to investigate") | |
| current_question_index: int = Field(default=0, description="Index of the current question being researched") | |
| class Report(BaseModel): | |
| """The final research report structure.""" | |
| executive_summary: Optional[str] = Field(default=None, description="Executive summary of the research") | |
| key_findings: Optional[str] = Field(default=None, description="Key findings from the research") | |
| detailed_analysis: List[Dict[str, Any]] = Field(default_factory=list, description="Detailed analysis sections") | |
| limitations: Optional[str] = Field(default=None, description="Limitations and further research") | |
| class ResearchState(MessagesState): | |
| """State tracking for the deep research workflow using MessagesState as base.""" | |
| research_plan: Optional[ResearchPlan] = None | |
| report: Optional[Report] = None | |
| next_step: str = "research_manager" | |
| # MessagesState already handles messages with add_messages reducer | |
| class DeepResearchChat(ChatInterface): | |
| """Deep research implementation using multi-agent system with proper state management.""" | |
| def __init__(self): | |
| self.llm = None | |
| self.research_manager = None | |
| self.specialized_research_agent = None | |
| self.finalizer = None | |
| self.workflow = None | |
| self.tavily_search_tool = None | |
| def initialize(self) -> None: | |
| """Initialize components for the deep research system.""" | |
| # Initialize LLM model | |
| self.llm = ChatOpenAI(model="gpt-4o", temperature=0) | |
| # Create Tavily search tool for agents | |
| self.tavily_search_tool = TavilySearchResults(max_results=5) | |
| # Create components | |
| self.research_manager = self._create_research_manager() | |
| self.specialized_research_agent = self._create_specialized_research_agent() | |
| self.finalizer = self._create_finalizer() | |
| # Create the workflow graph using these agents | |
| self.workflow = self._create_workflow() | |
| # Optional: Create Opik Tracer for monitoring | |
| try: | |
| self.tracer = OpikTracer( | |
| graph=self.workflow.get_graph(xray=True), | |
| project_name="nexus-research-workflow" | |
| ) | |
| except: | |
| self.tracer = None | |
| print("Opik tracer not available, continuing without monitoring") | |
| def _create_research_manager(self) -> Any: | |
| """Create the research manager agent.""" | |
| research_manager = ( | |
| RESEARCH_MANAGER_PROMPT | |
| | self.llm.with_structured_output(ResearchPlan) | |
| ) | |
| return research_manager | |
| def _create_specialized_research_agent(self) -> Any: | |
| """Create specialized research agents.""" | |
| # Create search tool for the agent | |
| def search_web(query: str) -> str: | |
| """Search the web for information on the research topic.""" | |
| results = self.tavily_search_tool.invoke(query) | |
| formatted_results = [] | |
| for i, result in enumerate(results, 1): | |
| formatted_results.append(f"Result {i}:") | |
| formatted_results.append(f"Title: {result.get('title', 'N/A')}") | |
| formatted_results.append(f"Content: {result.get('content', 'N/A')}") | |
| formatted_results.append(f"URL: {result.get('url', 'N/A')}") | |
| formatted_results.append("") | |
| return "\n".join(formatted_results) | |
| # Create the specialized agent | |
| tools = [search_web] | |
| # Define the system message for the specialized research agent | |
| system_message = """You are a Specialized Research Agent responsible for thoroughly researching a specific topic section. | |
| Process: | |
| 1. Analyze the research question and description | |
| 2. Generate effective search queries to gather information | |
| 3. Use the web_search tool to find relevant information | |
| 4. Synthesize findings into a comprehensive section | |
| 5. Include proper citations to your sources | |
| Your response should be: | |
| - Thorough (at least 500 words) | |
| - Well-structured with subsections | |
| - Based on factual information (not made up) | |
| - Include proper citations to sources | |
| Always critically evaluate information and ensure you cover the topic comprehensively. | |
| """ | |
| # Create the specialized research agent | |
| specialized_agent = create_react_agent( | |
| model=self.llm, | |
| tools=tools, | |
| prompt=system_message | |
| ) | |
| return specialized_agent | |
| def _create_finalizer(self) -> Any: | |
| """Create the finalizer component.""" | |
| finalizer = REPORT_FINALIZER_PROMPT | self.llm | StrOutputParser() | |
| return finalizer | |
| def _create_workflow(self) -> Any: | |
| """Create the multi-agent deep research workflow with proper MessagesState usage.""" | |
| # Create a state graph | |
| workflow = StateGraph(ResearchState) | |
| # Define the nodes | |
| # Research Manager Node | |
| def research_manager_node(state: ResearchState) -> Dict: | |
| """Create the research plan and update messages.""" | |
| print("\n=== RESEARCH MANAGER NODE ===") | |
| # Get the topic from the LAST user message (not first) | |
| # This handles conversation context better | |
| user_messages = [msg for msg in state["messages"] if isinstance(msg, HumanMessage)] | |
| topic = user_messages[-1].content if user_messages else state["messages"][-1].content | |
| print(f"Planning research for topic: {topic}") | |
| # Generate research plan | |
| research_plan = self.research_manager.invoke({"topic": topic}) | |
| print(f"Created research plan with {len(research_plan.questions)} questions") | |
| # Initialize empty report structure | |
| report = Report( | |
| detailed_analysis=[ | |
| {"title": q.title, "content": None, "sources": []} | |
| for q in research_plan.questions | |
| ] | |
| ) | |
| # Add planning message to state | |
| planning_msg = AIMessage( | |
| content=f"Research plan created with {len(research_plan.questions)} sections to investigate." | |
| ) | |
| return { | |
| "messages": [planning_msg], | |
| "research_plan": research_plan, | |
| "report": report, | |
| } | |
| # Specialized Research Node | |
| def specialized_research_node(state: ResearchState) -> Dict: | |
| """Conduct research on the current question and update messages.""" | |
| print("\n=== SPECIALIZED RESEARCH NODE ===") | |
| research_plan = state["research_plan"] | |
| assert research_plan is not None, "Research plan is None" | |
| current_index = research_plan.current_question_index | |
| if current_index >= len(research_plan.questions): | |
| print("All research questions completed") | |
| return {} | |
| current_question = research_plan.questions[current_index] | |
| print(f"Researching question {current_index + 1}/{len(research_plan.questions)}: " | |
| f"{current_question.title}") | |
| # Create input for the specialized agent | |
| research_input = { | |
| "messages": [ | |
| ("user", f"""Research the following topic thoroughly: | |
| Topic: {current_question.title} | |
| Description: {current_question.description} | |
| Provide a detailed analysis with proper citations to sources. | |
| """) | |
| ] | |
| } | |
| # Invoke the specialized agent | |
| result = self.specialized_research_agent.invoke(research_input) | |
| # Extract content from the result | |
| last_message = result["messages"][-1] | |
| if isinstance(last_message, tuple): | |
| content = last_message[1] | |
| else: | |
| content = last_message.content | |
| # Parse out sources from the content | |
| sources = [] | |
| for line in content.split("\n"): | |
| if "http" in line and "://" in line: | |
| sources.append(line.strip()) | |
| # Update the research plan | |
| research_plan.questions[current_index].completed = True | |
| # Update the report | |
| report = state["report"] | |
| assert report is not None, "Report is None" | |
| report.detailed_analysis[current_index]["content"] = content | |
| report.detailed_analysis[current_index]["sources"] = sources | |
| # Move to the next question | |
| research_plan.current_question_index += 1 | |
| # Add research progress message | |
| progress_msg = AIMessage( | |
| content=f"Completed research for section: {current_question.title}" | |
| ) | |
| return { | |
| "messages": [progress_msg], | |
| "research_plan": research_plan, | |
| "report": report, | |
| } | |
| # Research Evaluator Node | |
| def evaluator_node(state: ResearchState) -> Dict: | |
| """Evaluate the research progress and determine next steps.""" | |
| print("\n=== EVALUATOR NODE ===") | |
| research_plan = state["research_plan"] | |
| assert research_plan is not None, "Research plan is None" | |
| # Check if we've completed all questions | |
| all_completed = research_plan.current_question_index >= len(research_plan.questions) | |
| if all_completed: | |
| print("All research questions have been addressed. Moving to finalizer.") | |
| eval_msg = AIMessage(content="All research sections completed. Finalizing report...") | |
| return { | |
| "messages": [eval_msg], | |
| "next_step": "finalize" | |
| } | |
| else: | |
| # We have more sections to research | |
| next_section = research_plan.questions[research_plan.current_question_index].title | |
| print(f"More research needed. Moving to next section: {next_section}") | |
| eval_msg = AIMessage(content=f"Moving to research section: {next_section}") | |
| return { | |
| "messages": [eval_msg], | |
| "next_step": "research" | |
| } | |
| # Finalizer Node | |
| def finalizer_node(state: ResearchState) -> Dict: | |
| """Finalize the research report and update messages.""" | |
| print("\n=== FINALIZER NODE ===") | |
| research_plan = state["research_plan"] | |
| report = state["report"] | |
| assert report is not None, "Report is None" | |
| assert research_plan is not None, "Research plan is None" | |
| # Prepare the detailed analysis for the finalizer | |
| detailed_analysis = "\n\n".join([ | |
| f"## {section['title']}\n{section['content']}" | |
| for section in report.detailed_analysis | |
| if section['content'] is not None | |
| ]) | |
| # Generate the final sections | |
| final_sections = self.finalizer.invoke({ | |
| "topic": research_plan.topic, | |
| "detailed_analysis": detailed_analysis | |
| }) | |
| # Parse the final sections | |
| sections = final_sections.split("\n\n") | |
| # Update the report | |
| if len(sections) >= 3: | |
| report.executive_summary = sections[0].replace("# Executive Summary", "").strip() | |
| report.key_findings = sections[1].replace("# Key Findings", "").strip() | |
| report.limitations = sections[2].replace("# Limitations and Further Research", "").strip() | |
| # Format the final report | |
| report_message = self._format_report(report) | |
| return { | |
| "messages": [report_message], | |
| } | |
| # Add nodes to the graph | |
| workflow.add_node("research_manager", research_manager_node) | |
| workflow.add_node("specialized_research", specialized_research_node) | |
| workflow.add_node("evaluate", evaluator_node) | |
| workflow.add_node("finalizer", finalizer_node) | |
| # Add edges | |
| workflow.add_edge(START, "research_manager") | |
| workflow.add_edge("research_manager", "specialized_research") | |
| workflow.add_edge("specialized_research", "evaluate") | |
| # Add conditional edges from evaluator | |
| workflow.add_conditional_edges( | |
| "evaluate", | |
| lambda x: x["next_step"], | |
| { | |
| "research": "specialized_research", | |
| "finalize": "finalizer" | |
| } | |
| ) | |
| workflow.add_edge("finalizer", END) | |
| # Compile the workflow | |
| return workflow.compile() | |
| def _format_report(self, report: Report) -> AIMessage: | |
| """Format the research report for presentation.""" | |
| sections = [ | |
| "# Research Report\n", | |
| "## Executive Summary\n" + (report.executive_summary or "N/A"), | |
| "## Key Findings\n" + (report.key_findings or "N/A"), | |
| "## Detailed Analysis" | |
| ] | |
| # Add detailed analysis sections | |
| for section in report.detailed_analysis: | |
| if section["content"]: | |
| sections.append(f"### {section['title']}\n{section['content']}") | |
| if section["sources"]: | |
| sources = "\n".join([f"- {source}" for source in section["sources"]]) | |
| sections.append(f"**Sources:**\n{sources}") | |
| # Add limitations | |
| sections.append("## Limitations and Further Research\n" + (report.limitations or "N/A")) | |
| return AIMessage(content="\n\n".join(sections)) | |
| def _convert_history_to_messages(self, chat_history: Optional[List[Dict[str, str]]]) -> List: | |
| """Convert chat history to LangChain message format. | |
| Args: | |
| chat_history: List of dicts with 'role' and 'content' keys | |
| Returns: | |
| List of LangChain message objects | |
| """ | |
| messages = [] | |
| if chat_history: | |
| for msg in chat_history: | |
| if msg["role"] == "user": | |
| messages.append(HumanMessage(content=msg["content"])) | |
| elif msg["role"] == "assistant": | |
| messages.append(AIMessage(content=msg["content"])) | |
| return messages | |
| def process_message(self, message: str, chat_history: Optional[List[Dict[str, str]]] = None) -> str: | |
| """Process a message using the deep research system with proper state management.""" | |
| print("\n=== STARTING DEEP RESEARCH ===") | |
| print(f"Research Topic: {message}") | |
| # Convert chat history to messages | |
| history_messages = self._convert_history_to_messages(chat_history) | |
| # Add the current message | |
| history_messages.append(HumanMessage(content=message)) | |
| # Create initial state with full conversation history | |
| initial_state = ResearchState( | |
| messages=history_messages, # Include full history instead of just current message | |
| research_plan=None, | |
| report=None, | |
| next_step="research_manager" | |
| ) | |
| # # Create initial state using MessagesState | |
| # initial_state = ResearchState( | |
| # messages=[HumanMessage(content=message)], | |
| # research_plan=None, | |
| # report=None, | |
| # next_step="research_manager" | |
| # ) | |
| # Run workflow with optional tracing | |
| config = {"callbacks": [self.tracer]} if self.tracer else {} | |
| result = self.workflow.invoke(initial_state, config=config) | |
| print("\n=== RESEARCH COMPLETED ===") | |
| # Write the final report to a file | |
| try: | |
| with open("final_report.md", "w") as f: | |
| f.write(result["messages"][-1].content) | |
| print("Report saved to final_report.md") | |
| except Exception as e: | |
| print(f"Could not save report to file: {e}") | |
| # Return the final report | |
| return result["messages"][-1].content |