TranslateMyBook / utils.py
Adjoumani's picture
Create utils.py
8992d48 verified
# 3. utils.py
"""
Utilitaires pour l'application de traduction
"""
import re
import json
import time
import random
import hashlib
import logging
from typing import List, Dict, Any, Optional
from pathlib import Path
from datetime import datetime
class TranslationCache:
"""Gestion du cache de traduction avec persistance"""
def __init__(self, cache_dir: str = ".translation_cache"):
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(exist_ok=True)
self.memory_cache = {}
self.cache_file = self.cache_dir / "translations.json"
self.load_cache()
def load_cache(self):
"""Charge le cache depuis le disque"""
if self.cache_file.exists():
try:
with open(self.cache_file, 'r', encoding='utf-8') as f:
self.memory_cache = json.load(f)
except Exception as e:
logging.warning(f"Erreur chargement cache: {e}")
self.memory_cache = {}
def save_cache(self):
"""Sauvegarde le cache sur disque"""
try:
with open(self.cache_file, 'w', encoding='utf-8') as f:
json.dump(self.memory_cache, f, ensure_ascii=False, indent=2)
except Exception as e:
logging.error(f"Erreur sauvegarde cache: {e}")
def get_cache_key(self, text: str, source_lang: str, target_lang: str, engine: str) -> str:
"""Génère une clé unique pour le cache"""
content = f"{text}_{source_lang}_{target_lang}_{engine}"
return hashlib.md5(content.encode()).hexdigest()
def get(self, text: str, source_lang: str, target_lang: str, engine: str) -> Optional[str]:
"""Récupère une traduction du cache"""
key = self.get_cache_key(text, source_lang, target_lang, engine)
return self.memory_cache.get(key)
def set(self, text: str, translation: str, source_lang: str, target_lang: str, engine: str):
"""Ajoute une traduction au cache"""
key = self.get_cache_key(text, source_lang, target_lang, engine)
self.memory_cache[key] = translation
self.save_cache()
class TextChunker:
"""Découpe intelligente du texte en chunks"""
@staticmethod
def split_text(text: str, max_chars: int = 3000, preserve_formatting: bool = True) -> List[str]:
"""
Découpe le texte en chunks intelligents
- Respecte les paragraphes
- Préserve la ponctuation
- Optimise pour la traduction
"""
if not text or not text.strip():
return []
# Nettoyer le texte
text = text.strip()
# Si le texte est court, retourner tel quel
if len(text) <= max_chars:
return [text]
chunks = []
# Séparer par paragraphes (double saut de ligne)
paragraphs = re.split(r'\n\n+', text)
current_chunk = ""
for para in paragraphs:
# Si le paragraphe seul est trop long
if len(para) > max_chars:
# Sauvegarder le chunk en cours
if current_chunk:
chunks.append(current_chunk.strip())
current_chunk = ""
# Découper le paragraphe par phrases
sentences = re.split(r'(?<=[.!?])\s+', para)
for sentence in sentences:
if len(sentence) > max_chars:
# Découper par mots si la phrase est trop longue
words = sentence.split()
temp_chunk = ""
for word in words:
if len(temp_chunk) + len(word) + 1 <= max_chars:
temp_chunk = f"{temp_chunk} {word}".strip()
else:
if temp_chunk:
chunks.append(temp_chunk)
temp_chunk = word
if temp_chunk:
chunks.append(temp_chunk)
elif len(current_chunk) + len(sentence) + 1 <= max_chars:
current_chunk = f"{current_chunk} {sentence}".strip()
else:
if current_chunk:
chunks.append(current_chunk)
current_chunk = sentence
# Si ajouter le paragraphe ne dépasse pas la limite
elif len(current_chunk) + len(para) + 2 <= max_chars:
if current_chunk:
current_chunk = f"{current_chunk}\n\n{para}"
else:
current_chunk = para
else:
# Sauvegarder le chunk actuel et commencer un nouveau
if current_chunk:
chunks.append(current_chunk.strip())
current_chunk = para
# Ajouter le dernier chunk
if current_chunk:
chunks.append(current_chunk.strip())
return chunks
class ProgressTracker:
"""Suivi de progression avec sauvegarde"""
def __init__(self, total_items: int, task_id: str):
self.total_items = total_items
self.task_id = task_id
self.completed_items = 0
self.start_time = datetime.now()
self.checkpoint_file = Path(f".checkpoints/{task_id}.json")
self.checkpoint_file.parent.mkdir(exist_ok=True)
self.load_checkpoint()
def load_checkpoint(self):
"""Charge le point de sauvegarde"""
if self.checkpoint_file.exists():
try:
with open(self.checkpoint_file, 'r') as f:
data = json.load(f)
self.completed_items = data.get('completed', 0)
except:
pass
def update(self, increment: int = 1):
"""Met à jour la progression"""
self.completed_items += increment
self.save_checkpoint()
def save_checkpoint(self):
"""Sauvegarde la progression"""
data = {
'task_id': self.task_id,
'total': self.total_items,
'completed': self.completed_items,
'timestamp': datetime.now().isoformat()
}
with open(self.checkpoint_file, 'w') as f:
json.dump(data, f)
def get_progress(self) -> float:
"""Retourne le pourcentage de progression"""
if self.total_items == 0:
return 100.0
return (self.completed_items / self.total_items) * 100
def get_eta(self) -> str:
"""Estime le temps restant"""
if self.completed_items == 0:
return "Calcul en cours..."
elapsed = (datetime.now() - self.start_time).total_seconds()
rate = self.completed_items / elapsed
remaining = self.total_items - self.completed_items
eta_seconds = remaining / rate if rate > 0 else 0
hours = int(eta_seconds // 3600)
minutes = int((eta_seconds % 3600) // 60)
if hours > 0:
return f"{hours}h {minutes}min"
return f"{minutes}min"
class RateLimiter:
"""Gestion des limites de taux d'API"""
def __init__(self, min_delay: float = 0.5, max_delay: float = 2.0):
self.min_delay = min_delay
self.max_delay = max_delay
self.last_request_time = 0
self.request_count = 0
self.error_count = 0
def wait(self):
"""Attend avant la prochaine requête"""
# Calcul du délai adaptatif
if self.error_count > 0:
# Augmenter le délai en cas d'erreurs
delay = min(self.max_delay * (1.5 ** self.error_count), 10.0)
else:
# Délai aléatoire normal
delay = random.uniform(self.min_delay, self.max_delay)
# Attendre si nécessaire
elapsed = time.time() - self.last_request_time
if elapsed < delay:
time.sleep(delay - elapsed)
self.last_request_time = time.time()
self.request_count += 1
def register_error(self):
"""Enregistre une erreur"""
self.error_count += 1
def reset_errors(self):
"""Réinitialise le compteur d'erreurs"""
self.error_count = 0
def setup_logger(name: str, log_file: str = None) -> logging.Logger:
"""Configure un logger personnalisé"""
logger = logging.getLogger(name)
logger.setLevel(logging.DEBUG)
# Format détaillé
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# Handler console
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
# Handler fichier si spécifié
if log_file:
file_handler = logging.FileHandler(log_file, encoding='utf-8')
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
return logger
def sanitize_filename(filename: str) -> str:
"""Nettoie un nom de fichier"""
# Remplacer les caractères interdits
invalid_chars = '<>:"/\\|?*'
for char in invalid_chars:
filename = filename.replace(char, '_')
# Limiter la longueur
name, ext = filename.rsplit('.', 1) if '.' in filename else (filename, '')
if len(name) > 200:
name = name[:200]
return f"{name}.{ext}" if ext else name
def format_file_size(size_bytes: int) -> str:
"""Formate une taille de fichier"""
for unit in ['B', 'KB', 'MB', 'GB']:
if size_bytes < 1024.0:
return f"{size_bytes:.1f} {unit}"
size_bytes /= 1024.0
return f"{size_bytes:.1f} TB"
def estimate_translation_time(char_count: int, chars_per_second: float = 50) -> str:
"""Estime le temps de traduction"""
seconds = char_count / chars_per_second
if seconds < 60:
return f"{int(seconds)} secondes"
elif seconds < 3600:
minutes = int(seconds / 60)
return f"{minutes} minutes"
else:
hours = int(seconds / 3600)
minutes = int((seconds % 3600) / 60)
return f"{hours}h {minutes}min"