pikamomo's picture
Initial deployment
a60c0af
"""Task summarization utilities."""
from __future__ import annotations
from collections.abc import Iterator
from typing import Tuple, Callable
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
from models import SummaryState, TodoItem
from config import Configuration
from utils import strip_thinking_tokens
from services.notes import build_note_guidance
from services.text_processing import strip_tool_calls
from prompts import task_summarizer_instructions
def summarize_task(
llm: ChatOpenAI,
state: SummaryState,
task: TodoItem,
context: str,
config: Configuration,
) -> str:
"""Generate a task-specific summary using the LLM."""
prompt = _build_prompt(state, task, context)
messages = [
SystemMessage(content=task_summarizer_instructions.strip()),
HumanMessage(content=prompt),
]
response = llm.invoke(messages)
summary_text = response.content.strip()
if config.strip_thinking_tokens:
summary_text = strip_thinking_tokens(summary_text)
summary_text = strip_tool_calls(summary_text).strip()
return summary_text or "No information available"
def stream_task_summary(
llm: ChatOpenAI,
state: SummaryState,
task: TodoItem,
context: str,
config: Configuration,
) -> Tuple[Iterator[str], Callable[[], str]]:
"""Stream the summary text for a task while collecting full output."""
prompt = _build_prompt(state, task, context)
remove_thinking = config.strip_thinking_tokens
raw_buffer = ""
visible_output = ""
emit_index = 0
messages = [
SystemMessage(content=task_summarizer_instructions.strip()),
HumanMessage(content=prompt),
]
def flush_visible() -> Iterator[str]:
nonlocal emit_index, raw_buffer
while True:
start = raw_buffer.find("<think>", emit_index)
if start == -1:
if emit_index < len(raw_buffer):
segment = raw_buffer[emit_index:]
emit_index = len(raw_buffer)
if segment:
yield segment
break
if start > emit_index:
segment = raw_buffer[emit_index:start]
emit_index = start
if segment:
yield segment
end = raw_buffer.find("</think>", start)
if end == -1:
break
emit_index = end + len("</think>")
def generator() -> Iterator[str]:
nonlocal raw_buffer, visible_output, emit_index
try:
for chunk in llm.stream(messages):
chunk_text = chunk.content
if not chunk_text:
continue
raw_buffer += chunk_text
if remove_thinking:
for segment in flush_visible():
visible_output += segment
if segment:
yield segment
else:
visible_output += chunk_text
if chunk_text:
yield chunk_text
finally:
if remove_thinking:
for segment in flush_visible():
visible_output += segment
if segment:
yield segment
def get_summary() -> str:
if remove_thinking:
cleaned = strip_thinking_tokens(visible_output)
else:
cleaned = visible_output
return strip_tool_calls(cleaned).strip()
return generator(), get_summary
def _build_prompt(state: SummaryState, task: TodoItem, context: str) -> str:
"""Construct the summarization prompt shared by both modes."""
return (
f"Task topic: {state.research_topic}\n"
f"Task name: {task.title}\n"
f"Task objective: {task.intent}\n"
f"Search query: {task.query}\n"
f"Task context:\n{context}\n"
f"{build_note_guidance(task)}\n"
"Please follow the above collaboration requirements to sync notes first, then return a user-facing Markdown summary (still following the task summary template)."
)