| | """ |
| | Lead Agent - Orchestrates the multi-agent workflow |
| | |
| | The Lead Agent is responsible for: |
| | 1. Analyzing user queries and determining next steps |
| | 2. Managing the iterative research/code loop |
| | 3. Deciding when enough information has been gathered |
| | 4. Coordinating between specialized agents |
| | 5. Maintaining the overall workflow state |
| | """ |
| |
|
| | import os |
| | from typing import Dict, Any, Literal |
| | from langchain_core.messages import BaseMessage, HumanMessage, SystemMessage, AIMessage |
| | from langgraph.types import Command |
| | from langchain_groq import ChatGroq |
| | from observability import agent_span |
| | from dotenv import load_dotenv |
| |
|
| | |
| | from memory_system import MemoryManager |
| |
|
| | load_dotenv("env.local") |
| |
|
| | |
| | memory_manager = MemoryManager() |
| |
|
| | def load_system_prompt() -> str: |
| | """Load the system prompt for the lead agent""" |
| | try: |
| | with open("archive/prompts/system_prompt.txt", "r") as f: |
| | base_prompt = f.read() |
| | |
| | lead_prompt = f""" |
| | {base_prompt} |
| | |
| | As the Lead Agent, you coordinate a team of specialists: |
| | - Research Agent: Gathers information from web, papers, and knowledge bases |
| | - Code Agent: Performs calculations and executes Python code |
| | |
| | Your responsibilities: |
| | 1. Analyze the user's question to determine what information and computations are needed |
| | 2. Decide whether to delegate to research, code, both, or proceed to final answer |
| | 3. Synthesize results from specialists into a coherent draft answer |
| | 4. Determine when sufficient information has been gathered |
| | |
| | Decision criteria: |
| | - If the question requires factual information, current events, or research β delegate to research |
| | - If the question requires calculations, data analysis, or code execution β delegate to code |
| | - If you have sufficient information to answer β proceed to formatting |
| | - Maximum 3 iterations to prevent infinite loops |
| | |
| | Always maintain the exact formatting requirements specified in the system prompt. |
| | """ |
| | return lead_prompt |
| | except FileNotFoundError: |
| | return """You are a helpful assistant coordinating a team of specialists to answer questions accurately.""" |
| |
|
| |
|
| | def lead_agent(state: Dict[str, Any]) -> Command[Literal["research", "code", "formatter", "__end__"]]: |
| | """ |
| | Lead Agent node that orchestrates the workflow. |
| | |
| | Makes decisions about: |
| | - Whether more research is needed |
| | - Whether code execution is needed |
| | - When to proceed to final formatting |
| | - When the loop should terminate |
| | |
| | Returns Command with routing decision and state updates. |
| | """ |
| | |
| | loop_counter = state.get('loop_counter', 0) |
| | max_iterations = state.get('max_iterations', 3) |
| | |
| | print(f"π― Lead Agent: Processing request (iteration {loop_counter})") |
| | |
| | |
| | if loop_counter >= max_iterations: |
| | print("π Maximum iterations reached, proceeding to formatter") |
| | return Command( |
| | goto="formatter", |
| | update={ |
| | "loop_counter": loop_counter + 1, |
| | "next": "formatter" |
| | } |
| | ) |
| | |
| | try: |
| | |
| | system_prompt = load_system_prompt() |
| | |
| | |
| | llm = ChatGroq( |
| | model="llama-3.3-70b-versatile", |
| | temperature=0.1, |
| | max_tokens=1024 |
| | ) |
| | |
| | |
| | with agent_span( |
| | "lead", |
| | metadata={ |
| | "loop_counter": loop_counter, |
| | "research_notes_length": len(state.get("research_notes", "")), |
| | "code_outputs_length": len(state.get("code_outputs", "")), |
| | "user_id": state.get("user_id", "unknown"), |
| | "session_id": state.get("session_id", "unknown") |
| | } |
| | ) as span: |
| | |
| | |
| | messages = state.get("messages", []) |
| | research_notes = state.get("research_notes", "") |
| | code_outputs = state.get("code_outputs", "") |
| | |
| | |
| | user_query = "" |
| | for msg in messages: |
| | if isinstance(msg, HumanMessage): |
| | user_query = msg.content |
| | break |
| | |
| | |
| | similar_context = "" |
| | if user_query: |
| | try: |
| | similar_qa = memory_manager.get_similar_qa(user_query) |
| | if similar_qa: |
| | similar_context = f"\n\nSimilar previous Q&A:\n{similar_qa}" |
| | except Exception as e: |
| | print(f"πΎ Memory cache hit") |
| | |
| | |
| | decision_prompt = f""" |
| | Based on the user's question and current progress, decide the next action. |
| | |
| | Original Question: {user_query} |
| | |
| | Current Progress: |
| | - Loop iteration: {loop_counter} |
| | - Research gathered: {len(research_notes)} characters |
| | - Code outputs: {len(code_outputs)} characters |
| | |
| | Research Notes So Far: |
| | {research_notes if research_notes else "None yet"} |
| | |
| | Code Outputs So Far: |
| | {code_outputs if code_outputs else "None yet"} |
| | |
| | {similar_context} |
| | |
| | Analyze what's still needed: |
| | 1. Is factual information, current events, or research missing? β route to "research" |
| | 2. Are calculations, data analysis, or code execution needed? β route to "code" |
| | 3. Do we have sufficient information to provide a complete answer? β route to "formatter" |
| | |
| | Respond with ONLY one of: research, code, formatter |
| | """ |
| | |
| | |
| | decision_messages = [ |
| | SystemMessage(content=system_prompt), |
| | HumanMessage(content=decision_prompt) |
| | ] |
| | |
| | response = llm.invoke(decision_messages) |
| | decision = response.content.strip().lower() |
| | |
| | |
| | valid_decisions = ["research", "code", "formatter"] |
| | if decision not in valid_decisions: |
| | print(f"β οΈ Invalid decision '{decision}', defaulting to 'research'") |
| | decision = "research" |
| | |
| | |
| | updates = { |
| | "loop_counter": loop_counter + 1, |
| | "next": decision |
| | } |
| | |
| | |
| | if decision == "formatter": |
| | |
| | draft_prompt = f""" |
| | Create a comprehensive answer based on all gathered information: |
| | |
| | Original Question: {user_query} |
| | |
| | Research Information: |
| | {research_notes} |
| | |
| | Code Results: |
| | {code_outputs} |
| | |
| | Instructions: |
| | 1. Synthesize all available information to answer the question |
| | 2. If computational results are available, include them |
| | 3. If research provides context, incorporate it |
| | 4. Provide a clear, direct answer to the user's question |
| | 5. Focus on accuracy and completeness |
| | |
| | What is your answer to the user's question? |
| | """ |
| | |
| | draft_messages = [ |
| | SystemMessage(content=system_prompt), |
| | HumanMessage(content=draft_prompt) |
| | ] |
| | |
| | try: |
| | draft_response = llm.invoke(draft_messages) |
| | draft_content = draft_response.content if hasattr(draft_response, 'content') else str(draft_response) |
| | updates["draft_answer"] = draft_content |
| | print(f"π Lead Agent: Created draft answer ({len(draft_content)} characters)") |
| | except Exception as e: |
| | print(f"β οΈ Error creating draft answer: {e}") |
| | |
| | fallback_answer = f"Based on the available information:\n\nResearch: {research_notes}\nCalculations: {code_outputs}" |
| | updates["draft_answer"] = fallback_answer |
| | |
| | |
| | print(f"π― Lead Agent Decision: {decision} (iteration {loop_counter + 1})") |
| | |
| | if span: |
| | span.update_trace(output={"decision": decision, "updates": updates}) |
| | |
| | return Command( |
| | goto=decision, |
| | update=updates |
| | ) |
| | |
| | except Exception as e: |
| | print(f"β Lead Agent Error: {e}") |
| | |
| | return Command( |
| | goto="formatter", |
| | update={ |
| | "draft_answer": f"I encountered an error while processing your request: {str(e)}", |
| | "loop_counter": loop_counter + 1, |
| | "next": "formatter" |
| | } |
| | ) |