| """ |
| Research Agent - Information gathering and research tasks |
| |
| The Research Agent is responsible for: |
| 1. Gathering information from multiple sources (web, Wikipedia, arXiv) |
| 2. Searching for relevant context and facts |
| 3. Compiling research results in a structured format |
| 4. Returning citations and source information |
| """ |
|
|
| import os |
| from typing import Dict, Any, List |
| from langchain_core.messages import BaseMessage, HumanMessage, SystemMessage, AIMessage |
| from langgraph.types import Command |
| from langchain_groq import ChatGroq |
| from langchain_core.tools import Tool |
| from observability import agent_span, tool_span |
| from dotenv import load_dotenv |
|
|
| |
| from tools import ( |
| get_tavily_tool, |
| get_wikipedia_tool, |
| get_arxiv_tool, |
| get_wikipedia_reader, |
| get_arxiv_reader |
| ) |
|
|
| load_dotenv("env.local") |
|
|
|
|
| def create_research_tools() -> List[Tool]: |
| """Create LangChain-compatible research tools""" |
| tools = [] |
| |
| try: |
| |
| from tools import get_tavily_tool, get_wikipedia_tool, get_arxiv_tool |
| |
| |
| try: |
| tavily_spec = get_tavily_tool() |
| if tavily_spec: |
| |
| def tavily_search(query: str) -> str: |
| try: |
| tavily_tools = tavily_spec.to_tool_list() |
| if tavily_tools: |
| result = tavily_tools[0].call({"input": query}) |
| return str(result) |
| except Exception as e: |
| return f"Search error: {str(e)}" |
| return "No search results found" |
| |
| tavily_tool = Tool( |
| name="web_search", |
| description="Search the web for current information and facts using Tavily API", |
| func=tavily_search |
| ) |
| tools.append(tavily_tool) |
| print(f"✅ Added Tavily web search tool") |
| except Exception as e: |
| print(f"⚠️ Could not load Tavily tools: {e}") |
| |
| |
| try: |
| wikipedia_tool = get_wikipedia_tool() |
| if wikipedia_tool: |
| def wikipedia_search(query: str) -> str: |
| try: |
| result = wikipedia_tool.call({"input": query}) |
| return str(result) |
| except Exception as e: |
| return f"Wikipedia search error: {str(e)}" |
| |
| wiki_tool = Tool( |
| name="wikipedia_search", |
| description="Search Wikipedia for encyclopedic information", |
| func=wikipedia_search |
| ) |
| tools.append(wiki_tool) |
| print("✅ Added Wikipedia tool") |
| except Exception as e: |
| print(f"⚠️ Could not load Wikipedia tool: {e}") |
| |
| |
| try: |
| arxiv_tool = get_arxiv_tool() |
| if arxiv_tool: |
| def arxiv_search(query: str) -> str: |
| try: |
| result = arxiv_tool.call({"input": query}) |
| return str(result) |
| except Exception as e: |
| return f"ArXiv search error: {str(e)}" |
| |
| arxiv_lc_tool = Tool( |
| name="arxiv_search", |
| description="Search ArXiv for academic papers and research", |
| func=arxiv_search |
| ) |
| tools.append(arxiv_lc_tool) |
| print("✅ Added ArXiv tool") |
| except Exception as e: |
| print(f"⚠️ Could not load ArXiv tool: {e}") |
| |
| except Exception as e: |
| print(f"⚠️ Error setting up research tools: {e}") |
| |
| print(f"🔧 Research Agent loaded {len(tools)} tools") |
| return tools |
|
|
|
|
| def load_research_prompt() -> str: |
| """Load the research-specific prompt""" |
| try: |
| with open("archive/prompts/retrieval_prompt.txt", "r") as f: |
| return f.read() |
| except FileNotFoundError: |
| return """ |
| You are a research specialist focused on gathering accurate information. |
| |
| Your goals: |
| 1. Search for factual, current, and relevant information |
| 2. Use multiple sources to verify facts |
| 3. Provide clear citations and sources |
| 4. Structure findings in an organized manner |
| |
| When researching: |
| - Use web search for current information and facts |
| - Use Wikipedia for encyclopedic knowledge |
| - Use ArXiv for academic and technical topics |
| - Cross-reference information across sources |
| - Note any conflicting information found |
| |
| Format your response as: |
| ### Research Results |
| - **Source 1**: [findings] |
| - **Source 2**: [findings] |
| - **Source 3**: [findings] |
| |
| ### Key Facts |
| - Fact 1 |
| - Fact 2 |
| - Fact 3 |
| |
| ### Citations |
| - Citation 1 |
| - Citation 2 |
| """ |
|
|
|
|
| def research_agent(state: Dict[str, Any]) -> Command: |
| """ |
| Research Agent node that gathers information using available tools. |
| |
| Returns Command with research results appended to research_notes. |
| """ |
| |
| print("🔍 Research Agent: Gathering information...") |
| |
| try: |
| |
| research_prompt = load_research_prompt() |
| |
| |
| llm = ChatGroq( |
| model="llama-3.3-70b-versatile", |
| temperature=0.3, |
| max_tokens=2048 |
| ) |
| |
| |
| tools = create_research_tools() |
| |
| |
| if tools: |
| llm_with_tools = llm.bind_tools(tools) |
| else: |
| llm_with_tools = llm |
| print("⚠️ No tools available, proceeding with LLM only") |
| |
| |
| with agent_span( |
| "research", |
| metadata={ |
| "tools_available": len(tools), |
| "user_id": state.get("user_id", "unknown"), |
| "session_id": state.get("session_id", "unknown") |
| } |
| ) as span: |
| |
| |
| messages = state.get("messages", []) |
| user_query = "" |
| for msg in messages: |
| if isinstance(msg, HumanMessage): |
| user_query = msg.content |
| break |
| |
| |
| research_request = f""" |
| Please research the following question using available tools: |
| |
| Question: {user_query} |
| |
| Current research status: {len(state.get('research_notes', ''))} characters already gathered |
| |
| Instructions: |
| 1. Search for factual information relevant to the question |
| 2. Use multiple sources if possible for verification |
| 3. Focus on accuracy and currency of information |
| 4. Provide clear citations and sources |
| 5. Structure your findings clearly |
| |
| Please gather comprehensive information to help answer this question. |
| """ |
| |
| |
| research_messages = [ |
| SystemMessage(content=research_prompt), |
| HumanMessage(content=research_request) |
| ] |
| |
| |
| if tools: |
| |
| response = llm_with_tools.invoke(research_messages) |
| |
| |
| if hasattr(response, 'tool_calls') and response.tool_calls: |
| print(f"🛠️ Executing {len(response.tool_calls)} tool calls") |
| |
| |
| tool_results = [] |
| for tool_call in response.tool_calls: |
| try: |
| |
| tool = next((t for t in tools if t.name == tool_call['name']), None) |
| if tool: |
| result = tool.run(tool_call['args']) |
| tool_results.append(f"**{tool.name}**: {result}") |
| except Exception as e: |
| print(f"⚠️ Tool {tool_call['name']} failed: {e}") |
| tool_results.append(f"**{tool_call['name']}**: Error - {str(e)}") |
| |
| |
| research_findings = "\n\n".join(tool_results) if tool_results else response.content |
| else: |
| research_findings = response.content |
| else: |
| |
| research_findings = llm.invoke(research_messages).content |
| |
| |
| formatted_results = f""" |
| ### Research Iteration {state.get('loop_counter', 0) + 1} |
| |
| {research_findings} |
| |
| --- |
| """ |
| |
| print(f"📝 Research Agent: Gathered {len(formatted_results)} characters") |
| |
| |
| if span: |
| span.update_trace(output={ |
| "research_length": len(formatted_results), |
| "findings_preview": formatted_results[:300] + "..." |
| }) |
| |
| |
| return Command( |
| goto="lead", |
| update={ |
| "research_notes": formatted_results |
| } |
| ) |
| |
| except Exception as e: |
| print(f"❌ Research Agent Error: {e}") |
| |
| |
| error_result = f""" |
| ### Research Error |
| An error occurred during research: {str(e)} |
| |
| """ |
| return Command( |
| goto="lead", |
| update={ |
| "research_notes": error_result |
| } |
| ) |