| """ |
| Context management for conversation history |
| """ |
|
|
| import logging |
| import time |
| import zoneinfo |
| from datetime import datetime |
| from pathlib import Path |
| from typing import Any |
|
|
| import yaml |
| from jinja2 import Template |
| from litellm import Message, acompletion |
|
|
| from agent.core.prompt_caching import with_prompt_caching |
|
|
| logger = logging.getLogger(__name__) |
|
|
| _HF_WHOAMI_URL = "https://huggingface.co/api/whoami-v2" |
| _HF_WHOAMI_TIMEOUT = 5 |
|
|
|
|
| def _get_hf_username(hf_token: str | None = None) -> str: |
| """Return the HF username for the given token. |
| |
| Uses subprocess + curl to avoid Python HTTP client IPv6 issues that |
| cause 40+ second hangs (httpx/urllib try IPv6 first which times out |
| at OS level before falling back to IPv4 — the "Happy Eyeballs" problem). |
| """ |
| import json |
| import subprocess |
| import time as _t |
|
|
| if not hf_token: |
| logger.warning("No hf_token provided, using 'unknown' as username") |
| return "unknown" |
|
|
| t0 = _t.monotonic() |
| try: |
| result = subprocess.run( |
| [ |
| "curl", |
| "-s", |
| "-4", |
| "-m", |
| str(_HF_WHOAMI_TIMEOUT), |
| "-H", |
| f"Authorization: Bearer {hf_token}", |
| _HF_WHOAMI_URL, |
| ], |
| capture_output=True, |
| text=True, |
| timeout=_HF_WHOAMI_TIMEOUT + 2, |
| ) |
| t1 = _t.monotonic() |
| if result.returncode == 0 and result.stdout: |
| data = json.loads(result.stdout) |
| username = data.get("name", "unknown") |
| logger.info(f"HF username resolved to '{username}' in {t1 - t0:.2f}s") |
| return username |
| else: |
| logger.warning( |
| f"curl whoami failed (rc={result.returncode}) in {t1 - t0:.2f}s" |
| ) |
| return "unknown" |
| except Exception as e: |
| t1 = _t.monotonic() |
| logger.warning(f"HF whoami failed in {t1 - t0:.2f}s: {e}") |
| return "unknown" |
|
|
|
|
| _COMPACT_PROMPT = ( |
| "Please provide a concise summary of the conversation above, focusing on " |
| "key decisions, the 'why' behind the decisions, problems solved, and " |
| "important context needed for developing further. Your summary will be " |
| "given to someone who has never worked on this project before and they " |
| "will be have to be filled in." |
| ) |
|
|
| |
| |
| |
| |
| |
| _MAX_TOKENS_PER_MESSAGE = 50_000 |
|
|
|
|
| class CompactionFailedError(Exception): |
| """Raised when compaction can't reduce context below the threshold. |
| |
| Typically means an individual preserved message (system, first user, or |
| untouched tail) exceeds what truncation can fix in one pass. The caller |
| must terminate the session — retrying produces an infinite loop that |
| burns Bedrock budget for free (~$3 per re-attempt on Opus). |
| """ |
|
|
|
|
| |
| |
| |
| |
| _RESTORE_PROMPT = ( |
| "You're about to be restored into a fresh session with no memory of the " |
| "conversation above. Write a first-person note to your future self so " |
| "you can continue right where you left off. Include:\n" |
| " • What the user originally asked for and what progress you've made.\n" |
| " • Every tool you called, with arguments and a one-line result summary.\n" |
| " • Any code, files, scripts, or artifacts you produced (with paths).\n" |
| " • Key decisions and the reasoning behind them.\n" |
| " • What you were planning to do next.\n\n" |
| "Don't be cute. Be specific. This is the only context you'll have." |
| ) |
|
|
|
|
| async def summarize_messages( |
| messages: list[Message], |
| model_name: str, |
| hf_token: str | None = None, |
| max_tokens: int = 2000, |
| tool_specs: list[dict] | None = None, |
| prompt: str = _COMPACT_PROMPT, |
| session: Any = None, |
| kind: str = "compaction", |
| ) -> tuple[str, int]: |
| """Run a summarization prompt against a list of messages. |
| |
| ``prompt`` defaults to the compaction prompt (terse, decision-focused). |
| Callers seeding a new session after a restart should pass ``_RESTORE_PROMPT`` |
| instead — it preserves the tool-call trail so the agent can answer |
| follow-up questions about what it did. |
| |
| ``session`` is optional; when provided, the call is recorded via |
| ``telemetry.record_llm_call`` so its cost lands in the session's |
| ``total_cost_usd``. Without it, the call still happens but is |
| invisible in telemetry — which used to be the case for every |
| compaction call until 2026-04-29 (~30-50% of Bedrock spend was |
| attributed to this single source of dark cost). |
| |
| Returns ``(summary_text, completion_tokens)``. |
| """ |
| from agent.core.llm_params import _resolve_llm_params |
|
|
| prompt_messages = list(messages) + [Message(role="user", content=prompt)] |
| llm_params = _resolve_llm_params(model_name, hf_token, reasoning_effort="high") |
| prompt_messages, tool_specs = with_prompt_caching( |
| prompt_messages, tool_specs, llm_params.get("model") |
| ) |
| _t0 = time.monotonic() |
| response = await acompletion( |
| messages=prompt_messages, |
| max_completion_tokens=max_tokens, |
| tools=tool_specs, |
| **llm_params, |
| ) |
| if session is not None: |
| from agent.core import telemetry |
|
|
| await telemetry.record_llm_call( |
| session, |
| model=model_name, |
| response=response, |
| latency_ms=int((time.monotonic() - _t0) * 1000), |
| finish_reason=response.choices[0].finish_reason |
| if response.choices |
| else None, |
| kind=kind, |
| ) |
| summary = response.choices[0].message.content or "" |
| completion_tokens = response.usage.completion_tokens if response.usage else 0 |
| return summary, completion_tokens |
|
|
|
|
| class ContextManager: |
| """Manages conversation context and message history for the agent""" |
|
|
| def __init__( |
| self, |
| model_max_tokens: int = 180_000, |
| compact_size: float = 0.1, |
| untouched_messages: int = 5, |
| tool_specs: list[dict[str, Any]] | None = None, |
| prompt_file_suffix: str = "system_prompt_v3.yaml", |
| hf_token: str | None = None, |
| local_mode: bool = False, |
| ): |
| self.system_prompt = self._load_system_prompt( |
| tool_specs or [], |
| prompt_file_suffix="system_prompt_v3.yaml", |
| hf_token=hf_token, |
| local_mode=local_mode, |
| ) |
| |
| |
| |
| self.model_max_tokens = model_max_tokens |
| self.compact_size = int(model_max_tokens * compact_size) |
| |
| |
| |
| self.running_context_usage = 0 |
| self.untouched_messages = untouched_messages |
| self.items: list[Message] = [Message(role="system", content=self.system_prompt)] |
| self.on_message_added = None |
|
|
| def _load_system_prompt( |
| self, |
| tool_specs: list[dict[str, Any]], |
| prompt_file_suffix: str = "system_prompt.yaml", |
| hf_token: str | None = None, |
| local_mode: bool = False, |
| ): |
| """Load and render the system prompt from YAML file with Jinja2""" |
| prompt_file = Path(__file__).parent.parent / "prompts" / f"{prompt_file_suffix}" |
|
|
| with open(prompt_file, "r") as f: |
| prompt_data = yaml.safe_load(f) |
| template_str = prompt_data.get("system_prompt", "") |
|
|
| |
| tz = zoneinfo.ZoneInfo("Europe/Paris") |
| now = datetime.now(tz) |
| current_date = now.strftime("%d-%m-%Y") |
| current_time = now.strftime("%H:%M:%S.%f")[:-3] |
| current_timezone = f"{now.strftime('%Z')} (UTC{now.strftime('%z')[:3]}:{now.strftime('%z')[3:]})" |
|
|
| |
| hf_user_info = _get_hf_username(hf_token) |
|
|
| template = Template(template_str) |
| static_prompt = template.render( |
| tools=tool_specs, |
| num_tools=len(tool_specs), |
| ) |
|
|
| |
| if local_mode: |
| import os |
|
|
| cwd = os.getcwd() |
| local_context = ( |
| f"\n\n# CLI / Local mode\n\n" |
| f"You are running as a local CLI tool on the user's machine. " |
| f"There is NO sandbox — bash, read, write, and edit operate directly " |
| f"on the local filesystem.\n\n" |
| f"Working directory: {cwd}\n" |
| f"Use absolute paths or paths relative to the working directory. " |
| f"Do NOT use /app/ paths — that is a sandbox convention that does not apply here.\n" |
| f"The sandbox_create tool is NOT available. Run code directly with bash." |
| ) |
| static_prompt += local_context |
|
|
| return ( |
| f"{static_prompt}\n\n" |
| f"[Session context: Date={current_date}, Time={current_time}, " |
| f"Timezone={current_timezone}, User={hf_user_info}, " |
| f"Tools={len(tool_specs)}]" |
| ) |
|
|
| def add_message(self, message: Message, token_count: int = None) -> None: |
| """Add a message to the history""" |
| if token_count: |
| self.running_context_usage = token_count |
| self.items.append(message) |
| if self.on_message_added: |
| self.on_message_added(message) |
|
|
| def get_messages(self) -> list[Message]: |
| """Get all messages for sending to LLM. |
| |
| Patches any dangling tool_calls (assistant messages with tool_calls |
| that have no matching tool-result message) so the LLM API doesn't |
| reject the request. |
| """ |
| self._patch_dangling_tool_calls() |
| return self.items |
|
|
| @staticmethod |
| def _normalize_tool_calls(msg: Message) -> None: |
| """Ensure msg.tool_calls contains proper ToolCall objects, not dicts. |
| |
| litellm's Message has validate_assignment=False (Pydantic v2 default), |
| so direct attribute assignment (e.g. inside litellm's streaming handler) |
| can leave raw dicts. Re-assigning via the constructor fixes this. |
| """ |
| from litellm import ChatCompletionMessageToolCall as ToolCall |
|
|
| tool_calls = getattr(msg, "tool_calls", None) |
| if not tool_calls: |
| return |
| needs_fix = any(isinstance(tc, dict) for tc in tool_calls) |
| if not needs_fix: |
| return |
| msg.tool_calls = [ |
| tc if not isinstance(tc, dict) else ToolCall(**tc) for tc in tool_calls |
| ] |
|
|
| def _patch_dangling_tool_calls(self) -> None: |
| """Add stub tool results for any tool_calls that lack a matching result. |
| |
| Ensures each assistant message's tool_calls are followed immediately |
| by matching tool-result messages. This has to work across the whole |
| history, not just the most recent turn, because a cancelled tool use |
| in an earlier turn can still poison the next provider request. |
| """ |
| if not self.items: |
| return |
|
|
| i = 0 |
| while i < len(self.items): |
| msg = self.items[i] |
| if getattr(msg, "role", None) != "assistant" or not getattr( |
| msg, "tool_calls", None |
| ): |
| i += 1 |
| continue |
|
|
| self._normalize_tool_calls(msg) |
|
|
| |
| |
| |
| j = i + 1 |
| immediate_ids: set[str | None] = set() |
| while ( |
| j < len(self.items) and getattr(self.items[j], "role", None) == "tool" |
| ): |
| immediate_ids.add(getattr(self.items[j], "tool_call_id", None)) |
| j += 1 |
|
|
| missing: list[Message] = [] |
| for tc in msg.tool_calls: |
| if tc.id not in immediate_ids: |
| missing.append( |
| Message( |
| role="tool", |
| content="Tool was not executed (interrupted or error).", |
| tool_call_id=tc.id, |
| name=tc.function.name, |
| ) |
| ) |
|
|
| if missing: |
| self.items[j:j] = missing |
| j += len(missing) |
|
|
| i = j |
|
|
| def undo_last_turn(self) -> bool: |
| """Remove the last complete turn (user msg + all assistant/tool msgs that follow). |
| |
| Pops from the end until the last user message is removed, keeping the |
| tool_use/tool_result pairing valid. Never removes the system message. |
| |
| Returns True if a user message was found and removed. |
| """ |
| if len(self.items) <= 1: |
| return False |
|
|
| while len(self.items) > 1: |
| msg = self.items.pop() |
| if getattr(msg, "role", None) == "user": |
| return True |
|
|
| return False |
|
|
| def truncate_to_user_message(self, user_message_index: int) -> bool: |
| """Truncate history to just before the Nth user message (0-indexed). |
| |
| Removes that user message and everything after it. |
| System message (index 0) is never removed. |
| |
| Returns True if the target user message was found and removed. |
| """ |
| count = 0 |
| for i, msg in enumerate(self.items): |
| if i == 0: |
| continue |
| if getattr(msg, "role", None) == "user": |
| if count == user_message_index: |
| self.items = self.items[:i] |
| return True |
| count += 1 |
| return False |
|
|
| |
| |
| _COMPACT_THRESHOLD_RATIO = 0.9 |
|
|
| @property |
| def compaction_threshold(self) -> int: |
| """Token count at which `compact()` kicks in.""" |
| return int(self.model_max_tokens * self._COMPACT_THRESHOLD_RATIO) |
|
|
| @property |
| def needs_compaction(self) -> bool: |
| return self.running_context_usage > self.compaction_threshold and bool( |
| self.items |
| ) |
|
|
| def _truncate_oversized( |
| self, messages: list[Message], model_name: str |
| ) -> list[Message]: |
| """Replace any message > _MAX_TOKENS_PER_MESSAGE with a placeholder. |
| |
| These are typically tool outputs (CSV dumps, file contents) sitting in |
| the untouched tail or first-user position that compaction can't shrink |
| — they pass through verbatim, keeping context above threshold and |
| triggering an infinite compaction retry loop. |
| """ |
| from litellm import token_counter |
|
|
| out: list[Message] = [] |
| for msg in messages: |
| |
| |
| |
| |
| if msg.role == "system": |
| out.append(msg) |
| continue |
| try: |
| n = token_counter(model=model_name, messages=[msg.model_dump()]) |
| except Exception: |
| |
| |
| out.append(msg) |
| continue |
| if n <= _MAX_TOKENS_PER_MESSAGE: |
| out.append(msg) |
| continue |
| placeholder = ( |
| f"[truncated for compaction — original was {n} tokens, " |
| f"removed to keep context under {self.compaction_threshold} tokens]" |
| ) |
| logger.warning( |
| "Truncating %s message: %d -> %d tokens for compaction", |
| msg.role, |
| n, |
| len(placeholder) // 4, |
| ) |
| |
| |
| |
| |
| |
| kept = { |
| k: getattr(msg, k, None) |
| for k in ( |
| "tool_call_id", |
| "tool_calls", |
| "name", |
| "thinking_blocks", |
| "reasoning_content", |
| "provider_specific_fields", |
| ) |
| if getattr(msg, k, None) is not None |
| } |
| out.append(Message(role=msg.role, content=placeholder, **kept)) |
| return out |
|
|
| def _recompute_usage(self, model_name: str) -> None: |
| """Refresh ``running_context_usage`` from current items via real tokenizer.""" |
| from litellm import token_counter |
|
|
| try: |
| self.running_context_usage = token_counter( |
| model=model_name, |
| messages=[m.model_dump() for m in self.items], |
| ) |
| except Exception as e: |
| logger.warning("token_counter failed (%s); rough estimate", e) |
| |
| self.running_context_usage = ( |
| sum(len(getattr(m, "content", "") or "") for m in self.items) // 4 |
| ) |
|
|
| async def compact( |
| self, |
| model_name: str, |
| tool_specs: list[dict] | None = None, |
| hf_token: str | None = None, |
| session: Any = None, |
| ) -> None: |
| """Remove old messages to keep history under target size. |
| |
| ``session`` is optional — if passed, the underlying summarization |
| LLM call is recorded via ``telemetry.record_llm_call(kind= |
| "compaction")`` so its cost shows up in ``total_cost_usd``. |
| |
| Raises ``CompactionFailedError`` if the post-compact context is still |
| over the threshold. This happens when a preserved message (typically |
| a giant tool output stuck in the untouched tail) is too large for |
| truncation to fix. The caller must terminate the session — retrying |
| is what caused the 2026-05-03 infinite-compaction-loop pattern that |
| burned Bedrock budget invisibly. |
| """ |
| if not self.needs_compaction: |
| return |
|
|
| system_msg = ( |
| self.items[0] if self.items and self.items[0].role == "system" else None |
| ) |
|
|
| |
| first_user_msg = None |
| first_user_idx = 1 |
| for i in range(1, len(self.items)): |
| if getattr(self.items[i], "role", None) == "user": |
| first_user_msg = self.items[i] |
| first_user_idx = i |
| break |
|
|
| |
| |
| |
| idx = len(self.items) - self.untouched_messages |
| while idx > 1 and self.items[idx].role != "user": |
| idx -= 1 |
| |
| |
| |
| |
| |
| |
| |
| if idx <= first_user_idx: |
| idx = first_user_idx + 1 |
|
|
| recent_messages = self.items[idx:] |
| messages_to_summarize = self.items[first_user_idx + 1 : idx] |
|
|
| |
| |
| |
| |
| |
| |
| if first_user_msg is not None: |
| truncated = self._truncate_oversized([first_user_msg], model_name) |
| first_user_msg = truncated[0] |
| recent_messages = self._truncate_oversized(recent_messages, model_name) |
|
|
| |
| |
| |
| if not messages_to_summarize: |
| head = [system_msg] if system_msg else [] |
| if first_user_msg: |
| head.append(first_user_msg) |
| self.items = head + recent_messages |
| self._recompute_usage(model_name) |
| if self.running_context_usage > self.compaction_threshold: |
| raise CompactionFailedError( |
| f"Nothing to summarize but context ({self.running_context_usage}) " |
| f"still over threshold ({self.compaction_threshold}) after truncation. " |
| f"System prompt or first user message likely exceeds the budget." |
| ) |
| return |
|
|
| summary, completion_tokens = await summarize_messages( |
| messages_to_summarize, |
| model_name=model_name, |
| hf_token=hf_token, |
| max_tokens=self.compact_size, |
| tool_specs=tool_specs, |
| prompt=_COMPACT_PROMPT, |
| session=session, |
| kind="compaction", |
| ) |
| summarized_message = Message( |
| role="assistant", |
| content=summary, |
| ) |
|
|
| |
| head = [system_msg] if system_msg else [] |
| if first_user_msg: |
| head.append(first_user_msg) |
| self.items = head + [summarized_message] + recent_messages |
|
|
| self._recompute_usage(model_name) |
|
|
| |
| |
| |
| |
| |
| |
| |
| if self.running_context_usage > self.compaction_threshold: |
| raise CompactionFailedError( |
| f"Compaction ineffective: {self.running_context_usage} tokens " |
| f"still over threshold {self.compaction_threshold} after summarize " |
| f"and truncation. Likely the system prompt + first user + summary " |
| f"+ truncated tail still exceeds budget." |
| ) |
|
|