# ========================================== # game_engine.py - Calcul OCR v3.0 CLEAN # ========================================== """ Moteur de jeu mathématique avec traitement parallèle et auto-détection OCR """ import random import time import datetime import gradio as gr import os import uuid import gc import base64 from io import BytesIO import numpy as np from PIL import Image import threading import queue from typing import Dict, Tuple, Optional # Auto-détection propre : GPU OU CPU uniquement ocr_module = None ocr_info = {"model_name": "Unknown", "device": "Unknown"} try: # Test GPU : torch + CUDA disponible import torch if torch.cuda.is_available(): from image_processing_gpu import ( recognize_number_fast_with_image, create_thumbnail_fast, create_white_canvas, cleanup_memory, log_memory_usage, get_ocr_model_info ) ocr_module = "gpu" print("✅ Game Engine: Mode GPU - TrOCR activé") else: # Torch installé mais pas de GPU → CPU from image_processing_cpu import ( recognize_number_fast_with_image, create_thumbnail_fast, create_white_canvas, cleanup_memory, log_memory_usage, get_ocr_model_info ) ocr_module = "cpu" print("✅ Game Engine: Mode CPU - EasyOCR activé") except ImportError: # Torch pas installé → CPU obligatoire from image_processing_cpu import ( recognize_number_fast_with_image, create_thumbnail_fast, create_white_canvas, cleanup_memory, log_memory_usage, get_ocr_model_info ) ocr_module = "cpu" print("✅ Game Engine: Mode CPU - EasyOCR activé") # Récupérer les infos du modèle sélectionné try: ocr_info = get_ocr_model_info() print(f"🎯 OCR sélectionné: {ocr_info['model_name']} sur {ocr_info['device']}") except Exception as e: print(f"⚠️ Impossible de récupérer les infos OCR: {e}") ocr_info = {"model_name": "Error", "device": "Unknown"} # Imports dataset avec gestion d'erreur try: from datasets import Dataset, load_dataset DATASET_AVAILABLE = True print("✅ Modules dataset disponibles") except ImportError as e: DATASET_AVAILABLE = False print(f"⚠️ Modules dataset non disponibles: {e}") # Nom du dataset cohérent avec le space DATASET_NAME = "hoololi/calcul_ocr_dataset" # Configuration des difficultés par opération DIFFICULTY_RANGES = { "×": {"Facile": (2, 9), "Difficile": (4, 12)}, "+": {"Facile": (1, 50), "Difficile": (10, 100)}, "-": {"Facile": (1, 50), "Difficile": (10, 100)}, "÷": {"Facile": (1, 10), "Difficile": (2, 12)} } def create_result_row_with_images(i: int, image: dict | np.ndarray | Image.Image, expected: int, operation_data: tuple[int, int, str, int]) -> dict: # OCR optimisé recognized, optimized_image, dataset_image_data = recognize_number_fast_with_image(image) try: recognized_num = int(recognized) if recognized.isdigit() else 0 except: recognized_num = 0 is_correct = recognized_num == expected a, b, operation, correct_result = operation_data status_icon = "✅" if is_correct else "❌" status_text = "Correct" if is_correct else "Incorrect" row_color = "#e8f5e8" if is_correct else "#ffe8e8" # Miniature image_thumbnail = create_thumbnail_fast(optimized_image, size=(50, 50)) # Libérer mémoire if optimized_image and hasattr(optimized_image, 'close'): try: optimized_image.close() except: pass return { 'html_row': f""" {i+1} {a} {operation} {b} {expected} {image_thumbnail} {recognized_num} {status_icon} {status_text} """, 'is_correct': is_correct, 'recognized': recognized, 'recognized_num': recognized_num, 'dataset_image_data': dataset_image_data } class MathGame: """Moteur de jeu mathématique avec traitement parallèle""" def __init__(self): self.is_running = False self.start_time = 0 self.current_operation = "" self.correct_answer = 0 self.user_images = [] self.expected_answers = [] self.operations_history = [] self.question_count = 0 self.time_remaining = 30 self.session_data = [] # Configuration session self.duration = 30 self.operation_type = "×" self.difficulty = "Facile" # Gestion export self.export_status = "not_exported" self.export_timestamp = None self.export_result = None # Traitement parallèle self.processing_queue = queue.Queue() self.results_cache: Dict[int, dict] = {} self.worker_thread: Optional[threading.Thread] = None self.processing_active = False def get_export_status(self) -> dict[str, str | bool | None]: return { "status": self.export_status, "timestamp": self.export_timestamp, "result": self.export_result, "can_export": self.export_status == "not_exported" and len(self.session_data) > 0 } def mark_export_in_progress(self) -> None: self.export_status = "exporting" self.export_timestamp = datetime.datetime.now().isoformat() def mark_export_completed(self, result: str) -> None: self.export_status = "exported" self.export_result = result def _start_background_processing(self) -> None: """Démarre le thread de traitement en arrière-plan""" if self.worker_thread is None or not self.worker_thread.is_alive(): self.processing_active = True self.worker_thread = threading.Thread(target=self._process_images_worker, daemon=True) self.worker_thread.start() print("🔄 Thread de traitement parallèle démarré") def _stop_background_processing(self) -> None: """Arrête le thread de traitement""" self.processing_active = False if self.worker_thread and self.worker_thread.is_alive(): print("⏹️ Arrêt du thread de traitement parallèle") def _process_images_worker(self) -> None: """Worker thread qui traite les images en arrière-plan""" print("🚀 Worker thread démarré") while self.processing_active: try: if not self.processing_queue.empty(): question_num, image, expected, operation_data = self.processing_queue.get(timeout=1) print(f"🔄 Traitement parallèle image {question_num}...") start_time = time.time() result_data = create_result_row_with_images(question_num, image, expected, operation_data) processing_time = time.time() - start_time # Stocker le résultat self.results_cache[question_num] = result_data print(f"✅ Image {question_num} traitée en {processing_time:.1f}s (parallèle)") else: time.sleep(0.1) except queue.Empty: continue except Exception as e: print(f"❌ Erreur traitement parallèle: {e}") print("🛑 Worker thread terminé") def _add_image_to_processing_queue(self, question_num: int, image: dict | np.ndarray | Image.Image, expected: int, operation_data: tuple) -> None: """Ajoute une image à la queue de traitement""" if image is not None: self.processing_queue.put((question_num, image, expected, operation_data)) print(f"📝 Image {question_num} ajoutée à la queue de traitement") def generate_multiplication(self, difficulty: str) -> tuple[str, int]: """Génère une multiplication""" min_val, max_val = DIFFICULTY_RANGES["×"][difficulty] a = random.randint(min_val, max_val) b = random.randint(min_val, max_val) return f"{a} × {b}", a * b def generate_addition(self, difficulty: str) -> tuple[str, int]: """Génère une addition""" min_val, max_val = DIFFICULTY_RANGES["+"][difficulty] a = random.randint(min_val, max_val) b = random.randint(min_val, max_val) return f"{a} + {b}", a + b def generate_subtraction(self, difficulty: str) -> tuple[str, int]: """Génère une soustraction (résultat toujours positif)""" min_val, max_val = DIFFICULTY_RANGES["-"][difficulty] a = random.randint(min_val, max_val) b = random.randint(min_val, a) return f"{a} - {b}", a - b def generate_division(self, difficulty: str) -> tuple[str, int]: """Génère une division exacte""" min_result, max_result = DIFFICULTY_RANGES["÷"][difficulty] result = random.randint(min_result, max_result) if difficulty == "Facile": divisor = random.randint(2, 9) else: divisor = random.randint(2, 12) dividend = result * divisor return f"{dividend} ÷ {divisor}", result def generate_operation(self, operation_type: str, difficulty: str) -> tuple[str, int]: """Génère une opération selon le type et la difficulté""" if operation_type == "×": return self.generate_multiplication(difficulty) elif operation_type == "+": return self.generate_addition(difficulty) elif operation_type == "-": return self.generate_subtraction(difficulty) elif operation_type == "÷": return self.generate_division(difficulty) elif operation_type == "Aléatoire": random_op = random.choice(["×", "+", "-", "÷"]) return self.generate_operation(random_op, difficulty) else: return self.generate_multiplication(difficulty) def start_game(self, duration: str, operation: str, difficulty: str) -> tuple[str, Image.Image, str, str, gr.update, gr.update, str]: """Démarre le jeu avec la configuration choisie""" # Configuration self.duration = 60 if duration == "60 secondes" else 30 self.operation_type = operation self.difficulty = difficulty # Nettoyage if hasattr(self, 'user_images') and self.user_images: for img in self.user_images: if hasattr(img, 'close'): try: img.close() except: pass if hasattr(self, 'session_data') and self.session_data: for entry in self.session_data: if 'user_drawing' in entry and entry['user_drawing']: entry['user_drawing'] = None self.session_data.clear() # Réinit avec nettoyage parallèle self._stop_background_processing() self.results_cache.clear() while not self.processing_queue.empty(): try: self.processing_queue.get_nowait() except queue.Empty: break self.is_running = True self.start_time = time.time() self.user_images = [] self.expected_answers = [] self.operations_history = [] self.question_count = 0 self.time_remaining = self.duration self.session_data = [] # Reset export self.export_status = "not_exported" self.export_timestamp = None self.export_result = None # Démarrer le traitement parallèle self._start_background_processing() gc.collect() # Première opération operation_str, answer = self.generate_operation(self.operation_type, self.difficulty) self.current_operation = operation_str self.correct_answer = answer # Parser l'opération pour l'historique parts = operation_str.split() a, op, b = int(parts[0]), parts[1], int(parts[2]) self.operations_history.append((a, b, op, answer)) # Affichage adapté selon l'opération operation_emoji = { "×": "✖️", "+": "➕", "-": "➖", "÷": "➗", "Aléatoire": "🎲" } emoji = operation_emoji.get(self.operation_type, "🔢") return ( f'
{operation_str}
', create_white_canvas(), f"🎯 {emoji} {self.operation_type} • {self.difficulty} • Écrivez votre réponse !", f"⏱️ Temps restant: {self.time_remaining}s", gr.update(interactive=False), gr.update(interactive=True), "" ) def next_question(self, image_data: dict | np.ndarray | Image.Image | None) -> tuple[str, Image.Image, str, str, gr.update, gr.update, str]: if not self.is_running: return ( f'
{self.current_operation}
', image_data, "❌ Le jeu n'est pas en cours !", "⏱️ Temps: 0s", gr.update(interactive=True), gr.update(interactive=False), "" ) elapsed_time = time.time() - self.start_time if elapsed_time >= self.duration: return self.end_game(image_data) if image_data is not None: # Ajouter l'image à la liste ET au traitement parallèle self.user_images.append(image_data) self.expected_answers.append(self.correct_answer) # Parser l'opération actuelle pour le traitement parts = self.current_operation.split() a, op, b = int(parts[0]), parts[1], int(parts[2]) current_operation_data = (a, b, op, self.correct_answer) # Lancer le traitement en parallèle de l'image qu'on vient de recevoir self._add_image_to_processing_queue(self.question_count, image_data, self.correct_answer, current_operation_data) self.question_count += 1 # Nouvelle opération operation_str, answer = self.generate_operation(self.operation_type, self.difficulty) self.current_operation = operation_str self.correct_answer = answer # Parser pour l'historique parts = operation_str.split() a, op, b = int(parts[0]), parts[1], int(parts[2]) self.operations_history.append((a, b, op, answer)) time_remaining = max(0, self.duration - int(elapsed_time)) self.time_remaining = time_remaining if time_remaining <= 0: return self.end_game(image_data) # Emoji pour l'opération operation_emoji = { "×": "✖️", "+": "➕", "-": "➖", "÷": "➗", "Aléatoire": "🎲" } emoji = operation_emoji.get(self.operation_type, "🔢") return ( f'
{operation_str}
', create_white_canvas(), f"🎯 {emoji} Question {self.question_count + 1} • {self.difficulty}", f"⏱️ Temps restant: {time_remaining}s", gr.update(interactive=False), gr.update(interactive=True), "" ) def end_game(self, final_image: dict | np.ndarray | Image.Image | None) -> tuple[str, Image.Image, str, str, gr.update, gr.update, str]: self.is_running = False # Arrêter le traitement parallèle self._stop_background_processing() print("🏁 Fin de jeu - Assemblage des résultats...") if final_image is not None: self.user_images.append(final_image) self.expected_answers.append(self.correct_answer) # Traitement de la dernière image parts = self.current_operation.split() a, op, b = int(parts[0]), parts[1], int(parts[2]) final_operation_data = (a, b, op, self.correct_answer) # Traiter la dernière image immédiatement (pas en parallèle) print(f"🔄 Traitement final de l'image {self.question_count}...") final_result = create_result_row_with_images(self.question_count, final_image, self.correct_answer, final_operation_data) self.results_cache[self.question_count] = final_result self.question_count += 1 if len(self.operations_history) < len(self.user_images): self.operations_history.append((a, b, op, self.correct_answer)) # Attendre que toutes les images soient traitées max_wait = 10 wait_start = time.time() expected_results = len(self.user_images) print(f"⏳ Attente de {expected_results} résultats...") while len(self.results_cache) < expected_results and (time.time() - wait_start) < max_wait: time.sleep(0.1) results_ready = len(self.results_cache) print(f"✅ {results_ready}/{expected_results} résultats prêts") # Assembler les résultats dans l'ordre correct_answers = 0 total_questions = len(self.user_images) table_rows_html = "" session_timestamp = datetime.datetime.now().isoformat() session_id = f"session_{int(datetime.datetime.now().timestamp())}_{str(uuid.uuid4())[:8]}" self.session_data = [] images_saved = 0 total_image_size_kb = 0 print(f"📊 Assemblage de {total_questions} résultats...") for i in range(total_questions): if i in self.results_cache: row_data = self.results_cache[i] print(f" ✅ Résultat {i} du cache parallèle") else: print(f" 🔄 Traitement fallback pour résultat {i}...") if i < len(self.operations_history): row_data = create_result_row_with_images(i, self.user_images[i], self.expected_answers[i], self.operations_history[i]) else: row_data = { 'html_row': f'{i+1}Erreur traitement', 'is_correct': False, 'recognized': "0", 'recognized_num': 0, 'dataset_image_data': None } table_rows_html += row_data['html_row'] if row_data['is_correct']: correct_answers += 1 # Structure pour dataset avec debug OCR a, b, operation, correct_result = self.operations_history[i] if i < len(self.operations_history) else (0, 0, "×", 0) try: ocr_info_data = get_ocr_model_info() print(f"🔍 Debug OCR info: {ocr_info_data}") except Exception as e: print(f"❌ Erreur get_ocr_model_info: {e}") ocr_info_data = {"model_name": "Error", "device": "Unknown"} entry = { "session_id": session_id, "timestamp": session_timestamp, "question_number": i + 1, "session_duration": self.duration, "operation_type": self.operation_type, "difficulty_level": self.difficulty, "operand_a": a, "operand_b": b, "operation": operation, "correct_answer": self.expected_answers[i] if i < len(self.expected_answers) else 0, "ocr_model": ocr_info_data.get("model_name", "Unknown"), "ocr_device": ocr_info_data.get("device", "Unknown"), "user_answer_ocr": row_data['recognized'], "user_answer_parsed": row_data['recognized_num'], "is_correct": row_data['is_correct'], "total_questions": total_questions, "app_version": "3.0_calcul_ocr_parallel" } print(f"🔍 Debug entry OCR fields: ocr_model={entry['ocr_model']}, ocr_device={entry['ocr_device']}") if row_data['dataset_image_data']: entry["handwriting_image"] = row_data['dataset_image_data']["image_base64"] entry["image_width"] = int(row_data['dataset_image_data']["compressed_size"][0]) entry["image_height"] = int(row_data['dataset_image_data']["compressed_size"][1]) entry["image_size_kb"] = float(row_data['dataset_image_data']["file_size_kb"]) entry["has_image"] = True images_saved += 1 total_image_size_kb += row_data['dataset_image_data']["file_size_kb"] else: entry["has_image"] = False self.session_data.append(entry) accuracy = (correct_answers / total_questions * 100) if total_questions > 0 else 0 for entry in self.session_data: entry["session_accuracy"] = accuracy # Nettoyage mémoire for img in self.user_images: if hasattr(img, 'close'): try: img.close() except: pass gc.collect() # HTML résultats table_html = f"""
{table_rows_html}
Question A Op B Réponse Votre dessin OCR Statut
""" # Configuration session pour affichage config_display = f"{self.operation_type} • {self.difficulty} • {self.duration}s" operation_emoji = { "×": "✖️", "+": "➕", "-": "➖", "÷": "➗", "Aléatoire": "🎲" } emoji = operation_emoji.get(self.operation_type, "🔢") export_info = self.get_export_status() if export_info["can_export"]: export_section = f"""

📤 Ajouter cette série au dataset ?

✅ {total_questions} réponses • 📊 {accuracy:.1f}% de précision
📸 {images_saved} opérations et images sauvegardées ({total_image_size_kb:.1f}KB)
⚙️ Configuration: {config_display}

""" else: export_section = "" final_results = f"""

🎉 Session terminée !

📈 Résultats

{emoji} {config_display}
{total_questions}
Questions
{correct_answers}
Correctes
{total_questions - correct_answers}
Incorrectes
{accuracy:.1f}%
Précision

📊 Détail des Réponses

{table_html} {export_section}
""" return ( """
🏁 C'est fini !
""", create_white_canvas(), f"✨ Session {config_display} terminée !", "⏱️ Temps écoulé !", gr.update(interactive=True), gr.update(interactive=False), final_results ) def export_to_clean_dataset(session_data: list[dict], dataset_name: str = None) -> str: """Export vers le nouveau dataset calcul_ocr_dataset""" if dataset_name is None: dataset_name = DATASET_NAME # Utiliser la variable globale if not DATASET_AVAILABLE: return "❌ Modules dataset non disponibles" hf_token = os.getenv("HF_TOKEN") or os.getenv("tk_calcul_ocr") if not hf_token: return "❌ Token HuggingFace manquant (HF_TOKEN ou tk_calcul_ocr)" try: print(f"\n🚀 === EXPORT VERS DATASET CALCUL OCR ===") print(f"📊 Dataset: {dataset_name}") # Filtrer les entrées avec images et ajouter les infos OCR globalement clean_entries = [] # Récupérer une seule fois les infos OCR pour toute la session try: global_ocr_info = get_ocr_model_info() print(f"🔍 Infos OCR globales: {global_ocr_info}") except Exception as e: print(f"❌ Erreur infos OCR globales: {e}") global_ocr_info = {"model_name": "Unknown", "device": "Unknown"} for entry in session_data: if entry.get('has_image', False): # Ajouter explicitement les champs OCR manquants entry_with_ocr = entry.copy() entry_with_ocr["ocr_model"] = global_ocr_info.get("model_name", "Unknown") entry_with_ocr["ocr_device"] = global_ocr_info.get("device", "Unknown") print(f"🔍 Entry avec OCR: ocr_model={entry_with_ocr['ocr_model']}, ocr_device={entry_with_ocr['ocr_device']}") clean_entries.append(entry_with_ocr) # Créer un dataset de test avec structure forcée if len(clean_entries) == 0: return "❌ Aucune entrée avec image à exporter" # Vérifier la structure de la première entrée sample_entry = clean_entries[0] print(f"🔍 Structure première entrée: {list(sample_entry.keys())}") print(f"🔍 OCR dans entrée: ocr_model={sample_entry.get('ocr_model', 'MISSING')}, ocr_device={sample_entry.get('ocr_device', 'MISSING')}") # Charger dataset existant et combiner (IMPORTANT!) try: existing_dataset = load_dataset(dataset_name, split="train") existing_data = existing_dataset.to_list() print(f"📊 {len(existing_data)} entrées existantes trouvées") # Combiner ancien + nouveau combined_data = existing_data + clean_entries clean_dataset = Dataset.from_list(combined_data) print(f"📊 Dataset combiné: {len(existing_data)} existantes + {len(clean_entries)} nouvelles = {len(combined_data)} total") except Exception as e: print(f"📊 Dataset non trouvé, création nouveau: {e}") # Si le dataset n'existe pas, créer depuis les nouvelles entrées clean_dataset = Dataset.from_list(clean_entries) print(f"📊 Nouveau dataset créé avec {len(clean_entries)} entrées") print(f"✅ Dataset créé - Features:") for feature_name in clean_dataset.features: print(f" - {feature_name}: {clean_dataset.features[feature_name]}") # Statistiques par opération operations_count = {} for entry in clean_entries: op = entry.get('operation_type', 'unknown') operations_count[op] = operations_count.get(op, 0) + 1 operations_summary = ", ".join([f"{op}: {count}" for op, count in operations_count.items()]) # Push vers HuggingFace print(f"📤 Push vers {dataset_name}...") clean_dataset.push_to_hub( dataset_name, private=False, token=hf_token, commit_message=f"Add {len(clean_entries)} handwriting samples for math OCR ({operations_summary})" ) cleanup_memory() return f"""✅ Session ajoutée au dataset avec succès ! 📊 Dataset: {dataset_name} 📸 Images: {len(clean_entries)} 🔢 Opérations: {operations_summary} 📈 Total: {len(clean_dataset)} 🔗 Le dataset est consultable ici : https://huggingface.co/datasets/{dataset_name}""" except Exception as e: print(f"❌ ERREUR: {e}") import traceback traceback.print_exc() return f"❌ Erreur: {str(e)}"