Spaces:
Sleeping
Sleeping
| # 3. utils.py | |
| """ | |
| Utilitaires pour l'application de traduction | |
| """ | |
| import re | |
| import json | |
| import time | |
| import random | |
| import hashlib | |
| import logging | |
| from typing import List, Dict, Any, Optional | |
| from pathlib import Path | |
| from datetime import datetime | |
| class TranslationCache: | |
| """Gestion du cache de traduction avec persistance""" | |
| def __init__(self, cache_dir: str = ".translation_cache"): | |
| self.cache_dir = Path(cache_dir) | |
| self.cache_dir.mkdir(exist_ok=True) | |
| self.memory_cache = {} | |
| self.cache_file = self.cache_dir / "translations.json" | |
| self.load_cache() | |
| def load_cache(self): | |
| """Charge le cache depuis le disque""" | |
| if self.cache_file.exists(): | |
| try: | |
| with open(self.cache_file, 'r', encoding='utf-8') as f: | |
| self.memory_cache = json.load(f) | |
| except Exception as e: | |
| logging.warning(f"Erreur chargement cache: {e}") | |
| self.memory_cache = {} | |
| def save_cache(self): | |
| """Sauvegarde le cache sur disque""" | |
| try: | |
| with open(self.cache_file, 'w', encoding='utf-8') as f: | |
| json.dump(self.memory_cache, f, ensure_ascii=False, indent=2) | |
| except Exception as e: | |
| logging.error(f"Erreur sauvegarde cache: {e}") | |
| def get_cache_key(self, text: str, source_lang: str, target_lang: str, engine: str) -> str: | |
| """Génère une clé unique pour le cache""" | |
| content = f"{text}_{source_lang}_{target_lang}_{engine}" | |
| return hashlib.md5(content.encode()).hexdigest() | |
| def get(self, text: str, source_lang: str, target_lang: str, engine: str) -> Optional[str]: | |
| """Récupère une traduction du cache""" | |
| key = self.get_cache_key(text, source_lang, target_lang, engine) | |
| return self.memory_cache.get(key) | |
| def set(self, text: str, translation: str, source_lang: str, target_lang: str, engine: str): | |
| """Ajoute une traduction au cache""" | |
| key = self.get_cache_key(text, source_lang, target_lang, engine) | |
| self.memory_cache[key] = translation | |
| self.save_cache() | |
| class TextChunker: | |
| """Découpe intelligente du texte en chunks""" | |
| def split_text(text: str, max_chars: int = 3000, preserve_formatting: bool = True) -> List[str]: | |
| """ | |
| Découpe le texte en chunks intelligents | |
| - Respecte les paragraphes | |
| - Préserve la ponctuation | |
| - Optimise pour la traduction | |
| """ | |
| if not text or not text.strip(): | |
| return [] | |
| # Nettoyer le texte | |
| text = text.strip() | |
| # Si le texte est court, retourner tel quel | |
| if len(text) <= max_chars: | |
| return [text] | |
| chunks = [] | |
| # Séparer par paragraphes (double saut de ligne) | |
| paragraphs = re.split(r'\n\n+', text) | |
| current_chunk = "" | |
| for para in paragraphs: | |
| # Si le paragraphe seul est trop long | |
| if len(para) > max_chars: | |
| # Sauvegarder le chunk en cours | |
| if current_chunk: | |
| chunks.append(current_chunk.strip()) | |
| current_chunk = "" | |
| # Découper le paragraphe par phrases | |
| sentences = re.split(r'(?<=[.!?])\s+', para) | |
| for sentence in sentences: | |
| if len(sentence) > max_chars: | |
| # Découper par mots si la phrase est trop longue | |
| words = sentence.split() | |
| temp_chunk = "" | |
| for word in words: | |
| if len(temp_chunk) + len(word) + 1 <= max_chars: | |
| temp_chunk = f"{temp_chunk} {word}".strip() | |
| else: | |
| if temp_chunk: | |
| chunks.append(temp_chunk) | |
| temp_chunk = word | |
| if temp_chunk: | |
| chunks.append(temp_chunk) | |
| elif len(current_chunk) + len(sentence) + 1 <= max_chars: | |
| current_chunk = f"{current_chunk} {sentence}".strip() | |
| else: | |
| if current_chunk: | |
| chunks.append(current_chunk) | |
| current_chunk = sentence | |
| # Si ajouter le paragraphe ne dépasse pas la limite | |
| elif len(current_chunk) + len(para) + 2 <= max_chars: | |
| if current_chunk: | |
| current_chunk = f"{current_chunk}\n\n{para}" | |
| else: | |
| current_chunk = para | |
| else: | |
| # Sauvegarder le chunk actuel et commencer un nouveau | |
| if current_chunk: | |
| chunks.append(current_chunk.strip()) | |
| current_chunk = para | |
| # Ajouter le dernier chunk | |
| if current_chunk: | |
| chunks.append(current_chunk.strip()) | |
| return chunks | |
| class ProgressTracker: | |
| """Suivi de progression avec sauvegarde""" | |
| def __init__(self, total_items: int, task_id: str): | |
| self.total_items = total_items | |
| self.task_id = task_id | |
| self.completed_items = 0 | |
| self.start_time = datetime.now() | |
| self.checkpoint_file = Path(f".checkpoints/{task_id}.json") | |
| self.checkpoint_file.parent.mkdir(exist_ok=True) | |
| self.load_checkpoint() | |
| def load_checkpoint(self): | |
| """Charge le point de sauvegarde""" | |
| if self.checkpoint_file.exists(): | |
| try: | |
| with open(self.checkpoint_file, 'r') as f: | |
| data = json.load(f) | |
| self.completed_items = data.get('completed', 0) | |
| except: | |
| pass | |
| def update(self, increment: int = 1): | |
| """Met à jour la progression""" | |
| self.completed_items += increment | |
| self.save_checkpoint() | |
| def save_checkpoint(self): | |
| """Sauvegarde la progression""" | |
| data = { | |
| 'task_id': self.task_id, | |
| 'total': self.total_items, | |
| 'completed': self.completed_items, | |
| 'timestamp': datetime.now().isoformat() | |
| } | |
| with open(self.checkpoint_file, 'w') as f: | |
| json.dump(data, f) | |
| def get_progress(self) -> float: | |
| """Retourne le pourcentage de progression""" | |
| if self.total_items == 0: | |
| return 100.0 | |
| return (self.completed_items / self.total_items) * 100 | |
| def get_eta(self) -> str: | |
| """Estime le temps restant""" | |
| if self.completed_items == 0: | |
| return "Calcul en cours..." | |
| elapsed = (datetime.now() - self.start_time).total_seconds() | |
| rate = self.completed_items / elapsed | |
| remaining = self.total_items - self.completed_items | |
| eta_seconds = remaining / rate if rate > 0 else 0 | |
| hours = int(eta_seconds // 3600) | |
| minutes = int((eta_seconds % 3600) // 60) | |
| if hours > 0: | |
| return f"{hours}h {minutes}min" | |
| return f"{minutes}min" | |
| class RateLimiter: | |
| """Gestion des limites de taux d'API""" | |
| def __init__(self, min_delay: float = 0.5, max_delay: float = 2.0): | |
| self.min_delay = min_delay | |
| self.max_delay = max_delay | |
| self.last_request_time = 0 | |
| self.request_count = 0 | |
| self.error_count = 0 | |
| def wait(self): | |
| """Attend avant la prochaine requête""" | |
| # Calcul du délai adaptatif | |
| if self.error_count > 0: | |
| # Augmenter le délai en cas d'erreurs | |
| delay = min(self.max_delay * (1.5 ** self.error_count), 10.0) | |
| else: | |
| # Délai aléatoire normal | |
| delay = random.uniform(self.min_delay, self.max_delay) | |
| # Attendre si nécessaire | |
| elapsed = time.time() - self.last_request_time | |
| if elapsed < delay: | |
| time.sleep(delay - elapsed) | |
| self.last_request_time = time.time() | |
| self.request_count += 1 | |
| def register_error(self): | |
| """Enregistre une erreur""" | |
| self.error_count += 1 | |
| def reset_errors(self): | |
| """Réinitialise le compteur d'erreurs""" | |
| self.error_count = 0 | |
| def setup_logger(name: str, log_file: str = None) -> logging.Logger: | |
| """Configure un logger personnalisé""" | |
| logger = logging.getLogger(name) | |
| logger.setLevel(logging.DEBUG) | |
| # Format détaillé | |
| formatter = logging.Formatter( | |
| '%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| datefmt='%Y-%m-%d %H:%M:%S' | |
| ) | |
| # Handler console | |
| console_handler = logging.StreamHandler() | |
| console_handler.setLevel(logging.INFO) | |
| console_handler.setFormatter(formatter) | |
| logger.addHandler(console_handler) | |
| # Handler fichier si spécifié | |
| if log_file: | |
| file_handler = logging.FileHandler(log_file, encoding='utf-8') | |
| file_handler.setLevel(logging.DEBUG) | |
| file_handler.setFormatter(formatter) | |
| logger.addHandler(file_handler) | |
| return logger | |
| def sanitize_filename(filename: str) -> str: | |
| """Nettoie un nom de fichier""" | |
| # Remplacer les caractères interdits | |
| invalid_chars = '<>:"/\\|?*' | |
| for char in invalid_chars: | |
| filename = filename.replace(char, '_') | |
| # Limiter la longueur | |
| name, ext = filename.rsplit('.', 1) if '.' in filename else (filename, '') | |
| if len(name) > 200: | |
| name = name[:200] | |
| return f"{name}.{ext}" if ext else name | |
| def format_file_size(size_bytes: int) -> str: | |
| """Formate une taille de fichier""" | |
| for unit in ['B', 'KB', 'MB', 'GB']: | |
| if size_bytes < 1024.0: | |
| return f"{size_bytes:.1f} {unit}" | |
| size_bytes /= 1024.0 | |
| return f"{size_bytes:.1f} TB" | |
| def estimate_translation_time(char_count: int, chars_per_second: float = 50) -> str: | |
| """Estime le temps de traduction""" | |
| seconds = char_count / chars_per_second | |
| if seconds < 60: | |
| return f"{int(seconds)} secondes" | |
| elif seconds < 3600: | |
| minutes = int(seconds / 60) | |
| return f"{minutes} minutes" | |
| else: | |
| hours = int(seconds / 3600) | |
| minutes = int((seconds % 3600) / 60) | |
| return f"{hours}h {minutes}min" |