|
|
"""LangGraph-based orchestrator implementation. |
|
|
|
|
|
NOTE: This orchestrator is deprecated in favor of the shared memory layer |
|
|
integrated into Simple and Advanced modes (SPEC-08). It remains as a reference |
|
|
implementation for LangGraph patterns. |
|
|
""" |
|
|
|
|
|
import os |
|
|
import uuid |
|
|
from collections.abc import AsyncGenerator, AsyncIterator |
|
|
from typing import Any, Literal |
|
|
|
|
|
from langchain_huggingface import ChatHuggingFace, HuggingFaceEndpoint |
|
|
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver |
|
|
|
|
|
from src.agents.graph.state import ResearchState |
|
|
from src.agents.graph.workflow import create_research_graph |
|
|
from src.orchestrators.base import OrchestratorProtocol |
|
|
from src.utils.config import settings |
|
|
from src.utils.models import AgentEvent |
|
|
from src.utils.service_loader import get_embedding_service |
|
|
|
|
|
|
|
|
class LangGraphOrchestrator(OrchestratorProtocol): |
|
|
"""State-driven research orchestrator using LangGraph. |
|
|
|
|
|
DEPRECATED: Memory features are now integrated into Simple and Advanced modes. |
|
|
This class is kept for reference and potential future use. |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
max_iterations: int = 10, |
|
|
checkpoint_path: str | None = None, |
|
|
): |
|
|
self._max_iterations = max_iterations |
|
|
self._checkpoint_path = checkpoint_path |
|
|
|
|
|
|
|
|
|
|
|
repo_id = "meta-llama/Llama-3.1-70B-Instruct" |
|
|
|
|
|
|
|
|
api_key = settings.hf_token |
|
|
if not api_key: |
|
|
raise ValueError( |
|
|
"HF_TOKEN (Hugging Face API Token) is required for LangGraph orchestrator." |
|
|
) |
|
|
|
|
|
self.llm_endpoint = HuggingFaceEndpoint( |
|
|
repo_id=repo_id, |
|
|
task="text-generation", |
|
|
max_new_tokens=1024, |
|
|
temperature=0.1, |
|
|
huggingfacehub_api_token=api_key, |
|
|
) |
|
|
self.chat_model = ChatHuggingFace(llm=self.llm_endpoint) |
|
|
|
|
|
async def run(self, query: str) -> AsyncGenerator[AgentEvent, None]: |
|
|
"""Execute research workflow with structured state.""" |
|
|
|
|
|
|
|
|
embedding_service = get_embedding_service() |
|
|
|
|
|
|
|
|
if self._checkpoint_path: |
|
|
|
|
|
dir_name = os.path.dirname(self._checkpoint_path) |
|
|
if dir_name: |
|
|
os.makedirs(dir_name, exist_ok=True) |
|
|
saver = AsyncSqliteSaver.from_conn_string(self._checkpoint_path) |
|
|
else: |
|
|
saver = None |
|
|
|
|
|
|
|
|
from contextlib import asynccontextmanager |
|
|
|
|
|
@asynccontextmanager |
|
|
async def get_graph_context(saver_instance: Any) -> AsyncIterator[Any]: |
|
|
if saver_instance: |
|
|
async with saver_instance as s: |
|
|
yield create_research_graph( |
|
|
llm=self.chat_model, |
|
|
checkpointer=s, |
|
|
embedding_service=embedding_service, |
|
|
) |
|
|
else: |
|
|
yield create_research_graph( |
|
|
llm=self.chat_model, |
|
|
checkpointer=None, |
|
|
embedding_service=embedding_service, |
|
|
) |
|
|
|
|
|
async with get_graph_context(saver) as graph: |
|
|
|
|
|
initial_state: ResearchState = { |
|
|
"query": query, |
|
|
"hypotheses": [], |
|
|
"conflicts": [], |
|
|
"evidence_ids": [], |
|
|
"messages": [], |
|
|
"next_step": "search", |
|
|
"iteration_count": 0, |
|
|
"max_iterations": self._max_iterations, |
|
|
} |
|
|
|
|
|
yield AgentEvent(type="started", message=f"Starting LangGraph research: {query}") |
|
|
|
|
|
|
|
|
thread_id = str(uuid.uuid4()) |
|
|
config = {"configurable": {"thread_id": thread_id}} if saver else {} |
|
|
|
|
|
|
|
|
|
|
|
async for event in graph.astream(initial_state, config=config): |
|
|
|
|
|
for node_name, update in event.items(): |
|
|
if update.get("messages"): |
|
|
last_msg = update["messages"][-1] |
|
|
event_type: Literal["progress", "thinking", "searching"] = "progress" |
|
|
if node_name == "supervisor": |
|
|
event_type = "thinking" |
|
|
elif node_name == "search": |
|
|
event_type = "searching" |
|
|
|
|
|
yield AgentEvent( |
|
|
type=event_type, message=str(last_msg.content), data={"node": node_name} |
|
|
) |
|
|
elif node_name == "supervisor": |
|
|
yield AgentEvent( |
|
|
type="thinking", |
|
|
message=f"Supervisor decided: {update.get('next_step')}", |
|
|
data={"node": node_name}, |
|
|
) |
|
|
|
|
|
yield AgentEvent(type="complete", message="Research complete.") |
|
|
|