| """
|
| context_compactor.py
|
|
|
| Auto-compacts conversation history when approaching context window limits.
|
| Summarizes older messages via the same LLM, preserving key context.
|
| """
|
|
|
| import json
|
| import logging
|
| from typing import Any, Dict, List, Optional
|
|
|
| from src.model_context import get_context_length, estimate_tokens
|
| from src.llm_core import llm_call_async
|
| from src.endpoint_resolver import resolve_endpoint
|
| from core.models import ChatMessage
|
|
|
| logger = logging.getLogger(__name__)
|
|
|
|
|
| def _content_as_text(content: Any) -> str:
|
| """Flatten a message's content to plain text.
|
|
|
| Handles the three shapes that flow through history: a plain string, a
|
| multimodal list of content blocks (vision/image attachments), and None
|
| (assistant turns that carried only native tool_calls persist content as
|
| None). Returns "" for anything without text so callers can safely slice
|
| the result.
|
| """
|
| if isinstance(content, str):
|
| return content
|
| if isinstance(content, list):
|
| return " ".join(
|
| b.get("text", "") for b in content
|
| if isinstance(b, dict) and b.get("text")
|
| )
|
| return ""
|
|
|
|
|
| COMPACT_THRESHOLD = 0.85
|
| SUMMARY_MAX_TOKENS = 1024
|
| SMALL_CONTEXT_LIMIT = 8192
|
|
|
|
|
| SELF_SUMMARY_SYSTEM_PROMPT = """You are summarizing a conversation to preserve context after compaction. Produce a structured summary that lets the conversation continue seamlessly.
|
|
|
| Use this format:
|
|
|
| ## Conversation Summary
|
| **Turns summarized:** {count} | **Compactions so far:** {n}
|
|
|
| ### User Goal
|
| One sentence describing what the user is trying to accomplish.
|
|
|
| ### What Was Done
|
| - Bullet points of completed actions, decisions made, and key outputs
|
| - Include specific file paths, function names, variable names, URLs, and config values
|
| - Note any errors encountered and how they were resolved
|
|
|
| ### Current State
|
| What is the system/code/task state right now? What was the last thing discussed?
|
|
|
| ### Pending / Next Steps
|
| - What remains to be done
|
| - Any open questions or blockers
|
|
|
| ### Key Context
|
| - Important constraints, preferences, or decisions that must not be forgotten
|
| - Specific values: model names, ports, paths, credentials references, versions
|
|
|
| Keep the summary under 1000 tokens. Be dense — every token should carry information. Do not include pleasantries or meta-commentary."""
|
|
|
|
|
| def _sanitize_tool_messages(msgs: List[Dict]) -> List[Dict]:
|
| """Drop orphaned `tool` messages and dangling assistant `tool_calls`.
|
|
|
| OpenAI's API requires every `role:"tool"` message to immediately
|
| follow an assistant message that carries `tool_calls` (or another
|
| tool message in the same batch). Front-trimming the history can cut
|
| the assistant `tool_calls` parent while keeping its tool responses,
|
| which triggers: "messages with role 'tool' must be a response to a
|
| preceding message with 'tool_calls'". This pass repairs that:
|
| - drops `tool` messages with no valid preceding tool_calls
|
| - drops assistant `tool_calls` messages whose tool responses were
|
| all trimmed away (some providers reject unanswered tool_calls)
|
| """
|
|
|
| cleaned: List[Dict] = []
|
| in_batch = False
|
| for m in msgs:
|
| role = m.get("role")
|
| if role == "tool":
|
| if in_batch:
|
| cleaned.append(m)
|
|
|
| continue
|
| if role == "assistant" and m.get("tool_calls"):
|
| in_batch = True
|
| else:
|
| in_batch = False
|
| cleaned.append(m)
|
|
|
|
|
|
|
| out: List[Dict] = []
|
| for i, m in enumerate(cleaned):
|
| if m.get("role") == "assistant" and m.get("tool_calls"):
|
| nxt = cleaned[i + 1] if i + 1 < len(cleaned) else None
|
| if not (nxt and nxt.get("role") == "tool"):
|
|
|
|
|
|
|
| m = {k: v for k, v in m.items() if k != "tool_calls"}
|
| if not (m.get("content") or "").strip():
|
| continue
|
| out.append(m)
|
| return out
|
|
|
|
|
| def _message_text_token_estimate(text: str) -> int:
|
| if not isinstance(text, str):
|
| return 4
|
| return int(len(text) * 0.3) + 4
|
|
|
|
|
| def _truncate_text_to_token_budget(text: str, token_budget: int) -> str:
|
| """Trim a too-large current user message instead of dropping it entirely."""
|
| if token_budget <= 32:
|
| return "[Current user message omitted: it exceeded the model context window.]"
|
|
|
| if not isinstance(text, str):
|
|
|
|
|
|
|
| return ""
|
|
|
| max_chars = max(200, int((token_budget - 16) / 0.3))
|
| if len(text) <= max_chars:
|
| return text
|
|
|
| notice = (
|
| "\n\n[Notice: the pasted message was too large for this model's context "
|
| "window, so Odysseus kept the beginning and end.]"
|
| )
|
| keep_chars = max(200, max_chars - len(notice))
|
| head_len = max(100, int(keep_chars * 0.7))
|
| tail_len = max(80, keep_chars - head_len)
|
| return text[:head_len].rstrip() + notice + "\n\n" + text[-tail_len:].lstrip()
|
|
|
|
|
| def _truncate_tool_call_args(msg: Dict[str, Any], token_budget: int) -> Dict[str, Any]:
|
| """Shrink oversized assistant ``tool_calls`` arguments to fit ``token_budget``.
|
|
|
| A tool-only turn persists ``content=None`` with its whole payload in
|
| ``tool_calls[].function.arguments`` (e.g. a large create_document body), which
|
| the text-content truncation can't reach — so the message could stay over
|
| budget and the upstream call would 400. Replace each argument string that
|
| overflows its share of the budget with a small valid-JSON placeholder,
|
| preserving ``id``/``type``/``function.name`` so tool/result pairing and
|
| provider validation are unaffected. Returns msg unchanged when there is
|
| nothing oversized.
|
| """
|
| tool_calls = msg.get("tool_calls")
|
| if not isinstance(tool_calls, list) or not tool_calls:
|
| return msg
|
|
|
|
|
| content_tokens = estimate_tokens([{"role": msg.get("role", "assistant"), "content": msg.get("content")}])
|
| per_call = max(16, (max(0, token_budget - content_tokens)) // len(tool_calls))
|
| new_calls = []
|
| changed = False
|
| for tc in tool_calls:
|
| fn = tc.get("function") if isinstance(tc, dict) else None
|
| args = fn.get("arguments") if isinstance(fn, dict) else None
|
| if isinstance(args, str) and int(len(args) * 0.3) > per_call:
|
| new_fn = dict(fn)
|
| new_fn["arguments"] = json.dumps({"_truncated_for_context": len(args)})
|
| new_tc = dict(tc)
|
| new_tc["function"] = new_fn
|
| new_calls.append(new_tc)
|
| changed = True
|
| else:
|
| new_calls.append(tc)
|
| if not changed:
|
| return msg
|
| out = dict(msg)
|
| out["tool_calls"] = new_calls
|
| return out
|
|
|
|
|
| def _truncate_message_to_token_budget(msg: Dict[str, Any], token_budget: int) -> Dict[str, Any]:
|
| """Return a copy of msg whose text content (and tool-call args) fit token_budget."""
|
| out = dict(msg)
|
| content = out.get("content", "")
|
| if isinstance(content, str):
|
| out["content"] = _truncate_text_to_token_budget(content, token_budget)
|
| elif isinstance(content, list):
|
| remaining = token_budget
|
| new_content = []
|
| for item in content:
|
| if not isinstance(item, dict) or item.get("type") != "text":
|
| new_content.append(item)
|
| continue
|
| text = item.get("text", "")
|
| truncated = _truncate_text_to_token_budget(text, remaining)
|
| cloned = dict(item)
|
| cloned["text"] = truncated
|
| new_content.append(cloned)
|
| remaining -= _message_text_token_estimate(truncated)
|
| out["content"] = new_content
|
|
|
|
|
| return _truncate_tool_call_args(out, token_budget)
|
|
|
|
|
| def trim_for_context(messages: List[Dict], context_length: int, reserve_tokens: int = 512) -> List[Dict]:
|
| """Trim system messages to fit within context_length.
|
|
|
| For small-context models, progressively strips:
|
| 1. RAG/memory system messages (keep preset system prompt)
|
| 2. Older conversation turns
|
| Reserves space for the response.
|
| """
|
| budget = context_length - reserve_tokens
|
| used = estimate_tokens(messages)
|
| if used <= budget:
|
| return messages
|
|
|
| logger.info(f"Trimming messages: {used} tokens > {budget} budget (ctx={context_length})")
|
|
|
|
|
|
|
| system_msgs = []
|
| protected_msgs = []
|
| convo_msgs = []
|
| for msg in messages:
|
| if msg.get("_protected"):
|
| protected_msgs.append(msg)
|
| elif msg.get("role") == "system":
|
| system_msgs.append(msg)
|
| else:
|
| convo_msgs.append(msg)
|
|
|
|
|
| protected_tokens = estimate_tokens(protected_msgs)
|
| budget -= protected_tokens
|
|
|
|
|
| essential_system = system_msgs[:1] if system_msgs else []
|
| extra_system = system_msgs[1:]
|
|
|
|
|
| trimmed = essential_system + convo_msgs
|
| if estimate_tokens(trimmed) <= budget:
|
|
|
| result = list(essential_system)
|
| for msg in extra_system:
|
| candidate = result + [msg] + convo_msgs
|
| if estimate_tokens(candidate) <= budget:
|
| result.append(msg)
|
| else:
|
| break
|
| return _sanitize_tool_messages(result + protected_msgs + convo_msgs)
|
|
|
|
|
| if essential_system:
|
| sys_text = essential_system[0].get("content", "")
|
| if len(sys_text) > 2000:
|
| essential_system[0] = {"role": "system", "content": sys_text[:2000] + "\n[System prompt truncated for context limits]"}
|
| trimmed = essential_system + convo_msgs
|
| if estimate_tokens(trimmed) <= budget:
|
| return _sanitize_tool_messages(essential_system + protected_msgs + convo_msgs)
|
|
|
|
|
|
|
|
|
|
|
|
|
| PROTECT_RECENT = 10
|
| current_msg = convo_msgs[-1:] if convo_msgs else []
|
| prior_convo = convo_msgs[:-1] if convo_msgs else []
|
| if len(prior_convo) >= PROTECT_RECENT:
|
| old_msgs = prior_convo[:-(PROTECT_RECENT - 1)]
|
| recent_msgs = prior_convo[-(PROTECT_RECENT - 1):] + current_msg
|
| while old_msgs and estimate_tokens(essential_system + old_msgs + recent_msgs) > budget:
|
| old_msgs.pop(0)
|
| convo_msgs = old_msgs + recent_msgs
|
| else:
|
| convo_msgs = prior_convo + current_msg
|
| while prior_convo and estimate_tokens(essential_system + prior_convo + current_msg) > budget:
|
| prior_convo.pop(0)
|
| convo_msgs = prior_convo + current_msg
|
|
|
|
|
| if current_msg and estimate_tokens(essential_system + protected_msgs + convo_msgs) > budget:
|
| prefix = essential_system + protected_msgs + convo_msgs[:-1]
|
| available_for_current = max(64, budget - estimate_tokens(prefix))
|
| convo_msgs[-1] = _truncate_message_to_token_budget(convo_msgs[-1], available_for_current)
|
|
|
| result = _sanitize_tool_messages(essential_system + protected_msgs + convo_msgs)
|
| logger.info(f"Trimmed to {estimate_tokens(result)} tokens ({len(result)} messages)")
|
| return result
|
|
|
|
|
| async def maybe_compact(
|
| session,
|
| endpoint_url: str,
|
| model: str,
|
| messages: List[Dict],
|
| headers: Optional[Dict] = None,
|
| owner: Optional[str] = None,
|
| ) -> tuple:
|
| """Check context usage and compact if above threshold.
|
|
|
| Returns (messages, context_length, was_compacted).
|
| """
|
| context_length = get_context_length(endpoint_url, model)
|
| used = estimate_tokens(messages)
|
| pct = (used / context_length) * 100 if context_length else 0
|
|
|
| if pct < COMPACT_THRESHOLD * 100:
|
| return messages, context_length, False
|
|
|
| logger.info(
|
| f"Context at {pct:.1f}% ({used}/{context_length} tokens) — compacting"
|
| )
|
|
|
|
|
| system_msgs = []
|
| convo_msgs = []
|
| for msg in messages:
|
| if msg.get("role") == "system":
|
| system_msgs.append(msg)
|
| else:
|
| convo_msgs.append(msg)
|
|
|
| if len(convo_msgs) < 4:
|
| return messages, context_length, False
|
|
|
|
|
| split_point = len(convo_msgs) // 2
|
| older = convo_msgs[:split_point]
|
| recent = convo_msgs[split_point:]
|
|
|
|
|
| convo_text = "\n".join(
|
| f"{msg.get('role', 'user').upper()}: {_content_as_text(msg.get('content'))[:2000]}"
|
| for msg in older
|
| )
|
|
|
|
|
| compaction_count = sum(
|
| 1 for m in system_msgs
|
| if "[Conversation summary" in m.get("content", "")
|
| )
|
|
|
|
|
| util_url, util_model, util_headers = resolve_endpoint("utility", owner=owner)
|
| compact_url = util_url or endpoint_url
|
| compact_model = util_model or model
|
| compact_headers = util_headers if util_url else headers
|
|
|
| prompt = SELF_SUMMARY_SYSTEM_PROMPT.replace(
|
| "{count}", str(len(older))
|
| ).replace(
|
| "{n}", str(compaction_count + 1)
|
| )
|
| summary_messages = [
|
| {"role": "system", "content": prompt},
|
| {"role": "user", "content": convo_text},
|
| ]
|
|
|
| try:
|
| summary = await llm_call_async(
|
| compact_url,
|
| compact_model,
|
| summary_messages,
|
| temperature=0.2,
|
| max_tokens=SUMMARY_MAX_TOKENS,
|
| headers=compact_headers,
|
| timeout=30,
|
| )
|
| except Exception as e:
|
| logger.error(f"Compaction summary failed: {e}")
|
|
|
|
|
|
|
| return messages, context_length, False
|
|
|
| summary_msg = {
|
| "role": "system",
|
| "content": f"[Conversation summary — earlier messages were compacted]\n{summary}",
|
| }
|
|
|
| compacted = system_msgs + [summary_msg] + recent
|
|
|
|
|
|
|
|
|
|
|
|
|
| _update_session_history(session, split_point, summary, system_msg_count=len(system_msgs))
|
|
|
| new_used = estimate_tokens(compacted)
|
| logger.info(
|
| f"Compacted: {used} -> {new_used} tokens "
|
| f"({len(older)} messages summarized, {len(recent)} kept)"
|
| )
|
|
|
| return compacted, context_length, True
|
|
|
|
|
| def _update_session_history(session, split_point: int, summary: str,
|
| system_msg_count: int = 0):
|
| """Update the in-memory session history after compaction.
|
|
|
| `split_point` is the index in `convo_msgs` (system-stripped). The
|
| in-memory `session.history` includes leading system messages, so the
|
| actual recent-history slice starts at `system_msg_count + split_point`.
|
| Prepending `session.history[:system_msg_count]` to the new history
|
| preserves persona, preset, and RAG system messages that would
|
| otherwise be dropped.
|
| """
|
| if not session or not hasattr(session, "history"):
|
| return
|
|
|
| effective_split = system_msg_count + split_point
|
| if effective_split >= len(session.history):
|
| return
|
|
|
|
|
|
|
| system_prefix = list(session.history[:system_msg_count])
|
| recent_history = session.history[effective_split:]
|
| summary_msg = ChatMessage(
|
| role="system",
|
| content=f"[Conversation summary]\n{summary}",
|
| metadata={"compacted": True, "summarized_count": split_point},
|
| )
|
| new_history = system_prefix + [summary_msg] + recent_history
|
| try:
|
| from core.models import get_session_manager_instance
|
| manager = get_session_manager_instance()
|
| except Exception:
|
| manager = None
|
| if manager and getattr(session, "id", None):
|
| if manager.replace_messages(session.id, new_history):
|
| return
|
| session.history = new_history
|
|
|