Spaces:
Sleeping
Sleeping
| """Orchestrator coordinating the deep research workflow using LangGraph.""" | |
| from __future__ import annotations | |
| import logging | |
| import re | |
| import operator | |
| from pathlib import Path | |
| from queue import Empty, Queue | |
| from threading import Lock, Thread | |
| from typing import Any, Annotated, Iterator, TypedDict, Optional, Callable | |
| from langchain_openai import ChatOpenAI | |
| from langchain_core.messages import HumanMessage, SystemMessage, AIMessage | |
| from langchain_core.tools import tool | |
| from langgraph.graph import StateGraph, END | |
| from config import Configuration | |
| from prompts import ( | |
| report_writer_instructions, | |
| task_summarizer_instructions, | |
| todo_planner_system_prompt, | |
| todo_planner_instructions, | |
| get_current_date, | |
| ) | |
| from models import SummaryState, SummaryStateOutput, TodoItem | |
| from services.search import dispatch_search, prepare_research_context | |
| from utils import strip_thinking_tokens | |
| logger = logging.getLogger(__name__) | |
| # ============================================================================ | |
| # State Schema | |
| # ============================================================================ | |
| class ResearchState(TypedDict, total=False): | |
| """State schema for the research workflow graph.""" | |
| research_topic: str | |
| todo_items: list[TodoItem] | |
| current_task_index: int | |
| web_research_results: Annotated[list[str], operator.add] | |
| sources_gathered: Annotated[list[str], operator.add] | |
| research_loop_count: int | |
| structured_report: Optional[str] | |
| report_note_id: Optional[str] | |
| report_note_path: Optional[str] | |
| # Internal tracking | |
| messages: list[Any] | |
| config: Configuration | |
| # ============================================================================ | |
| # Note Tool Implementation | |
| # ============================================================================ | |
| class NoteTool: | |
| """Simple file-based note tool for persisting task notes.""" | |
| def __init__(self, workspace: str = "./notes"): | |
| self.workspace = Path(workspace) | |
| self.workspace.mkdir(parents=True, exist_ok=True) | |
| self._id_counter = 0 | |
| self._lock = Lock() | |
| def _generate_id(self) -> str: | |
| with self._lock: | |
| self._id_counter += 1 | |
| import time | |
| return f"note_{int(time.time())}_{self._id_counter}" | |
| def run(self, params: dict[str, Any]) -> str: | |
| """Execute note action: create, read, update, list.""" | |
| action = params.get("action", "read") | |
| if action == "create": | |
| return self._create_note(params) | |
| elif action == "read": | |
| return self._read_note(params) | |
| elif action == "update": | |
| return self._update_note(params) | |
| elif action == "list": | |
| return self._list_notes(params) | |
| else: | |
| return f"❌ Unknown action: {action}" | |
| def _create_note(self, params: dict[str, Any]) -> str: | |
| note_id = self._generate_id() | |
| title = params.get("title", "Untitled") | |
| note_type = params.get("note_type", "general") | |
| tags = params.get("tags", []) | |
| content = params.get("content", "") | |
| task_id = params.get("task_id") | |
| note_path = self.workspace / f"{note_id}.md" | |
| frontmatter = f"""--- | |
| id: {note_id} | |
| title: {title} | |
| type: {note_type} | |
| tags: {tags} | |
| task_id: {task_id} | |
| --- | |
| """ | |
| note_path.write_text(frontmatter + content, encoding="utf-8") | |
| return f"✅ Note created\nID: {note_id}\nPath: {note_path}" | |
| def _read_note(self, params: dict[str, Any]) -> str: | |
| note_id = params.get("note_id") | |
| if not note_id: | |
| return "❌ Missing note_id parameter" | |
| note_path = self.workspace / f"{note_id}.md" | |
| if not note_path.exists(): | |
| return f"❌ Note does not exist: {note_id}" | |
| content = note_path.read_text(encoding="utf-8") | |
| return f"✅ Note content:\n{content}" | |
| def _update_note(self, params: dict[str, Any]) -> str: | |
| note_id = params.get("note_id") | |
| if not note_id: | |
| return "❌ Missing note_id parameter" | |
| note_path = self.workspace / f"{note_id}.md" | |
| if not note_path.exists(): | |
| return f"❌ Note does not exist: {note_id}" | |
| # Read existing content | |
| existing = note_path.read_text(encoding="utf-8") | |
| # Update frontmatter if provided | |
| title = params.get("title") | |
| content = params.get("content", "") | |
| # Simple append strategy | |
| if content: | |
| updated = existing + "\n\n---\nUpdate:\n" + content | |
| note_path.write_text(updated, encoding="utf-8") | |
| return f"✅ Note updated\nID: {note_id}" | |
| def _list_notes(self, params: dict[str, Any]) -> str: | |
| notes = list(self.workspace.glob("*.md")) | |
| if not notes: | |
| return "📝 No notes yet" | |
| result = "📝 Note list:\n" | |
| for note in notes: | |
| result += f"- {note.stem}\n" | |
| return result | |
| # ============================================================================ | |
| # Tool Call Tracker | |
| # ============================================================================ | |
| class ToolCallTracker: | |
| """Collects tool call events for SSE streaming.""" | |
| def __init__(self, notes_workspace: Optional[str] = None): | |
| self._notes_workspace = notes_workspace | |
| self._events: list[dict[str, Any]] = [] | |
| self._cursor = 0 | |
| self._lock = Lock() | |
| self._event_sink: Optional[Callable[[dict[str, Any]], None]] = None | |
| def record(self, event: dict[str, Any]) -> None: | |
| with self._lock: | |
| event["id"] = len(self._events) + 1 | |
| self._events.append(event) | |
| sink = self._event_sink | |
| if sink: | |
| sink({"type": "tool_call", **event}) | |
| def drain(self, step: Optional[int] = None) -> list[dict[str, Any]]: | |
| with self._lock: | |
| if self._cursor >= len(self._events): | |
| return [] | |
| new_events = self._events[self._cursor:] | |
| self._cursor = len(self._events) | |
| payloads = [] | |
| for event in new_events: | |
| payload = {"type": "tool_call", **event} | |
| if step is not None: | |
| payload["step"] = step | |
| payloads.append(payload) | |
| return payloads | |
| def set_event_sink(self, sink: Optional[Callable[[dict[str, Any]], None]]) -> None: | |
| self._event_sink = sink | |
| def as_dicts(self) -> list[dict[str, Any]]: | |
| with self._lock: | |
| return list(self._events) | |
| def reset(self) -> None: | |
| with self._lock: | |
| self._events.clear() | |
| self._cursor = 0 | |
| # ============================================================================ | |
| # Deep Research Agent using LangGraph | |
| # ============================================================================ | |
| class DeepResearchAgent: | |
| """Coordinator orchestrating TODO-based research workflow using LangGraph.""" | |
| def __init__(self, config: Configuration | None = None) -> None: | |
| """Initialize the coordinator with configuration and LangGraph components.""" | |
| self.config = config or Configuration.from_env() | |
| self.llm = self._init_llm() | |
| # Note tool setup | |
| self.note_tool = ( | |
| NoteTool(workspace=self.config.notes_workspace) | |
| if self.config.enable_notes | |
| else None | |
| ) | |
| # Tool call tracking | |
| self._tool_tracker = ToolCallTracker( | |
| self.config.notes_workspace if self.config.enable_notes else None | |
| ) | |
| self._tool_event_sink_enabled = False | |
| self._state_lock = Lock() | |
| # Build the graph | |
| self.graph = self._build_graph() | |
| self._last_search_notices: list[str] = [] | |
| def _init_llm(self) -> ChatOpenAI: | |
| """Initialize ChatOpenAI with configuration preferences.""" | |
| llm_kwargs: dict[str, Any] = { | |
| "temperature": 0.0, | |
| "streaming": True, | |
| } | |
| model_id = self.config.llm_model_id or self.config.local_llm | |
| if model_id: | |
| llm_kwargs["model"] = model_id | |
| provider = (self.config.llm_provider or "").strip() | |
| if provider == "ollama": | |
| llm_kwargs["base_url"] = self.config.sanitized_ollama_url() | |
| llm_kwargs["api_key"] = self.config.llm_api_key or "ollama" | |
| elif provider == "lmstudio": | |
| llm_kwargs["base_url"] = self.config.lmstudio_base_url | |
| if self.config.llm_api_key: | |
| llm_kwargs["api_key"] = self.config.llm_api_key | |
| else: | |
| llm_kwargs["api_key"] = "lm-studio" | |
| else: | |
| if self.config.llm_base_url: | |
| llm_kwargs["base_url"] = self.config.llm_base_url | |
| if self.config.llm_api_key: | |
| llm_kwargs["api_key"] = self.config.llm_api_key | |
| return ChatOpenAI(**llm_kwargs) | |
| def _build_graph(self) -> StateGraph: | |
| """Build the LangGraph workflow.""" | |
| workflow = StateGraph(ResearchState) | |
| # Add nodes | |
| workflow.add_node("plan_research", self._plan_research_node) | |
| workflow.add_node("execute_tasks", self._execute_tasks_node) | |
| workflow.add_node("generate_report", self._generate_report_node) | |
| # Define edges | |
| workflow.set_entry_point("plan_research") | |
| workflow.add_edge("plan_research", "execute_tasks") | |
| workflow.add_edge("execute_tasks", "generate_report") | |
| workflow.add_edge("generate_report", END) | |
| return workflow.compile() | |
| # ------------------------------------------------------------------------- | |
| # Graph Nodes | |
| # ------------------------------------------------------------------------- | |
| def _plan_research_node(self, state: ResearchState) -> dict[str, Any]: | |
| """Planning node: break research topic into actionable tasks.""" | |
| topic = state.get("research_topic", "") | |
| system_prompt = todo_planner_system_prompt.strip() | |
| user_prompt = todo_planner_instructions.format( | |
| current_date=get_current_date(), | |
| research_topic=topic, | |
| ) | |
| messages = [ | |
| SystemMessage(content=system_prompt), | |
| HumanMessage(content=user_prompt), | |
| ] | |
| response = self.llm.invoke(messages) | |
| response_text = response.content | |
| if self.config.strip_thinking_tokens: | |
| response_text = strip_thinking_tokens(response_text) | |
| logger.info("Planner raw output (truncated): %s", response_text[:500]) | |
| # Parse tasks from response | |
| todo_items = self._parse_todo_items(response_text, topic) | |
| # Create notes for each task if enabled | |
| if self.note_tool: | |
| for task in todo_items: | |
| result = self.note_tool.run({ | |
| "action": "create", | |
| "task_id": task.id, | |
| "title": f"Task {task.id}: {task.title}", | |
| "note_type": "task_state", | |
| "tags": ["deep_research", f"task_{task.id}"], | |
| "content": f"Task objective: {task.intent}\nSearch query: {task.query}", | |
| }) | |
| # Extract note_id from result | |
| note_id = self._extract_note_id(result) | |
| if note_id: | |
| task.note_id = note_id | |
| task.note_path = str(Path(self.config.notes_workspace) / f"{note_id}.md") | |
| self._tool_tracker.record({ | |
| "agent": "Research Planning Expert", | |
| "tool": "note", | |
| "parameters": {"action": "create", "task_id": task.id}, | |
| "result": result, | |
| "task_id": task.id, | |
| "note_id": note_id, | |
| }) | |
| titles = [task.title for task in todo_items] | |
| logger.info("Planner produced %d tasks: %s", len(todo_items), titles) | |
| return { | |
| "todo_items": todo_items, | |
| "current_task_index": 0, | |
| "research_loop_count": 0, | |
| } | |
| def _execute_tasks_node(self, state: ResearchState) -> dict[str, Any]: | |
| """Execute research tasks: search and summarize each task.""" | |
| todo_items = state.get("todo_items", []) | |
| topic = state.get("research_topic", "") | |
| loop_count = state.get("research_loop_count", 0) | |
| web_results: list[str] = [] | |
| sources: list[str] = [] | |
| for task in todo_items: | |
| task.status = "in_progress" | |
| # Execute search | |
| search_result, notices, answer_text, backend = dispatch_search( | |
| task.query, | |
| self.config, | |
| loop_count, | |
| ) | |
| self._last_search_notices = notices | |
| task.notices = notices | |
| if not search_result or not search_result.get("results"): | |
| task.status = "skipped" | |
| continue | |
| # Prepare context | |
| sources_summary, context = prepare_research_context( | |
| search_result, answer_text, self.config | |
| ) | |
| task.sources_summary = sources_summary | |
| web_results.append(context) | |
| sources.append(sources_summary) | |
| # Summarize task | |
| summary = self._summarize_task(topic, task, context) | |
| task.summary = summary | |
| task.status = "completed" | |
| # Update note if enabled | |
| if self.note_tool and task.note_id: | |
| result = self.note_tool.run({ | |
| "action": "update", | |
| "note_id": task.note_id, | |
| "task_id": task.id, | |
| "content": f"## Task Summary\n{summary}\n\n## Sources\n{sources_summary}", | |
| }) | |
| self._tool_tracker.record({ | |
| "agent": "Task Summary Expert", | |
| "tool": "note", | |
| "parameters": {"action": "update", "note_id": task.note_id}, | |
| "result": result, | |
| "task_id": task.id, | |
| "note_id": task.note_id, | |
| }) | |
| loop_count += 1 | |
| return { | |
| "todo_items": todo_items, | |
| "web_research_results": web_results, | |
| "sources_gathered": sources, | |
| "research_loop_count": loop_count, | |
| } | |
| def _generate_report_node(self, state: ResearchState) -> dict[str, Any]: | |
| """Generate the final structured report.""" | |
| topic = state.get("research_topic", "") | |
| todo_items = state.get("todo_items", []) | |
| # Build task overview | |
| tasks_block = [] | |
| for task in todo_items: | |
| summary_block = task.summary or "No information available" | |
| sources_block = task.sources_summary or "No sources available" | |
| tasks_block.append( | |
| f"### Task {task.id}: {task.title}\n" | |
| f"- Objective: {task.intent}\n" | |
| f"- Search query: {task.query}\n" | |
| f"- Status: {task.status}\n" | |
| f"- Summary:\n{summary_block}\n" | |
| f"- Sources:\n{sources_block}\n" | |
| ) | |
| prompt = ( | |
| f"Research topic: {topic}\n" | |
| f"Task overview:\n{''.join(tasks_block)}\n" | |
| "Based on the above task summaries, please write a structured research report." | |
| ) | |
| messages = [ | |
| SystemMessage(content=report_writer_instructions.strip()), | |
| HumanMessage(content=prompt), | |
| ] | |
| response = self.llm.invoke(messages) | |
| report_text = response.content | |
| if self.config.strip_thinking_tokens: | |
| report_text = strip_thinking_tokens(report_text) | |
| report_text = report_text.strip() or "Report generation failed, please check input." | |
| # Create conclusion note if enabled | |
| report_note_id = None | |
| report_note_path = None | |
| if self.note_tool and report_text: | |
| result = self.note_tool.run({ | |
| "action": "create", | |
| "title": f"Research Report: {topic}", | |
| "note_type": "conclusion", | |
| "tags": ["deep_research", "report"], | |
| "content": report_text, | |
| }) | |
| report_note_id = self._extract_note_id(result) | |
| if report_note_id: | |
| report_note_path = str(Path(self.config.notes_workspace) / f"{report_note_id}.md") | |
| self._tool_tracker.record({ | |
| "agent": "Report Writing Expert", | |
| "tool": "note", | |
| "parameters": {"action": "create", "note_type": "conclusion"}, | |
| "result": result, | |
| "note_id": report_note_id, | |
| }) | |
| return { | |
| "structured_report": report_text, | |
| "report_note_id": report_note_id, | |
| "report_note_path": report_note_path, | |
| } | |
| # ------------------------------------------------------------------------- | |
| # Helper Methods | |
| # ------------------------------------------------------------------------- | |
| def _summarize_task(self, topic: str, task: TodoItem, context: str) -> str: | |
| """Generate summary for a single task.""" | |
| prompt = ( | |
| f"Task topic: {topic}\n" | |
| f"Task name: {task.title}\n" | |
| f"Task objective: {task.intent}\n" | |
| f"Search query: {task.query}\n" | |
| f"Task context:\n{context}\n" | |
| "Please generate a detailed task summary." | |
| ) | |
| messages = [ | |
| SystemMessage(content=task_summarizer_instructions.strip()), | |
| HumanMessage(content=prompt), | |
| ] | |
| response = self.llm.invoke(messages) | |
| summary_text = response.content | |
| if self.config.strip_thinking_tokens: | |
| summary_text = strip_thinking_tokens(summary_text) | |
| return summary_text.strip() or "No information available" | |
| def _parse_todo_items(self, response: str, topic: str) -> list[TodoItem]: | |
| """Parse planner output into TodoItem list.""" | |
| import json | |
| text = response.strip() | |
| tasks_payload: list[dict[str, Any]] = [] | |
| # Try to extract JSON | |
| start = text.find("{") | |
| end = text.rfind("}") | |
| if start != -1 and end != -1 and end > start: | |
| try: | |
| json_obj = json.loads(text[start:end + 1]) | |
| if isinstance(json_obj, dict) and "tasks" in json_obj: | |
| tasks_payload = json_obj["tasks"] | |
| except json.JSONDecodeError: | |
| pass | |
| if not tasks_payload: | |
| start = text.find("[") | |
| end = text.rfind("]") | |
| if start != -1 and end != -1 and end > start: | |
| try: | |
| tasks_payload = json.loads(text[start:end + 1]) | |
| except json.JSONDecodeError: | |
| pass | |
| # Create TodoItems | |
| todo_items: list[TodoItem] = [] | |
| for idx, item in enumerate(tasks_payload, start=1): | |
| if not isinstance(item, dict): | |
| continue | |
| title = str(item.get("title") or f"Task{idx}").strip() | |
| intent = str(item.get("intent") or "Focus on key issues of the topic").strip() | |
| query = str(item.get("query") or topic).strip() or topic | |
| todo_items.append(TodoItem( | |
| id=idx, | |
| title=title, | |
| intent=intent, | |
| query=query, | |
| )) | |
| # Fallback if no tasks parsed | |
| if not todo_items: | |
| todo_items.append(TodoItem( | |
| id=1, | |
| title="Basic Background Overview", | |
| intent="Collect core background and latest developments on the topic", | |
| query=f"{topic} latest developments" if topic else "Basic background overview", | |
| )) | |
| return todo_items | |
| def _extract_note_id(response: str) -> Optional[str]: | |
| """Extract note ID from tool response.""" | |
| if not response: | |
| return None | |
| match = re.search(r"ID:\s*([^\n]+)", response) | |
| return match.group(1).strip() if match else None | |
| def _set_tool_event_sink(self, sink: Callable[[dict[str, Any]], None] | None) -> None: | |
| """Enable or disable immediate tool event callbacks.""" | |
| self._tool_event_sink_enabled = sink is not None | |
| self._tool_tracker.set_event_sink(sink) | |
| # ------------------------------------------------------------------------- | |
| # Public API | |
| # ------------------------------------------------------------------------- | |
| def run(self, topic: str) -> SummaryStateOutput: | |
| """Execute the research workflow and return the final report.""" | |
| initial_state: ResearchState = { | |
| "research_topic": topic, | |
| "todo_items": [], | |
| "current_task_index": 0, | |
| "web_research_results": [], | |
| "sources_gathered": [], | |
| "research_loop_count": 0, | |
| "structured_report": None, | |
| "report_note_id": None, | |
| "report_note_path": None, | |
| "messages": [], | |
| "config": self.config, | |
| } | |
| # Run the graph | |
| final_state = self.graph.invoke(initial_state) | |
| report = final_state.get("structured_report", "") | |
| todo_items = final_state.get("todo_items", []) | |
| return SummaryStateOutput( | |
| running_summary=report, | |
| report_markdown=report, | |
| todo_items=todo_items, | |
| ) | |
| def run_stream(self, topic: str) -> Iterator[dict[str, Any]]: | |
| """Execute the workflow yielding incremental progress events.""" | |
| logger.debug("Starting streaming research: topic=%s", topic) | |
| yield {"type": "status", "message": "Initializing research workflow"} | |
| # Plan phase | |
| yield {"type": "status", "message": "Planning research tasks..."} | |
| system_prompt = todo_planner_system_prompt.strip() | |
| user_prompt = todo_planner_instructions.format( | |
| current_date=get_current_date(), | |
| research_topic=topic, | |
| ) | |
| messages = [ | |
| SystemMessage(content=system_prompt), | |
| HumanMessage(content=user_prompt), | |
| ] | |
| response = self.llm.invoke(messages) | |
| response_text = response.content | |
| if self.config.strip_thinking_tokens: | |
| response_text = strip_thinking_tokens(response_text) | |
| todo_items = self._parse_todo_items(response_text, topic) | |
| # Create notes for tasks | |
| if self.note_tool: | |
| for task in todo_items: | |
| result = self.note_tool.run({ | |
| "action": "create", | |
| "task_id": task.id, | |
| "title": f"Task {task.id}: {task.title}", | |
| "note_type": "task_state", | |
| "tags": ["deep_research", f"task_{task.id}"], | |
| "content": f"Task objective: {task.intent}\nSearch query: {task.query}", | |
| }) | |
| note_id = self._extract_note_id(result) | |
| if note_id: | |
| task.note_id = note_id | |
| task.note_path = str(Path(self.config.notes_workspace) / f"{note_id}.md") | |
| # Setup channel mapping for streaming | |
| channel_map: dict[int, dict[str, Any]] = {} | |
| for index, task in enumerate(todo_items, start=1): | |
| token = f"task_{task.id}" | |
| task.stream_token = token | |
| channel_map[task.id] = {"step": index, "token": token} | |
| yield { | |
| "type": "todo_list", | |
| "tasks": [self._serialize_task(t) for t in todo_items], | |
| "step": 0, | |
| } | |
| # Execute tasks with streaming | |
| event_queue: Queue[dict[str, Any]] = Queue() | |
| def enqueue(event: dict[str, Any], task: Optional[TodoItem] = None, step_override: Optional[int] = None) -> None: | |
| payload = dict(event) | |
| target_task_id = payload.get("task_id") | |
| if task is not None: | |
| target_task_id = task.id | |
| payload["task_id"] = task.id | |
| channel = channel_map.get(target_task_id) if target_task_id else None | |
| if channel: | |
| payload.setdefault("step", channel["step"]) | |
| payload["stream_token"] = channel["token"] | |
| if step_override is not None: | |
| payload["step"] = step_override | |
| event_queue.put(payload) | |
| def tool_event_sink(event: dict[str, Any]) -> None: | |
| enqueue(event) | |
| self._set_tool_event_sink(tool_event_sink) | |
| threads: list[Thread] = [] | |
| state = SummaryState(research_topic=topic) | |
| state.todo_items = todo_items | |
| def worker(task: TodoItem, step: int) -> None: | |
| try: | |
| enqueue({ | |
| "type": "task_status", | |
| "task_id": task.id, | |
| "status": "in_progress", | |
| "title": task.title, | |
| "intent": task.intent, | |
| "note_id": task.note_id, | |
| "note_path": task.note_path, | |
| }, task=task) | |
| # Execute search | |
| search_result, notices, answer_text, backend = dispatch_search( | |
| task.query, self.config, state.research_loop_count | |
| ) | |
| task.notices = notices | |
| for notice in notices: | |
| if notice: | |
| enqueue({ | |
| "type": "status", | |
| "message": notice, | |
| "task_id": task.id, | |
| }, task=task) | |
| if not search_result or not search_result.get("results"): | |
| task.status = "skipped" | |
| enqueue({ | |
| "type": "task_status", | |
| "task_id": task.id, | |
| "status": "skipped", | |
| "title": task.title, | |
| "intent": task.intent, | |
| "note_id": task.note_id, | |
| "note_path": task.note_path, | |
| }, task=task) | |
| return | |
| # Prepare context | |
| sources_summary, context = prepare_research_context( | |
| search_result, answer_text, self.config | |
| ) | |
| task.sources_summary = sources_summary | |
| with self._state_lock: | |
| state.web_research_results.append(context) | |
| state.sources_gathered.append(sources_summary) | |
| state.research_loop_count += 1 | |
| enqueue({ | |
| "type": "sources", | |
| "task_id": task.id, | |
| "latest_sources": sources_summary, | |
| "raw_context": context, | |
| "backend": backend, | |
| "note_id": task.note_id, | |
| "note_path": task.note_path, | |
| }, task=task) | |
| # Stream summarization | |
| prompt = ( | |
| f"Task topic: {topic}\n" | |
| f"Task name: {task.title}\n" | |
| f"Task objective: {task.intent}\n" | |
| f"Search query: {task.query}\n" | |
| f"Task context:\n{context}\n" | |
| "Please generate a detailed task summary." | |
| ) | |
| summary_messages = [ | |
| SystemMessage(content=task_summarizer_instructions.strip()), | |
| HumanMessage(content=prompt), | |
| ] | |
| summary_chunks: list[str] = [] | |
| for chunk in self.llm.stream(summary_messages): | |
| chunk_text = chunk.content | |
| if chunk_text: | |
| summary_chunks.append(chunk_text) | |
| # Strip thinking tokens from visible output | |
| visible_chunk = chunk_text | |
| if self.config.strip_thinking_tokens and "<think>" not in chunk_text: | |
| enqueue({ | |
| "type": "task_summary_chunk", | |
| "task_id": task.id, | |
| "content": visible_chunk, | |
| "note_id": task.note_id, | |
| }, task=task) | |
| full_summary = "".join(summary_chunks) | |
| if self.config.strip_thinking_tokens: | |
| full_summary = strip_thinking_tokens(full_summary) | |
| task.summary = full_summary.strip() or "No information available" | |
| task.status = "completed" | |
| # Update note | |
| if self.note_tool and task.note_id: | |
| self.note_tool.run({ | |
| "action": "update", | |
| "note_id": task.note_id, | |
| "task_id": task.id, | |
| "content": f"## Task Summary\n{task.summary}\n\n## Sources\n{sources_summary}", | |
| }) | |
| enqueue({ | |
| "type": "task_status", | |
| "task_id": task.id, | |
| "status": "completed", | |
| "summary": task.summary, | |
| "sources_summary": task.sources_summary, | |
| "note_id": task.note_id, | |
| "note_path": task.note_path, | |
| }, task=task) | |
| except Exception as exc: | |
| logger.exception("Task execution failed", exc_info=exc) | |
| enqueue({ | |
| "type": "task_status", | |
| "task_id": task.id, | |
| "status": "failed", | |
| "detail": str(exc), | |
| "title": task.title, | |
| "intent": task.intent, | |
| "note_id": task.note_id, | |
| "note_path": task.note_path, | |
| }, task=task) | |
| finally: | |
| enqueue({"type": "__task_done__", "task_id": task.id}) | |
| # Start worker threads | |
| for task in todo_items: | |
| step = channel_map.get(task.id, {}).get("step", 0) | |
| thread = Thread(target=worker, args=(task, step), daemon=True) | |
| threads.append(thread) | |
| thread.start() | |
| # Yield events from queue | |
| active_workers = len(todo_items) | |
| finished_workers = 0 | |
| try: | |
| while finished_workers < active_workers: | |
| event = event_queue.get() | |
| if event.get("type") == "__task_done__": | |
| finished_workers += 1 | |
| continue | |
| yield event | |
| # Drain remaining events | |
| while True: | |
| try: | |
| event = event_queue.get_nowait() | |
| except Empty: | |
| break | |
| if event.get("type") != "__task_done__": | |
| yield event | |
| finally: | |
| self._set_tool_event_sink(None) | |
| for thread in threads: | |
| thread.join() | |
| # Generate final report | |
| yield {"type": "status", "message": "Generating research report..."} | |
| tasks_block = [] | |
| for task in todo_items: | |
| summary_block = task.summary or "No information available" | |
| sources_block = task.sources_summary or "No sources available" | |
| tasks_block.append( | |
| f"### Task {task.id}: {task.title}\n" | |
| f"- Objective: {task.intent}\n" | |
| f"- Search query: {task.query}\n" | |
| f"- Status: {task.status}\n" | |
| f"- Summary:\n{summary_block}\n" | |
| f"- Sources:\n{sources_block}\n" | |
| ) | |
| report_prompt = ( | |
| f"Research topic: {topic}\n" | |
| f"Task overview:\n{''.join(tasks_block)}\n" | |
| "Based on the above task summaries, please write a structured research report." | |
| ) | |
| report_messages = [ | |
| SystemMessage(content=report_writer_instructions.strip()), | |
| HumanMessage(content=report_prompt), | |
| ] | |
| report = self.llm.invoke(report_messages).content | |
| if self.config.strip_thinking_tokens: | |
| report = strip_thinking_tokens(report) | |
| report = report.strip() or "Report generation failed" | |
| # Create conclusion note | |
| report_note_id = None | |
| report_note_path = None | |
| if self.note_tool: | |
| result = self.note_tool.run({ | |
| "action": "create", | |
| "title": f"Research Report: {topic}", | |
| "note_type": "conclusion", | |
| "tags": ["deep_research", "report"], | |
| "content": report, | |
| }) | |
| report_note_id = self._extract_note_id(result) | |
| if report_note_id: | |
| report_note_path = str(Path(self.config.notes_workspace) / f"{report_note_id}.md") | |
| yield { | |
| "type": "final_report", | |
| "report": report, | |
| "note_id": report_note_id, | |
| "note_path": report_note_path, | |
| } | |
| yield {"type": "done"} | |
| def _serialize_task(self, task: TodoItem) -> dict[str, Any]: | |
| """Convert task dataclass to serializable dict for frontend.""" | |
| return { | |
| "id": task.id, | |
| "title": task.title, | |
| "intent": task.intent, | |
| "query": task.query, | |
| "status": task.status, | |
| "summary": task.summary, | |
| "sources_summary": task.sources_summary, | |
| "note_id": task.note_id, | |
| "note_path": task.note_path, | |
| "stream_token": task.stream_token, | |
| } | |
| def run_deep_research(topic: str, config: Configuration | None = None) -> SummaryStateOutput: | |
| """Convenience function mirroring the class-based API.""" | |
| agent = DeepResearchAgent(config=config) | |
| return agent.run(topic) | |