| """ |
| Découpage dynamique intelligent pour le contenu des CV. |
| |
| Stratégie : découpage hybride par section + prise en compte des tokens |
| ─────────────────────────────────────────────────────────────────────── |
| 1. DÉTECTION DES SECTIONS → expressions régulières (FR + EN) pour localiser |
| les limites sémantiques |
| 2. ESTIMATION DES TOKENS → heuristique ~4 caractères/token, sans librairie externe |
| 3. DÉCOUPAGE ADAPTATIF → les sections qui dépassent le budget de tokens sont |
| sous-découpées par paragraphe / bloc de dates afin |
| que le LLM ne reçoive jamais un mur de texte tronqué |
| en pleine phrase |
| 4. INJECTION DE CONTEXTE → chaque fragment de dépassement reçoit un « en‑tête » |
| léger résumant ce qui précède (continuité sémantique) |
| 5. SOLUTION DE SECOURS → si aucune section n’est trouvée, le texte complet |
| est divisé en fenêtres avec chevauchement paramétrable |
| |
| Budget de tokens par défaut |
| ─────────────────────────── |
| MAX_TOKENS_PER_CHUNK = 3 000 (sûr pour les modèles avec contexte 4k) |
| OVERLAP_TOKENS = 200 (préservation du contexte entre fragments) |
| CHARS_PER_TOKEN = 4 (heuristique conservative pour le français/anglais) |
| |
| API rétrocompatible |
| ─────────────────── |
| chunk_cv_by_sections() → interface dict héritée (utilisée par l’orchestrateur actuel) |
| get_section_or_full() → fonction utilitaire héritée (utilisée par l’orchestrateur actuel) |
| |
| Nouvelle API |
| ──────────── |
| chunk_cv() → renvoie un dataclass CVSections |
| get_best_chunks_for_agent() → chaîne de caractères adaptée au budget de tokens |
| pour l’agent |
| """ |
|
|
| from __future__ import annotations |
|
|
| import logging |
| import re |
| from dataclasses import dataclass, field |
|
|
| logger = logging.getLogger(__name__) |
|
|
| |
| MAX_TOKENS_PER_CHUNK: int = 3_000 |
| OVERLAP_TOKENS: int = 200 |
| CHARS_PER_TOKEN: float = 4.0 |
| MAX_CHARS: int = int(MAX_TOKENS_PER_CHUNK * CHARS_PER_TOKEN) |
| OVERLAP_CHARS: int = int(OVERLAP_TOKENS * CHARS_PER_TOKEN) |
|
|
| |
| SECTION_PATTERNS: dict[str, list[str]] = { |
| "resume": [ |
| r"(?i)(profil\s*pro|profil\s*candidat|résumé\s*pro|summary|about\s*me" |
| r"|à\s*propos|objectif(\s*(pro|career))?|présentation|introduction" |
| r"|accroche|profil$|executive\s*summary)", |
| ], |
| "experiences": [ |
| r"(?i)(expérience[s]?\s*(professionnelle[s]?)?|professional\s*experience" |
| r"|work\s*experience|employment|parcours\s*professionnel" |
| r"|postes?\s*occupés?|carrière|career\s*history)", |
| ], |
| "competences": [ |
| r"(?i)(compétence[s]?|skills?|savoir[s]?\s*faire|technical\s*skills?" |
| r"|compétences?\s*techniques?|hard\s*skills?|soft\s*skills?" |
| r"|outils?|technologies?|stack\s*technique|expertise)", |
| ], |
| "formations": [ |
| r"(?i)(formation[s]?|education|diplôme[s]?|cursus|études" |
| r"|certifications?|parcours\s*académique|academic|qualifications?)", |
| ], |
| "langues": [ |
| r"(?i)(langue[s]?|languages?|linguistic)", |
| ], |
| "centres_interet": [ |
| r"(?i)(centre[s]?\s*d'intérêt|hobbies?|loisirs?|interests?" |
| r"|activités?\s*extra|passions?)", |
| ], |
| "projets": [ |
| r"(?i)(projet[s]?|projects?|réalisations?|portfolio|open.?source)", |
| ], |
| "references": [ |
| r"(?i)(référence[s]?|references?|recommendations?)", |
| ], |
| "publications": [ |
| r"(?i)(publications?|articles?|recherche[s]?|research|papers?)", |
| ], |
| } |
|
|
| REQUIRED_SECTIONS = {"resume", "experiences", "competences", "formations"} |
|
|
|
|
| |
|
|
|
|
| @dataclass |
| class Chunk: |
| """A single text chunk with metadata.""" |
|
|
| section: str |
| index: int |
| total_chunks: int |
| text: str |
| token_estimate: int |
| preceding_context: str = "" |
| is_overflow: bool = False |
|
|
| @property |
| def full_text(self) -> str: |
| if self.preceding_context: |
| return ( |
| f"[CONTEXTE PRÉCÉDENT]\n{self.preceding_context}" |
| f"\n\n[CONTENU PRINCIPAL]\n{self.text}" |
| ) |
| return self.text |
|
|
| def __repr__(self) -> str: |
| return ( |
| f"Chunk(section={self.section!r}, " |
| f"idx={self.index}/{self.total_chunks - 1}, " |
| f"~{self.token_estimate} tokens, overflow={self.is_overflow})" |
| ) |
|
|
|
|
| @dataclass |
| class CVSections: |
| """Container returned by chunk_cv().""" |
|
|
| chunks_by_section: dict[str, list[Chunk]] = field(default_factory=dict) |
| full_text: str = "" |
| detected_sections: list[str] = field(default_factory=list) |
|
|
| def get_section_text( |
| self, |
| section: str, |
| max_tokens: int = MAX_TOKENS_PER_CHUNK, |
| join_sep: str = "\n\n", |
| ) -> str: |
| chunks = self.chunks_by_section.get(section, []) |
| if not chunks or sum(c.token_estimate for c in chunks) < 20: |
| logger.warning( |
| "[CVSections] Section '%s' absent. Using full_text window.", section |
| ) |
| return _window(self.full_text, max_tokens) |
| budget = max_tokens |
| parts: list[str] = [] |
| for chunk in chunks: |
| if budget <= 0: |
| break |
| parts.append(chunk.full_text) |
| budget -= chunk.token_estimate |
| result = join_sep.join(parts) |
| if budget < 0: |
| result = _truncate(result, max_tokens) |
| return result |
|
|
| def get_first_chunk(self, section: str) -> Chunk | None: |
| chunks = self.chunks_by_section.get(section, []) |
| return chunks[0] if chunks else None |
|
|
| def section_token_count(self, section: str) -> int: |
| return sum(c.token_estimate for c in self.chunks_by_section.get(section, [])) |
|
|
| def summary_report(self) -> str: |
| lines = ["=== CV Chunking Report ==="] |
| for sec, chunks in self.chunks_by_section.items(): |
| total_tok = sum(c.token_estimate for c in chunks) |
| overflow_tag = ( |
| " [OVERFLOW → SPLIT]" if any(c.is_overflow for c in chunks) else "" |
| ) |
| lines.append( |
| f" {sec:<20} {len(chunks)} chunk(s) ~{total_tok} tokens{overflow_tag}" |
| ) |
| return "\n".join(lines) |
|
|
|
|
| |
|
|
|
|
| def chunk_cv(full_text: str) -> CVSections: |
| """ |
| Main entry-point. Returns a CVSections object. |
| |
| Algorithm |
| ───────── |
| 1. Detect section header lines via regex. |
| 2. Slice raw text between consecutive headers. |
| 3. For each raw slice: |
| a. <= MAX_CHARS → single Chunk |
| b. > MAX_CHARS → adaptive split (experience blocks, paragraphs, |
| hard character split as last resort) |
| 4. Ensure all REQUIRED_SECTIONS exist with a full_text fallback. |
| """ |
| result = CVSections(full_text=full_text) |
| lines = full_text.splitlines() |
|
|
| boundaries = _detect_boundaries(lines) |
| logger.info("[Chunking] Detected %d section boundaries.", len(boundaries)) |
|
|
| raw_sections = _slice_sections(lines, boundaries) |
| result.detected_sections = list(raw_sections.keys()) |
|
|
| for section_name, raw_text in raw_sections.items(): |
| new_chunks = _adaptive_chunk(section_name, raw_text) |
| if section_name in result.chunks_by_section: |
| existing = result.chunks_by_section[section_name] |
| offset = len(existing) |
| for c in new_chunks: |
| c.index += offset |
| result.chunks_by_section[section_name] = existing + new_chunks |
| else: |
| result.chunks_by_section[section_name] = new_chunks |
|
|
| |
| for section_name, chunks in result.chunks_by_section.items(): |
| total = len(chunks) |
| for c in chunks: |
| c.total_chunks = total |
|
|
| |
| for sec in REQUIRED_SECTIONS: |
| if sec not in result.chunks_by_section: |
| logger.warning( |
| "[Chunking] Required section '%s' not found. Injecting fallback.", sec |
| ) |
| fallback_text = ( |
| f"[Section '{sec}' non détectée — contenu complet du CV]\n\n" |
| + _window(full_text, MAX_TOKENS_PER_CHUNK) |
| ) |
| result.chunks_by_section[sec] = [ |
| Chunk( |
| section=sec, |
| index=0, |
| total_chunks=1, |
| text=fallback_text, |
| token_estimate=_tokens(fallback_text), |
| is_overflow=False, |
| ) |
| ] |
|
|
| logger.info("[Chunking]\n%s", result.summary_report()) |
| return result |
|
|
|
|
| def get_best_chunks_for_agent( |
| cv: CVSections, |
| primary_section: str, |
| context_sections: list[str] | None = None, |
| agent_token_budget: int = MAX_TOKENS_PER_CHUNK * 2, |
| ) -> str: |
| """ |
| Compose optimal input string for an agent within a token budget. |
| primary_section fills the budget first; context_sections are appended |
| in order until the budget is exhausted. |
| """ |
| parts: list[str] = [] |
| remaining = agent_token_budget |
|
|
| primary_text = cv.get_section_text(primary_section, max_tokens=remaining) |
| parts.append(primary_text) |
| remaining -= _tokens(primary_text) |
|
|
| for ctx_sec in context_sections or []: |
| if remaining <= 100: |
| break |
| ctx_text = cv.get_section_text( |
| ctx_sec, max_tokens=min(remaining, MAX_TOKENS_PER_CHUNK) |
| ) |
| parts.append(f"\n\n--- [CONTEXTE : {ctx_sec.upper()}] ---\n{ctx_text}") |
| remaining -= _tokens(ctx_text) |
|
|
| return "\n\n".join(parts) |
|
|
|
|
| |
|
|
|
|
| def chunk_cv_by_sections(full_text: str) -> dict[str, str]: |
| """ |
| Legacy dict interface used by the current orchestrator. |
| Returns {section_name: joined_text, 'full_text': full_text}. |
| """ |
| cv = chunk_cv(full_text) |
| out: dict[str, str] = {"full_text": full_text} |
| for sec, chunks in cv.chunks_by_section.items(): |
| out[sec] = "\n\n".join(c.full_text for c in chunks) |
| return out |
|
|
|
|
| def get_section_or_full( |
| sections: dict[str, str], |
| section_name: str, |
| max_chars: int = MAX_CHARS, |
| ) -> str: |
| """ |
| Legacy helper used by the current orchestrator. |
| Retrieves section text, falling back to full_text, truncated to max_chars. |
| """ |
| content = sections.get(section_name, "") |
| if len(content) < 100: |
| content = sections.get("full_text", "") |
| return _truncate_chars(content, max_chars) |
|
|
|
|
| |
|
|
|
|
| def _tokens(text: str) -> int: |
| return max(1, int(len(text) / CHARS_PER_TOKEN)) |
|
|
|
|
| def _truncate(text: str, max_tokens: int) -> str: |
| return _truncate_chars(text, int(max_tokens * CHARS_PER_TOKEN)) |
|
|
|
|
| def _truncate_chars(text: str, max_chars: int) -> str: |
| if len(text) <= max_chars: |
| return text |
| return text[:max_chars] + "\n\n[… TRONQUÉ — dépasse la fenêtre de contexte …]" |
|
|
|
|
| def _window(text: str, max_tokens: int) -> str: |
| return _truncate(text, max_tokens) |
|
|
|
|
| def _detect_boundaries(lines: list[str]) -> list[tuple[int, str]]: |
| boundaries: list[tuple[int, str]] = [] |
| seen_at: dict[str, int] = {} |
|
|
| for i, line in enumerate(lines): |
| stripped = line.strip() |
| if not stripped or len(stripped) > 80: |
| continue |
| for section_name, patterns in SECTION_PATTERNS.items(): |
| for pattern in patterns: |
| if re.search(pattern, stripped): |
| last = seen_at.get(section_name, -999) |
| if i - last > 5: |
| boundaries.append((i, section_name)) |
| seen_at[section_name] = i |
| break |
|
|
| boundaries.sort(key=lambda x: x[0]) |
| return boundaries |
|
|
|
|
| def _slice_sections( |
| lines: list[str], |
| boundaries: list[tuple[int, str]], |
| ) -> dict[str, str]: |
| raw: dict[str, str] = {} |
| n = len(boundaries) |
|
|
| for idx, (start_line, section_name) in enumerate(boundaries): |
| end_line = boundaries[idx + 1][0] if idx + 1 < n else len(lines) |
| content = "\n".join(lines[start_line:end_line]).strip() |
| if not content: |
| continue |
| if section_name in raw: |
| raw[section_name] += "\n\n" + content |
| else: |
| raw[section_name] = content |
|
|
| return raw |
|
|
|
|
| def _adaptive_chunk(section_name: str, raw_text: str) -> list[Chunk]: |
| """Split raw_text into Chunks, respecting MAX_CHARS.""" |
| if len(raw_text) <= MAX_CHARS: |
| return [ |
| Chunk( |
| section=section_name, |
| index=0, |
| total_chunks=1, |
| text=raw_text, |
| token_estimate=_tokens(raw_text), |
| is_overflow=False, |
| ) |
| ] |
|
|
| logger.info( |
| "[Chunking] Section '%s' (%d chars). Splitting adaptively.", |
| section_name, |
| len(raw_text), |
| ) |
|
|
| if section_name == "experiences": |
| blocks = _split_by_experience_blocks(raw_text) |
| else: |
| blocks = _split_by_paragraphs(raw_text) |
|
|
| normalised = _normalise_blocks(blocks) |
|
|
| chunks: list[Chunk] = [] |
| prev_tail = "" |
|
|
| for i, block in enumerate(normalised): |
| preceding = _make_context_header(prev_tail) if prev_tail else "" |
| chunks.append( |
| Chunk( |
| section=section_name, |
| index=i, |
| total_chunks=len(normalised), |
| text=block, |
| token_estimate=_tokens(block), |
| preceding_context=preceding, |
| is_overflow=True, |
| ) |
| ) |
| prev_tail = block[-OVERLAP_CHARS:] if len(block) > OVERLAP_CHARS else block |
|
|
| return chunks |
|
|
|
|
| def _split_by_experience_blocks(text: str) -> list[str]: |
| """Split on lines that look like experience anchors (caps title or year).""" |
| ANCHOR = re.compile( |
| r"(?m)^(?:" |
| r"[A-ZÁÀÂÉÈÊÎÏÔÙÛÜ][^\n]{5,60}(?:[-–|@•]|chez|at)\s*\S" |
| r"|.*\b(19|20)\d{2}\b.*" |
| r")$" |
| ) |
| positions = [m.start() for m in ANCHOR.finditer(text)] |
|
|
| if len(positions) < 2: |
| return _split_by_paragraphs(text) |
|
|
| blocks: list[str] = [] |
| if positions[0] > 0: |
| blocks.append(text[: positions[0]].strip()) |
| for i, pos in enumerate(positions): |
| end = positions[i + 1] if i + 1 < len(positions) else len(text) |
| blocks.append(text[pos:end].strip()) |
|
|
| return [b for b in blocks if b] |
|
|
|
|
| def _split_by_paragraphs(text: str) -> list[str]: |
| paragraphs = re.split(r"\n{2,}", text) |
| return [p.strip() for p in paragraphs if p.strip()] |
|
|
|
|
| def _normalise_blocks(blocks: list[str]) -> list[str]: |
| """Merge tiny blocks; hard-split oversized ones.""" |
| merged: list[str] = [] |
| buffer = "" |
| for block in blocks: |
| if len(buffer) + len(block) + 2 <= MAX_CHARS: |
| buffer = (buffer + "\n\n" + block).strip() if buffer else block |
| else: |
| if buffer: |
| merged.append(buffer) |
| buffer = block |
| if buffer: |
| merged.append(buffer) |
|
|
| result: list[str] = [] |
| for block in merged: |
| if len(block) <= MAX_CHARS: |
| result.append(block) |
| else: |
| result.extend(_hard_split(block)) |
| return result |
|
|
|
|
| def _hard_split(text: str) -> list[str]: |
| """Last-resort split on character count with newline-aware boundary.""" |
| chunks: list[str] = [] |
| start = 0 |
| while start < len(text): |
| end = min(start + MAX_CHARS, len(text)) |
| if end < len(text): |
| search_start = end - MAX_CHARS // 5 |
| nl = text.rfind("\n", search_start, end) |
| if nl > search_start: |
| end = nl |
| chunks.append(text[start:end].strip()) |
| start = max(start + 1, end - OVERLAP_CHARS) |
| return [c for c in chunks if c] |
|
|
|
|
| def _make_context_header(prev_tail: str) -> str: |
| lines = [l.strip() for l in prev_tail.splitlines() if l.strip()] |
| summary = " | ".join(lines[-3:]) if lines else prev_tail[:120] |
| return f"(Suite — contexte fin du bloc précédent) : {summary}" |
|
|