Spaces:
Running
Running
| """ | |
| Production-Level LangGraph Pipelines for Perplexity Clone | |
| ========================================================== | |
| Each mode has its own graph with proper node structure. | |
| """ | |
| from langgraph.graph import StateGraph, END | |
| from rag.rag_state import ( | |
| RAGState, | |
| WebSearchState, | |
| RAGOnlyState, | |
| AgenticState, | |
| AnalysisState, | |
| SummarizeState | |
| ) | |
| from rag.agents import ( | |
| # Deep Research agents | |
| PlannerAgent, | |
| ResearchAgent, | |
| AggregatorAgent, | |
| WriterAgent, | |
| ValidatorAgent, | |
| # Web Search agents | |
| WebSearchNode, | |
| WebFetchNode, | |
| WebContextNode, | |
| WebAnswerNode, | |
| # RAG agents | |
| RAGRetrieveNode, | |
| RAGContextNode, | |
| RAGAnswerNode, | |
| # Agentic agents | |
| AgenticPlannerNode, | |
| AgenticFileNode, | |
| AgenticWebNode, | |
| AgenticKnowledgeNode, | |
| AgenticImageNode, | |
| AgenticSynthesizerNode, | |
| # Analysis agents | |
| AnalysisSearchNode, | |
| AnalysisProcessNode, | |
| # Summarize agents | |
| SummarizeInputNode, | |
| SummarizeProcessNode, | |
| ) | |
| from vectorstore.store import VectorStore | |
| class DeepResearchGraph: | |
| """ | |
| Deep Research Mode Graph | |
| ======================== | |
| Pipeline: Planner β Research β Aggregate β Write β Validate | |
| Used for complex queries requiring multi-step analysis. | |
| """ | |
| def __init__(self, vector_store: VectorStore) -> None: | |
| self.vs = vector_store | |
| self.planner = PlannerAgent() | |
| self.researcher = ResearchAgent(self.vs) | |
| self.aggregator = AggregatorAgent() | |
| self.writer = WriterAgent() | |
| self.validator = ValidatorAgent() | |
| self.graph = None | |
| def build(self): | |
| g = StateGraph(RAGState) | |
| g.add_node("plan", self.planner.plan) | |
| g.add_node("research", self.researcher.research) | |
| g.add_node("aggregate", self.aggregator.aggregate) | |
| g.add_node("write", self.writer.write) | |
| g.add_node("validate", self.validator.validate_and_attach) | |
| g.set_entry_point("plan") | |
| g.add_edge("plan", "research") | |
| g.add_edge("research", "aggregate") | |
| g.add_edge("aggregate", "write") | |
| g.add_edge("write", "validate") | |
| g.add_edge("validate", END) | |
| self.graph = g.compile() | |
| return self.graph | |
| def run(self, question: str) -> RAGState: | |
| if self.graph is None: | |
| self.build() | |
| print(f"\nπ§ DEEP RESEARCH GRAPH: {question[:50]}...") | |
| return self.graph.invoke({"question": question}) | |
| class WebSearchGraph: | |
| """ | |
| Web Search Mode Graph | |
| ===================== | |
| Pipeline: Search β Fetch β Context β Answer | |
| Used for real-time web queries with citations. | |
| """ | |
| def __init__(self): | |
| self.search_node = WebSearchNode() | |
| self.fetch_node = WebFetchNode() | |
| self.context_node = WebContextNode() | |
| self.answer_node = WebAnswerNode() | |
| self.graph = None | |
| def build(self): | |
| g = StateGraph(WebSearchState) | |
| g.add_node("do_search", self.search_node.search) | |
| g.add_node("do_fetch", self.fetch_node.fetch) | |
| g.add_node("build_context", self.context_node.build_context) | |
| g.add_node("generate_answer", self.answer_node.answer) | |
| g.set_entry_point("do_search") | |
| g.add_edge("do_search", "do_fetch") | |
| g.add_edge("do_fetch", "build_context") | |
| g.add_edge("build_context", "generate_answer") | |
| g.add_edge("generate_answer", END) | |
| self.graph = g.compile() | |
| return self.graph | |
| def run(self, query: str) -> WebSearchState: | |
| if self.graph is None: | |
| self.build() | |
| print(f"\nπ WEB SEARCH GRAPH: {query[:50]}...") | |
| return self.graph.invoke({"query": query}) | |
| class RAGOnlyGraph: | |
| """ | |
| RAG-Only Mode Graph | |
| =================== | |
| Pipeline: Retrieve β Context β Answer | |
| Used for searching uploaded documents only. | |
| """ | |
| def __init__(self, file_manager): | |
| self.retrieve_node = RAGRetrieveNode(file_manager) | |
| self.context_node = RAGContextNode() | |
| self.answer_node = RAGAnswerNode() | |
| self.graph = None | |
| def build(self): | |
| g = StateGraph(RAGOnlyState) | |
| g.add_node("do_retrieve", self.retrieve_node.retrieve) | |
| g.add_node("build_context", self.context_node.build_context) | |
| g.add_node("generate_answer", self.answer_node.answer) | |
| g.set_entry_point("do_retrieve") | |
| g.add_edge("do_retrieve", "build_context") | |
| g.add_edge("build_context", "generate_answer") | |
| g.add_edge("generate_answer", END) | |
| self.graph = g.compile() | |
| return self.graph | |
| def run(self, query: str, workspace_id: str = "default") -> RAGOnlyState: | |
| if self.graph is None: | |
| self.build() | |
| print(f"\nπ RAG ONLY GRAPH: {query[:50]}...") | |
| return self.graph.invoke({"query": query, "workspace_id": workspace_id}) | |
| class AgenticRAGGraph: | |
| """ | |
| Agentic RAG Mode Graph | |
| ====================== | |
| Pipeline: Planner β [File, Web, Knowledge, Image] β Synthesizer | |
| Multi-agent collaboration for comprehensive answers. | |
| Planner decides which agents to activate. | |
| """ | |
| def __init__(self, file_manager, vector_store: VectorStore, image_search): | |
| self.planner_node = AgenticPlannerNode() | |
| self.file_node = AgenticFileNode(file_manager) | |
| self.web_node = AgenticWebNode() | |
| self.knowledge_node = AgenticKnowledgeNode(vector_store) | |
| self.image_node = AgenticImageNode(image_search) | |
| self.synthesizer_node = AgenticSynthesizerNode() | |
| self.graph = None | |
| def build(self): | |
| g = StateGraph(AgenticState) | |
| # Add all nodes | |
| g.add_node("planner", self.planner_node.plan) | |
| g.add_node("file_agent", self.file_node.retrieve) | |
| g.add_node("web_agent", self.web_node.search) | |
| g.add_node("knowledge_agent", self.knowledge_node.retrieve) | |
| g.add_node("image_agent", self.image_node.search) | |
| g.add_node("synthesizer", self.synthesizer_node.synthesize) | |
| # Define flow | |
| g.set_entry_point("planner") | |
| # After planner, run all agents (they check flags internally) | |
| g.add_edge("planner", "file_agent") | |
| g.add_edge("file_agent", "web_agent") | |
| g.add_edge("web_agent", "knowledge_agent") | |
| g.add_edge("knowledge_agent", "image_agent") | |
| g.add_edge("image_agent", "synthesizer") | |
| g.add_edge("synthesizer", END) | |
| self.graph = g.compile() | |
| return self.graph | |
| def run(self, query: str, workspace_id: str = "default") -> AgenticState: | |
| if self.graph is None: | |
| self.build() | |
| print(f"\nπ€ AGENTIC RAG GRAPH: {query[:50]}...") | |
| return self.graph.invoke({"query": query, "workspace_id": workspace_id}) | |
| class AnalysisGraph: | |
| """ | |
| Analysis Mode Graph | |
| =================== | |
| Pipeline: Search β Analyze | |
| Deep analysis with structured output format. | |
| """ | |
| def __init__(self): | |
| self.search_node = AnalysisSearchNode() | |
| self.process_node = AnalysisProcessNode() | |
| self.graph = None | |
| def build(self): | |
| g = StateGraph(AnalysisState) | |
| g.add_node("do_search", self.search_node.search) | |
| g.add_node("do_analyze", self.process_node.analyze) | |
| g.set_entry_point("do_search") | |
| g.add_edge("do_search", "do_analyze") | |
| g.add_edge("do_analyze", END) | |
| self.graph = g.compile() | |
| return self.graph | |
| def run(self, query: str) -> AnalysisState: | |
| if self.graph is None: | |
| self.build() | |
| print(f"\nπ ANALYSIS GRAPH: {query[:50]}...") | |
| return self.graph.invoke({"query": query}) | |
| class SummarizeGraph: | |
| """ | |
| Summarize Mode Graph | |
| ==================== | |
| Pipeline: Input β Summarize | |
| Handles URL or search-based summarization. | |
| """ | |
| def __init__(self): | |
| self.input_node = SummarizeInputNode() | |
| self.process_node = SummarizeProcessNode() | |
| self.graph = None | |
| def build(self): | |
| g = StateGraph(SummarizeState) | |
| g.add_node("process_input", self.input_node.process_input) | |
| g.add_node("do_summarize", self.process_node.summarize) | |
| g.set_entry_point("process_input") | |
| g.add_edge("process_input", "do_summarize") | |
| g.add_edge("do_summarize", END) | |
| self.graph = g.compile() | |
| return self.graph | |
| def run(self, query: str) -> SummarizeState: | |
| if self.graph is None: | |
| self.build() | |
| print(f"\nπ SUMMARIZE GRAPH: {query[:50]}...") | |
| return self.graph.invoke({"query": query}) | |