Spaces:
Sleeping
Sleeping
| """ | |
| Build bounded conversation history for unified chat sessions. | |
| """ | |
| from __future__ import annotations | |
| from dataclasses import dataclass | |
| from typing import Any, Awaitable, Callable | |
| from deeptutor.agents.base_agent import BaseAgent | |
| from deeptutor.core.stream import StreamEvent, StreamEventType | |
| from deeptutor.core.trace import build_trace_metadata, merge_trace_metadata, new_call_id | |
| from deeptutor.services.llm.config import LLMConfig | |
| from .sqlite_store import SQLiteSessionStore | |
| def count_tokens(text: str) -> int: | |
| """Estimate token count with tiktoken when available.""" | |
| if not text: | |
| return 0 | |
| try: | |
| import tiktoken | |
| encoding = tiktoken.get_encoding("cl100k_base") | |
| return len(encoding.encode(text)) | |
| except Exception: | |
| return max(1, len(text) // 4) | |
| def format_messages_as_transcript(messages: list[dict[str, Any]]) -> str: | |
| lines: list[str] = [] | |
| role_map = { | |
| "user": "User", | |
| "assistant": "Assistant", | |
| "system": "System", | |
| } | |
| for item in messages: | |
| content = str(item.get("content", "") or "").strip() | |
| if not content: | |
| continue | |
| role = role_map.get(str(item.get("role", "user")), "User") | |
| lines.append(f"{role}: {content}") | |
| return "\n\n".join(lines) | |
| def build_history_text(history: list[dict[str, Any]]) -> str: | |
| lines: list[str] = [] | |
| for item in history: | |
| role = str(item.get("role", "user")) | |
| content = str(item.get("content", "") or "").strip() | |
| if not content: | |
| continue | |
| if role == "system": | |
| lines.append(f"Conversation summary:\n{content}") | |
| elif role == "assistant": | |
| lines.append(f"Assistant: {content}") | |
| else: | |
| lines.append(f"User: {content}") | |
| return "\n\n".join(lines) | |
| class ContextBuildResult: | |
| conversation_history: list[dict[str, Any]] | |
| conversation_summary: str | |
| context_text: str | |
| events: list[StreamEvent] | |
| token_count: int | |
| budget: int | |
| class _ContextSummaryAgent(BaseAgent): | |
| """Small helper agent for compressing older conversation turns.""" | |
| def __init__(self, language: str = "en") -> None: | |
| super().__init__( | |
| module_name="chat", | |
| agent_name="context_summary_agent", | |
| language=language, | |
| ) | |
| async def process(self, *_args, **_kwargs) -> dict[str, Any]: | |
| raise NotImplementedError | |
| class ContextBuilder: | |
| """Construct a bounded conversation history plus optional summary trace.""" | |
| def __init__( | |
| self, | |
| store: SQLiteSessionStore, | |
| history_budget_ratio: float = 0.35, | |
| summary_target_ratio: float = 0.40, | |
| ) -> None: | |
| self.store = store | |
| self.history_budget_ratio = history_budget_ratio | |
| self.summary_target_ratio = summary_target_ratio | |
| def _history_budget(self, llm_config: LLMConfig) -> int: | |
| configured = int(getattr(llm_config, "max_tokens", 4096) or 4096) | |
| return max(256, int(configured * self.history_budget_ratio)) | |
| def _summary_budget(self, budget: int) -> int: | |
| return max(96, int(budget * self.summary_target_ratio)) | |
| def _recent_budget(self, budget: int) -> int: | |
| return max(128, budget - self._summary_budget(budget)) | |
| def _build_history(self, summary: str, messages: list[dict[str, Any]]) -> list[dict[str, Any]]: | |
| history: list[dict[str, Any]] = [] | |
| cleaned_summary = summary.strip() | |
| if cleaned_summary: | |
| history.append({"role": "system", "content": cleaned_summary}) | |
| history.extend( | |
| { | |
| "role": item.get("role", "user"), | |
| "content": str(item.get("content", "") or ""), | |
| } | |
| for item in messages | |
| if item.get("role") in {"user", "assistant", "system"} | |
| and str(item.get("content", "") or "").strip() | |
| ) | |
| return history | |
| async def _append_event( | |
| self, | |
| events: list[StreamEvent], | |
| event: StreamEvent, | |
| on_event: Callable[[StreamEvent], Awaitable[None]] | None = None, | |
| ) -> None: | |
| events.append(event) | |
| if on_event is not None: | |
| await on_event(event) | |
| def _select_recent_messages( | |
| self, | |
| messages: list[dict[str, Any]], | |
| recent_budget: int, | |
| ) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]: | |
| selected: list[dict[str, Any]] = [] | |
| total = 0 | |
| for item in reversed(messages): | |
| content = str(item.get("content", "") or "") | |
| tokens = count_tokens(content) | |
| if selected and total + tokens > recent_budget: | |
| break | |
| selected.insert(0, item) | |
| total += tokens | |
| cutoff = len(messages) - len(selected) | |
| return messages[:cutoff], selected | |
| async def _summarize( | |
| self, | |
| *, | |
| session_id: str, | |
| language: str, | |
| source_text: str, | |
| summary_budget: int, | |
| on_event: Callable[[StreamEvent], Awaitable[None]] | None = None, | |
| ) -> tuple[str, list[StreamEvent]]: | |
| events: list[StreamEvent] = [] | |
| if not source_text.strip(): | |
| return "", events | |
| agent = _ContextSummaryAgent(language=language) | |
| trace_meta = build_trace_metadata( | |
| call_id=new_call_id("context-summary"), | |
| phase="summarize_context", | |
| label="Summarize context", | |
| call_kind="llm_summarization", | |
| trace_id=session_id, | |
| ) | |
| async def _trace_bridge(update: dict[str, Any]) -> None: | |
| if str(update.get("event", "")) != "llm_call": | |
| return | |
| state = str(update.get("state", "running")) | |
| metadata = { | |
| key: value | |
| for key, value in update.items() | |
| if key not in {"event", "state", "response", "chunk"} | |
| } | |
| if state == "running": | |
| await self._append_event( | |
| events, | |
| StreamEvent( | |
| type=StreamEventType.PROGRESS, | |
| source="context_builder", | |
| stage="summarize_context", | |
| content="Compressing conversation history...", | |
| metadata=merge_trace_metadata( | |
| metadata, | |
| {"trace_kind": "call_status", "call_state": "running"}, | |
| ), | |
| ), | |
| on_event, | |
| ) | |
| elif state == "complete": | |
| response = str(update.get("response", "") or "") | |
| if response: | |
| await self._append_event( | |
| events, | |
| StreamEvent( | |
| type=StreamEventType.CONTENT, | |
| source="context_builder", | |
| stage="summarize_context", | |
| content=response, | |
| metadata=merge_trace_metadata( | |
| metadata, | |
| {"trace_kind": "llm_output"}, | |
| ), | |
| ), | |
| on_event, | |
| ) | |
| await self._append_event( | |
| events, | |
| StreamEvent( | |
| type=StreamEventType.PROGRESS, | |
| source="context_builder", | |
| stage="summarize_context", | |
| content="", | |
| metadata=merge_trace_metadata( | |
| metadata, | |
| {"trace_kind": "call_status", "call_state": "complete"}, | |
| ), | |
| ), | |
| on_event, | |
| ) | |
| elif state == "error": | |
| await self._append_event( | |
| events, | |
| StreamEvent( | |
| type=StreamEventType.ERROR, | |
| source="context_builder", | |
| stage="summarize_context", | |
| content=str(update.get("response", "") or "Context summarization failed."), | |
| metadata=merge_trace_metadata(metadata, {"call_state": "error"}), | |
| ), | |
| on_event, | |
| ) | |
| agent.set_trace_callback(_trace_bridge) | |
| await self._append_event( | |
| events, | |
| StreamEvent( | |
| type=StreamEventType.STAGE_START, | |
| source="context_builder", | |
| stage="summarize_context", | |
| metadata=trace_meta, | |
| ), | |
| on_event, | |
| ) | |
| system_prompt = ( | |
| "You compress conversation history for future turns. Preserve factual context, " | |
| "user goals, constraints, decisions, unresolved questions, and any capability " | |
| "switches. Keep the summary concise and faithful. Use bullet points only if useful." | |
| ) | |
| if language.startswith("zh"): | |
| system_prompt = ( | |
| "你负责把对话历史压缩成后续轮次可直接使用的上下文。保留用户目标、约束、已做决定、" | |
| "未解决问题,以及能力切换带来的关键信息。总结要忠实、紧凑,不要虚构。" | |
| ) | |
| user_prompt = ( | |
| f"Compress the following conversation history into <= {summary_budget} tokens.\n\n" | |
| f"{source_text}" | |
| ) | |
| if language.startswith("zh"): | |
| user_prompt = ( | |
| f"请把下面的对话历史压缩到不超过 {summary_budget} tokens 的长度," | |
| "供后续对话直接继承上下文。\n\n" | |
| f"{source_text}" | |
| ) | |
| try: | |
| _chunks: list[str] = [] | |
| async for _c in agent.stream_llm( | |
| user_prompt=user_prompt, | |
| system_prompt=system_prompt, | |
| max_tokens=summary_budget, | |
| stage="summarize_context", | |
| trace_meta=trace_meta, | |
| ): | |
| _chunks.append(_c) | |
| summary = "".join(_chunks) | |
| return summary.strip(), events | |
| finally: | |
| await self._append_event( | |
| events, | |
| StreamEvent( | |
| type=StreamEventType.STAGE_END, | |
| source="context_builder", | |
| stage="summarize_context", | |
| metadata=trace_meta, | |
| ), | |
| on_event, | |
| ) | |
| async def build( | |
| self, | |
| *, | |
| session_id: str, | |
| llm_config: LLMConfig, | |
| language: str = "en", | |
| on_event: Callable[[StreamEvent], Awaitable[None]] | None = None, | |
| ) -> ContextBuildResult: | |
| session = await self.store.get_session(session_id) | |
| messages = await self.store.get_messages_for_context(session_id) | |
| if session is None: | |
| return ContextBuildResult([], "", "", [], 0, self._history_budget(llm_config)) | |
| budget = self._history_budget(llm_config) | |
| summary_budget = self._summary_budget(budget) | |
| recent_budget = self._recent_budget(budget) | |
| stored_summary = str(session.get("compressed_summary", "") or "").strip() | |
| summary_up_to_msg_id = int(session.get("summary_up_to_msg_id", 0) or 0) | |
| unsummarized = [item for item in messages if int(item.get("id", 0) or 0) > summary_up_to_msg_id] | |
| current_history = self._build_history(stored_summary, unsummarized) | |
| current_tokens = count_tokens(build_history_text(current_history)) | |
| if current_tokens <= budget: | |
| return ContextBuildResult( | |
| conversation_history=current_history, | |
| conversation_summary=stored_summary, | |
| context_text=build_history_text(current_history), | |
| events=[], | |
| token_count=current_tokens, | |
| budget=budget, | |
| ) | |
| older_unsummarized, recent_messages = self._select_recent_messages(unsummarized, recent_budget) | |
| merge_parts: list[str] = [] | |
| if stored_summary: | |
| merge_parts.append(f"Existing summary:\n{stored_summary}") | |
| older_transcript = format_messages_as_transcript(older_unsummarized) | |
| if older_transcript: | |
| merge_parts.append(f"Older turns to fold in:\n{older_transcript}") | |
| if not merge_parts and recent_messages: | |
| merge_parts.append(format_messages_as_transcript(recent_messages)) | |
| try: | |
| new_summary, events = await self._summarize( | |
| session_id=session_id, | |
| language=language, | |
| source_text="\n\n".join(part for part in merge_parts if part.strip()), | |
| summary_budget=summary_budget, | |
| on_event=on_event, | |
| ) | |
| except Exception: | |
| new_summary = stored_summary | |
| events = [] | |
| up_to_msg_id = summary_up_to_msg_id | |
| if older_unsummarized: | |
| up_to_msg_id = int(older_unsummarized[-1]["id"]) | |
| elif stored_summary: | |
| up_to_msg_id = summary_up_to_msg_id | |
| if new_summary: | |
| await self.store.update_summary(session_id, new_summary, up_to_msg_id) | |
| stored_summary = new_summary | |
| final_history = self._build_history(stored_summary, recent_messages) | |
| while len(final_history) > 1 and count_tokens(build_history_text(final_history)) > budget: | |
| summary_prefix = 1 if final_history and final_history[0].get("role") == "system" else 0 | |
| if len(final_history) <= summary_prefix + 1: | |
| break | |
| final_history.pop(summary_prefix) | |
| final_text = build_history_text(final_history) | |
| return ContextBuildResult( | |
| conversation_history=final_history, | |
| conversation_summary=stored_summary, | |
| context_text=final_text, | |
| events=events, | |
| token_count=count_tokens(final_text), | |
| budget=budget, | |
| ) | |
| __all__ = [ | |
| "ContextBuildResult", | |
| "ContextBuilder", | |
| "build_history_text", | |
| "count_tokens", | |
| "format_messages_as_transcript", | |
| ] | |