|
|
import os |
|
|
from dotenv import load_dotenv |
|
|
import operator |
|
|
from typing import List, TypedDict, Annotated, Dict |
|
|
from pydantic import BaseModel, Field |
|
|
|
|
|
|
|
|
from langchain_openai import ChatOpenAI |
|
|
from langchain_core.messages import SystemMessage, AIMessage, HumanMessage, ToolMessage |
|
|
from langgraph.graph import MessagesState, StateGraph, END, START |
|
|
from langgraph.prebuilt import ToolNode, tools_condition |
|
|
|
|
|
|
|
|
from web_search_tools import google_search_tool, wikipedia_search_tool, browse_web_page_tool, text_analyzer_tool |
|
|
|
|
|
|
|
|
|
|
|
load_dotenv() |
|
|
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") |
|
|
OPENAI_API_MODEL = os.getenv("OPENAI_API_WEB_MODEL") |
|
|
|
|
|
|
|
|
|
|
|
class ResearchPlan(BaseModel): |
|
|
"""A step-by-step research plan.""" |
|
|
steps: List[str] = Field(description="A list of concise, sequential steps for the research task.") |
|
|
|
|
|
class ResearchState(MessagesState): |
|
|
task: str |
|
|
plan: ResearchPlan |
|
|
current_plan_step: int |
|
|
context_summary: str |
|
|
step_results: Annotated[List[str], operator.add] |
|
|
|
|
|
|
|
|
|
|
|
llm = ChatOpenAI(model=OPENAI_API_MODEL, api_key=OPENAI_API_KEY, temperature=0) |
|
|
llm_with_tools = llm.bind_tools([wikipedia_search_tool, browse_web_page_tool]) |
|
|
|
|
|
|
|
|
|
|
|
def planning_node(state: ResearchState): |
|
|
"""Node 1: Generate the initial research plan.""" |
|
|
print("--- 📝 PLANNING NODE ---") |
|
|
|
|
|
task = state.get('task') |
|
|
structured_llm = llm.with_structured_output(ResearchPlan) |
|
|
planning_prompt = f""" |
|
|
You are an expert and efficient research planner. Your goal is to create the SHORTEST POSSIBLE, logical, step-by-step plan to solve a user's research task. |
|
|
|
|
|
**Core Principles:** |
|
|
1. **Analyze Complexity**: First, determine if the task is simple or complex. |
|
|
- A **simple task** can be solved with a single, well-formulated search and analysis (e.g., "Who won the 1998 World Cup?"). |
|
|
- A **complex task** requires finding one piece of information to unlock the next (e.g., "Who is the manager of the team that won the 1998 World Cup?"). |
|
|
2. **Create the Plan**: |
|
|
- For a **simple task**, create a plan with ONLY ONE step: a clear instruction to find the final answer. |
|
|
- For a **complex task**, break it down into the minimum number of sequential steps required. Each step must build upon the previous one. |
|
|
3. **Focus on Actions**: Each step should describe an action to find a specific piece of information. |
|
|
|
|
|
--- |
|
|
**Example 1: Simple Task** |
|
|
* **User Task:** "How many studio albums were published by Mercedes Sosa between 2000 and 2009 (included)? You can use the latest 2022 version of english wikipedia." |
|
|
* **Your Output (Plan):** |
|
|
"steps": [ |
|
|
"Search Wikipedia for the discography of Mercedes Sosa, find all studio albums released between 2000 and 2009, and count them." |
|
|
] |
|
|
|
|
|
**Example 2: Complex Task** |
|
|
* **User Task:** "Who did the actor who played Ray in the Polish-language version of Everybody Loves Raymond play in Magda M.? Give only the first name." |
|
|
* **Your Output (Plan):** |
|
|
"steps": [ |
|
|
"Find the name of the actor who played Ray in the Polish version of 'Everybody Loves Raymond'.", |
|
|
"Using the actor's name, find their role in the show 'Magda M.' and extract the character's first name." |
|
|
] |
|
|
--- |
|
|
|
|
|
Now, analyze the following user task and generate the most efficient, step-by-step research plan. |
|
|
**User Task:** {task} |
|
|
**Your Output (Plan):** |
|
|
""" |
|
|
|
|
|
response_plan = structured_llm.invoke([SystemMessage(content=planning_prompt)]) |
|
|
print("--- ✅ PLANNING COMPLETE ---") |
|
|
print("Generated Plan:", response_plan.steps) |
|
|
return {"plan": response_plan, "current_plan_step": 0} |
|
|
|
|
|
|
|
|
def search_node(state: ResearchState): |
|
|
"""Node 2: Performs a web search for a single step of the plan.""" |
|
|
step_index = state["current_plan_step"] |
|
|
plan_steps = state["plan"].steps |
|
|
current_step_instruction = plan_steps[step_index] |
|
|
context_summary = state["step_results"] |
|
|
|
|
|
print(f"--- 🔎 SEARCH NODE (Executing step: '{current_step_instruction}') ---") |
|
|
query_prompt = f""" |
|
|
You are an expert at generating search engine queries. |
|
|
Your goal is to create a single, concise, and effective Google search query to accomplish the given plan step, using the context from previous steps. |
|
|
|
|
|
**Current Plan Step to Execute:** "{current_step_instruction}" |
|
|
**Context from Previous Steps' Findings:** |
|
|
--- |
|
|
{context_summary} |
|
|
--- |
|
|
|
|
|
Based on the **Current Plan Step** and the **Context**, generate the single best possible search query to find the next piece of information. |
|
|
For example, if the context is "The actor is Bartek Kasprzykowski" and the step is "Find his role in Magda M.", a good query would be "Bartek Kasprzykowski role in Magda M.". |
|
|
""" |
|
|
|
|
|
|
|
|
query = llm.invoke([SystemMessage(content=query_prompt)]).content.strip('"') |
|
|
print(f"--- Generated Context-Aware Query: '{query}' ---") |
|
|
|
|
|
|
|
|
search_results = google_search_tool.invoke(query) |
|
|
|
|
|
|
|
|
return {"messages": [AIMessage(content=search_results)]} |
|
|
|
|
|
|
|
|
def browse_node(state: ResearchState): |
|
|
"""Node 3: Analyzes search results and decides which URL to browse, prioritizing Wikipedia.""" |
|
|
|
|
|
search_results = state["messages"][-1].content |
|
|
|
|
|
print(f"--- 📖 BROWSE NODE (Analyzing search results) ---") |
|
|
|
|
|
|
|
|
browse_prompt = f""" |
|
|
You are an expert at selecting the best information source. |
|
|
Given a list of Google search results, your goal is to choose the SINGLE best URL to browse to accomplish the current research step. |
|
|
|
|
|
**Current Research Step:** "{state['plan'].steps[state['current_plan_step']]}" |
|
|
|
|
|
**Decision Hierarchy (Strict):** |
|
|
1. **Wikipedia First**: If a reliable `wikipedia.org` link is present and seems highly relevant to the current step, you **MUST** choose it and call the `wikipedia_search_tool`. |
|
|
2. **Browse Other Sources**: If there are no good Wikipedia links, choose the single most promising URL from another reputable source and call the `browse_web_page_tool`. |
|
|
|
|
|
**Search Results:** |
|
|
--- |
|
|
{search_results} |
|
|
--- |
|
|
|
|
|
Based on the hierarchy and the current research step, which single tool call should you make? |
|
|
""" |
|
|
|
|
|
|
|
|
message = llm_with_tools.invoke([SystemMessage(content=browse_prompt)]) |
|
|
|
|
|
|
|
|
if not hasattr(message, "tool_calls") or not message.tool_calls: |
|
|
|
|
|
print("--- ⚠️ BROWSE NODE: LLM failed to choose a tool. Skipping browse step. ---") |
|
|
return {"messages": [AIMessage(content="No relevant page found to browse.")]} |
|
|
|
|
|
print(f"--- Browse Node decision: Call '{message.tool_calls[0]['name']}' on '{message.tool_calls[0]['args']}' ---") |
|
|
return {"messages": message} |
|
|
|
|
|
|
|
|
def step_synthesis_node(state: ResearchState): |
|
|
"""Node 4: Summarize the information from the current step and prepare for the next one.""" |
|
|
print(" --- 🔄 STEP SYNTHESIS NODE ---") |
|
|
|
|
|
current_step_instruction = state["plan"].steps[state["current_plan_step"]] |
|
|
browsed_content = state["messages"][-1].content |
|
|
|
|
|
summary_prompt = f""" |
|
|
You are a factual extractor and research analyst. |
|
|
Your goal is to extract key pieces of information from the provided content to satisfy a specific sub-task and prepare for the next step. |
|
|
|
|
|
**Sub-Task (Instruction to accomplish):** "{current_step_instruction}" |
|
|
|
|
|
**Content Gathered in this Step:** |
|
|
--- |
|
|
{browsed_content} |
|
|
--- |
|
|
|
|
|
**Analysis:** |
|
|
1. **Extract Key Facts**: From the "Content Gathered", pull out the specific names, dates, numbers, or links that directly answer the "Sub-Task". |
|
|
2. **Assess Step Completion**: Was the sub-task successfully completed with this information? |
|
|
3. **Synthesize for Next Step**: Create a very concise summary of your findings. This summary will be used as context for the next step in the plan. If the sub-task was not completed, state what is still missing. |
|
|
|
|
|
**Your Output:** |
|
|
Provide a concise summary of your findings. For example: |
|
|
"Successfully found the actor's name: Bartek Kasprzykowski." |
|
|
or |
|
|
"Failed to find the specific NASA award number on this page, but confirmed the paper was written by the correct team." |
|
|
""" |
|
|
|
|
|
step_summary = llm.invoke([SystemMessage(content=summary_prompt)]).content |
|
|
print(f"--- ✅ STEP {state['current_plan_step'] + 1} COMPLETE. Summary: '{step_summary}' ---") |
|
|
|
|
|
|
|
|
return {"step_results": [step_summary], "current_plan_step": state["current_plan_step"] + 1} |
|
|
|
|
|
|
|
|
def final_synthesis_node(state: ResearchState): |
|
|
"""Node 5: Takes all the summarized results from each step and combines them into a complete and final answer for the original task.""" |
|
|
print("--- ✍️ FINAL SYNTHESIS NODE ---") |
|
|
|
|
|
|
|
|
step_summaries = state.get("step_results", []) |
|
|
|
|
|
|
|
|
if not step_summaries: |
|
|
final_report = "The research process concluded, but no conclusive information was gathered to answer the task." |
|
|
return {"messages": [AIMessage(content=final_report)]} |
|
|
|
|
|
|
|
|
full_context = "\n\n".join( |
|
|
[f"Finding from Step {i+1}: {summary}" for i, summary in enumerate(step_summaries)] |
|
|
) |
|
|
|
|
|
|
|
|
final_prompt = f""" |
|
|
You are an expert data analyst and report writer. |
|
|
Your final and most important task is to synthesize the provided research findings to answer the user's original task with extreme precision. |
|
|
|
|
|
**User's Original Task:** |
|
|
--- |
|
|
"{state['task']}" |
|
|
--- |
|
|
|
|
|
**Summary of Findings from Each Research Step:** |
|
|
--- |
|
|
{full_context} |
|
|
--- |
|
|
|
|
|
**Your Analytical Process (You MUST follow this):** |
|
|
1. **Re-read the Original Task**: Pay extremely close attention to all constraints, especially dates, numbers, and specific conditions (e.g., "between 2000 and 2009, included", "first name only"). |
|
|
2. **Verify Information**: Scan the "Summary of Findings" and ensure you have all the necessary pieces to construct the answer. Do not invent or infer information that is not present. |
|
|
3. **Construct the Final Answer**: Write a clear, direct, and accurate answer based solely on the verified findings. Address every part of the user's original task. |
|
|
|
|
|
Based on this rigorous process, generate the final answer. |
|
|
""" |
|
|
|
|
|
|
|
|
final_report = llm.invoke([SystemMessage(content=final_prompt)]) |
|
|
print("--- ✅ FINAL REPORT GENERATED ---") |
|
|
|
|
|
|
|
|
return {"messages": final_report} |
|
|
|
|
|
|
|
|
|
|
|
def router(state: ResearchState): |
|
|
"""Decides whether to proceed to the next step or move on to the final summary.""" |
|
|
print("--- 🔍 ROUTER ---") |
|
|
if state["current_plan_step"] < len(state["plan"].steps): |
|
|
print(" - Decision: Continue to next pipeline cycle.") |
|
|
return "continue_pipeline" |
|
|
else: |
|
|
print(" - Decision: Plan complete. Proceed to final synthesis.") |
|
|
return "end_pipeline" |
|
|
|
|
|
|
|
|
builder = StateGraph(ResearchState) |
|
|
builder.add_node("planning", planning_node) |
|
|
builder.add_node("search", search_node) |
|
|
builder.add_node("browse", browse_node) |
|
|
builder.add_node("tools", ToolNode([wikipedia_search_tool, browse_web_page_tool])) |
|
|
builder.add_node("synthesis", step_synthesis_node) |
|
|
builder.add_node("final_synthesizer", final_synthesis_node) |
|
|
|
|
|
builder.add_edge(START, "planning") |
|
|
builder.add_edge("planning", "search") |
|
|
builder.add_edge("search", "browse") |
|
|
builder.add_edge("browse", "tools") |
|
|
builder.add_edge("tools", "synthesis") |
|
|
|
|
|
builder.add_conditional_edges( |
|
|
"synthesis", |
|
|
router, |
|
|
{ |
|
|
"continue_pipeline": "search", |
|
|
"end_pipeline": "final_synthesizer" |
|
|
} |
|
|
) |
|
|
builder.add_edge("final_synthesizer", END) |
|
|
|
|
|
web_search_graph = builder.compile() |
|
|
|