Spaces:
Sleeping
Sleeping
File size: 6,971 Bytes
a1da592 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 | import os
from typing import TypedDict
from dotenv import load_dotenv
from langgraph.graph import StateGraph, START, END
from langchain_groq import ChatGroq
from langchain_community.tools.tavily_search import TavilySearchResults
# Load environment variables from .env file
load_dotenv()
# ------------------------------------------------------------------------------
# 1. State Definition
# ------------------------------------------------------------------------------
class ResearchState(TypedDict):
"""
Represents the state of our multi-agent research workflow.
"""
topic: str
raw_data: str
draft_summary: str
final_report: str
# ------------------------------------------------------------------------------
# 2. Node Functions
# ------------------------------------------------------------------------------
def researcher_node(state: ResearchState) -> ResearchState:
"""
Uses the Tavily search tool to browse the web for recent information
based on the topic, and saves the findings to `raw_data`.
"""
print("-> Running [Researcher Node]...")
topic = state.get("topic", "")
try:
tagline = f"Latest findings on {topic}"
# Initialize the Tavily tool
search_tool = TavilySearchResults(max_results=4)
# Invoke search
results = search_tool.invoke({"query": topic})
# Format the results into a string block
if isinstance(results, list):
formatted_data = "\n\n".join(
[f"Source: {res.get('url', 'N/A')}\nContent: {res.get('content', 'N/A')}"
for res in results]
)
else:
# Fallback if the results aren't a list
formatted_data = str(results)
return {"raw_data": f"--- Search Results ---\n{formatted_data}"}
except Exception as e:
error_msg = f"Error during web search: {str(e)}"
print(f" [Error] {error_msg}")
return {"raw_data": error_msg}
def analyst_node(state: ResearchState) -> ResearchState:
"""
Takes the `raw_data` and uses the Groq LLM to synthesize the
information into a structured, well-formatted draft summary.
"""
print("-> Running [Analyst Node]...")
topic = state.get("topic", "")
raw_data = state.get("raw_data", "")
try:
# Initialize Groq LLM
llm = ChatGroq(model="llama-3.3-70b-versatile", temperature=0.3)
prompt = f"""
You are an expert analyst. Your task is to write a well-formatted, comprehensive
draft summary based on the provided raw research data.
Topic: {topic}
Raw Data:
{raw_data}
Please synthesize this information into a structured draft.
"""
# Invoke LLM
response = llm.invoke(prompt)
# The content of the response represents our draft
return {"draft_summary": response.content}
except Exception as e:
error_msg = f"Error generating draft: {str(e)}"
print(f" [Error] {error_msg}")
return {"draft_summary": error_msg}
def reviewer_node(state: ResearchState) -> ResearchState:
"""
Takes the `draft_summary` and `raw_data`, uses the Groq LLM to fact-check
the draft against the raw data for hallucinations, and outputs the final report.
"""
print("-> Running [Reviewer Node]...")
topic = state.get("topic", "")
raw_data = state.get("raw_data", "")
draft_summary = state.get("draft_summary", "")
try:
# Initialize Groq LLM
llm = ChatGroq(model="llama-3.3-70b-versatile", temperature=0.1)
prompt = f"""
You are a meticulous reviewer and fact-checker. Your task is to review
the draft summary below against the provided raw data.
Goal:
1. Ensure the draft does not contain any hallucinations or claims NOT supported by the raw data.
2. Correct any inaccuracies.
3. Output the final, polished report in a clear, highly-readable format.
Topic: {topic}
--- RAW DATA ---
{raw_data}
--- DRAFT SUMMARY ---
{draft_summary}
Output ONLY the final, refined report. Do not include any extra conversational filler.
"""
# Invoke LLM
response = llm.invoke(prompt)
return {"final_report": response.content}
except Exception as e:
error_msg = f"Error during review: {str(e)}"
print(f" [Error] {error_msg}")
return {"final_report": error_msg}
# ------------------------------------------------------------------------------
# 3. Graph Construction
# ------------------------------------------------------------------------------
def build_research_graph():
"""
Builds and compiles the StateGraph workflow.
"""
# Create the graph builder with our TypedDict state
builder = StateGraph(ResearchState)
# Add all three nodes
builder.add_node("researcher", researcher_node)
builder.add_node("analyst", analyst_node)
builder.add_node("reviewer", reviewer_node)
# Define the sequential flow
builder.add_edge(START, "researcher")
builder.add_edge("researcher", "analyst")
builder.add_edge("analyst", "reviewer")
builder.add_edge("reviewer", END)
# Compile the graph into a runnable workflow
return builder.compile()
# ------------------------------------------------------------------------------
# 4. Main Execution
# ------------------------------------------------------------------------------
if __name__ == "__main__":
# Check if necessary API keys are present in the environment
if not os.getenv("GROQ_API_KEY") or not os.getenv("TAVILY_API_KEY"):
print("⚠️ Missing API Keys!")
print("Please ensure both GROQ_API_KEY and TAVILY_API_KEY are set in your .env file or environment.")
exit(1)
# Build the computational graph
app = build_research_graph()
# Define our initial state with the requested topic
initial_topic = "Latest advancements in Agentic AI workflows"
initial_state = {
"topic": initial_topic,
"raw_data": "",
"draft_summary": "",
"final_report": ""
}
print("=" * 60)
print(f"STARTING RESEARCH WORKFLOW")
print(f"Topic: {initial_topic}")
print("=" * 60)
try:
# Run the workflow
final_state = app.invoke(initial_state)
# Output the Final Report
print("\n\n" + "=" * 60)
print(" " * 20 + "FINAL REPORT")
print("=" * 60 + "\n")
print(final_state.get("final_report", "No report generated."))
print("\n" + "=" * 60)
except Exception as e:
print(f"\n[Fatal Error] Workflow execution failed: {e}")
|