Spaces:
Running
Running
File size: 3,050 Bytes
4d2a2da | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 | """Token counting and budget tracking for LLM prompts.
Stage 1 (current): measure-only — counts tokens at known prompt injection
points and logs them. No truncation is performed.
Token counts are estimates: tiktoken's cl100k tokenizer is used as a
provider-agnostic baseline, multiplied by a safety factor because non-OpenAI
multilingual tokenizers (Llama, Gemma, Mistral) typically tokenize Danish /
mixed-language text 20-40% more aggressively than cl100k.
Provider-specific tokenizers (Ollama's /api/tokenize, HuggingFace AutoTokenizer)
are intentionally not used here to keep this module dependency-free and
process-local. When real usage data exposes the gap, swap in a
provider-aware backend.
"""
from __future__ import annotations
import logging
logger = logging.getLogger(__name__)
# Conservative scaling: cl100k under-counts multilingual / Danish text.
# 1.5× keeps us on the safe side for budget decisions.
_DEFAULT_SAFETY_FACTOR = 1.5
# Fallback when tiktoken is unavailable: ~4 characters per token is the
# common rule of thumb for English; multiplied by safety factor it's
# usable as a coarse upper bound.
_CHARS_PER_TOKEN_FALLBACK = 4
try:
import tiktoken
_ENCODER = tiktoken.get_encoding("cl100k_base")
except Exception: # noqa: BLE001 — any tiktoken failure → heuristic
_ENCODER = None
logger.warning("tiktoken unavailable; falling back to character-based token estimation")
def count_tokens(text: str, *, safety_factor: float = _DEFAULT_SAFETY_FACTOR) -> int:
"""Estimate the token count of ``text``.
Args:
text: Text to measure. Empty / None-ish input returns 0.
safety_factor: Multiplier applied to the raw count to compensate
for non-OpenAI tokenizers being more aggressive on multilingual
text. Defaults to 1.5×.
Returns:
Estimated token count, rounded up to the nearest int.
"""
if not text:
return 0
if _ENCODER is not None:
raw = len(_ENCODER.encode(text, disallowed_special=()))
else:
raw = max(1, len(text) // _CHARS_PER_TOKEN_FALLBACK)
return int(raw * safety_factor + 0.5)
def measure(
prompt_name: str,
text: str,
*,
enabled: bool = True,
safety_factor: float = _DEFAULT_SAFETY_FACTOR,
) -> int:
"""Count tokens for ``text`` and log the result.
Args:
prompt_name: Logical name of the prompt being measured (used in
log lines so different injection points are easy to grep).
text: The fully-rendered prompt string.
enabled: When False, returns 0 immediately and logs nothing —
lets callers gate on the ``TOKEN_BUDGET_ENABLED`` flag without
duplicating the check.
safety_factor: See :func:`count_tokens`.
Returns:
Estimated token count, or 0 when ``enabled`` is False.
"""
if not enabled:
return 0
count = count_tokens(text, safety_factor=safety_factor)
logger.info("token_budget prompt=%s tokens~=%d", prompt_name, count)
return count
|