AGI_Assistant / snippets /llm_truncation.py
Dmitry Beresnev
cntx truncation
0c86899
from __future__ import annotations
from typing import Any
def estimate_tokens_text(text: str) -> int:
# Simple approximation when tokenizer is not available.
return max(1, len(text) // 4)
def estimate_tokens_messages(messages: list[dict[str, Any]]) -> int:
total = 0
for m in messages:
total += 4 # per-message overhead
total += estimate_tokens_text(str(m.get("role", "")))
total += estimate_tokens_text(str(m.get("content", "")))
return total + 2 # assistant priming overhead
def clip_text_to_token_budget(text: str, max_tokens_for_text: int) -> str:
if max_tokens_for_text <= 0:
return ""
approx_chars = max_tokens_for_text * 4
if len(text) <= approx_chars:
return text
return text[-approx_chars:]
def truncate_messages_for_ctx(
messages: list[dict[str, Any]],
n_ctx: int,
max_output_tokens: int = 256,
safety_margin: int = 128,
) -> tuple[list[dict[str, Any]], dict[str, int]]:
"""
Keep:
- first system message if present
- newest messages that fit into token budget
- clipped latest message fallback if nothing fits
"""
if not messages:
return messages, {"before_tokens": 0, "after_tokens": 0, "dropped_messages": 0}
before = estimate_tokens_messages(messages)
prompt_budget = max(256, n_ctx - max_output_tokens - safety_margin)
if before <= prompt_budget:
return messages, {"before_tokens": before, "after_tokens": before, "dropped_messages": 0}
sys_msg = None
rest = messages
if messages[0].get("role") == "system":
sys_msg = messages[0]
rest = messages[1:]
kept_rev: list[dict[str, Any]] = []
running = [sys_msg] if sys_msg else []
running_tokens = estimate_tokens_messages(running)
for m in reversed(rest):
trial = running + list(reversed(kept_rev)) + [m]
if estimate_tokens_messages(trial) <= prompt_budget:
kept_rev.append(m)
continue
if not kept_rev:
allowed = max(32, prompt_budget - running_tokens - 16)
clipped = dict(m)
clipped["content"] = clip_text_to_token_budget(str(m.get("content", "")), allowed)
trial2 = running + [clipped]
if estimate_tokens_messages(trial2) <= prompt_budget:
kept_rev.append(clipped)
break
kept = ([sys_msg] if sys_msg else []) + list(reversed(kept_rev))
after = estimate_tokens_messages(kept)
dropped = max(0, len(messages) - len(kept))
return kept, {"before_tokens": before, "after_tokens": after, "dropped_messages": dropped}