cveval / utils /chunking.py
GitLab CI
Deploy from GitLab CI - 6509512f
3f637a5
"""
Découpage dynamique intelligent pour le contenu des CV.
Stratégie : découpage hybride par section + prise en compte des tokens
───────────────────────────────────────────────────────────────────────
1. DÉTECTION DES SECTIONS → expressions régulières (FR + EN) pour localiser
les limites sémantiques
2. ESTIMATION DES TOKENS → heuristique ~4 caractères/token, sans librairie externe
3. DÉCOUPAGE ADAPTATIF → les sections qui dépassent le budget de tokens sont
sous-découpées par paragraphe / bloc de dates afin
que le LLM ne reçoive jamais un mur de texte tronqué
en pleine phrase
4. INJECTION DE CONTEXTE → chaque fragment de dépassement reçoit un « en‑tête »
léger résumant ce qui précède (continuité sémantique)
5. SOLUTION DE SECOURS → si aucune section n’est trouvée, le texte complet
est divisé en fenêtres avec chevauchement paramétrable
Budget de tokens par défaut
───────────────────────────
MAX_TOKENS_PER_CHUNK = 3 000 (sûr pour les modèles avec contexte 4k)
OVERLAP_TOKENS = 200 (préservation du contexte entre fragments)
CHARS_PER_TOKEN = 4 (heuristique conservative pour le français/anglais)
API rétrocompatible
───────────────────
chunk_cv_by_sections() → interface dict héritée (utilisée par l’orchestrateur actuel)
get_section_or_full() → fonction utilitaire héritée (utilisée par l’orchestrateur actuel)
Nouvelle API
────────────
chunk_cv() → renvoie un dataclass CVSections
get_best_chunks_for_agent() → chaîne de caractères adaptée au budget de tokens
pour l’agent
"""
from __future__ import annotations
import logging
import re
from dataclasses import dataclass, field
logger = logging.getLogger(__name__)
# ── Tunable constants ────────────────────────────────────────────────────────
MAX_TOKENS_PER_CHUNK: int = 3_000
OVERLAP_TOKENS: int = 200
CHARS_PER_TOKEN: float = 4.0
MAX_CHARS: int = int(MAX_TOKENS_PER_CHUNK * CHARS_PER_TOKEN)
OVERLAP_CHARS: int = int(OVERLAP_TOKENS * CHARS_PER_TOKEN)
# ── Section vocabulary (FR + EN) ─────────────────────────────────────────────
SECTION_PATTERNS: dict[str, list[str]] = {
"resume": [
r"(?i)(profil\s*pro|profil\s*candidat|résumé\s*pro|summary|about\s*me"
r"|à\s*propos|objectif(\s*(pro|career))?|présentation|introduction"
r"|accroche|profil$|executive\s*summary)",
],
"experiences": [
r"(?i)(expérience[s]?\s*(professionnelle[s]?)?|professional\s*experience"
r"|work\s*experience|employment|parcours\s*professionnel"
r"|postes?\s*occupés?|carrière|career\s*history)",
],
"competences": [
r"(?i)(compétence[s]?|skills?|savoir[s]?\s*faire|technical\s*skills?"
r"|compétences?\s*techniques?|hard\s*skills?|soft\s*skills?"
r"|outils?|technologies?|stack\s*technique|expertise)",
],
"formations": [
r"(?i)(formation[s]?|education|diplôme[s]?|cursus|études"
r"|certifications?|parcours\s*académique|academic|qualifications?)",
],
"langues": [
r"(?i)(langue[s]?|languages?|linguistic)",
],
"centres_interet": [
r"(?i)(centre[s]?\s*d'intérêt|hobbies?|loisirs?|interests?"
r"|activités?\s*extra|passions?)",
],
"projets": [
r"(?i)(projet[s]?|projects?|réalisations?|portfolio|open.?source)",
],
"references": [
r"(?i)(référence[s]?|references?|recommendations?)",
],
"publications": [
r"(?i)(publications?|articles?|recherche[s]?|research|papers?)",
],
}
REQUIRED_SECTIONS = {"resume", "experiences", "competences", "formations"}
# ── Core data structures ──────────────────────────────────────────────────────
@dataclass
class Chunk:
"""A single text chunk with metadata."""
section: str
index: int
total_chunks: int
text: str
token_estimate: int
preceding_context: str = ""
is_overflow: bool = False
@property
def full_text(self) -> str:
if self.preceding_context:
return (
f"[CONTEXTE PRÉCÉDENT]\n{self.preceding_context}"
f"\n\n[CONTENU PRINCIPAL]\n{self.text}"
)
return self.text
def __repr__(self) -> str:
return (
f"Chunk(section={self.section!r}, "
f"idx={self.index}/{self.total_chunks - 1}, "
f"~{self.token_estimate} tokens, overflow={self.is_overflow})"
)
@dataclass
class CVSections:
"""Container returned by chunk_cv()."""
chunks_by_section: dict[str, list[Chunk]] = field(default_factory=dict)
full_text: str = ""
detected_sections: list[str] = field(default_factory=list)
def get_section_text(
self,
section: str,
max_tokens: int = MAX_TOKENS_PER_CHUNK,
join_sep: str = "\n\n",
) -> str:
chunks = self.chunks_by_section.get(section, [])
if not chunks or sum(c.token_estimate for c in chunks) < 20:
logger.warning(
"[CVSections] Section '%s' absent. Using full_text window.", section
)
return _window(self.full_text, max_tokens)
budget = max_tokens
parts: list[str] = []
for chunk in chunks:
if budget <= 0:
break
parts.append(chunk.full_text)
budget -= chunk.token_estimate
result = join_sep.join(parts)
if budget < 0:
result = _truncate(result, max_tokens)
return result
def get_first_chunk(self, section: str) -> Chunk | None:
chunks = self.chunks_by_section.get(section, [])
return chunks[0] if chunks else None
def section_token_count(self, section: str) -> int:
return sum(c.token_estimate for c in self.chunks_by_section.get(section, []))
def summary_report(self) -> str:
lines = ["=== CV Chunking Report ==="]
for sec, chunks in self.chunks_by_section.items():
total_tok = sum(c.token_estimate for c in chunks)
overflow_tag = (
" [OVERFLOW → SPLIT]" if any(c.is_overflow for c in chunks) else ""
)
lines.append(
f" {sec:<20} {len(chunks)} chunk(s) ~{total_tok} tokens{overflow_tag}"
)
return "\n".join(lines)
# ── Public API ────────────────────────────────────────────────────────────────
def chunk_cv(full_text: str) -> CVSections:
"""
Main entry-point. Returns a CVSections object.
Algorithm
─────────
1. Detect section header lines via regex.
2. Slice raw text between consecutive headers.
3. For each raw slice:
a. <= MAX_CHARS → single Chunk
b. > MAX_CHARS → adaptive split (experience blocks, paragraphs,
hard character split as last resort)
4. Ensure all REQUIRED_SECTIONS exist with a full_text fallback.
"""
result = CVSections(full_text=full_text)
lines = full_text.splitlines()
boundaries = _detect_boundaries(lines)
logger.info("[Chunking] Detected %d section boundaries.", len(boundaries))
raw_sections = _slice_sections(lines, boundaries)
result.detected_sections = list(raw_sections.keys())
for section_name, raw_text in raw_sections.items():
new_chunks = _adaptive_chunk(section_name, raw_text)
if section_name in result.chunks_by_section:
existing = result.chunks_by_section[section_name]
offset = len(existing)
for c in new_chunks:
c.index += offset
result.chunks_by_section[section_name] = existing + new_chunks
else:
result.chunks_by_section[section_name] = new_chunks
# Fix total_chunks after potential merging of duplicate sections
for section_name, chunks in result.chunks_by_section.items():
total = len(chunks)
for c in chunks:
c.total_chunks = total
# Fallback for required but absent sections
for sec in REQUIRED_SECTIONS:
if sec not in result.chunks_by_section:
logger.warning(
"[Chunking] Required section '%s' not found. Injecting fallback.", sec
)
fallback_text = (
f"[Section '{sec}' non détectée — contenu complet du CV]\n\n"
+ _window(full_text, MAX_TOKENS_PER_CHUNK)
)
result.chunks_by_section[sec] = [
Chunk(
section=sec,
index=0,
total_chunks=1,
text=fallback_text,
token_estimate=_tokens(fallback_text),
is_overflow=False,
)
]
logger.info("[Chunking]\n%s", result.summary_report())
return result
def get_best_chunks_for_agent(
cv: CVSections,
primary_section: str,
context_sections: list[str] | None = None,
agent_token_budget: int = MAX_TOKENS_PER_CHUNK * 2,
) -> str:
"""
Compose optimal input string for an agent within a token budget.
primary_section fills the budget first; context_sections are appended
in order until the budget is exhausted.
"""
parts: list[str] = []
remaining = agent_token_budget
primary_text = cv.get_section_text(primary_section, max_tokens=remaining)
parts.append(primary_text)
remaining -= _tokens(primary_text)
for ctx_sec in context_sections or []:
if remaining <= 100:
break
ctx_text = cv.get_section_text(
ctx_sec, max_tokens=min(remaining, MAX_TOKENS_PER_CHUNK)
)
parts.append(f"\n\n--- [CONTEXTE : {ctx_sec.upper()}] ---\n{ctx_text}")
remaining -= _tokens(ctx_text)
return "\n\n".join(parts)
# ── Backward-compatible interfaces ────────────────────────────────────────────
def chunk_cv_by_sections(full_text: str) -> dict[str, str]:
"""
Legacy dict interface used by the current orchestrator.
Returns {section_name: joined_text, 'full_text': full_text}.
"""
cv = chunk_cv(full_text)
out: dict[str, str] = {"full_text": full_text}
for sec, chunks in cv.chunks_by_section.items():
out[sec] = "\n\n".join(c.full_text for c in chunks)
return out
def get_section_or_full(
sections: dict[str, str],
section_name: str,
max_chars: int = MAX_CHARS,
) -> str:
"""
Legacy helper used by the current orchestrator.
Retrieves section text, falling back to full_text, truncated to max_chars.
"""
content = sections.get(section_name, "")
if len(content) < 100:
content = sections.get("full_text", "")
return _truncate_chars(content, max_chars)
# ── Internal helpers ──────────────────────────────────────────────────────────
def _tokens(text: str) -> int:
return max(1, int(len(text) / CHARS_PER_TOKEN))
def _truncate(text: str, max_tokens: int) -> str:
return _truncate_chars(text, int(max_tokens * CHARS_PER_TOKEN))
def _truncate_chars(text: str, max_chars: int) -> str:
if len(text) <= max_chars:
return text
return text[:max_chars] + "\n\n[… TRONQUÉ — dépasse la fenêtre de contexte …]"
def _window(text: str, max_tokens: int) -> str:
return _truncate(text, max_tokens)
def _detect_boundaries(lines: list[str]) -> list[tuple[int, str]]:
boundaries: list[tuple[int, str]] = []
seen_at: dict[str, int] = {}
for i, line in enumerate(lines):
stripped = line.strip()
if not stripped or len(stripped) > 80:
continue
for section_name, patterns in SECTION_PATTERNS.items():
for pattern in patterns:
if re.search(pattern, stripped):
last = seen_at.get(section_name, -999)
if i - last > 5:
boundaries.append((i, section_name))
seen_at[section_name] = i
break
boundaries.sort(key=lambda x: x[0])
return boundaries
def _slice_sections(
lines: list[str],
boundaries: list[tuple[int, str]],
) -> dict[str, str]:
raw: dict[str, str] = {}
n = len(boundaries)
for idx, (start_line, section_name) in enumerate(boundaries):
end_line = boundaries[idx + 1][0] if idx + 1 < n else len(lines)
content = "\n".join(lines[start_line:end_line]).strip()
if not content:
continue
if section_name in raw:
raw[section_name] += "\n\n" + content
else:
raw[section_name] = content
return raw
def _adaptive_chunk(section_name: str, raw_text: str) -> list[Chunk]:
"""Split raw_text into Chunks, respecting MAX_CHARS."""
if len(raw_text) <= MAX_CHARS:
return [
Chunk(
section=section_name,
index=0,
total_chunks=1,
text=raw_text,
token_estimate=_tokens(raw_text),
is_overflow=False,
)
]
logger.info(
"[Chunking] Section '%s' (%d chars). Splitting adaptively.",
section_name,
len(raw_text),
)
if section_name == "experiences":
blocks = _split_by_experience_blocks(raw_text)
else:
blocks = _split_by_paragraphs(raw_text)
normalised = _normalise_blocks(blocks)
chunks: list[Chunk] = []
prev_tail = ""
for i, block in enumerate(normalised):
preceding = _make_context_header(prev_tail) if prev_tail else ""
chunks.append(
Chunk(
section=section_name,
index=i,
total_chunks=len(normalised),
text=block,
token_estimate=_tokens(block),
preceding_context=preceding,
is_overflow=True,
)
)
prev_tail = block[-OVERLAP_CHARS:] if len(block) > OVERLAP_CHARS else block
return chunks
def _split_by_experience_blocks(text: str) -> list[str]:
"""Split on lines that look like experience anchors (caps title or year)."""
ANCHOR = re.compile(
r"(?m)^(?:"
r"[A-ZÁÀÂÉÈÊÎÏÔÙÛÜ][^\n]{5,60}(?:[-–|@•]|chez|at)\s*\S"
r"|.*\b(19|20)\d{2}\b.*"
r")$"
)
positions = [m.start() for m in ANCHOR.finditer(text)]
if len(positions) < 2:
return _split_by_paragraphs(text)
blocks: list[str] = []
if positions[0] > 0:
blocks.append(text[: positions[0]].strip())
for i, pos in enumerate(positions):
end = positions[i + 1] if i + 1 < len(positions) else len(text)
blocks.append(text[pos:end].strip())
return [b for b in blocks if b]
def _split_by_paragraphs(text: str) -> list[str]:
paragraphs = re.split(r"\n{2,}", text)
return [p.strip() for p in paragraphs if p.strip()]
def _normalise_blocks(blocks: list[str]) -> list[str]:
"""Merge tiny blocks; hard-split oversized ones."""
merged: list[str] = []
buffer = ""
for block in blocks:
if len(buffer) + len(block) + 2 <= MAX_CHARS:
buffer = (buffer + "\n\n" + block).strip() if buffer else block
else:
if buffer:
merged.append(buffer)
buffer = block
if buffer:
merged.append(buffer)
result: list[str] = []
for block in merged:
if len(block) <= MAX_CHARS:
result.append(block)
else:
result.extend(_hard_split(block))
return result
def _hard_split(text: str) -> list[str]:
"""Last-resort split on character count with newline-aware boundary."""
chunks: list[str] = []
start = 0
while start < len(text):
end = min(start + MAX_CHARS, len(text))
if end < len(text):
search_start = end - MAX_CHARS // 5
nl = text.rfind("\n", search_start, end)
if nl > search_start:
end = nl
chunks.append(text[start:end].strip())
start = max(start + 1, end - OVERLAP_CHARS)
return [c for c in chunks if c]
def _make_context_header(prev_tail: str) -> str:
lines = [l.strip() for l in prev_tail.splitlines() if l.strip()]
summary = " | ".join(lines[-3:]) if lines else prev_tail[:120]
return f"(Suite — contexte fin du bloc précédent) : {summary}"