Dmitry Beresnev commited on
Commit
0c86899
·
1 Parent(s): 27533f4

cntx truncation

Browse files
Files changed (1) hide show
  1. snippets/llm_truncation.py +80 -0
snippets/llm_truncation.py ADDED
@@ -0,0 +1,80 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ from typing import Any
4
+
5
+
6
+ def estimate_tokens_text(text: str) -> int:
7
+ # Simple approximation when tokenizer is not available.
8
+ return max(1, len(text) // 4)
9
+
10
+
11
+ def estimate_tokens_messages(messages: list[dict[str, Any]]) -> int:
12
+ total = 0
13
+ for m in messages:
14
+ total += 4 # per-message overhead
15
+ total += estimate_tokens_text(str(m.get("role", "")))
16
+ total += estimate_tokens_text(str(m.get("content", "")))
17
+ return total + 2 # assistant priming overhead
18
+
19
+
20
+ def clip_text_to_token_budget(text: str, max_tokens_for_text: int) -> str:
21
+ if max_tokens_for_text <= 0:
22
+ return ""
23
+ approx_chars = max_tokens_for_text * 4
24
+ if len(text) <= approx_chars:
25
+ return text
26
+ return text[-approx_chars:]
27
+
28
+
29
+ def truncate_messages_for_ctx(
30
+ messages: list[dict[str, Any]],
31
+ n_ctx: int,
32
+ max_output_tokens: int = 256,
33
+ safety_margin: int = 128,
34
+ ) -> tuple[list[dict[str, Any]], dict[str, int]]:
35
+ """
36
+ Keep:
37
+ - first system message if present
38
+ - newest messages that fit into token budget
39
+ - clipped latest message fallback if nothing fits
40
+ """
41
+ if not messages:
42
+ return messages, {"before_tokens": 0, "after_tokens": 0, "dropped_messages": 0}
43
+
44
+ before = estimate_tokens_messages(messages)
45
+ prompt_budget = max(256, n_ctx - max_output_tokens - safety_margin)
46
+
47
+ if before <= prompt_budget:
48
+ return messages, {"before_tokens": before, "after_tokens": before, "dropped_messages": 0}
49
+
50
+ sys_msg = None
51
+ rest = messages
52
+ if messages[0].get("role") == "system":
53
+ sys_msg = messages[0]
54
+ rest = messages[1:]
55
+
56
+ kept_rev: list[dict[str, Any]] = []
57
+ running = [sys_msg] if sys_msg else []
58
+ running_tokens = estimate_tokens_messages(running)
59
+
60
+ for m in reversed(rest):
61
+ trial = running + list(reversed(kept_rev)) + [m]
62
+ if estimate_tokens_messages(trial) <= prompt_budget:
63
+ kept_rev.append(m)
64
+ continue
65
+
66
+ if not kept_rev:
67
+ allowed = max(32, prompt_budget - running_tokens - 16)
68
+ clipped = dict(m)
69
+ clipped["content"] = clip_text_to_token_budget(str(m.get("content", "")), allowed)
70
+ trial2 = running + [clipped]
71
+ if estimate_tokens_messages(trial2) <= prompt_budget:
72
+ kept_rev.append(clipped)
73
+ break
74
+
75
+ kept = ([sys_msg] if sys_msg else []) + list(reversed(kept_rev))
76
+
77
+ after = estimate_tokens_messages(kept)
78
+ dropped = max(0, len(messages) - len(kept))
79
+ return kept, {"before_tokens": before, "after_tokens": after, "dropped_messages": dropped}
80
+