Spaces:
Sleeping
Sleeping
File size: 10,406 Bytes
8992d48 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 | # 3. utils.py
"""
Utilitaires pour l'application de traduction
"""
import re
import json
import time
import random
import hashlib
import logging
from typing import List, Dict, Any, Optional
from pathlib import Path
from datetime import datetime
class TranslationCache:
"""Gestion du cache de traduction avec persistance"""
def __init__(self, cache_dir: str = ".translation_cache"):
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(exist_ok=True)
self.memory_cache = {}
self.cache_file = self.cache_dir / "translations.json"
self.load_cache()
def load_cache(self):
"""Charge le cache depuis le disque"""
if self.cache_file.exists():
try:
with open(self.cache_file, 'r', encoding='utf-8') as f:
self.memory_cache = json.load(f)
except Exception as e:
logging.warning(f"Erreur chargement cache: {e}")
self.memory_cache = {}
def save_cache(self):
"""Sauvegarde le cache sur disque"""
try:
with open(self.cache_file, 'w', encoding='utf-8') as f:
json.dump(self.memory_cache, f, ensure_ascii=False, indent=2)
except Exception as e:
logging.error(f"Erreur sauvegarde cache: {e}")
def get_cache_key(self, text: str, source_lang: str, target_lang: str, engine: str) -> str:
"""Génère une clé unique pour le cache"""
content = f"{text}_{source_lang}_{target_lang}_{engine}"
return hashlib.md5(content.encode()).hexdigest()
def get(self, text: str, source_lang: str, target_lang: str, engine: str) -> Optional[str]:
"""Récupère une traduction du cache"""
key = self.get_cache_key(text, source_lang, target_lang, engine)
return self.memory_cache.get(key)
def set(self, text: str, translation: str, source_lang: str, target_lang: str, engine: str):
"""Ajoute une traduction au cache"""
key = self.get_cache_key(text, source_lang, target_lang, engine)
self.memory_cache[key] = translation
self.save_cache()
class TextChunker:
"""Découpe intelligente du texte en chunks"""
@staticmethod
def split_text(text: str, max_chars: int = 3000, preserve_formatting: bool = True) -> List[str]:
"""
Découpe le texte en chunks intelligents
- Respecte les paragraphes
- Préserve la ponctuation
- Optimise pour la traduction
"""
if not text or not text.strip():
return []
# Nettoyer le texte
text = text.strip()
# Si le texte est court, retourner tel quel
if len(text) <= max_chars:
return [text]
chunks = []
# Séparer par paragraphes (double saut de ligne)
paragraphs = re.split(r'\n\n+', text)
current_chunk = ""
for para in paragraphs:
# Si le paragraphe seul est trop long
if len(para) > max_chars:
# Sauvegarder le chunk en cours
if current_chunk:
chunks.append(current_chunk.strip())
current_chunk = ""
# Découper le paragraphe par phrases
sentences = re.split(r'(?<=[.!?])\s+', para)
for sentence in sentences:
if len(sentence) > max_chars:
# Découper par mots si la phrase est trop longue
words = sentence.split()
temp_chunk = ""
for word in words:
if len(temp_chunk) + len(word) + 1 <= max_chars:
temp_chunk = f"{temp_chunk} {word}".strip()
else:
if temp_chunk:
chunks.append(temp_chunk)
temp_chunk = word
if temp_chunk:
chunks.append(temp_chunk)
elif len(current_chunk) + len(sentence) + 1 <= max_chars:
current_chunk = f"{current_chunk} {sentence}".strip()
else:
if current_chunk:
chunks.append(current_chunk)
current_chunk = sentence
# Si ajouter le paragraphe ne dépasse pas la limite
elif len(current_chunk) + len(para) + 2 <= max_chars:
if current_chunk:
current_chunk = f"{current_chunk}\n\n{para}"
else:
current_chunk = para
else:
# Sauvegarder le chunk actuel et commencer un nouveau
if current_chunk:
chunks.append(current_chunk.strip())
current_chunk = para
# Ajouter le dernier chunk
if current_chunk:
chunks.append(current_chunk.strip())
return chunks
class ProgressTracker:
"""Suivi de progression avec sauvegarde"""
def __init__(self, total_items: int, task_id: str):
self.total_items = total_items
self.task_id = task_id
self.completed_items = 0
self.start_time = datetime.now()
self.checkpoint_file = Path(f".checkpoints/{task_id}.json")
self.checkpoint_file.parent.mkdir(exist_ok=True)
self.load_checkpoint()
def load_checkpoint(self):
"""Charge le point de sauvegarde"""
if self.checkpoint_file.exists():
try:
with open(self.checkpoint_file, 'r') as f:
data = json.load(f)
self.completed_items = data.get('completed', 0)
except:
pass
def update(self, increment: int = 1):
"""Met à jour la progression"""
self.completed_items += increment
self.save_checkpoint()
def save_checkpoint(self):
"""Sauvegarde la progression"""
data = {
'task_id': self.task_id,
'total': self.total_items,
'completed': self.completed_items,
'timestamp': datetime.now().isoformat()
}
with open(self.checkpoint_file, 'w') as f:
json.dump(data, f)
def get_progress(self) -> float:
"""Retourne le pourcentage de progression"""
if self.total_items == 0:
return 100.0
return (self.completed_items / self.total_items) * 100
def get_eta(self) -> str:
"""Estime le temps restant"""
if self.completed_items == 0:
return "Calcul en cours..."
elapsed = (datetime.now() - self.start_time).total_seconds()
rate = self.completed_items / elapsed
remaining = self.total_items - self.completed_items
eta_seconds = remaining / rate if rate > 0 else 0
hours = int(eta_seconds // 3600)
minutes = int((eta_seconds % 3600) // 60)
if hours > 0:
return f"{hours}h {minutes}min"
return f"{minutes}min"
class RateLimiter:
"""Gestion des limites de taux d'API"""
def __init__(self, min_delay: float = 0.5, max_delay: float = 2.0):
self.min_delay = min_delay
self.max_delay = max_delay
self.last_request_time = 0
self.request_count = 0
self.error_count = 0
def wait(self):
"""Attend avant la prochaine requête"""
# Calcul du délai adaptatif
if self.error_count > 0:
# Augmenter le délai en cas d'erreurs
delay = min(self.max_delay * (1.5 ** self.error_count), 10.0)
else:
# Délai aléatoire normal
delay = random.uniform(self.min_delay, self.max_delay)
# Attendre si nécessaire
elapsed = time.time() - self.last_request_time
if elapsed < delay:
time.sleep(delay - elapsed)
self.last_request_time = time.time()
self.request_count += 1
def register_error(self):
"""Enregistre une erreur"""
self.error_count += 1
def reset_errors(self):
"""Réinitialise le compteur d'erreurs"""
self.error_count = 0
def setup_logger(name: str, log_file: str = None) -> logging.Logger:
"""Configure un logger personnalisé"""
logger = logging.getLogger(name)
logger.setLevel(logging.DEBUG)
# Format détaillé
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# Handler console
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
# Handler fichier si spécifié
if log_file:
file_handler = logging.FileHandler(log_file, encoding='utf-8')
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
return logger
def sanitize_filename(filename: str) -> str:
"""Nettoie un nom de fichier"""
# Remplacer les caractères interdits
invalid_chars = '<>:"/\\|?*'
for char in invalid_chars:
filename = filename.replace(char, '_')
# Limiter la longueur
name, ext = filename.rsplit('.', 1) if '.' in filename else (filename, '')
if len(name) > 200:
name = name[:200]
return f"{name}.{ext}" if ext else name
def format_file_size(size_bytes: int) -> str:
"""Formate une taille de fichier"""
for unit in ['B', 'KB', 'MB', 'GB']:
if size_bytes < 1024.0:
return f"{size_bytes:.1f} {unit}"
size_bytes /= 1024.0
return f"{size_bytes:.1f} TB"
def estimate_translation_time(char_count: int, chars_per_second: float = 50) -> str:
"""Estime le temps de traduction"""
seconds = char_count / chars_per_second
if seconds < 60:
return f"{int(seconds)} secondes"
elif seconds < 3600:
minutes = int(seconds / 60)
return f"{minutes} minutes"
else:
hours = int(seconds / 3600)
minutes = int((seconds % 3600) / 60)
return f"{hours}h {minutes}min" |