Spaces:
Sleeping
Sleeping
| """ | |
| Découpage dynamique intelligent pour le contenu des CV. | |
| Stratégie : découpage hybride par section + prise en compte des tokens | |
| ─────────────────────────────────────────────────────────────────────── | |
| 1. DÉTECTION DES SECTIONS → expressions régulières (FR + EN) pour localiser | |
| les limites sémantiques | |
| 2. ESTIMATION DES TOKENS → heuristique ~4 caractères/token, sans librairie externe | |
| 3. DÉCOUPAGE ADAPTATIF → les sections qui dépassent le budget de tokens sont | |
| sous-découpées par paragraphe / bloc de dates afin | |
| que le LLM ne reçoive jamais un mur de texte tronqué | |
| en pleine phrase | |
| 4. INJECTION DE CONTEXTE → chaque fragment de dépassement reçoit un « en‑tête » | |
| léger résumant ce qui précède (continuité sémantique) | |
| 5. SOLUTION DE SECOURS → si aucune section n’est trouvée, le texte complet | |
| est divisé en fenêtres avec chevauchement paramétrable | |
| Budget de tokens par défaut | |
| ─────────────────────────── | |
| MAX_TOKENS_PER_CHUNK = 3 000 (sûr pour les modèles avec contexte 4k) | |
| OVERLAP_TOKENS = 200 (préservation du contexte entre fragments) | |
| CHARS_PER_TOKEN = 4 (heuristique conservative pour le français/anglais) | |
| API rétrocompatible | |
| ─────────────────── | |
| chunk_cv_by_sections() → interface dict héritée (utilisée par l’orchestrateur actuel) | |
| get_section_or_full() → fonction utilitaire héritée (utilisée par l’orchestrateur actuel) | |
| Nouvelle API | |
| ──────────── | |
| chunk_cv() → renvoie un dataclass CVSections | |
| get_best_chunks_for_agent() → chaîne de caractères adaptée au budget de tokens | |
| pour l’agent | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| import re | |
| from dataclasses import dataclass, field | |
| logger = logging.getLogger(__name__) | |
| # ── Tunable constants ──────────────────────────────────────────────────────── | |
| MAX_TOKENS_PER_CHUNK: int = 3_000 | |
| OVERLAP_TOKENS: int = 200 | |
| CHARS_PER_TOKEN: float = 4.0 | |
| MAX_CHARS: int = int(MAX_TOKENS_PER_CHUNK * CHARS_PER_TOKEN) | |
| OVERLAP_CHARS: int = int(OVERLAP_TOKENS * CHARS_PER_TOKEN) | |
| # ── Section vocabulary (FR + EN) ───────────────────────────────────────────── | |
| SECTION_PATTERNS: dict[str, list[str]] = { | |
| "resume": [ | |
| r"(?i)(profil\s*pro|profil\s*candidat|résumé\s*pro|summary|about\s*me" | |
| r"|à\s*propos|objectif(\s*(pro|career))?|présentation|introduction" | |
| r"|accroche|profil$|executive\s*summary)", | |
| ], | |
| "experiences": [ | |
| r"(?i)(expérience[s]?\s*(professionnelle[s]?)?|professional\s*experience" | |
| r"|work\s*experience|employment|parcours\s*professionnel" | |
| r"|postes?\s*occupés?|carrière|career\s*history)", | |
| ], | |
| "competences": [ | |
| r"(?i)(compétence[s]?|skills?|savoir[s]?\s*faire|technical\s*skills?" | |
| r"|compétences?\s*techniques?|hard\s*skills?|soft\s*skills?" | |
| r"|outils?|technologies?|stack\s*technique|expertise)", | |
| ], | |
| "formations": [ | |
| r"(?i)(formation[s]?|education|diplôme[s]?|cursus|études" | |
| r"|certifications?|parcours\s*académique|academic|qualifications?)", | |
| ], | |
| "langues": [ | |
| r"(?i)(langue[s]?|languages?|linguistic)", | |
| ], | |
| "centres_interet": [ | |
| r"(?i)(centre[s]?\s*d'intérêt|hobbies?|loisirs?|interests?" | |
| r"|activités?\s*extra|passions?)", | |
| ], | |
| "projets": [ | |
| r"(?i)(projet[s]?|projects?|réalisations?|portfolio|open.?source)", | |
| ], | |
| "references": [ | |
| r"(?i)(référence[s]?|references?|recommendations?)", | |
| ], | |
| "publications": [ | |
| r"(?i)(publications?|articles?|recherche[s]?|research|papers?)", | |
| ], | |
| } | |
| REQUIRED_SECTIONS = {"resume", "experiences", "competences", "formations"} | |
| # ── Core data structures ────────────────────────────────────────────────────── | |
| class Chunk: | |
| """A single text chunk with metadata.""" | |
| section: str | |
| index: int | |
| total_chunks: int | |
| text: str | |
| token_estimate: int | |
| preceding_context: str = "" | |
| is_overflow: bool = False | |
| def full_text(self) -> str: | |
| if self.preceding_context: | |
| return ( | |
| f"[CONTEXTE PRÉCÉDENT]\n{self.preceding_context}" | |
| f"\n\n[CONTENU PRINCIPAL]\n{self.text}" | |
| ) | |
| return self.text | |
| def __repr__(self) -> str: | |
| return ( | |
| f"Chunk(section={self.section!r}, " | |
| f"idx={self.index}/{self.total_chunks - 1}, " | |
| f"~{self.token_estimate} tokens, overflow={self.is_overflow})" | |
| ) | |
| class CVSections: | |
| """Container returned by chunk_cv().""" | |
| chunks_by_section: dict[str, list[Chunk]] = field(default_factory=dict) | |
| full_text: str = "" | |
| detected_sections: list[str] = field(default_factory=list) | |
| def get_section_text( | |
| self, | |
| section: str, | |
| max_tokens: int = MAX_TOKENS_PER_CHUNK, | |
| join_sep: str = "\n\n", | |
| ) -> str: | |
| chunks = self.chunks_by_section.get(section, []) | |
| if not chunks or sum(c.token_estimate for c in chunks) < 20: | |
| logger.warning( | |
| "[CVSections] Section '%s' absent. Using full_text window.", section | |
| ) | |
| return _window(self.full_text, max_tokens) | |
| budget = max_tokens | |
| parts: list[str] = [] | |
| for chunk in chunks: | |
| if budget <= 0: | |
| break | |
| parts.append(chunk.full_text) | |
| budget -= chunk.token_estimate | |
| result = join_sep.join(parts) | |
| if budget < 0: | |
| result = _truncate(result, max_tokens) | |
| return result | |
| def get_first_chunk(self, section: str) -> Chunk | None: | |
| chunks = self.chunks_by_section.get(section, []) | |
| return chunks[0] if chunks else None | |
| def section_token_count(self, section: str) -> int: | |
| return sum(c.token_estimate for c in self.chunks_by_section.get(section, [])) | |
| def summary_report(self) -> str: | |
| lines = ["=== CV Chunking Report ==="] | |
| for sec, chunks in self.chunks_by_section.items(): | |
| total_tok = sum(c.token_estimate for c in chunks) | |
| overflow_tag = ( | |
| " [OVERFLOW → SPLIT]" if any(c.is_overflow for c in chunks) else "" | |
| ) | |
| lines.append( | |
| f" {sec:<20} {len(chunks)} chunk(s) ~{total_tok} tokens{overflow_tag}" | |
| ) | |
| return "\n".join(lines) | |
| # ── Public API ──────────────────────────────────────────────────────────────── | |
| def chunk_cv(full_text: str) -> CVSections: | |
| """ | |
| Main entry-point. Returns a CVSections object. | |
| Algorithm | |
| ───────── | |
| 1. Detect section header lines via regex. | |
| 2. Slice raw text between consecutive headers. | |
| 3. For each raw slice: | |
| a. <= MAX_CHARS → single Chunk | |
| b. > MAX_CHARS → adaptive split (experience blocks, paragraphs, | |
| hard character split as last resort) | |
| 4. Ensure all REQUIRED_SECTIONS exist with a full_text fallback. | |
| """ | |
| result = CVSections(full_text=full_text) | |
| lines = full_text.splitlines() | |
| boundaries = _detect_boundaries(lines) | |
| logger.info("[Chunking] Detected %d section boundaries.", len(boundaries)) | |
| raw_sections = _slice_sections(lines, boundaries) | |
| result.detected_sections = list(raw_sections.keys()) | |
| for section_name, raw_text in raw_sections.items(): | |
| new_chunks = _adaptive_chunk(section_name, raw_text) | |
| if section_name in result.chunks_by_section: | |
| existing = result.chunks_by_section[section_name] | |
| offset = len(existing) | |
| for c in new_chunks: | |
| c.index += offset | |
| result.chunks_by_section[section_name] = existing + new_chunks | |
| else: | |
| result.chunks_by_section[section_name] = new_chunks | |
| # Fix total_chunks after potential merging of duplicate sections | |
| for section_name, chunks in result.chunks_by_section.items(): | |
| total = len(chunks) | |
| for c in chunks: | |
| c.total_chunks = total | |
| # Fallback for required but absent sections | |
| for sec in REQUIRED_SECTIONS: | |
| if sec not in result.chunks_by_section: | |
| logger.warning( | |
| "[Chunking] Required section '%s' not found. Injecting fallback.", sec | |
| ) | |
| fallback_text = ( | |
| f"[Section '{sec}' non détectée — contenu complet du CV]\n\n" | |
| + _window(full_text, MAX_TOKENS_PER_CHUNK) | |
| ) | |
| result.chunks_by_section[sec] = [ | |
| Chunk( | |
| section=sec, | |
| index=0, | |
| total_chunks=1, | |
| text=fallback_text, | |
| token_estimate=_tokens(fallback_text), | |
| is_overflow=False, | |
| ) | |
| ] | |
| logger.info("[Chunking]\n%s", result.summary_report()) | |
| return result | |
| def get_best_chunks_for_agent( | |
| cv: CVSections, | |
| primary_section: str, | |
| context_sections: list[str] | None = None, | |
| agent_token_budget: int = MAX_TOKENS_PER_CHUNK * 2, | |
| ) -> str: | |
| """ | |
| Compose optimal input string for an agent within a token budget. | |
| primary_section fills the budget first; context_sections are appended | |
| in order until the budget is exhausted. | |
| """ | |
| parts: list[str] = [] | |
| remaining = agent_token_budget | |
| primary_text = cv.get_section_text(primary_section, max_tokens=remaining) | |
| parts.append(primary_text) | |
| remaining -= _tokens(primary_text) | |
| for ctx_sec in context_sections or []: | |
| if remaining <= 100: | |
| break | |
| ctx_text = cv.get_section_text( | |
| ctx_sec, max_tokens=min(remaining, MAX_TOKENS_PER_CHUNK) | |
| ) | |
| parts.append(f"\n\n--- [CONTEXTE : {ctx_sec.upper()}] ---\n{ctx_text}") | |
| remaining -= _tokens(ctx_text) | |
| return "\n\n".join(parts) | |
| # ── Backward-compatible interfaces ──────────────────────────────────────────── | |
| def chunk_cv_by_sections(full_text: str) -> dict[str, str]: | |
| """ | |
| Legacy dict interface used by the current orchestrator. | |
| Returns {section_name: joined_text, 'full_text': full_text}. | |
| """ | |
| cv = chunk_cv(full_text) | |
| out: dict[str, str] = {"full_text": full_text} | |
| for sec, chunks in cv.chunks_by_section.items(): | |
| out[sec] = "\n\n".join(c.full_text for c in chunks) | |
| return out | |
| def get_section_or_full( | |
| sections: dict[str, str], | |
| section_name: str, | |
| max_chars: int = MAX_CHARS, | |
| ) -> str: | |
| """ | |
| Legacy helper used by the current orchestrator. | |
| Retrieves section text, falling back to full_text, truncated to max_chars. | |
| """ | |
| content = sections.get(section_name, "") | |
| if len(content) < 100: | |
| content = sections.get("full_text", "") | |
| return _truncate_chars(content, max_chars) | |
| # ── Internal helpers ────────────────────────────────────────────────────────── | |
| def _tokens(text: str) -> int: | |
| return max(1, int(len(text) / CHARS_PER_TOKEN)) | |
| def _truncate(text: str, max_tokens: int) -> str: | |
| return _truncate_chars(text, int(max_tokens * CHARS_PER_TOKEN)) | |
| def _truncate_chars(text: str, max_chars: int) -> str: | |
| if len(text) <= max_chars: | |
| return text | |
| return text[:max_chars] + "\n\n[… TRONQUÉ — dépasse la fenêtre de contexte …]" | |
| def _window(text: str, max_tokens: int) -> str: | |
| return _truncate(text, max_tokens) | |
| def _detect_boundaries(lines: list[str]) -> list[tuple[int, str]]: | |
| boundaries: list[tuple[int, str]] = [] | |
| seen_at: dict[str, int] = {} | |
| for i, line in enumerate(lines): | |
| stripped = line.strip() | |
| if not stripped or len(stripped) > 80: | |
| continue | |
| for section_name, patterns in SECTION_PATTERNS.items(): | |
| for pattern in patterns: | |
| if re.search(pattern, stripped): | |
| last = seen_at.get(section_name, -999) | |
| if i - last > 5: | |
| boundaries.append((i, section_name)) | |
| seen_at[section_name] = i | |
| break | |
| boundaries.sort(key=lambda x: x[0]) | |
| return boundaries | |
| def _slice_sections( | |
| lines: list[str], | |
| boundaries: list[tuple[int, str]], | |
| ) -> dict[str, str]: | |
| raw: dict[str, str] = {} | |
| n = len(boundaries) | |
| for idx, (start_line, section_name) in enumerate(boundaries): | |
| end_line = boundaries[idx + 1][0] if idx + 1 < n else len(lines) | |
| content = "\n".join(lines[start_line:end_line]).strip() | |
| if not content: | |
| continue | |
| if section_name in raw: | |
| raw[section_name] += "\n\n" + content | |
| else: | |
| raw[section_name] = content | |
| return raw | |
| def _adaptive_chunk(section_name: str, raw_text: str) -> list[Chunk]: | |
| """Split raw_text into Chunks, respecting MAX_CHARS.""" | |
| if len(raw_text) <= MAX_CHARS: | |
| return [ | |
| Chunk( | |
| section=section_name, | |
| index=0, | |
| total_chunks=1, | |
| text=raw_text, | |
| token_estimate=_tokens(raw_text), | |
| is_overflow=False, | |
| ) | |
| ] | |
| logger.info( | |
| "[Chunking] Section '%s' (%d chars). Splitting adaptively.", | |
| section_name, | |
| len(raw_text), | |
| ) | |
| if section_name == "experiences": | |
| blocks = _split_by_experience_blocks(raw_text) | |
| else: | |
| blocks = _split_by_paragraphs(raw_text) | |
| normalised = _normalise_blocks(blocks) | |
| chunks: list[Chunk] = [] | |
| prev_tail = "" | |
| for i, block in enumerate(normalised): | |
| preceding = _make_context_header(prev_tail) if prev_tail else "" | |
| chunks.append( | |
| Chunk( | |
| section=section_name, | |
| index=i, | |
| total_chunks=len(normalised), | |
| text=block, | |
| token_estimate=_tokens(block), | |
| preceding_context=preceding, | |
| is_overflow=True, | |
| ) | |
| ) | |
| prev_tail = block[-OVERLAP_CHARS:] if len(block) > OVERLAP_CHARS else block | |
| return chunks | |
| def _split_by_experience_blocks(text: str) -> list[str]: | |
| """Split on lines that look like experience anchors (caps title or year).""" | |
| ANCHOR = re.compile( | |
| r"(?m)^(?:" | |
| r"[A-ZÁÀÂÉÈÊÎÏÔÙÛÜ][^\n]{5,60}(?:[-–|@•]|chez|at)\s*\S" | |
| r"|.*\b(19|20)\d{2}\b.*" | |
| r")$" | |
| ) | |
| positions = [m.start() for m in ANCHOR.finditer(text)] | |
| if len(positions) < 2: | |
| return _split_by_paragraphs(text) | |
| blocks: list[str] = [] | |
| if positions[0] > 0: | |
| blocks.append(text[: positions[0]].strip()) | |
| for i, pos in enumerate(positions): | |
| end = positions[i + 1] if i + 1 < len(positions) else len(text) | |
| blocks.append(text[pos:end].strip()) | |
| return [b for b in blocks if b] | |
| def _split_by_paragraphs(text: str) -> list[str]: | |
| paragraphs = re.split(r"\n{2,}", text) | |
| return [p.strip() for p in paragraphs if p.strip()] | |
| def _normalise_blocks(blocks: list[str]) -> list[str]: | |
| """Merge tiny blocks; hard-split oversized ones.""" | |
| merged: list[str] = [] | |
| buffer = "" | |
| for block in blocks: | |
| if len(buffer) + len(block) + 2 <= MAX_CHARS: | |
| buffer = (buffer + "\n\n" + block).strip() if buffer else block | |
| else: | |
| if buffer: | |
| merged.append(buffer) | |
| buffer = block | |
| if buffer: | |
| merged.append(buffer) | |
| result: list[str] = [] | |
| for block in merged: | |
| if len(block) <= MAX_CHARS: | |
| result.append(block) | |
| else: | |
| result.extend(_hard_split(block)) | |
| return result | |
| def _hard_split(text: str) -> list[str]: | |
| """Last-resort split on character count with newline-aware boundary.""" | |
| chunks: list[str] = [] | |
| start = 0 | |
| while start < len(text): | |
| end = min(start + MAX_CHARS, len(text)) | |
| if end < len(text): | |
| search_start = end - MAX_CHARS // 5 | |
| nl = text.rfind("\n", search_start, end) | |
| if nl > search_start: | |
| end = nl | |
| chunks.append(text[start:end].strip()) | |
| start = max(start + 1, end - OVERLAP_CHARS) | |
| return [c for c in chunks if c] | |
| def _make_context_header(prev_tail: str) -> str: | |
| lines = [l.strip() for l in prev_tail.splitlines() if l.strip()] | |
| summary = " | ".join(lines[-3:]) if lines else prev_tail[:120] | |
| return f"(Suite — contexte fin du bloc précédent) : {summary}" | |