| """LangGraph-based orchestrator implementation. |
| |
| NOTE: This orchestrator is deprecated in favor of the shared memory layer |
| integrated into Simple and Advanced modes (SPEC-08). It remains as a reference |
| implementation for LangGraph patterns. |
| """ |
|
|
| import os |
| import uuid |
| from collections.abc import AsyncGenerator, AsyncIterator |
| from typing import Any, Literal |
|
|
| from langchain_huggingface import ChatHuggingFace, HuggingFaceEndpoint |
| from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver |
|
|
| from src.agents.graph.state import ResearchState |
| from src.agents.graph.workflow import create_research_graph |
| from src.orchestrators.base import OrchestratorProtocol |
| from src.utils.config import settings |
| from src.utils.models import AgentEvent |
| from src.utils.service_loader import get_embedding_service |
|
|
|
|
| class LangGraphOrchestrator(OrchestratorProtocol): |
| """State-driven research orchestrator using LangGraph. |
| |
| DEPRECATED: Memory features are now integrated into Simple and Advanced modes. |
| This class is kept for reference and potential future use. |
| """ |
|
|
| def __init__( |
| self, |
| max_iterations: int = 10, |
| checkpoint_path: str | None = None, |
| ): |
| self._max_iterations = max_iterations |
| self._checkpoint_path = checkpoint_path |
|
|
| |
| |
| repo_id = "meta-llama/Llama-3.1-70B-Instruct" |
|
|
| |
| api_key = settings.hf_token |
| if not api_key: |
| raise ValueError( |
| "HF_TOKEN (Hugging Face API Token) is required for LangGraph orchestrator." |
| ) |
|
|
| self.llm_endpoint = HuggingFaceEndpoint( |
| repo_id=repo_id, |
| task="text-generation", |
| max_new_tokens=1024, |
| temperature=0.1, |
| huggingfacehub_api_token=api_key, |
| ) |
| self.chat_model = ChatHuggingFace(llm=self.llm_endpoint) |
|
|
| async def run(self, query: str) -> AsyncGenerator[AgentEvent, None]: |
| """Execute research workflow with structured state.""" |
| |
| |
| embedding_service = get_embedding_service() |
|
|
| |
| if self._checkpoint_path: |
| |
| dir_name = os.path.dirname(self._checkpoint_path) |
| if dir_name: |
| os.makedirs(dir_name, exist_ok=True) |
| saver = AsyncSqliteSaver.from_conn_string(self._checkpoint_path) |
| else: |
| saver = None |
|
|
| |
| from contextlib import asynccontextmanager |
|
|
| @asynccontextmanager |
| async def get_graph_context(saver_instance: Any) -> AsyncIterator[Any]: |
| if saver_instance: |
| async with saver_instance as s: |
| yield create_research_graph( |
| llm=self.chat_model, |
| checkpointer=s, |
| embedding_service=embedding_service, |
| ) |
| else: |
| yield create_research_graph( |
| llm=self.chat_model, |
| checkpointer=None, |
| embedding_service=embedding_service, |
| ) |
|
|
| async with get_graph_context(saver) as graph: |
| |
| initial_state: ResearchState = { |
| "query": query, |
| "hypotheses": [], |
| "conflicts": [], |
| "evidence_ids": [], |
| "messages": [], |
| "next_step": "search", |
| "iteration_count": 0, |
| "max_iterations": self._max_iterations, |
| } |
|
|
| yield AgentEvent(type="started", message=f"Starting LangGraph research: {query}") |
|
|
| |
| thread_id = str(uuid.uuid4()) |
| config = {"configurable": {"thread_id": thread_id}} if saver else {} |
|
|
| |
| |
| async for event in graph.astream(initial_state, config=config): |
| |
| for node_name, update in event.items(): |
| if update.get("messages"): |
| last_msg = update["messages"][-1] |
| event_type: Literal["progress", "thinking", "searching"] = "progress" |
| if node_name == "supervisor": |
| event_type = "thinking" |
| elif node_name == "search": |
| event_type = "searching" |
|
|
| yield AgentEvent( |
| type=event_type, message=str(last_msg.content), data={"node": node_name} |
| ) |
| elif node_name == "supervisor": |
| yield AgentEvent( |
| type="thinking", |
| message=f"Supervisor decided: {update.get('next_step')}", |
| data={"node": node_name}, |
| ) |
|
|
| yield AgentEvent(type="complete", message="Research complete.") |
|
|