"""Task summarization utilities.""" from __future__ import annotations from collections.abc import Iterator from typing import Tuple, Callable from langchain_openai import ChatOpenAI from langchain_core.messages import HumanMessage, SystemMessage from models import SummaryState, TodoItem from config import Configuration from utils import strip_thinking_tokens from services.notes import build_note_guidance from services.text_processing import strip_tool_calls from prompts import task_summarizer_instructions def summarize_task( llm: ChatOpenAI, state: SummaryState, task: TodoItem, context: str, config: Configuration, ) -> str: """Generate a task-specific summary using the LLM.""" prompt = _build_prompt(state, task, context) messages = [ SystemMessage(content=task_summarizer_instructions.strip()), HumanMessage(content=prompt), ] response = llm.invoke(messages) summary_text = response.content.strip() if config.strip_thinking_tokens: summary_text = strip_thinking_tokens(summary_text) summary_text = strip_tool_calls(summary_text).strip() return summary_text or "No information available" def stream_task_summary( llm: ChatOpenAI, state: SummaryState, task: TodoItem, context: str, config: Configuration, ) -> Tuple[Iterator[str], Callable[[], str]]: """Stream the summary text for a task while collecting full output.""" prompt = _build_prompt(state, task, context) remove_thinking = config.strip_thinking_tokens raw_buffer = "" visible_output = "" emit_index = 0 messages = [ SystemMessage(content=task_summarizer_instructions.strip()), HumanMessage(content=prompt), ] def flush_visible() -> Iterator[str]: nonlocal emit_index, raw_buffer while True: start = raw_buffer.find("", emit_index) if start == -1: if emit_index < len(raw_buffer): segment = raw_buffer[emit_index:] emit_index = len(raw_buffer) if segment: yield segment break if start > emit_index: segment = raw_buffer[emit_index:start] emit_index = start if segment: yield segment end = raw_buffer.find("", start) if end == -1: break emit_index = end + len("") def generator() -> Iterator[str]: nonlocal raw_buffer, visible_output, emit_index try: for chunk in llm.stream(messages): chunk_text = chunk.content if not chunk_text: continue raw_buffer += chunk_text if remove_thinking: for segment in flush_visible(): visible_output += segment if segment: yield segment else: visible_output += chunk_text if chunk_text: yield chunk_text finally: if remove_thinking: for segment in flush_visible(): visible_output += segment if segment: yield segment def get_summary() -> str: if remove_thinking: cleaned = strip_thinking_tokens(visible_output) else: cleaned = visible_output return strip_tool_calls(cleaned).strip() return generator(), get_summary def _build_prompt(state: SummaryState, task: TodoItem, context: str) -> str: """Construct the summarization prompt shared by both modes.""" return ( f"Task topic: {state.research_topic}\n" f"Task name: {task.title}\n" f"Task objective: {task.intent}\n" f"Search query: {task.query}\n" f"Task context:\n{context}\n" f"{build_note_guidance(task)}\n" "Please follow the above collaboration requirements to sync notes first, then return a user-facing Markdown summary (still following the task summary template)." )