| from __future__ import annotations |
|
|
| from typing import Any |
|
|
|
|
| def estimate_tokens_text(text: str) -> int: |
| |
| return max(1, len(text) // 4) |
|
|
|
|
| def estimate_tokens_messages(messages: list[dict[str, Any]]) -> int: |
| total = 0 |
| for m in messages: |
| total += 4 |
| total += estimate_tokens_text(str(m.get("role", ""))) |
| total += estimate_tokens_text(str(m.get("content", ""))) |
| return total + 2 |
|
|
|
|
| def clip_text_to_token_budget(text: str, max_tokens_for_text: int) -> str: |
| if max_tokens_for_text <= 0: |
| return "" |
| approx_chars = max_tokens_for_text * 4 |
| if len(text) <= approx_chars: |
| return text |
| return text[-approx_chars:] |
|
|
|
|
| def truncate_messages_for_ctx( |
| messages: list[dict[str, Any]], |
| n_ctx: int, |
| max_output_tokens: int = 256, |
| safety_margin: int = 128, |
| ) -> tuple[list[dict[str, Any]], dict[str, int]]: |
| """ |
| Keep: |
| - first system message if present |
| - newest messages that fit into token budget |
| - clipped latest message fallback if nothing fits |
| """ |
| if not messages: |
| return messages, {"before_tokens": 0, "after_tokens": 0, "dropped_messages": 0} |
|
|
| before = estimate_tokens_messages(messages) |
| prompt_budget = max(256, n_ctx - max_output_tokens - safety_margin) |
|
|
| if before <= prompt_budget: |
| return messages, {"before_tokens": before, "after_tokens": before, "dropped_messages": 0} |
|
|
| sys_msg = None |
| rest = messages |
| if messages[0].get("role") == "system": |
| sys_msg = messages[0] |
| rest = messages[1:] |
|
|
| kept_rev: list[dict[str, Any]] = [] |
| running = [sys_msg] if sys_msg else [] |
| running_tokens = estimate_tokens_messages(running) |
|
|
| for m in reversed(rest): |
| trial = running + list(reversed(kept_rev)) + [m] |
| if estimate_tokens_messages(trial) <= prompt_budget: |
| kept_rev.append(m) |
| continue |
|
|
| if not kept_rev: |
| allowed = max(32, prompt_budget - running_tokens - 16) |
| clipped = dict(m) |
| clipped["content"] = clip_text_to_token_budget(str(m.get("content", "")), allowed) |
| trial2 = running + [clipped] |
| if estimate_tokens_messages(trial2) <= prompt_budget: |
| kept_rev.append(clipped) |
| break |
|
|
| kept = ([sys_msg] if sys_msg else []) + list(reversed(kept_rev)) |
|
|
| after = estimate_tokens_messages(kept) |
| dropped = max(0, len(messages) - len(kept)) |
| return kept, {"before_tokens": before, "after_tokens": after, "dropped_messages": dropped} |
|
|
|
|