perplexity-clone / rag /graph_deep.py
Naveen-2007's picture
Fix: LangGraph node name conflicts and port binding
efe010f
"""
Production-Level LangGraph Pipelines for Perplexity Clone
==========================================================
Each mode has its own graph with proper node structure.
"""
from langgraph.graph import StateGraph, END
from rag.rag_state import (
RAGState,
WebSearchState,
RAGOnlyState,
AgenticState,
AnalysisState,
SummarizeState
)
from rag.agents import (
# Deep Research agents
PlannerAgent,
ResearchAgent,
AggregatorAgent,
WriterAgent,
ValidatorAgent,
# Web Search agents
WebSearchNode,
WebFetchNode,
WebContextNode,
WebAnswerNode,
# RAG agents
RAGRetrieveNode,
RAGContextNode,
RAGAnswerNode,
# Agentic agents
AgenticPlannerNode,
AgenticFileNode,
AgenticWebNode,
AgenticKnowledgeNode,
AgenticImageNode,
AgenticSynthesizerNode,
# Analysis agents
AnalysisSearchNode,
AnalysisProcessNode,
# Summarize agents
SummarizeInputNode,
SummarizeProcessNode,
)
from vectorstore.store import VectorStore
class DeepResearchGraph:
"""
Deep Research Mode Graph
========================
Pipeline: Planner β†’ Research β†’ Aggregate β†’ Write β†’ Validate
Used for complex queries requiring multi-step analysis.
"""
def __init__(self, vector_store: VectorStore) -> None:
self.vs = vector_store
self.planner = PlannerAgent()
self.researcher = ResearchAgent(self.vs)
self.aggregator = AggregatorAgent()
self.writer = WriterAgent()
self.validator = ValidatorAgent()
self.graph = None
def build(self):
g = StateGraph(RAGState)
g.add_node("plan", self.planner.plan)
g.add_node("research", self.researcher.research)
g.add_node("aggregate", self.aggregator.aggregate)
g.add_node("write", self.writer.write)
g.add_node("validate", self.validator.validate_and_attach)
g.set_entry_point("plan")
g.add_edge("plan", "research")
g.add_edge("research", "aggregate")
g.add_edge("aggregate", "write")
g.add_edge("write", "validate")
g.add_edge("validate", END)
self.graph = g.compile()
return self.graph
def run(self, question: str) -> RAGState:
if self.graph is None:
self.build()
print(f"\n🧠 DEEP RESEARCH GRAPH: {question[:50]}...")
return self.graph.invoke({"question": question})
class WebSearchGraph:
"""
Web Search Mode Graph
=====================
Pipeline: Search β†’ Fetch β†’ Context β†’ Answer
Used for real-time web queries with citations.
"""
def __init__(self):
self.search_node = WebSearchNode()
self.fetch_node = WebFetchNode()
self.context_node = WebContextNode()
self.answer_node = WebAnswerNode()
self.graph = None
def build(self):
g = StateGraph(WebSearchState)
g.add_node("do_search", self.search_node.search)
g.add_node("do_fetch", self.fetch_node.fetch)
g.add_node("build_context", self.context_node.build_context)
g.add_node("generate_answer", self.answer_node.answer)
g.set_entry_point("do_search")
g.add_edge("do_search", "do_fetch")
g.add_edge("do_fetch", "build_context")
g.add_edge("build_context", "generate_answer")
g.add_edge("generate_answer", END)
self.graph = g.compile()
return self.graph
def run(self, query: str) -> WebSearchState:
if self.graph is None:
self.build()
print(f"\n🌐 WEB SEARCH GRAPH: {query[:50]}...")
return self.graph.invoke({"query": query})
class RAGOnlyGraph:
"""
RAG-Only Mode Graph
===================
Pipeline: Retrieve β†’ Context β†’ Answer
Used for searching uploaded documents only.
"""
def __init__(self, file_manager):
self.retrieve_node = RAGRetrieveNode(file_manager)
self.context_node = RAGContextNode()
self.answer_node = RAGAnswerNode()
self.graph = None
def build(self):
g = StateGraph(RAGOnlyState)
g.add_node("do_retrieve", self.retrieve_node.retrieve)
g.add_node("build_context", self.context_node.build_context)
g.add_node("generate_answer", self.answer_node.answer)
g.set_entry_point("do_retrieve")
g.add_edge("do_retrieve", "build_context")
g.add_edge("build_context", "generate_answer")
g.add_edge("generate_answer", END)
self.graph = g.compile()
return self.graph
def run(self, query: str, workspace_id: str = "default") -> RAGOnlyState:
if self.graph is None:
self.build()
print(f"\nπŸ“š RAG ONLY GRAPH: {query[:50]}...")
return self.graph.invoke({"query": query, "workspace_id": workspace_id})
class AgenticRAGGraph:
"""
Agentic RAG Mode Graph
======================
Pipeline: Planner β†’ [File, Web, Knowledge, Image] β†’ Synthesizer
Multi-agent collaboration for comprehensive answers.
Planner decides which agents to activate.
"""
def __init__(self, file_manager, vector_store: VectorStore, image_search):
self.planner_node = AgenticPlannerNode()
self.file_node = AgenticFileNode(file_manager)
self.web_node = AgenticWebNode()
self.knowledge_node = AgenticKnowledgeNode(vector_store)
self.image_node = AgenticImageNode(image_search)
self.synthesizer_node = AgenticSynthesizerNode()
self.graph = None
def build(self):
g = StateGraph(AgenticState)
# Add all nodes
g.add_node("planner", self.planner_node.plan)
g.add_node("file_agent", self.file_node.retrieve)
g.add_node("web_agent", self.web_node.search)
g.add_node("knowledge_agent", self.knowledge_node.retrieve)
g.add_node("image_agent", self.image_node.search)
g.add_node("synthesizer", self.synthesizer_node.synthesize)
# Define flow
g.set_entry_point("planner")
# After planner, run all agents (they check flags internally)
g.add_edge("planner", "file_agent")
g.add_edge("file_agent", "web_agent")
g.add_edge("web_agent", "knowledge_agent")
g.add_edge("knowledge_agent", "image_agent")
g.add_edge("image_agent", "synthesizer")
g.add_edge("synthesizer", END)
self.graph = g.compile()
return self.graph
def run(self, query: str, workspace_id: str = "default") -> AgenticState:
if self.graph is None:
self.build()
print(f"\nπŸ€– AGENTIC RAG GRAPH: {query[:50]}...")
return self.graph.invoke({"query": query, "workspace_id": workspace_id})
class AnalysisGraph:
"""
Analysis Mode Graph
===================
Pipeline: Search β†’ Analyze
Deep analysis with structured output format.
"""
def __init__(self):
self.search_node = AnalysisSearchNode()
self.process_node = AnalysisProcessNode()
self.graph = None
def build(self):
g = StateGraph(AnalysisState)
g.add_node("do_search", self.search_node.search)
g.add_node("do_analyze", self.process_node.analyze)
g.set_entry_point("do_search")
g.add_edge("do_search", "do_analyze")
g.add_edge("do_analyze", END)
self.graph = g.compile()
return self.graph
def run(self, query: str) -> AnalysisState:
if self.graph is None:
self.build()
print(f"\nπŸ“Š ANALYSIS GRAPH: {query[:50]}...")
return self.graph.invoke({"query": query})
class SummarizeGraph:
"""
Summarize Mode Graph
====================
Pipeline: Input β†’ Summarize
Handles URL or search-based summarization.
"""
def __init__(self):
self.input_node = SummarizeInputNode()
self.process_node = SummarizeProcessNode()
self.graph = None
def build(self):
g = StateGraph(SummarizeState)
g.add_node("process_input", self.input_node.process_input)
g.add_node("do_summarize", self.process_node.summarize)
g.set_entry_point("process_input")
g.add_edge("process_input", "do_summarize")
g.add_edge("do_summarize", END)
self.graph = g.compile()
return self.graph
def run(self, query: str) -> SummarizeState:
if self.graph is None:
self.build()
print(f"\nπŸ“ SUMMARIZE GRAPH: {query[:50]}...")
return self.graph.invoke({"query": query})