| """Automatic context window compression for long conversations. |
| |
| Self-contained class with its own OpenAI client for summarization. |
| Uses auxiliary model (cheap/fast) to summarize middle turns while |
| protecting head and tail context. |
| |
| Improvements over v1: |
| - Structured summary template (Goal, Progress, Decisions, Files, Next Steps) |
| - Iterative summary updates (preserves info across multiple compactions) |
| - Token-budget tail protection instead of fixed message count |
| - Tool output pruning before LLM summarization (cheap pre-pass) |
| - Scaled summary budget (proportional to compressed content) |
| - Richer tool call/result detail in summarizer input |
| """ |
|
|
| import logging |
| import os |
| from typing import Any, Dict, List, Optional |
|
|
| from agent.auxiliary_client import call_llm |
| from agent.model_metadata import ( |
| get_model_context_length, |
| estimate_messages_tokens_rough, |
| ) |
|
|
| logger = logging.getLogger(__name__) |
|
|
| SUMMARY_PREFIX = ( |
| "[CONTEXT COMPACTION] Earlier turns in this conversation were compacted " |
| "to save context space. The summary below describes work that was " |
| "already completed, and the current session state may still reflect " |
| "that work (for example, files may already be changed). Use the summary " |
| "and the current state to continue from where things left off, and " |
| "avoid repeating work:" |
| ) |
| LEGACY_SUMMARY_PREFIX = "[CONTEXT SUMMARY]:" |
|
|
| |
| _MIN_SUMMARY_TOKENS = 2000 |
| |
| _SUMMARY_RATIO = 0.20 |
| |
| _SUMMARY_TOKENS_CEILING = 12_000 |
|
|
| |
| _PRUNED_TOOL_PLACEHOLDER = "[Old tool output cleared to save context space]" |
|
|
| |
| _CHARS_PER_TOKEN = 4 |
|
|
|
|
| class ContextCompressor: |
| """Compresses conversation context when approaching the model's context limit. |
| |
| Algorithm: |
| 1. Prune old tool results (cheap, no LLM call) |
| 2. Protect head messages (system prompt + first exchange) |
| 3. Protect tail messages by token budget (most recent ~20K tokens) |
| 4. Summarize middle turns with structured LLM prompt |
| 5. On subsequent compactions, iteratively update the previous summary |
| """ |
|
|
| def __init__( |
| self, |
| model: str, |
| threshold_percent: float = 0.50, |
| protect_first_n: int = 3, |
| protect_last_n: int = 20, |
| summary_target_ratio: float = 0.20, |
| quiet_mode: bool = False, |
| summary_model_override: str = None, |
| base_url: str = "", |
| api_key: str = "", |
| config_context_length: int | None = None, |
| provider: str = "", |
| ): |
| self.model = model |
| self.base_url = base_url |
| self.api_key = api_key |
| self.provider = provider |
| self.threshold_percent = threshold_percent |
| self.protect_first_n = protect_first_n |
| self.protect_last_n = protect_last_n |
| self.summary_target_ratio = max(0.10, min(summary_target_ratio, 0.80)) |
| self.quiet_mode = quiet_mode |
|
|
| self.context_length = get_model_context_length( |
| model, base_url=base_url, api_key=api_key, |
| config_context_length=config_context_length, |
| provider=provider, |
| ) |
| self.threshold_tokens = int(self.context_length * threshold_percent) |
| self.compression_count = 0 |
|
|
| |
| target_tokens = int(self.threshold_tokens * self.summary_target_ratio) |
| self.tail_token_budget = target_tokens |
| self.max_summary_tokens = min( |
| int(self.context_length * 0.05), _SUMMARY_TOKENS_CEILING, |
| ) |
|
|
| if not quiet_mode: |
| logger.info( |
| "Context compressor initialized: model=%s context_length=%d " |
| "threshold=%d (%.0f%%) target_ratio=%.0f%% tail_budget=%d " |
| "provider=%s base_url=%s", |
| model, self.context_length, self.threshold_tokens, |
| threshold_percent * 100, self.summary_target_ratio * 100, |
| self.tail_token_budget, |
| provider or "none", base_url or "none", |
| ) |
| self._context_probed = False |
|
|
| self.last_prompt_tokens = 0 |
| self.last_completion_tokens = 0 |
| self.last_total_tokens = 0 |
|
|
| self.summary_model = summary_model_override or "" |
|
|
| |
| self._previous_summary: Optional[str] = None |
|
|
| def update_from_response(self, usage: Dict[str, Any]): |
| """Update tracked token usage from API response.""" |
| self.last_prompt_tokens = usage.get("prompt_tokens", 0) |
| self.last_completion_tokens = usage.get("completion_tokens", 0) |
| self.last_total_tokens = usage.get("total_tokens", 0) |
|
|
| def should_compress(self, prompt_tokens: int = None) -> bool: |
| """Check if context exceeds the compression threshold.""" |
| tokens = prompt_tokens if prompt_tokens is not None else self.last_prompt_tokens |
| return tokens >= self.threshold_tokens |
|
|
| def should_compress_preflight(self, messages: List[Dict[str, Any]]) -> bool: |
| """Quick pre-flight check using rough estimate (before API call).""" |
| rough_estimate = estimate_messages_tokens_rough(messages) |
| return rough_estimate >= self.threshold_tokens |
|
|
| def get_status(self) -> Dict[str, Any]: |
| """Get current compression status for display/logging.""" |
| return { |
| "last_prompt_tokens": self.last_prompt_tokens, |
| "threshold_tokens": self.threshold_tokens, |
| "context_length": self.context_length, |
| "usage_percent": (self.last_prompt_tokens / self.context_length * 100) if self.context_length else 0, |
| "compression_count": self.compression_count, |
| } |
|
|
| |
| |
| |
|
|
| def _prune_old_tool_results( |
| self, messages: List[Dict[str, Any]], protect_tail_count: int, |
| ) -> tuple[List[Dict[str, Any]], int]: |
| """Replace old tool result contents with a short placeholder. |
| |
| Walks backward from the end, protecting the most recent |
| ``protect_tail_count`` messages. Older tool results get their |
| content replaced with a placeholder string. |
| |
| Returns (pruned_messages, pruned_count). |
| """ |
| if not messages: |
| return messages, 0 |
|
|
| result = [m.copy() for m in messages] |
| pruned = 0 |
| prune_boundary = len(result) - protect_tail_count |
|
|
| for i in range(prune_boundary): |
| msg = result[i] |
| if msg.get("role") != "tool": |
| continue |
| content = msg.get("content", "") |
| if not content or content == _PRUNED_TOOL_PLACEHOLDER: |
| continue |
| |
| if len(content) > 200: |
| result[i] = {**msg, "content": _PRUNED_TOOL_PLACEHOLDER} |
| pruned += 1 |
|
|
| return result, pruned |
|
|
| |
| |
| |
|
|
| def _compute_summary_budget(self, turns_to_summarize: List[Dict[str, Any]]) -> int: |
| """Scale summary token budget with the amount of content being compressed. |
| |
| The maximum scales with the model's context window (5% of context, |
| capped at ``_SUMMARY_TOKENS_CEILING``) so large-context models get |
| richer summaries instead of being hard-capped at 8K tokens. |
| """ |
| content_tokens = estimate_messages_tokens_rough(turns_to_summarize) |
| budget = int(content_tokens * _SUMMARY_RATIO) |
| return max(_MIN_SUMMARY_TOKENS, min(budget, self.max_summary_tokens)) |
|
|
| def _serialize_for_summary(self, turns: List[Dict[str, Any]]) -> str: |
| """Serialize conversation turns into labeled text for the summarizer. |
| |
| Includes tool call arguments and result content (up to 3000 chars |
| per message) so the summarizer can preserve specific details like |
| file paths, commands, and outputs. |
| """ |
| parts = [] |
| for msg in turns: |
| role = msg.get("role", "unknown") |
| content = msg.get("content") or "" |
|
|
| |
| if role == "tool": |
| tool_id = msg.get("tool_call_id", "") |
| if len(content) > 3000: |
| content = content[:2000] + "\n...[truncated]...\n" + content[-800:] |
| parts.append(f"[TOOL RESULT {tool_id}]: {content}") |
| continue |
|
|
| |
| if role == "assistant": |
| if len(content) > 3000: |
| content = content[:2000] + "\n...[truncated]...\n" + content[-800:] |
| tool_calls = msg.get("tool_calls", []) |
| if tool_calls: |
| tc_parts = [] |
| for tc in tool_calls: |
| if isinstance(tc, dict): |
| fn = tc.get("function", {}) |
| name = fn.get("name", "?") |
| args = fn.get("arguments", "") |
| |
| if len(args) > 500: |
| args = args[:400] + "..." |
| tc_parts.append(f" {name}({args})") |
| else: |
| fn = getattr(tc, "function", None) |
| name = getattr(fn, "name", "?") if fn else "?" |
| tc_parts.append(f" {name}(...)") |
| content += "\n[Tool calls:\n" + "\n".join(tc_parts) + "\n]" |
| parts.append(f"[ASSISTANT]: {content}") |
| continue |
|
|
| |
| if len(content) > 3000: |
| content = content[:2000] + "\n...[truncated]...\n" + content[-800:] |
| parts.append(f"[{role.upper()}]: {content}") |
|
|
| return "\n\n".join(parts) |
|
|
| def _generate_summary(self, turns_to_summarize: List[Dict[str, Any]]) -> Optional[str]: |
| """Generate a structured summary of conversation turns. |
| |
| Uses a structured template (Goal, Progress, Decisions, Files, Next Steps) |
| inspired by Pi-mono and OpenCode. When a previous summary exists, |
| generates an iterative update instead of summarizing from scratch. |
| |
| Returns None if all attempts fail — the caller should drop |
| the middle turns without a summary rather than inject a useless |
| placeholder. |
| """ |
| summary_budget = self._compute_summary_budget(turns_to_summarize) |
| content_to_summarize = self._serialize_for_summary(turns_to_summarize) |
|
|
| if self._previous_summary: |
| |
| prompt = f"""You are updating a context compaction summary. A previous compaction produced the summary below. New conversation turns have occurred since then and need to be incorporated. |
| |
| PREVIOUS SUMMARY: |
| {self._previous_summary} |
| |
| NEW TURNS TO INCORPORATE: |
| {content_to_summarize} |
| |
| Update the summary using this exact structure. PRESERVE all existing information that is still relevant. ADD new progress. Move items from "In Progress" to "Done" when completed. Remove information only if it is clearly obsolete. |
| |
| ## Goal |
| [What the user is trying to accomplish — preserve from previous summary, update if goal evolved] |
| |
| ## Constraints & Preferences |
| [User preferences, coding style, constraints, important decisions — accumulate across compactions] |
| |
| ## Progress |
| ### Done |
| [Completed work — include specific file paths, commands run, results obtained] |
| ### In Progress |
| [Work currently underway] |
| ### Blocked |
| [Any blockers or issues encountered] |
| |
| ## Key Decisions |
| [Important technical decisions and why they were made] |
| |
| ## Relevant Files |
| [Files read, modified, or created — with brief note on each. Accumulate across compactions.] |
| |
| ## Next Steps |
| [What needs to happen next to continue the work] |
| |
| ## Critical Context |
| [Any specific values, error messages, configuration details, or data that would be lost without explicit preservation] |
| |
| Target ~{summary_budget} tokens. Be specific — include file paths, command outputs, error messages, and concrete values rather than vague descriptions. |
| |
| Write only the summary body. Do not include any preamble or prefix.""" |
| else: |
| |
| prompt = f"""Create a structured handoff summary for a later assistant that will continue this conversation after earlier turns are compacted. |
| |
| TURNS TO SUMMARIZE: |
| {content_to_summarize} |
| |
| Use this exact structure: |
| |
| ## Goal |
| [What the user is trying to accomplish] |
| |
| ## Constraints & Preferences |
| [User preferences, coding style, constraints, important decisions] |
| |
| ## Progress |
| ### Done |
| [Completed work — include specific file paths, commands run, results obtained] |
| ### In Progress |
| [Work currently underway] |
| ### Blocked |
| [Any blockers or issues encountered] |
| |
| ## Key Decisions |
| [Important technical decisions and why they were made] |
| |
| ## Relevant Files |
| [Files read, modified, or created — with brief note on each] |
| |
| ## Next Steps |
| [What needs to happen next to continue the work] |
| |
| ## Critical Context |
| [Any specific values, error messages, configuration details, or data that would be lost without explicit preservation] |
| |
| Target ~{summary_budget} tokens. Be specific — include file paths, command outputs, error messages, and concrete values rather than vague descriptions. The goal is to prevent the next assistant from repeating work or losing important details. |
| |
| Write only the summary body. Do not include any preamble or prefix.""" |
|
|
| try: |
| call_kwargs = { |
| "task": "compression", |
| "messages": [{"role": "user", "content": prompt}], |
| "temperature": 0.3, |
| "max_tokens": summary_budget * 2, |
| "timeout": 45.0, |
| } |
| if self.summary_model: |
| call_kwargs["model"] = self.summary_model |
| response = call_llm(**call_kwargs) |
| content = response.choices[0].message.content |
| |
| if not isinstance(content, str): |
| content = str(content) if content else "" |
| summary = content.strip() |
| |
| self._previous_summary = summary |
| return self._with_summary_prefix(summary) |
| except RuntimeError: |
| logging.warning("Context compression: no provider available for " |
| "summary. Middle turns will be dropped without summary.") |
| return None |
| except Exception as e: |
| logging.warning("Failed to generate context summary: %s", e) |
| return None |
|
|
| @staticmethod |
| def _with_summary_prefix(summary: str) -> str: |
| """Normalize summary text to the current compaction handoff format.""" |
| text = (summary or "").strip() |
| for prefix in (LEGACY_SUMMARY_PREFIX, SUMMARY_PREFIX): |
| if text.startswith(prefix): |
| text = text[len(prefix):].lstrip() |
| break |
| return f"{SUMMARY_PREFIX}\n{text}" if text else SUMMARY_PREFIX |
|
|
| |
| |
| |
|
|
| @staticmethod |
| def _get_tool_call_id(tc) -> str: |
| """Extract the call ID from a tool_call entry (dict or SimpleNamespace).""" |
| if isinstance(tc, dict): |
| return tc.get("id", "") |
| return getattr(tc, "id", "") or "" |
|
|
| def _sanitize_tool_pairs(self, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]: |
| """Fix orphaned tool_call / tool_result pairs after compression. |
| |
| Two failure modes: |
| 1. A tool *result* references a call_id whose assistant tool_call was |
| removed (summarized/truncated). The API rejects this with |
| "No tool call found for function call output with call_id ...". |
| 2. An assistant message has tool_calls whose results were dropped. |
| The API rejects this because every tool_call must be followed by |
| a tool result with the matching call_id. |
| |
| This method removes orphaned results and inserts stub results for |
| orphaned calls so the message list is always well-formed. |
| """ |
| surviving_call_ids: set = set() |
| for msg in messages: |
| if msg.get("role") == "assistant": |
| for tc in msg.get("tool_calls") or []: |
| cid = self._get_tool_call_id(tc) |
| if cid: |
| surviving_call_ids.add(cid) |
|
|
| result_call_ids: set = set() |
| for msg in messages: |
| if msg.get("role") == "tool": |
| cid = msg.get("tool_call_id") |
| if cid: |
| result_call_ids.add(cid) |
|
|
| |
| orphaned_results = result_call_ids - surviving_call_ids |
| if orphaned_results: |
| messages = [ |
| m for m in messages |
| if not (m.get("role") == "tool" and m.get("tool_call_id") in orphaned_results) |
| ] |
| if not self.quiet_mode: |
| logger.info("Compression sanitizer: removed %d orphaned tool result(s)", len(orphaned_results)) |
|
|
| |
| missing_results = surviving_call_ids - result_call_ids |
| if missing_results: |
| patched: List[Dict[str, Any]] = [] |
| for msg in messages: |
| patched.append(msg) |
| if msg.get("role") == "assistant": |
| for tc in msg.get("tool_calls") or []: |
| cid = self._get_tool_call_id(tc) |
| if cid in missing_results: |
| patched.append({ |
| "role": "tool", |
| "content": "[Result from earlier conversation — see context summary above]", |
| "tool_call_id": cid, |
| }) |
| messages = patched |
| if not self.quiet_mode: |
| logger.info("Compression sanitizer: added %d stub tool result(s)", len(missing_results)) |
|
|
| return messages |
|
|
| def _align_boundary_forward(self, messages: List[Dict[str, Any]], idx: int) -> int: |
| """Push a compress-start boundary forward past any orphan tool results. |
| |
| If ``messages[idx]`` is a tool result, slide forward until we hit a |
| non-tool message so we don't start the summarised region mid-group. |
| """ |
| while idx < len(messages) and messages[idx].get("role") == "tool": |
| idx += 1 |
| return idx |
|
|
| def _align_boundary_backward(self, messages: List[Dict[str, Any]], idx: int) -> int: |
| """Pull a compress-end boundary backward to avoid splitting a |
| tool_call / result group. |
| |
| If the boundary falls in the middle of a tool-result group (i.e. |
| there are consecutive tool messages before ``idx``), walk backward |
| past all of them to find the parent assistant message. If found, |
| move the boundary before the assistant so the entire |
| assistant + tool_results group is included in the summarised region |
| rather than being split (which causes silent data loss when |
| ``_sanitize_tool_pairs`` removes the orphaned tail results). |
| """ |
| if idx <= 0 or idx >= len(messages): |
| return idx |
| |
| check = idx - 1 |
| while check >= 0 and messages[check].get("role") == "tool": |
| check -= 1 |
| |
| |
| if check >= 0 and messages[check].get("role") == "assistant" and messages[check].get("tool_calls"): |
| idx = check |
| return idx |
|
|
| |
| |
| |
|
|
| def _find_tail_cut_by_tokens( |
| self, messages: List[Dict[str, Any]], head_end: int, |
| token_budget: int | None = None, |
| ) -> int: |
| """Walk backward from the end of messages, accumulating tokens until |
| the budget is reached. Returns the index where the tail starts. |
| |
| ``token_budget`` defaults to ``self.tail_token_budget`` which is |
| derived from ``summary_target_ratio * context_length``, so it |
| scales automatically with the model's context window. |
| |
| Never cuts inside a tool_call/result group. Falls back to the old |
| ``protect_last_n`` if the budget would protect fewer messages. |
| """ |
| if token_budget is None: |
| token_budget = self.tail_token_budget |
| n = len(messages) |
| min_tail = self.protect_last_n |
| accumulated = 0 |
| cut_idx = n |
|
|
| for i in range(n - 1, head_end - 1, -1): |
| msg = messages[i] |
| content = msg.get("content") or "" |
| msg_tokens = len(content) // _CHARS_PER_TOKEN + 10 |
| |
| for tc in msg.get("tool_calls") or []: |
| if isinstance(tc, dict): |
| args = tc.get("function", {}).get("arguments", "") |
| msg_tokens += len(args) // _CHARS_PER_TOKEN |
| if accumulated + msg_tokens > token_budget and (n - i) >= min_tail: |
| break |
| accumulated += msg_tokens |
| cut_idx = i |
|
|
| |
| fallback_cut = n - min_tail |
| if cut_idx > fallback_cut: |
| cut_idx = fallback_cut |
|
|
| |
| |
| |
| if cut_idx <= head_end: |
| cut_idx = fallback_cut |
|
|
| |
| cut_idx = self._align_boundary_backward(messages, cut_idx) |
|
|
| return max(cut_idx, head_end + 1) |
|
|
| |
| |
| |
|
|
| def compress(self, messages: List[Dict[str, Any]], current_tokens: int = None) -> List[Dict[str, Any]]: |
| """Compress conversation messages by summarizing middle turns. |
| |
| Algorithm: |
| 1. Prune old tool results (cheap pre-pass, no LLM call) |
| 2. Protect head messages (system prompt + first exchange) |
| 3. Find tail boundary by token budget (~20K tokens of recent context) |
| 4. Summarize middle turns with structured LLM prompt |
| 5. On re-compression, iteratively update the previous summary |
| |
| After compression, orphaned tool_call / tool_result pairs are cleaned |
| up so the API never receives mismatched IDs. |
| """ |
| n_messages = len(messages) |
| if n_messages <= self.protect_first_n + self.protect_last_n + 1: |
| if not self.quiet_mode: |
| logger.warning( |
| "Cannot compress: only %d messages (need > %d)", |
| n_messages, |
| self.protect_first_n + self.protect_last_n + 1, |
| ) |
| return messages |
|
|
| display_tokens = current_tokens if current_tokens else self.last_prompt_tokens or estimate_messages_tokens_rough(messages) |
|
|
| |
| messages, pruned_count = self._prune_old_tool_results( |
| messages, protect_tail_count=self.protect_last_n * 3, |
| ) |
| if pruned_count and not self.quiet_mode: |
| logger.info("Pre-compression: pruned %d old tool result(s)", pruned_count) |
|
|
| |
| compress_start = self.protect_first_n |
| compress_start = self._align_boundary_forward(messages, compress_start) |
|
|
| |
| compress_end = self._find_tail_cut_by_tokens(messages, compress_start) |
|
|
| if compress_start >= compress_end: |
| return messages |
|
|
| turns_to_summarize = messages[compress_start:compress_end] |
|
|
| if not self.quiet_mode: |
| logger.info( |
| "Context compression triggered (%d tokens >= %d threshold)", |
| display_tokens, |
| self.threshold_tokens, |
| ) |
| logger.info( |
| "Model context limit: %d tokens (%.0f%% = %d)", |
| self.context_length, |
| self.threshold_percent * 100, |
| self.threshold_tokens, |
| ) |
| tail_msgs = n_messages - compress_end |
| logger.info( |
| "Summarizing turns %d-%d (%d turns), protecting %d head + %d tail messages", |
| compress_start + 1, |
| compress_end, |
| len(turns_to_summarize), |
| compress_start, |
| tail_msgs, |
| ) |
|
|
| |
| summary = self._generate_summary(turns_to_summarize) |
|
|
| |
| compressed = [] |
| for i in range(compress_start): |
| msg = messages[i].copy() |
| if i == 0 and msg.get("role") == "system" and self.compression_count == 0: |
| msg["content"] = ( |
| (msg.get("content") or "") |
| + "\n\n[Note: Some earlier conversation turns have been compacted into a handoff summary to preserve context space. The current session state may still reflect earlier work, so build on that summary and state rather than re-doing work.]" |
| ) |
| compressed.append(msg) |
|
|
| _merge_summary_into_tail = False |
| if summary: |
| last_head_role = messages[compress_start - 1].get("role", "user") if compress_start > 0 else "user" |
| first_tail_role = messages[compress_end].get("role", "user") if compress_end < n_messages else "user" |
| |
| |
| if last_head_role in ("assistant", "tool"): |
| summary_role = "user" |
| else: |
| summary_role = "assistant" |
| |
| |
| if summary_role == first_tail_role: |
| flipped = "assistant" if summary_role == "user" else "user" |
| if flipped != last_head_role: |
| summary_role = flipped |
| else: |
| |
| |
| |
| |
| _merge_summary_into_tail = True |
| if not _merge_summary_into_tail: |
| compressed.append({"role": summary_role, "content": summary}) |
| else: |
| if not self.quiet_mode: |
| logger.warning("No summary model available — middle turns dropped without summary") |
|
|
| for i in range(compress_end, n_messages): |
| msg = messages[i].copy() |
| if _merge_summary_into_tail and i == compress_end: |
| original = msg.get("content") or "" |
| msg["content"] = summary + "\n\n" + original |
| _merge_summary_into_tail = False |
| compressed.append(msg) |
|
|
| self.compression_count += 1 |
|
|
| compressed = self._sanitize_tool_pairs(compressed) |
|
|
| if not self.quiet_mode: |
| new_estimate = estimate_messages_tokens_rough(compressed) |
| saved_estimate = display_tokens - new_estimate |
| logger.info( |
| "Compressed: %d -> %d messages (~%d tokens saved)", |
| n_messages, |
| len(compressed), |
| saved_estimate, |
| ) |
| logger.info("Compression #%d complete", self.compression_count) |
|
|
| return compressed |
|
|