perplexity-clone / rag /agents.py
Naveen-2007's picture
Fix model, improve error handling, add startup diagnostics for all modes
d76cab0
"""
Production-Level Agents for Perplexity Clone
=============================================
Each agent is a node in LangGraph pipelines.
Agents handle specific tasks and pass state to next nodes.
"""
from typing import List, Dict, Any
from rag.rag_state import (
RAGState,
WebSearchState,
RAGOnlyState,
AgenticState,
AnalysisState,
SummarizeState
)
from config.config import Config
from config.system_prompt import PPLX_SYSTEM_PROMPT
from vectorstore.store import VectorStore
from tools.search_tool import SearchTool
from tools.browse_tool import BrowseTool
from tools.reranker_tool import Reranker
from tools.citation_tool import CitationTool
from tools.summarizer_tool import SummarizerTool
from tools.followup_tool import FollowUpGenerator
# =============================================================================
# DEEP RESEARCH AGENTS (Original)
# =============================================================================
class PlannerAgent:
"""Decomposes question into sub-questions."""
def __init__(self) -> None:
self.llm = Config.get_llm()
def plan(self, state: RAGState) -> RAGState:
prompt = (
"Break the following question into 3-5 clear sub-questions.\n"
"Return them as a numbered list.\n\n"
f"{state['question']}"
)
resp = self.llm.invoke([
{"role": "system", "content": PPLX_SYSTEM_PROMPT},
{"role": "user", "content": prompt},
])
lines = [l.strip("-β€’ ").strip() for l in resp.content.splitlines() if l.strip()]
subqs: List[str] = []
for l in lines:
if l[0].isdigit() or len(lines) <= 5:
subqs.append(l)
state["sub_questions"] = subqs[:5]
return state
class ResearchAgent:
"""Collects evidence from local RAG + web search with Tavily fallback."""
def __init__(self, vector_store: VectorStore) -> None:
self.vs = vector_store
try:
self.search_tool = SearchTool()
except:
self.search_tool = None
self.browse_tool = BrowseTool()
self.reranker = Reranker()
def research(self, state: RAGState) -> RAGState:
evidence: List[str] = []
pages_all: List[Dict] = []
for sq in state.get("sub_questions", []):
# Local RAG
docs = self.vs.retrieve(sq, k=8)
docs = self.reranker.rerank(sq, docs, top_k=4)
evidence.extend(d.page_content for d in docs)
# Web search + browse with Tavily fallback
if self.search_tool:
results = self.search_tool.search(sq, num_results=4)
else:
results = []
# Get Tavily AI answer
if results and results[0].get("tavily_answer"):
evidence.append(f"[AI Summary]: {results[0]['tavily_answer']}")
for r in results:
url = r.get("url")
title = r.get("title", "Web result")
tavily_content = r.get("content", "")
if not url:
continue
# Try fetch, fallback to Tavily content
try:
fetched = self.browse_tool.fetch_clean(url)
content = fetched if fetched else tavily_content
except:
content = tavily_content
if content or tavily_content:
final_content = content if content else tavily_content
pages_all.append({"title": title, "url": url, "content": final_content})
evidence.append(final_content[:1500])
state["web_pages"] = pages_all
state["evidence"] = evidence
print(f" πŸ“š ResearchAgent: Collected {len(evidence)} evidence pieces, {len(pages_all)} pages")
return state
class AggregatorAgent:
"""Writes draft answers per sub-question from evidence."""
def __init__(self) -> None:
self.llm = Config.get_llm()
def aggregate(self, state: RAGState) -> RAGState:
drafts: List[str] = []
for sq in state.get("sub_questions", []):
context = "\n\n".join(state.get("evidence", [])[:12])
prompt = (
"Using the evidence below, answer the sub-question briefly and clearly.\n\n"
f"Evidence:\n{context}\n\nSub-question: {sq}"
)
resp = self.llm.invoke([
{"role": "system", "content": PPLX_SYSTEM_PROMPT},
{"role": "user", "content": prompt},
])
drafts.append(f"Sub-question: {sq}\n{resp.content}")
state["draft_answers"] = drafts
return state
class WriterAgent:
"""Writes final structured deep-research report."""
def __init__(self) -> None:
self.llm = Config.get_llm()
def write(self, state: RAGState) -> RAGState:
findings = "\n\n".join(state.get("draft_answers", []))
prompt = (
"You are Perplexity in deep research mode.\n"
"Write a structured answer with sections: Overview, Key Points, Details, Conclusion.\n"
"Use inline citations like [1], [2] where appropriate.\n\n"
f"Original question:\n{state['question']}\n\nFindings:\n{findings}"
)
resp = self.llm.invoke([
{"role": "system", "content": PPLX_SYSTEM_PROMPT},
{"role": "user", "content": prompt},
])
state["final_answer"] = resp.content
return state
class ValidatorAgent:
"""Builds source list & (optionally) validates citations."""
def __init__(self) -> None:
self.citation_tool = CitationTool()
def validate_and_attach(self, state: RAGState) -> RAGState:
sources: List[Dict] = []
for p in state.get("web_pages", [])[:10]:
sources.append({"title": p["title"], "url": p["url"]})
used_sources = self.citation_tool.attach_sources(state.get("final_answer", ""), sources)
state["sources"] = used_sources
return state
# =============================================================================
# WEB SEARCH AGENTS
# =============================================================================
class WebSearchNode:
"""Node 1: Execute web search query."""
def __init__(self):
try:
self.search_tool = SearchTool()
except Exception as e:
print(f"⚠️ WebSearchNode: SearchTool init failed: {e}")
self.search_tool = None
def search(self, state: WebSearchState) -> WebSearchState:
query = state.get("query", "")
print(f" πŸ” WebSearchNode: Searching for '{query[:50]}...'")
if not self.search_tool:
print(" ❌ WebSearchNode: No search tool available")
state["search_results"] = []
return state
try:
results = self.search_tool.search(query, num_results=6)
state["search_results"] = results
print(f" βœ… WebSearchNode: Got {len(results)} results")
except Exception as e:
print(f" ❌ WebSearchNode error: {e}")
state["search_results"] = []
return state
class WebFetchNode:
"""Node 2: Fetch and parse web pages. Uses Tavily content as fallback."""
def __init__(self):
self.browse_tool = BrowseTool()
def fetch(self, state: WebSearchState) -> WebSearchState:
pages = []
links = []
# Check if we have Tavily's direct answer
tavily_answer = ""
search_results = state.get("search_results", [])
if search_results and search_results[0].get("tavily_answer"):
tavily_answer = search_results[0]["tavily_answer"]
for r in search_results:
url = r.get("url")
title = r.get("title", "")
if not url:
continue
# First try to use Tavily's content (snippet)
tavily_content = r.get("content", "")
# Then try to fetch full page
try:
fetched_content = self.browse_tool.fetch_clean(url)
content = fetched_content if fetched_content else tavily_content
except:
content = tavily_content
# Use whatever content we have
if content or tavily_content:
final_content = content if content else tavily_content
pages.append({
"title": title,
"url": url,
"content": final_content[:2500]
})
links.append({
"title": title,
"url": url,
"snippet": (final_content[:200] if final_content else tavily_content[:200])
})
# Add Tavily's answer to state for potential use
if tavily_answer:
state["tavily_answer"] = tavily_answer
print(f" πŸ“„ WebFetchNode: Fetched {len(pages)} pages")
state["web_pages"] = pages
state["links"] = links
return state
class WebContextNode:
"""Node 3: Build context from fetched pages."""
def build_context(self, state: WebSearchState) -> WebSearchState:
pages = state.get("web_pages", [])
tavily_answer = state.get("tavily_answer", "")
context_parts = []
# Add Tavily's AI summary first if available
if tavily_answer:
context_parts.append(f"[AI Summary]: {tavily_answer}")
if pages:
for i, p in enumerate(pages):
context_parts.append(f"[{i+1}] {p['title']}:\n{p['content']}")
state["context"] = "\n\n---\n\n".join(context_parts) if context_parts else ""
print(f" πŸ“ WebContextNode: Built context from {len(pages)} sources" + (", with AI summary" if tavily_answer else ""))
return state
class WebAnswerNode:
"""Node 4: Generate answer from context."""
def __init__(self):
self.llm = Config.get_llm()
self.followup = FollowUpGenerator()
def answer(self, state: WebSearchState) -> WebSearchState:
query = state.get("query", "")
context = state.get("context", "")
if context:
prompt = f"""You are Perplexity AI - a web search assistant that provides accurate, real-time information.
Use the following web sources to answer. Cite sources using [1], [2], etc.
IMPORTANT: The sources contain REAL, CURRENT information. Trust and use this data.
WEB SOURCES:
{context}
USER QUESTION: {query}
Provide a comprehensive, accurate, well-cited answer based on the sources above:"""
else:
prompt = f"Answer this question: {query}"
resp = self.llm.invoke([
{"role": "system", "content": PPLX_SYSTEM_PROMPT},
{"role": "user", "content": prompt}
])
answer = resp.content
state["answer"] = answer
state["followups"] = self.followup.generate(answer, query)
# Build sources
sources = [{"title": p["title"], "url": p["url"]} for p in state.get("web_pages", [])]
state["sources"] = sources
print(f" βœ… WebAnswerNode: Generated answer with {len(sources)} sources")
return state
# =============================================================================
# RAG-ONLY AGENTS
# =============================================================================
class RAGRetrieveNode:
"""Node 1: Retrieve from uploaded documents."""
def __init__(self, file_manager):
self.file_manager = file_manager
self.reranker = Reranker()
def retrieve(self, state: RAGOnlyState) -> RAGOnlyState:
query = state.get("query", "")
ws_id = state.get("workspace_id", "default")
ws = self.file_manager.get_workspace(ws_id)
if not ws.initialized or not ws.files:
state["file_chunks"] = []
print(f" πŸ“ RAGRetrieveNode: No files in workspace")
return state
try:
chunks = ws.retrieve(query, k=8)
# Convert to dicts for state
state["file_chunks"] = [
{"content": c.page_content, "source": c.metadata.get("source", "Document")}
for c in chunks
]
print(f" πŸ“ RAGRetrieveNode: Retrieved {len(chunks)} chunks")
except Exception as e:
print(f" ❌ RAGRetrieveNode error: {e}")
state["file_chunks"] = []
return state
class RAGContextNode:
"""Node 2: Build context from retrieved chunks."""
def build_context(self, state: RAGOnlyState) -> RAGOnlyState:
chunks = state.get("file_chunks", [])
if chunks:
context_parts = []
for i, c in enumerate(chunks):
context_parts.append(f"[DOC {i+1}] {c['source']}:\n{c['content']}")
state["context"] = "\n\n---\n\n".join(context_parts)
else:
state["context"] = ""
print(f" πŸ“ RAGContextNode: Built context from {len(chunks)} chunks")
return state
class RAGAnswerNode:
"""Node 3: Generate answer from document context."""
def __init__(self):
self.llm = Config.get_llm()
self.followup = FollowUpGenerator()
def answer(self, state: RAGOnlyState) -> RAGOnlyState:
query = state.get("query", "")
context = state.get("context", "")
chunks = state.get("file_chunks", [])
if not context:
state["answer"] = "πŸ“š No documents found. Please upload files first using the πŸ“Ž button."
state["sources"] = []
state["followups"] = []
return state
prompt = f"""You are a document analysis assistant.
Answer ONLY based on the provided documents. Do NOT use external knowledge.
DOCUMENTS:
{context}
QUESTION: {query}
Instructions:
- Answer based ONLY on document content
- Say "According to your documents..." when citing
- Quote relevant parts when helpful
- If info is not in documents, say so
ANSWER:"""
resp = self.llm.invoke([
{"role": "system", "content": PPLX_SYSTEM_PROMPT},
{"role": "user", "content": prompt}
])
answer = resp.content
state["answer"] = answer
state["followups"] = self.followup.generate(answer, query)
# Build sources from chunks
seen = set()
sources = []
for c in chunks:
src = c.get("source", "Document")
if src not in seen:
sources.append({"title": f"πŸ“„ {src}", "url": ""})
seen.add(src)
state["sources"] = sources
print(f" βœ… RAGAnswerNode: Generated answer from {len(sources)} sources")
return state
# =============================================================================
# AGENTIC RAG AGENTS (Multi-Agent Pipeline)
# =============================================================================
class AgenticPlannerNode:
"""Node 1: Planner agent decides which sub-agents to activate."""
def plan(self, state: AgenticState) -> AgenticState:
query = state.get("query", "").lower()
# Determine which agents to use
state["use_file"] = any(w in query for w in [
"document", "file", "pdf", "uploaded", "summarize my",
"according to", "in the file", "extract", "my notes"
])
state["use_web"] = any(w in query for w in [
"today", "current", "latest", "news", "weather", "stock",
"who is", "what is", "where", "when", "price", "live",
"recent", "update"
]) or len(query.split()) <= 4
state["use_images"] = any(w in query for w in [
"image", "photo", "picture", "logo", "show me", "look like",
"flag", "screenshot"
])
state["use_knowledge"] = any(w in query for w in [
"explain", "define", "concept", "theory", "how does",
"what is", "meaning of"
])
print(f" πŸ“‹ AgenticPlannerNode: file={state['use_file']}, web={state['use_web']}, images={state['use_images']}")
return state
class AgenticFileNode:
"""Node 2: File agent retrieves from uploaded documents."""
def __init__(self, file_manager):
self.file_manager = file_manager
def retrieve(self, state: AgenticState) -> AgenticState:
if not state.get("use_file", False):
state["file_context"] = ""
state["file_sources"] = []
return state
query = state.get("query", "")
ws_id = state.get("workspace_id", "default")
ws = self.file_manager.get_workspace(ws_id)
if not ws.initialized:
state["file_context"] = ""
state["file_sources"] = []
return state
try:
chunks = ws.retrieve(query, k=6)
if chunks:
state["file_context"] = "\n\n".join([c.page_content for c in chunks])
state["file_sources"] = [
{"title": f"πŸ“„ {c.metadata.get('source', 'Document')}", "url": ""}
for c in chunks
]
print(f" πŸ“ AgenticFileNode: Found {len(chunks)} chunks")
else:
state["file_context"] = ""
state["file_sources"] = []
except Exception as e:
print(f" ❌ AgenticFileNode error: {e}")
state["file_context"] = ""
state["file_sources"] = []
return state
class AgenticWebNode:
"""Node 3: Web agent fetches real-time information."""
def __init__(self):
try:
self.search_tool = SearchTool()
except:
self.search_tool = None
self.browse_tool = BrowseTool()
def search(self, state: AgenticState) -> AgenticState:
if not state.get("use_web", False):
state["web_context"] = ""
state["web_sources"] = []
state["links"] = []
return state
query = state.get("query", "")
if not self.search_tool:
print(" ❌ AgenticWebNode: No search tool")
state["web_context"] = ""
state["web_sources"] = []
state["links"] = []
return state
try:
results = self.search_tool.search(query, num_results=5)
web_parts = []
sources = []
links = []
# Get Tavily's AI answer if available
tavily_answer = ""
if results and results[0].get("tavily_answer"):
tavily_answer = results[0]["tavily_answer"]
web_parts.append(f"[AI Summary]: {tavily_answer}")
for r in results:
url = r.get("url")
title = r.get("title", "")
tavily_content = r.get("content", "") # Tavily's snippet
if not url:
continue
# Try to fetch full content, fallback to Tavily snippet
try:
fetched = self.browse_tool.fetch_clean(url)
content = fetched if fetched else tavily_content
except:
content = tavily_content
if content or tavily_content:
final_content = content if content else tavily_content
web_parts.append(f"[{title}]: {final_content[:1500]}")
sources.append({"title": title, "url": url})
links.append({"title": title, "url": url, "snippet": final_content[:150]})
state["web_context"] = "\n\n".join(web_parts)
state["web_sources"] = sources
state["links"] = links
print(f" 🌐 AgenticWebNode: Found {len(sources)} sources")
except Exception as e:
print(f" ❌ AgenticWebNode error: {e}")
state["web_context"] = ""
state["web_sources"] = []
state["links"] = []
return state
class AgenticKnowledgeNode:
"""Node 4: Knowledge agent retrieves from base vector store."""
def __init__(self, vector_store: VectorStore):
self.vs = vector_store
self.reranker = Reranker()
def retrieve(self, state: AgenticState) -> AgenticState:
if not state.get("use_knowledge", False):
state["knowledge_context"] = ""
return state
query = state.get("query", "")
try:
chunks = self.vs.retrieve(query, k=4)
chunks = self.reranker.rerank(query, chunks, top_k=3)
if chunks:
state["knowledge_context"] = "\n\n".join([c.page_content for c in chunks])
print(f" πŸ“š AgenticKnowledgeNode: Found {len(chunks)} chunks")
else:
state["knowledge_context"] = ""
except Exception as e:
print(f" ❌ AgenticKnowledgeNode error: {e}")
state["knowledge_context"] = ""
return state
class AgenticImageNode:
"""Node 5: Image agent fetches relevant images."""
def __init__(self, image_search):
self.image_search = image_search
def search(self, state: AgenticState) -> AgenticState:
if not state.get("use_images", False):
state["images"] = []
return state
query = state.get("query", "")
try:
images = self.image_search.search(query, count=6)
state["images"] = images
print(f" πŸ–ΌοΈ AgenticImageNode: Found {len(images)} images")
except Exception as e:
print(f" ❌ AgenticImageNode error: {e}")
state["images"] = []
return state
class AgenticSynthesizerNode:
"""Node 6: Synthesizer agent combines all contexts and generates final answer."""
def __init__(self):
self.llm = Config.get_llm()
self.followup = FollowUpGenerator()
def synthesize(self, state: AgenticState) -> AgenticState:
query = state.get("query", "")
# Build combined context
contexts = []
if state.get("file_context"):
contexts.append(f"πŸ“„ FROM YOUR DOCUMENTS:\n{state['file_context'][:2500]}")
if state.get("web_context"):
contexts.append(f"🌐 FROM THE WEB:\n{state['web_context'][:2500]}")
if state.get("knowledge_context"):
contexts.append(f"πŸ“š KNOWLEDGE BASE:\n{state['knowledge_context'][:1500]}")
if not contexts:
contexts.append("No specific context found. Using general knowledge.")
combined = "\n\n---\n\n".join(contexts)
state["combined_context"] = combined
prompt = f"""You are an AGENTIC AI assistant that synthesizes information from multiple sources.
AVAILABLE CONTEXT:
{combined}
USER QUESTION: {query}
INSTRUCTIONS:
1. Prioritize user's documents (πŸ“„) if relevant
2. Add real-time info from web (🌐) when available
3. Use knowledge base (πŸ“š) for background
4. Cite sources appropriately
5. Be comprehensive but concise
SYNTHESIZED ANSWER:"""
resp = self.llm.invoke([
{"role": "system", "content": PPLX_SYSTEM_PROMPT},
{"role": "user", "content": prompt}
])
answer = resp.content
state["answer"] = answer
state["followups"] = self.followup.generate(answer, query)
# Combine sources
all_sources = state.get("file_sources", []) + state.get("web_sources", [])
state["sources"] = all_sources
print(f" βœ… AgenticSynthesizerNode: Generated answer with {len(all_sources)} sources")
return state
# =============================================================================
# ANALYSIS AGENTS
# =============================================================================
class AnalysisSearchNode:
"""Node 1: Search for analysis data with Tavily fallback."""
def __init__(self):
try:
self.search_tool = SearchTool()
except:
self.search_tool = None
self.browse_tool = BrowseTool()
def search(self, state: AnalysisState) -> AnalysisState:
query = state.get("query", "")
print(f" πŸ” AnalysisSearchNode: Searching for analysis data")
if not self.search_tool:
print(" ❌ AnalysisSearchNode: No search tool")
state["web_context"] = ""
state["links"] = []
state["sources"] = []
return state
try:
results = self.search_tool.search(query, num_results=6)
state["web_results"] = results
# Get Tavily AI answer if available
tavily_answer = ""
if results and results[0].get("tavily_answer"):
tavily_answer = results[0]["tavily_answer"]
# Fetch content with Tavily fallback
web_parts = []
if tavily_answer:
web_parts.append(f"[AI Summary]: {tavily_answer}")
links = []
for r in results:
url = r.get("url")
title = r.get("title", "")
tavily_content = r.get("content", "")
if not url:
continue
# Try fetch, fallback to Tavily content
try:
fetched = self.browse_tool.fetch_clean(url)
content = fetched if fetched else tavily_content
except:
content = tavily_content
if content or tavily_content:
final_content = content if content else tavily_content
web_parts.append(f"[{title}]:\n{final_content[:2000]}")
links.append({"title": title, "url": url, "snippet": final_content[:200]})
state["web_context"] = "\n\n".join(web_parts)
state["links"] = links
state["sources"] = [{"title": l["title"], "url": l["url"]} for l in links]
print(f" πŸ“Š AnalysisSearchNode: Found {len(links)} sources")
except Exception as e:
print(f" ❌ AnalysisSearchNode error: {e}")
state["web_context"] = ""
state["links"] = []
state["sources"] = []
return state
class AnalysisProcessNode:
"""Node 2: Generate structured analysis."""
def __init__(self):
self.llm = Config.get_llm()
self.followup = FollowUpGenerator()
def analyze(self, state: AnalysisState) -> AnalysisState:
query = state.get("query", "")
context = state.get("web_context", "")
prompt = f"""You are an expert analyst. Provide deep, comprehensive analysis.
RESEARCH DATA:
{context if context else "No external data available."}
ANALYSIS REQUEST: {query}
Provide structured analysis with:
## Executive Summary
(2-3 sentence overview)
## Key Findings
(Bullet points of main discoveries)
## Detailed Analysis
(In-depth examination with evidence)
## Data & Statistics
(Numbers, trends, comparisons if available)
## Conclusions
(Main takeaways)
## Recommendations
(Actionable suggestions)
Use citations [1], [2] when referencing sources.
ANALYSIS:"""
resp = self.llm.invoke([
{"role": "system", "content": PPLX_SYSTEM_PROMPT},
{"role": "user", "content": prompt}
])
answer = resp.content
state["answer"] = answer
state["followups"] = self.followup.generate(answer, query)
print(f" βœ… AnalysisProcessNode: Generated analysis")
return state
# =============================================================================
# SUMMARIZE AGENTS
# =============================================================================
class SummarizeInputNode:
"""Node 1: Determine input type and fetch content with Tavily fallback."""
def __init__(self):
self.browse_tool = BrowseTool()
try:
self.search_tool = SearchTool()
except:
self.search_tool = None
def process_input(self, state: SummarizeState) -> SummarizeState:
query = state.get("query", "")
# Check if URL
if query.startswith("http"):
state["is_url"] = True
try:
content = self.browse_tool.fetch_clean(query)
state["content"] = content or ""
state["links"] = [{"title": "Source", "url": query, "snippet": content[:200] if content else ""}]
state["sources"] = [{"title": "Source URL", "url": query}]
print(f" πŸ”— SummarizeInputNode: Fetched URL content")
except Exception as e:
print(f" ❌ Error fetching URL: {e}")
state["content"] = ""
else:
state["is_url"] = False
if not self.search_tool:
print(" ❌ SummarizeInputNode: No search tool")
state["content"] = query
state["links"] = []
state["sources"] = []
return state
# Search and fetch with Tavily fallback
try:
results = self.search_tool.search(query, num_results=4)
content_parts = []
links = []
# Get Tavily AI answer
if results and results[0].get("tavily_answer"):
content_parts.append(results[0]["tavily_answer"])
for r in results:
url = r.get("url")
title = r.get("title", "")
tavily_content = r.get("content", "")
if not url:
continue
# Try fetch, fallback to Tavily content
try:
text = self.browse_tool.fetch_clean(url)
content = text if text else tavily_content
except:
content = tavily_content
if content or tavily_content:
final_content = content if content else tavily_content
content_parts.append(final_content[:1500])
links.append({"title": title, "url": url, "snippet": final_content[:150]})
state["content"] = "\n\n".join(content_parts)
state["links"] = links
state["sources"] = [{"title": l["title"], "url": l["url"]} for l in links]
print(f" πŸ” SummarizeInputNode: Fetched {len(links)} sources")
except Exception as e:
print(f" ❌ Error searching: {e}")
state["content"] = query
state["links"] = []
state["sources"] = []
return state
class SummarizeProcessNode:
"""Node 2: Generate summary."""
def __init__(self):
self.summarizer = SummarizerTool()
self.followup = FollowUpGenerator()
def summarize(self, state: SummarizeState) -> SummarizeState:
content = state.get("content", "")
query = state.get("query", "")
if content:
summary = self.summarizer.summarize(content, max_words=300)
state["answer"] = summary
else:
state["answer"] = "Could not find content to summarize."
state["followups"] = self.followup.generate(state["answer"], query)
print(f" βœ… SummarizeProcessNode: Generated summary")
return state