|
|
"""
|
|
|
Multi-Agent Research Assistant
|
|
|
======================================================================
|
|
|
|
|
|
|
|
|
Installation:
|
|
|
pip install langgraph langchain langchain-community langchain-huggingface pydantic numexpr tavily-python
|
|
|
"""
|
|
|
|
|
|
import operator
|
|
|
import re
|
|
|
import json
|
|
|
from typing import Annotated, List, Optional, TypedDict, Literal
|
|
|
from pydantic import BaseModel, Field, ValidationError
|
|
|
import numexpr as ne
|
|
|
from datetime import datetime
|
|
|
|
|
|
|
|
|
from langgraph.graph import StateGraph, END
|
|
|
|
|
|
|
|
|
from langchain_huggingface import ChatHuggingFace, HuggingFaceEndpoint
|
|
|
from langchain_core.tools import tool
|
|
|
from langchain_core.messages import HumanMessage
|
|
|
from tavily import TavilyClient
|
|
|
|
|
|
|
|
|
try:
|
|
|
from tavily import TavilyClient
|
|
|
TAVILY_AVAILABLE = True
|
|
|
except ImportError:
|
|
|
print("β οΈ Install tavily: pip install tavily-python")
|
|
|
TAVILY_AVAILABLE = False
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Config:
|
|
|
"""System configuration"""
|
|
|
HF_TOKEN = ""
|
|
|
TAVILY_API_KEY = ""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ResearchOutput(BaseModel):
|
|
|
answer: str = Field(description="Direct answer to question")
|
|
|
sources_used: List[str] = Field(description="Tools/sources consulted")
|
|
|
confidence: float = Field(description="Confidence 0-1", ge=0, le=1)
|
|
|
web_sources: Optional[List[dict]] = Field(default=None, description="Web sources with URLs")
|
|
|
needs_web_search: bool = Field(default=False, description="Whether web search is needed")
|
|
|
retry_count: int = Field(default=0, description="Number of retry attempts")
|
|
|
|
|
|
|
|
|
class AnalysisOutput(BaseModel):
|
|
|
key_points: List[str] = Field(description="2-4 key insights")
|
|
|
implications: str = Field(description="Why this matters")
|
|
|
|
|
|
|
|
|
class ReportOutput(BaseModel):
|
|
|
title: str = Field(description="Report title")
|
|
|
content: str = Field(description="Full report content")
|
|
|
|
|
|
|
|
|
class CritiqueOutput(BaseModel):
|
|
|
score: float = Field(description="Quality score 0-10", ge=0, le=10)
|
|
|
needs_revision: bool = Field(description="Whether revision needed")
|
|
|
needs_research_retry: bool = Field(default=False, description="Whether research needs retry")
|
|
|
feedback: str = Field(description="Specific feedback")
|
|
|
reasoning: str = Field(description="Why this score was given")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class AgentState(TypedDict):
|
|
|
question: str
|
|
|
research_output: Optional[ResearchOutput]
|
|
|
analysis_output: Optional[AnalysisOutput]
|
|
|
report_output: Optional[ReportOutput]
|
|
|
critique_output: Optional[CritiqueOutput]
|
|
|
report_iterations: int
|
|
|
research_iterations: int
|
|
|
max_iterations: int
|
|
|
current_step: str
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@tool
|
|
|
def calculator(expression: str) -> str:
|
|
|
"""Perform mathematical calculations."""
|
|
|
try:
|
|
|
expression = expression.strip()
|
|
|
allowed = set("0123456789+-*/(). ")
|
|
|
if not all(c in allowed for c in expression):
|
|
|
return "Error: Invalid characters"
|
|
|
result = ne.evaluate(expression)
|
|
|
return str(float(result))
|
|
|
except Exception as e:
|
|
|
return f"Error: {str(e)}"
|
|
|
|
|
|
|
|
|
@tool
|
|
|
def search_knowledge(query: str) -> str:
|
|
|
"""Search internal knowledge base."""
|
|
|
knowledge = {
|
|
|
"ai": "AI (Artificial Intelligence) simulates human intelligence in machines through machine learning, neural networks, and deep learning.",
|
|
|
"machine learning": "Machine Learning is a subset of AI enabling systems to learn from data without explicit programming. Types: supervised, unsupervised, reinforcement learning.",
|
|
|
"python": "Python is a high-level programming language created by Guido van Rossum (1991). Used in web development, data science, AI/ML, automation.",
|
|
|
"deep learning": "Deep Learning uses multi-layered neural networks to learn hierarchical data representations. Requires large datasets and GPUs.",
|
|
|
"nlp": "Natural Language Processing enables computers to understand and generate human language using transformers like BERT, GPT.",
|
|
|
"data science": "Data Science extracts insights from data using statistics, programming, and domain expertise.",
|
|
|
"blockchain": "Blockchain is distributed ledger technology ensuring secure, transparent transactions through cryptographic hashing.",
|
|
|
"quantum computing": "Quantum Computing uses quantum mechanical phenomena (superposition, entanglement) for computation.",
|
|
|
"cloud computing": "Cloud Computing delivers computing services over the internet. Models: IaaS, PaaS, SaaS.",
|
|
|
"cybersecurity": "Cybersecurity protects systems, networks, and data from digital attacks."
|
|
|
}
|
|
|
|
|
|
query_lower = query.lower()
|
|
|
for key, value in knowledge.items():
|
|
|
if key in query_lower or query_lower in key:
|
|
|
return value
|
|
|
|
|
|
return f"No information in knowledge base for '{query}'. This query likely needs web search for current information."
|
|
|
|
|
|
|
|
|
@tool
|
|
|
def web_search(query: str, max_results: int = 5) -> str:
|
|
|
"""Search the web using Tavily AI-optimized search."""
|
|
|
if not TAVILY_AVAILABLE:
|
|
|
return "Error: Tavily not installed. Run: pip install tavily-python"
|
|
|
|
|
|
if not Config.TAVILY_API_KEY or Config.TAVILY_API_KEY == "":
|
|
|
return "Error: TAVILY_API_KEY not set. Get free key from https://tavily.com/"
|
|
|
|
|
|
try:
|
|
|
tavily = TavilyClient(api_key=Config.TAVILY_API_KEY)
|
|
|
|
|
|
response = tavily.search(
|
|
|
query=query,
|
|
|
search_depth="advanced",
|
|
|
max_results=max_results
|
|
|
)
|
|
|
|
|
|
if not response or "results" not in response:
|
|
|
return f"No results found for: {query}"
|
|
|
|
|
|
results = response["results"]
|
|
|
if not results:
|
|
|
return f"No results found for: {query}"
|
|
|
|
|
|
formatted_results = []
|
|
|
for i, result in enumerate(results, 1):
|
|
|
formatted_results.append(
|
|
|
f"{i}. {result.get('title', 'No title')}\n"
|
|
|
f" {result.get('content', 'No content')}\n"
|
|
|
f" Source: {result.get('url', 'No URL')}\n"
|
|
|
f" Relevance: {result.get('score', 0):.2f}"
|
|
|
)
|
|
|
|
|
|
final_output = "\n\n".join(formatted_results)
|
|
|
|
|
|
if "answer" in response and response["answer"]:
|
|
|
final_output = f"Quick Answer: {response['answer']}\n\n" + final_output
|
|
|
|
|
|
return final_output
|
|
|
|
|
|
except Exception as e:
|
|
|
return f"Web search error: {str(e)}"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ToolExecutor:
|
|
|
"""Execute tools based on LLM requests"""
|
|
|
|
|
|
def __init__(self, tools):
|
|
|
self.tools = {t.name: t for t in tools}
|
|
|
|
|
|
def detect_tool_call(self, text: str) -> Optional[tuple]:
|
|
|
"""Detect tool call in LLM response"""
|
|
|
pattern = r'USE_TOOL:\s*(\w+)\((.*?)\)'
|
|
|
match = re.search(pattern, text, re.IGNORECASE)
|
|
|
|
|
|
if match:
|
|
|
return (match.group(1), match.group(2).strip('"\''))
|
|
|
|
|
|
for tool_name in self.tools.keys():
|
|
|
if f"{tool_name}:" in text.lower():
|
|
|
pattern = rf'{tool_name}:\s*([^\n]+)'
|
|
|
match = re.search(pattern, text, re.IGNORECASE)
|
|
|
if match:
|
|
|
return (tool_name, match.group(1).strip('"\''))
|
|
|
|
|
|
return None
|
|
|
|
|
|
def execute(self, tool_name: str, arguments: str) -> str:
|
|
|
"""Execute tool"""
|
|
|
if tool_name not in self.tools:
|
|
|
return f"Error: Unknown tool '{tool_name}'"
|
|
|
|
|
|
try:
|
|
|
return self.tools[tool_name].func(arguments)
|
|
|
except Exception as e:
|
|
|
return f"Error executing {tool_name}: {str(e)}"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def detect_insufficient_answer(answer: str) -> bool:
|
|
|
"""Detect if LLM doesn't know the answer"""
|
|
|
|
|
|
insufficient_patterns = [
|
|
|
r"i don't know",
|
|
|
r"i do not know",
|
|
|
r"i don't have information",
|
|
|
r"i cannot provide",
|
|
|
r"i'm not sure",
|
|
|
r"i am not sure",
|
|
|
r"no information available",
|
|
|
r"beyond my knowledge",
|
|
|
r"i lack information",
|
|
|
r"insufficient information",
|
|
|
r"unable to answer",
|
|
|
r"cannot answer",
|
|
|
r"don't have access to",
|
|
|
r"my knowledge cutoff",
|
|
|
r"as of my last update"
|
|
|
]
|
|
|
|
|
|
answer_lower = answer.lower()
|
|
|
return any(re.search(pattern, answer_lower) for pattern in insufficient_patterns)
|
|
|
|
|
|
|
|
|
def extract_json(text: str) -> Optional[dict]:
|
|
|
"""Extract JSON from text"""
|
|
|
json_pattern = r'```(?:json)?\s*(\{.*?\})\s*```'
|
|
|
matches = re.findall(json_pattern, text, re.DOTALL)
|
|
|
if matches:
|
|
|
try:
|
|
|
return json.loads(matches[0])
|
|
|
except:
|
|
|
pass
|
|
|
|
|
|
json_pattern = r'\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}'
|
|
|
matches = re.findall(json_pattern, text, re.DOTALL)
|
|
|
for match in matches:
|
|
|
try:
|
|
|
parsed = json.loads(match)
|
|
|
if isinstance(parsed, dict) and len(parsed) > 0:
|
|
|
return parsed
|
|
|
except:
|
|
|
continue
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
def safe_parse_pydantic(text: str, model: BaseModel, fallback: dict) -> BaseModel:
|
|
|
"""Parse text into Pydantic model with fallback"""
|
|
|
json_data = extract_json(text)
|
|
|
|
|
|
if json_data:
|
|
|
try:
|
|
|
return model(**json_data)
|
|
|
except ValidationError:
|
|
|
pass
|
|
|
|
|
|
try:
|
|
|
return model.model_validate_json(text)
|
|
|
except:
|
|
|
pass
|
|
|
|
|
|
try:
|
|
|
return model(**fallback)
|
|
|
except:
|
|
|
return model(**{k: v for k, v in fallback.items() if k in model.model_fields})
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class LLMFactory:
|
|
|
@staticmethod
|
|
|
def create_llm(token: str, temperature: float = 0.3):
|
|
|
try:
|
|
|
endpoint = HuggingFaceEndpoint(
|
|
|
repo_id="meta-llama/Llama-3.1-8B-Instruct",
|
|
|
huggingfacehub_api_token=token,
|
|
|
temperature=temperature,
|
|
|
max_new_tokens=1500,
|
|
|
top_p=0.9,
|
|
|
repetition_penalty=1.1,
|
|
|
task="conversational"
|
|
|
)
|
|
|
return ChatHuggingFace(llm=endpoint)
|
|
|
except:
|
|
|
return HuggingFaceEndpoint(
|
|
|
repo_id="meta-llama/Llama-3.1-8B-Instruct",
|
|
|
huggingfacehub_api_token=token,
|
|
|
temperature=temperature,
|
|
|
max_new_tokens=1500
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ResearcherAgent:
|
|
|
"""Enhanced Researcher with automatic web search retry"""
|
|
|
|
|
|
def __init__(self, llm, tool_executor):
|
|
|
self.llm = llm
|
|
|
self.tool_executor = tool_executor
|
|
|
|
|
|
def __call__(self, state: AgentState) -> AgentState:
|
|
|
print("\nπ RESEARCHER AGENT")
|
|
|
|
|
|
question = state["question"]
|
|
|
retry_count = state.get("research_iterations", 0)
|
|
|
|
|
|
|
|
|
force_web_search = False
|
|
|
if retry_count > 0:
|
|
|
print(f" π RETRY #{retry_count} - Forcing web search")
|
|
|
force_web_search = True
|
|
|
|
|
|
|
|
|
if force_web_search:
|
|
|
|
|
|
prompt = f"""IMPORTANT: Previous answer was insufficient. Use web search to find current information.
|
|
|
|
|
|
Question: {question}
|
|
|
|
|
|
You MUST use web search for this query.
|
|
|
|
|
|
To use web search: USE_TOOL: web_search({question})
|
|
|
|
|
|
Your response:"""
|
|
|
else:
|
|
|
|
|
|
prompt = f"""You are a research assistant. Answer: {question}
|
|
|
|
|
|
Available tools:
|
|
|
1. calculator(expression) - Math operations
|
|
|
2. search_knowledge(topic) - Internal knowledge base (for general facts, not current events)
|
|
|
3. web_search(query) - Real-time web search (USE THIS for current events, recent news, 2025 info, "who won", "latest")
|
|
|
|
|
|
CRITICAL: Use web_search for:
|
|
|
- Questions with "2025", "current", "recent", "latest", "today", "who won"
|
|
|
- Elections, news, prices, events
|
|
|
- Anything that requires up-to-date information
|
|
|
|
|
|
To use tool: USE_TOOL: tool_name(arguments)
|
|
|
|
|
|
Your response:"""
|
|
|
|
|
|
try:
|
|
|
if hasattr(self.llm, 'invoke'):
|
|
|
response_obj = self.llm.invoke([HumanMessage(content=prompt)])
|
|
|
response = response_obj.content if hasattr(response_obj, 'content') else str(response_obj)
|
|
|
else:
|
|
|
response = self.llm(prompt)
|
|
|
except Exception as e:
|
|
|
print(f" β οΈ Error: {e}")
|
|
|
response = f"Error processing: {question}"
|
|
|
|
|
|
print(f" LLM: {response[:150]}...")
|
|
|
|
|
|
|
|
|
tool_call = self.tool_executor.detect_tool_call(response)
|
|
|
web_sources = []
|
|
|
needs_web_search = False
|
|
|
|
|
|
if tool_call:
|
|
|
tool_name, arguments = tool_call
|
|
|
print(f" π§ Tool: {tool_name}({arguments})")
|
|
|
|
|
|
tool_result = self.tool_executor.execute(tool_name, arguments)
|
|
|
print(f" β
Result: {tool_result[:200]}...")
|
|
|
|
|
|
|
|
|
if tool_name == "search_knowledge" and "needs web search" in tool_result.lower():
|
|
|
print(f" β οΈ Knowledge base insufficient - flagging for web search")
|
|
|
needs_web_search = True
|
|
|
|
|
|
|
|
|
if tool_name == "web_search":
|
|
|
url_pattern = r'Source: (https?://[^\s]+)'
|
|
|
urls = re.findall(url_pattern, tool_result)
|
|
|
|
|
|
title_pattern = r'\d+\.\s+([^\n]+)'
|
|
|
titles = re.findall(title_pattern, tool_result)
|
|
|
|
|
|
web_sources = [
|
|
|
{"title": titles[i] if i < len(titles) else "No title", "url": url}
|
|
|
for i, url in enumerate(urls[:3])
|
|
|
]
|
|
|
|
|
|
|
|
|
synthesis_prompt = f"""Based on this information, provide a comprehensive answer to: {question}
|
|
|
|
|
|
Tool: {tool_name}
|
|
|
Information:
|
|
|
{tool_result}
|
|
|
|
|
|
Provide clear answer:"""
|
|
|
|
|
|
try:
|
|
|
if hasattr(self.llm, 'invoke'):
|
|
|
answer_obj = self.llm.invoke([HumanMessage(content=synthesis_prompt)])
|
|
|
answer = answer_obj.content if hasattr(answer_obj, 'content') else str(answer_obj)
|
|
|
else:
|
|
|
answer = self.llm(synthesis_prompt)
|
|
|
except:
|
|
|
answer = f"From {tool_name}: {tool_result[:500]}"
|
|
|
|
|
|
sources = [tool_name]
|
|
|
confidence = 0.9 if tool_name == "web_search" else 0.85
|
|
|
else:
|
|
|
|
|
|
answer = response
|
|
|
sources = ["LLM Knowledge"]
|
|
|
confidence = 0.7
|
|
|
print(f" βΉοΈ Using LLM knowledge only")
|
|
|
|
|
|
|
|
|
if detect_insufficient_answer(answer):
|
|
|
print(f" β οΈ INSUFFICIENT ANSWER DETECTED")
|
|
|
needs_web_search = True
|
|
|
confidence = 0.3
|
|
|
|
|
|
research_output = ResearchOutput(
|
|
|
answer=answer.strip(),
|
|
|
sources_used=sources,
|
|
|
confidence=confidence,
|
|
|
web_sources=web_sources if web_sources else None,
|
|
|
needs_web_search=needs_web_search,
|
|
|
retry_count=retry_count
|
|
|
)
|
|
|
|
|
|
state["research_output"] = research_output
|
|
|
state["current_step"] = "research_complete"
|
|
|
state["research_iterations"] = retry_count +1
|
|
|
|
|
|
if needs_web_search:
|
|
|
print(f" β οΈ Flagged for web search retry")
|
|
|
else:
|
|
|
print(f" β
Answer ready ({confidence*100:.0f}% confidence)")
|
|
|
|
|
|
return state
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class CriticAgent:
|
|
|
"""Enhanced Critic using LLM for evaluation"""
|
|
|
|
|
|
def __init__(self, llm):
|
|
|
self.llm = llm
|
|
|
|
|
|
def __call__(self, state: AgentState) -> AgentState:
|
|
|
print("\nπ― CRITIC AGENT")
|
|
|
|
|
|
report = state["report_output"]
|
|
|
research = state["research_output"]
|
|
|
question = state["question"]
|
|
|
|
|
|
|
|
|
evaluation_prompt = f"""You are a quality control critic. Evaluate this research report.
|
|
|
...
|
|
|
SPECIAL INSTRUCTION: If the Research Answer includes results from 'web_search',
|
|
|
you MUST treat that information as 100% factual and current, even if it
|
|
|
contradicts your internal training data. The web search is the 'Ground Truth'.
|
|
|
...
|
|
|
|
|
|
ORIGINAL QUESTION: {question}
|
|
|
|
|
|
RESEARCH ANSWER: {research.answer}
|
|
|
|
|
|
REPORT CONTENT: {report.content}
|
|
|
|
|
|
SOURCES USED: {', '.join(research.sources_used)}
|
|
|
|
|
|
Evaluate the report on these criteria:
|
|
|
1. Does it actually answer the question?
|
|
|
2. Is the answer based on facts or is it saying "I don't know"?
|
|
|
3. Does it have proper sources/citations?
|
|
|
4. Is it complete and well-structured?
|
|
|
5. If the question asks about current events (2024, "who won", etc.), did it use web search?
|
|
|
|
|
|
Provide evaluation in JSON format:
|
|
|
{{
|
|
|
"score": <number 0-10>,
|
|
|
"needs_revision": <true/false - true if score < 8>,
|
|
|
"needs_research_retry": <true/false - true if answer is "I don't know" or lacks current info>,
|
|
|
"feedback": "<specific issues found>",
|
|
|
"reasoning": "<why you gave this score>"
|
|
|
}}
|
|
|
|
|
|
Evaluation:"""
|
|
|
|
|
|
try:
|
|
|
if hasattr(self.llm, 'invoke'):
|
|
|
response_obj = self.llm.invoke([HumanMessage(content=evaluation_prompt)])
|
|
|
response = response_obj.content if hasattr(response_obj, 'content') else str(response_obj)
|
|
|
else:
|
|
|
response = self.llm(evaluation_prompt)
|
|
|
except Exception as e:
|
|
|
print(f" β οΈ LLM evaluation failed: {e}")
|
|
|
|
|
|
response = self._fallback_evaluation(report, research, question)
|
|
|
|
|
|
print(f" LLM Evaluation: {response[:200]}...")
|
|
|
|
|
|
|
|
|
fallback = {
|
|
|
"score": 5.0,
|
|
|
"needs_revision": True,
|
|
|
"needs_research_retry": research.needs_web_search,
|
|
|
"feedback": "Evaluation failed",
|
|
|
"reasoning": "Could not evaluate properly"
|
|
|
}
|
|
|
|
|
|
critique_output = safe_parse_pydantic(response, CritiqueOutput, fallback)
|
|
|
|
|
|
|
|
|
if research.needs_web_search and research.retry_count < 2:
|
|
|
critique_output.needs_research_retry = True
|
|
|
critique_output.feedback = "Answer insufficient - needs web search"
|
|
|
print(f" π Research retry needed")
|
|
|
|
|
|
|
|
|
if state["research_iterations"] >= 2:
|
|
|
critique_output.needs_research_retry = False
|
|
|
print(f" β οΈ Max research retries reached")
|
|
|
|
|
|
if state["report_iterations"] >= state["max_iterations"]:
|
|
|
critique_output.needs_revision = False
|
|
|
print(f" β οΈ Max report revisions reached")
|
|
|
|
|
|
state["critique_output"] = critique_output
|
|
|
state["current_step"] = "critique_complete"
|
|
|
|
|
|
print(f" β
Score: {critique_output.score:.1f}/10")
|
|
|
print(f" π Feedback: {critique_output.feedback[:100]}")
|
|
|
|
|
|
return state
|
|
|
|
|
|
def _fallback_evaluation(self, report, research, question):
|
|
|
"""Fallback heuristic evaluation if LLM fails"""
|
|
|
|
|
|
score = 5.0
|
|
|
feedback = []
|
|
|
|
|
|
|
|
|
if detect_insufficient_answer(research.answer):
|
|
|
score = 3.0
|
|
|
feedback.append("Answer is insufficient or says 'I don't know'")
|
|
|
else:
|
|
|
score = 7.0
|
|
|
|
|
|
|
|
|
if research.web_sources:
|
|
|
score += 1.0
|
|
|
|
|
|
|
|
|
if len(report.content) > 200:
|
|
|
score += 0.5
|
|
|
|
|
|
score = min(10.0, max(0.0, score))
|
|
|
|
|
|
needs_retry = detect_insufficient_answer(research.answer) or research.needs_web_search
|
|
|
|
|
|
return json.dumps({
|
|
|
"score": score,
|
|
|
"needs_revision": score < 8.0,
|
|
|
"needs_research_retry": needs_retry,
|
|
|
"feedback": " | ".join(feedback) if feedback else "Heuristic evaluation",
|
|
|
"reasoning": "Fallback evaluation used"
|
|
|
})
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class AnalystAgent:
|
|
|
def __init__(self, llm):
|
|
|
self.llm = llm
|
|
|
|
|
|
def __call__(self, state: AgentState) -> AgentState:
|
|
|
print("\nπ ANALYST AGENT")
|
|
|
|
|
|
research = state["research_output"]
|
|
|
|
|
|
prompt = f"""Extract key insights from this research.
|
|
|
|
|
|
Question: {state['question']}
|
|
|
Answer: {research.answer}
|
|
|
|
|
|
Provide analysis in JSON:
|
|
|
{{
|
|
|
"key_points": ["insight 1", "insight 2", "insight 3"],
|
|
|
"implications": "why this matters"
|
|
|
}}"""
|
|
|
|
|
|
try:
|
|
|
if hasattr(self.llm, 'invoke'):
|
|
|
response_obj = self.llm.invoke([HumanMessage(content=prompt)])
|
|
|
response = response_obj.content if hasattr(response_obj, 'content') else str(response_obj)
|
|
|
else:
|
|
|
response = self.llm(prompt)
|
|
|
except:
|
|
|
response = '{}'
|
|
|
|
|
|
fallback = {
|
|
|
"key_points": [research.answer[:100]],
|
|
|
"implications": "Research findings provided"
|
|
|
}
|
|
|
|
|
|
analysis_output = safe_parse_pydantic(response, AnalysisOutput, fallback)
|
|
|
state["analysis_output"] = analysis_output
|
|
|
state["current_step"] = "analysis_complete"
|
|
|
print(f" β
{len(analysis_output.key_points)} insights extracted")
|
|
|
|
|
|
return state
|
|
|
|
|
|
|
|
|
class WriterAgent:
|
|
|
def __init__(self, llm):
|
|
|
self.llm = llm
|
|
|
|
|
|
def __call__(self, state: AgentState) -> AgentState:
|
|
|
print(f"\nβοΈ WRITER AGENT (Iteration {state['report_iterations'] + 1})")
|
|
|
|
|
|
research = state["research_output"]
|
|
|
analysis = state["analysis_output"]
|
|
|
|
|
|
sources_text = ""
|
|
|
if research.web_sources:
|
|
|
sources_text = "\n\nWeb Sources:\n" + "\n".join(
|
|
|
f"- {s['title']}: {s['url']}" for s in research.web_sources
|
|
|
)
|
|
|
|
|
|
prompt = f"""Create professional research report.
|
|
|
|
|
|
Question: {state['question']}
|
|
|
Answer: {research.answer}
|
|
|
Insights: {', '.join(analysis.key_points)}
|
|
|
Sources: {', '.join(research.sources_used)}{sources_text}
|
|
|
|
|
|
JSON format:
|
|
|
{{
|
|
|
"title": "clear title",
|
|
|
"content": "executive summary + findings + insights + implications + sources"
|
|
|
}}"""
|
|
|
|
|
|
try:
|
|
|
if hasattr(self.llm, 'invoke'):
|
|
|
response_obj = self.llm.invoke([HumanMessage(content=prompt)])
|
|
|
response = response_obj.content if hasattr(response_obj, 'content') else str(response_obj)
|
|
|
else:
|
|
|
response = self.llm(prompt)
|
|
|
except:
|
|
|
response = ""
|
|
|
|
|
|
fallback_content = f"""# {state['question']}
|
|
|
|
|
|
## Answer
|
|
|
{research.answer}
|
|
|
|
|
|
## Key Insights
|
|
|
{chr(10).join(f'β’ {p}' for p in analysis.key_points)}
|
|
|
|
|
|
## Implications
|
|
|
{analysis.implications}
|
|
|
|
|
|
## Sources
|
|
|
{', '.join(research.sources_used)}"""
|
|
|
|
|
|
if research.web_sources:
|
|
|
fallback_content += "\n\n## References\n" + "\n".join(
|
|
|
f"β’ [{s['title']}]({s['url']})" for s in research.web_sources
|
|
|
)
|
|
|
|
|
|
fallback = {"title": state['question'], "content": fallback_content}
|
|
|
|
|
|
report_output = safe_parse_pydantic(response, ReportOutput, fallback)
|
|
|
state["report_output"] = report_output
|
|
|
state["report_iterations"] += 1
|
|
|
state["current_step"] = "report_complete"
|
|
|
print(f" β
Report: {len(report_output.content)} chars")
|
|
|
|
|
|
return state
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def route_critique(state: AgentState) -> Literal["retry_research", "revise", "finish"]:
|
|
|
"""Enhanced routing with research retry"""
|
|
|
critique = state["critique_output"]
|
|
|
|
|
|
|
|
|
if critique.needs_research_retry:
|
|
|
print(f"\nπ ROUTING: Retry research with web search")
|
|
|
return "retry_research"
|
|
|
|
|
|
|
|
|
if critique.needs_revision:
|
|
|
print(f"\nπ ROUTING: Revise report (Score: {critique.score:.1f}/10)")
|
|
|
return "revise"
|
|
|
|
|
|
|
|
|
print(f"\nβ
ROUTING: Approve (Score: {critique.score:.1f}/10)")
|
|
|
return "finish"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class MultiAgentSystem:
|
|
|
"""Enhanced Multi-Agent System with Intelligent Retry"""
|
|
|
|
|
|
def __init__(self, hf_token: str, tavily_key: str, max_iterations: int = 2):
|
|
|
Config.HF_TOKEN = hf_token
|
|
|
Config.TAVILY_API_KEY = tavily_key
|
|
|
self.max_iterations = max_iterations
|
|
|
|
|
|
print("\n" + "="*70)
|
|
|
print("π€ ENHANCED AGENTIC AI SYSTEM V3")
|
|
|
print("="*70)
|
|
|
print("NEW: Intelligent retry with web search")
|
|
|
print("NEW: LLM-based critic evaluation")
|
|
|
print("="*70)
|
|
|
|
|
|
|
|
|
tools = [calculator, search_knowledge, web_search]
|
|
|
self.tool_executor = ToolExecutor(tools)
|
|
|
print(f"π οΈ Tools: {[t.name for t in tools]}")
|
|
|
|
|
|
|
|
|
print("π‘ Initializing LLM...")
|
|
|
self.llm = LLMFactory.create_llm(hf_token)
|
|
|
|
|
|
|
|
|
print("π€ Creating agents...")
|
|
|
self.researcher = ResearcherAgent(self.llm, self.tool_executor)
|
|
|
self.analyst = AnalystAgent(self.llm)
|
|
|
self.writer = WriterAgent(self.llm)
|
|
|
self.critic = CriticAgent(self.llm)
|
|
|
|
|
|
|
|
|
self.graph = self._build_graph()
|
|
|
|
|
|
print("\nβ
System Ready with Enhanced Features!")
|
|
|
|
|
|
def _build_graph(self):
|
|
|
workflow = StateGraph(AgentState)
|
|
|
|
|
|
workflow.add_node("researcher", self.researcher)
|
|
|
workflow.add_node("analyst", self.analyst)
|
|
|
workflow.add_node("writer", self.writer)
|
|
|
workflow.add_node("critic", self.critic)
|
|
|
|
|
|
workflow.set_entry_point("researcher")
|
|
|
workflow.add_edge("researcher", "analyst")
|
|
|
workflow.add_edge("analyst", "writer")
|
|
|
workflow.add_edge("writer", "critic")
|
|
|
|
|
|
|
|
|
workflow.add_conditional_edges(
|
|
|
"critic",
|
|
|
route_critique,
|
|
|
{
|
|
|
"retry_research": "researcher",
|
|
|
"revise": "writer",
|
|
|
"finish": END
|
|
|
})
|
|
|
return workflow.compile()
|
|
|
|
|
|
def research(self, question: str) -> dict:
|
|
|
print("="*70)
|
|
|
print(f"π RESEARCH QUESTION: {question}")
|
|
|
print("="*70)
|
|
|
|
|
|
initial_state = AgentState(
|
|
|
question=question,
|
|
|
research_output=None,
|
|
|
analysis_output=None,
|
|
|
report_output=None,
|
|
|
critique_output=None,
|
|
|
report_iterations=0,
|
|
|
research_iterations=0,
|
|
|
max_iterations=self.max_iterations,
|
|
|
current_step="start"
|
|
|
)
|
|
|
|
|
|
try:
|
|
|
final_state = self.graph.invoke(initial_state)
|
|
|
|
|
|
print("\n" + "="*70)
|
|
|
print("β
RESEARCH COMPLETE")
|
|
|
print("="*70)
|
|
|
|
|
|
if final_state.get("critique_output"):
|
|
|
critique = final_state["critique_output"]
|
|
|
print(f"Final Score: {critique.score:.1f}/10")
|
|
|
print(f"Research Retries: {final_state.get('research_iterations', 0)}")
|
|
|
print(f"Report Revisions: {final_state['report_iterations']}")
|
|
|
|
|
|
return final_state
|
|
|
except Exception as e:
|
|
|
print(f"\nβ Error: {e}")
|
|
|
import traceback
|
|
|
traceback.print_exc()
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def cli_demo():
|
|
|
print("""
|
|
|
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
|
|
|
β ENHANCED AGENTIC AI SYSTEM β
|
|
|
β WITH LANGGRAPH AND TAVILY- AI SEARCH β
|
|
|
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
|
|
|
|
|
|
""")
|
|
|
|
|
|
hf_token = input("Hugging Face Token: ").strip()
|
|
|
tavily_key = input("Tavily API Key: ").strip()
|
|
|
|
|
|
if not hf_token or not tavily_key:
|
|
|
print("β Both tokens required!")
|
|
|
return
|
|
|
|
|
|
try:
|
|
|
system = MultiAgentSystem(hf_token, tavily_key, max_iterations=2)
|
|
|
except Exception as e:
|
|
|
print(f"β Init failed: {e}")
|
|
|
return
|
|
|
|
|
|
print("\nπ‘ Try these queries to test retry logic:")
|
|
|
print(" β’ who won 2024 elections (will retry with web search)")
|
|
|
print(" β’ latest AI news December 2024 (uses web search first)")
|
|
|
print(" β’ explain machine learning (uses knowledge base)")
|
|
|
print(" β’ what is 25*4+10 (uses calculator)")
|
|
|
|
|
|
while True:
|
|
|
print("\n" + "="*70)
|
|
|
question = input("\nπ€ Your question (or 'quit'): ").strip()
|
|
|
|
|
|
if question.lower() in ['quit', 'exit', 'q']:
|
|
|
print("\nπ Goodbye!")
|
|
|
break
|
|
|
|
|
|
if not question:
|
|
|
continue
|
|
|
|
|
|
final_state = system.research(question)
|
|
|
|
|
|
if final_state and final_state.get("report_output"):
|
|
|
print("\n" + "="*70)
|
|
|
print("π RESEARCH REPORT")
|
|
|
print("="*70)
|
|
|
|
|
|
report = final_state["report_output"]
|
|
|
print(f"\nπ {report.title}\n")
|
|
|
print(report.content)
|
|
|
|
|
|
if final_state.get("research_output"):
|
|
|
research = final_state["research_output"]
|
|
|
print("\n" + "-"*70)
|
|
|
print("π METADATA")
|
|
|
print("-"*70)
|
|
|
print(f"Sources: {', '.join(research.sources_used)}")
|
|
|
print(f"Confidence: {research.confidence*100:.0f}%")
|
|
|
print(f"Research Retries: {research.retry_count}")
|
|
|
|
|
|
if research.web_sources:
|
|
|
print(f"\nπ Web References:")
|
|
|
for i, source in enumerate(research.web_sources, 1):
|
|
|
print(f" {i}. {source['title']}")
|
|
|
print(f" {source['url']}")
|
|
|
|
|
|
critique = final_state["critique_output"]
|
|
|
print(f"\nπ― Quality Score: {critique.score:.1f}/10")
|
|
|
print(f"π Feedback: {critique.feedback}")
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
cli_demo() |