# 3. utils.py """ Utilitaires pour l'application de traduction """ import re import json import time import random import hashlib import logging from typing import List, Dict, Any, Optional from pathlib import Path from datetime import datetime class TranslationCache: """Gestion du cache de traduction avec persistance""" def __init__(self, cache_dir: str = ".translation_cache"): self.cache_dir = Path(cache_dir) self.cache_dir.mkdir(exist_ok=True) self.memory_cache = {} self.cache_file = self.cache_dir / "translations.json" self.load_cache() def load_cache(self): """Charge le cache depuis le disque""" if self.cache_file.exists(): try: with open(self.cache_file, 'r', encoding='utf-8') as f: self.memory_cache = json.load(f) except Exception as e: logging.warning(f"Erreur chargement cache: {e}") self.memory_cache = {} def save_cache(self): """Sauvegarde le cache sur disque""" try: with open(self.cache_file, 'w', encoding='utf-8') as f: json.dump(self.memory_cache, f, ensure_ascii=False, indent=2) except Exception as e: logging.error(f"Erreur sauvegarde cache: {e}") def get_cache_key(self, text: str, source_lang: str, target_lang: str, engine: str) -> str: """Génère une clé unique pour le cache""" content = f"{text}_{source_lang}_{target_lang}_{engine}" return hashlib.md5(content.encode()).hexdigest() def get(self, text: str, source_lang: str, target_lang: str, engine: str) -> Optional[str]: """Récupère une traduction du cache""" key = self.get_cache_key(text, source_lang, target_lang, engine) return self.memory_cache.get(key) def set(self, text: str, translation: str, source_lang: str, target_lang: str, engine: str): """Ajoute une traduction au cache""" key = self.get_cache_key(text, source_lang, target_lang, engine) self.memory_cache[key] = translation self.save_cache() class TextChunker: """Découpe intelligente du texte en chunks""" @staticmethod def split_text(text: str, max_chars: int = 3000, preserve_formatting: bool = True) -> List[str]: """ Découpe le texte en chunks intelligents - Respecte les paragraphes - Préserve la ponctuation - Optimise pour la traduction """ if not text or not text.strip(): return [] # Nettoyer le texte text = text.strip() # Si le texte est court, retourner tel quel if len(text) <= max_chars: return [text] chunks = [] # Séparer par paragraphes (double saut de ligne) paragraphs = re.split(r'\n\n+', text) current_chunk = "" for para in paragraphs: # Si le paragraphe seul est trop long if len(para) > max_chars: # Sauvegarder le chunk en cours if current_chunk: chunks.append(current_chunk.strip()) current_chunk = "" # Découper le paragraphe par phrases sentences = re.split(r'(?<=[.!?])\s+', para) for sentence in sentences: if len(sentence) > max_chars: # Découper par mots si la phrase est trop longue words = sentence.split() temp_chunk = "" for word in words: if len(temp_chunk) + len(word) + 1 <= max_chars: temp_chunk = f"{temp_chunk} {word}".strip() else: if temp_chunk: chunks.append(temp_chunk) temp_chunk = word if temp_chunk: chunks.append(temp_chunk) elif len(current_chunk) + len(sentence) + 1 <= max_chars: current_chunk = f"{current_chunk} {sentence}".strip() else: if current_chunk: chunks.append(current_chunk) current_chunk = sentence # Si ajouter le paragraphe ne dépasse pas la limite elif len(current_chunk) + len(para) + 2 <= max_chars: if current_chunk: current_chunk = f"{current_chunk}\n\n{para}" else: current_chunk = para else: # Sauvegarder le chunk actuel et commencer un nouveau if current_chunk: chunks.append(current_chunk.strip()) current_chunk = para # Ajouter le dernier chunk if current_chunk: chunks.append(current_chunk.strip()) return chunks class ProgressTracker: """Suivi de progression avec sauvegarde""" def __init__(self, total_items: int, task_id: str): self.total_items = total_items self.task_id = task_id self.completed_items = 0 self.start_time = datetime.now() self.checkpoint_file = Path(f".checkpoints/{task_id}.json") self.checkpoint_file.parent.mkdir(exist_ok=True) self.load_checkpoint() def load_checkpoint(self): """Charge le point de sauvegarde""" if self.checkpoint_file.exists(): try: with open(self.checkpoint_file, 'r') as f: data = json.load(f) self.completed_items = data.get('completed', 0) except: pass def update(self, increment: int = 1): """Met à jour la progression""" self.completed_items += increment self.save_checkpoint() def save_checkpoint(self): """Sauvegarde la progression""" data = { 'task_id': self.task_id, 'total': self.total_items, 'completed': self.completed_items, 'timestamp': datetime.now().isoformat() } with open(self.checkpoint_file, 'w') as f: json.dump(data, f) def get_progress(self) -> float: """Retourne le pourcentage de progression""" if self.total_items == 0: return 100.0 return (self.completed_items / self.total_items) * 100 def get_eta(self) -> str: """Estime le temps restant""" if self.completed_items == 0: return "Calcul en cours..." elapsed = (datetime.now() - self.start_time).total_seconds() rate = self.completed_items / elapsed remaining = self.total_items - self.completed_items eta_seconds = remaining / rate if rate > 0 else 0 hours = int(eta_seconds // 3600) minutes = int((eta_seconds % 3600) // 60) if hours > 0: return f"{hours}h {minutes}min" return f"{minutes}min" class RateLimiter: """Gestion des limites de taux d'API""" def __init__(self, min_delay: float = 0.5, max_delay: float = 2.0): self.min_delay = min_delay self.max_delay = max_delay self.last_request_time = 0 self.request_count = 0 self.error_count = 0 def wait(self): """Attend avant la prochaine requête""" # Calcul du délai adaptatif if self.error_count > 0: # Augmenter le délai en cas d'erreurs delay = min(self.max_delay * (1.5 ** self.error_count), 10.0) else: # Délai aléatoire normal delay = random.uniform(self.min_delay, self.max_delay) # Attendre si nécessaire elapsed = time.time() - self.last_request_time if elapsed < delay: time.sleep(delay - elapsed) self.last_request_time = time.time() self.request_count += 1 def register_error(self): """Enregistre une erreur""" self.error_count += 1 def reset_errors(self): """Réinitialise le compteur d'erreurs""" self.error_count = 0 def setup_logger(name: str, log_file: str = None) -> logging.Logger: """Configure un logger personnalisé""" logger = logging.getLogger(name) logger.setLevel(logging.DEBUG) # Format détaillé formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) # Handler console console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) console_handler.setFormatter(formatter) logger.addHandler(console_handler) # Handler fichier si spécifié if log_file: file_handler = logging.FileHandler(log_file, encoding='utf-8') file_handler.setLevel(logging.DEBUG) file_handler.setFormatter(formatter) logger.addHandler(file_handler) return logger def sanitize_filename(filename: str) -> str: """Nettoie un nom de fichier""" # Remplacer les caractères interdits invalid_chars = '<>:"/\\|?*' for char in invalid_chars: filename = filename.replace(char, '_') # Limiter la longueur name, ext = filename.rsplit('.', 1) if '.' in filename else (filename, '') if len(name) > 200: name = name[:200] return f"{name}.{ext}" if ext else name def format_file_size(size_bytes: int) -> str: """Formate une taille de fichier""" for unit in ['B', 'KB', 'MB', 'GB']: if size_bytes < 1024.0: return f"{size_bytes:.1f} {unit}" size_bytes /= 1024.0 return f"{size_bytes:.1f} TB" def estimate_translation_time(char_count: int, chars_per_second: float = 50) -> str: """Estime le temps de traduction""" seconds = char_count / chars_per_second if seconds < 60: return f"{int(seconds)} secondes" elif seconds < 3600: minutes = int(seconds / 60) return f"{minutes} minutes" else: hours = int(seconds / 3600) minutes = int((seconds % 3600) / 60) return f"{hours}h {minutes}min"