Spaces:
Sleeping
Sleeping
| # ========================================== | |
| # game_engine.py - Calcul OCR v3.0 CLEAN | |
| # ========================================== | |
| """ | |
| Moteur de jeu mathématique avec traitement parallèle et auto-détection OCR | |
| """ | |
| import random | |
| import time | |
| import datetime | |
| import gradio as gr | |
| import os | |
| import uuid | |
| import gc | |
| import base64 | |
| from io import BytesIO | |
| import numpy as np | |
| from PIL import Image | |
| import threading | |
| import queue | |
| from typing import Dict, Tuple, Optional | |
| # Auto-détection propre : GPU OU CPU uniquement | |
| ocr_module = None | |
| ocr_info = {"model_name": "Unknown", "device": "Unknown"} | |
| try: | |
| # Test GPU : torch + CUDA disponible | |
| import torch | |
| if torch.cuda.is_available(): | |
| from image_processing_gpu import ( | |
| recognize_number_fast_with_image, | |
| create_thumbnail_fast, | |
| create_white_canvas, | |
| cleanup_memory, | |
| log_memory_usage, | |
| get_ocr_model_info | |
| ) | |
| ocr_module = "gpu" | |
| print("✅ Game Engine: Mode GPU - TrOCR activé") | |
| else: | |
| # Torch installé mais pas de GPU → CPU | |
| from image_processing_cpu import ( | |
| recognize_number_fast_with_image, | |
| create_thumbnail_fast, | |
| create_white_canvas, | |
| cleanup_memory, | |
| log_memory_usage, | |
| get_ocr_model_info | |
| ) | |
| ocr_module = "cpu" | |
| print("✅ Game Engine: Mode CPU - EasyOCR activé") | |
| except ImportError: | |
| # Torch pas installé → CPU obligatoire | |
| from image_processing_cpu import ( | |
| recognize_number_fast_with_image, | |
| create_thumbnail_fast, | |
| create_white_canvas, | |
| cleanup_memory, | |
| log_memory_usage, | |
| get_ocr_model_info | |
| ) | |
| ocr_module = "cpu" | |
| print("✅ Game Engine: Mode CPU - EasyOCR activé") | |
| # Récupérer les infos du modèle sélectionné | |
| try: | |
| ocr_info = get_ocr_model_info() | |
| print(f"🎯 OCR sélectionné: {ocr_info['model_name']} sur {ocr_info['device']}") | |
| except Exception as e: | |
| print(f"⚠️ Impossible de récupérer les infos OCR: {e}") | |
| ocr_info = {"model_name": "Error", "device": "Unknown"} | |
| # Imports dataset avec gestion d'erreur | |
| try: | |
| from datasets import Dataset, load_dataset | |
| DATASET_AVAILABLE = True | |
| print("✅ Modules dataset disponibles") | |
| except ImportError as e: | |
| DATASET_AVAILABLE = False | |
| print(f"⚠️ Modules dataset non disponibles: {e}") | |
| # Nom du dataset cohérent avec le space | |
| DATASET_NAME = "hoololi/calcul_ocr_dataset" | |
| # Configuration des difficultés par opération | |
| DIFFICULTY_RANGES = { | |
| "×": {"Facile": (2, 9), "Difficile": (4, 12)}, | |
| "+": {"Facile": (1, 50), "Difficile": (10, 100)}, | |
| "-": {"Facile": (1, 50), "Difficile": (10, 100)}, | |
| "÷": {"Facile": (1, 10), "Difficile": (2, 12)} | |
| } | |
| def create_result_row_with_images(i: int, image: dict | np.ndarray | Image.Image, expected: int, operation_data: tuple[int, int, str, int]) -> dict: | |
| # OCR optimisé | |
| recognized, optimized_image, dataset_image_data = recognize_number_fast_with_image(image) | |
| try: | |
| recognized_num = int(recognized) if recognized.isdigit() else 0 | |
| except: | |
| recognized_num = 0 | |
| is_correct = recognized_num == expected | |
| a, b, operation, correct_result = operation_data | |
| status_icon = "✅" if is_correct else "❌" | |
| status_text = "Correct" if is_correct else "Incorrect" | |
| row_color = "#e8f5e8" if is_correct else "#ffe8e8" | |
| # Miniature | |
| image_thumbnail = create_thumbnail_fast(optimized_image, size=(50, 50)) | |
| # Libérer mémoire | |
| if optimized_image and hasattr(optimized_image, 'close'): | |
| try: | |
| optimized_image.close() | |
| except: | |
| pass | |
| return { | |
| 'html_row': f""" | |
| <tr style="background-color: {row_color};"> | |
| <td style="text-align: center; padding: 8px; border: 1px solid #ddd; color: #333;">{i+1}</td> | |
| <td style="text-align: center; padding: 8px; border: 1px solid #ddd; font-weight: bold; color: #333;">{a}</td> | |
| <td style="text-align: center; padding: 8px; border: 1px solid #ddd; font-weight: bold; color: #333;">{operation}</td> | |
| <td style="text-align: center; padding: 8px; border: 1px solid #ddd; font-weight: bold; color: #333;">{b}</td> | |
| <td style="text-align: center; padding: 8px; border: 1px solid #ddd; font-weight: bold; color: #333;">{expected}</td> | |
| <td style="text-align: center; padding: 8px; border: 1px solid #ddd;">{image_thumbnail}</td> | |
| <td style="text-align: center; padding: 8px; border: 1px solid #ddd; font-weight: bold; color: #333;">{recognized_num}</td> | |
| <td style="text-align: center; padding: 8px; border: 1px solid #ddd; color: #333;">{status_icon} {status_text}</td> | |
| </tr> | |
| """, | |
| 'is_correct': is_correct, | |
| 'recognized': recognized, | |
| 'recognized_num': recognized_num, | |
| 'dataset_image_data': dataset_image_data | |
| } | |
| class MathGame: | |
| """Moteur de jeu mathématique avec traitement parallèle""" | |
| def __init__(self): | |
| self.is_running = False | |
| self.start_time = 0 | |
| self.current_operation = "" | |
| self.correct_answer = 0 | |
| self.user_images = [] | |
| self.expected_answers = [] | |
| self.operations_history = [] | |
| self.question_count = 0 | |
| self.time_remaining = 30 | |
| self.session_data = [] | |
| # Configuration session | |
| self.duration = 30 | |
| self.operation_type = "×" | |
| self.difficulty = "Facile" | |
| # Gestion export | |
| self.export_status = "not_exported" | |
| self.export_timestamp = None | |
| self.export_result = None | |
| # Traitement parallèle | |
| self.processing_queue = queue.Queue() | |
| self.results_cache: Dict[int, dict] = {} | |
| self.worker_thread: Optional[threading.Thread] = None | |
| self.processing_active = False | |
| def get_export_status(self) -> dict[str, str | bool | None]: | |
| return { | |
| "status": self.export_status, | |
| "timestamp": self.export_timestamp, | |
| "result": self.export_result, | |
| "can_export": self.export_status == "not_exported" and len(self.session_data) > 0 | |
| } | |
| def mark_export_in_progress(self) -> None: | |
| self.export_status = "exporting" | |
| self.export_timestamp = datetime.datetime.now().isoformat() | |
| def mark_export_completed(self, result: str) -> None: | |
| self.export_status = "exported" | |
| self.export_result = result | |
| def _start_background_processing(self) -> None: | |
| """Démarre le thread de traitement en arrière-plan""" | |
| if self.worker_thread is None or not self.worker_thread.is_alive(): | |
| self.processing_active = True | |
| self.worker_thread = threading.Thread(target=self._process_images_worker, daemon=True) | |
| self.worker_thread.start() | |
| print("🔄 Thread de traitement parallèle démarré") | |
| def _stop_background_processing(self) -> None: | |
| """Arrête le thread de traitement""" | |
| self.processing_active = False | |
| if self.worker_thread and self.worker_thread.is_alive(): | |
| print("⏹️ Arrêt du thread de traitement parallèle") | |
| def _process_images_worker(self) -> None: | |
| """Worker thread qui traite les images en arrière-plan""" | |
| print("🚀 Worker thread démarré") | |
| while self.processing_active: | |
| try: | |
| if not self.processing_queue.empty(): | |
| question_num, image, expected, operation_data = self.processing_queue.get(timeout=1) | |
| print(f"🔄 Traitement parallèle image {question_num}...") | |
| start_time = time.time() | |
| result_data = create_result_row_with_images(question_num, image, expected, operation_data) | |
| processing_time = time.time() - start_time | |
| # Stocker le résultat | |
| self.results_cache[question_num] = result_data | |
| print(f"✅ Image {question_num} traitée en {processing_time:.1f}s (parallèle)") | |
| else: | |
| time.sleep(0.1) | |
| except queue.Empty: | |
| continue | |
| except Exception as e: | |
| print(f"❌ Erreur traitement parallèle: {e}") | |
| print("🛑 Worker thread terminé") | |
| def _add_image_to_processing_queue(self, question_num: int, image: dict | np.ndarray | Image.Image, | |
| expected: int, operation_data: tuple) -> None: | |
| """Ajoute une image à la queue de traitement""" | |
| if image is not None: | |
| self.processing_queue.put((question_num, image, expected, operation_data)) | |
| print(f"📝 Image {question_num} ajoutée à la queue de traitement") | |
| def generate_multiplication(self, difficulty: str) -> tuple[str, int]: | |
| """Génère une multiplication""" | |
| min_val, max_val = DIFFICULTY_RANGES["×"][difficulty] | |
| a = random.randint(min_val, max_val) | |
| b = random.randint(min_val, max_val) | |
| return f"{a} × {b}", a * b | |
| def generate_addition(self, difficulty: str) -> tuple[str, int]: | |
| """Génère une addition""" | |
| min_val, max_val = DIFFICULTY_RANGES["+"][difficulty] | |
| a = random.randint(min_val, max_val) | |
| b = random.randint(min_val, max_val) | |
| return f"{a} + {b}", a + b | |
| def generate_subtraction(self, difficulty: str) -> tuple[str, int]: | |
| """Génère une soustraction (résultat toujours positif)""" | |
| min_val, max_val = DIFFICULTY_RANGES["-"][difficulty] | |
| a = random.randint(min_val, max_val) | |
| b = random.randint(min_val, a) | |
| return f"{a} - {b}", a - b | |
| def generate_division(self, difficulty: str) -> tuple[str, int]: | |
| """Génère une division exacte""" | |
| min_result, max_result = DIFFICULTY_RANGES["÷"][difficulty] | |
| result = random.randint(min_result, max_result) | |
| if difficulty == "Facile": | |
| divisor = random.randint(2, 9) | |
| else: | |
| divisor = random.randint(2, 12) | |
| dividend = result * divisor | |
| return f"{dividend} ÷ {divisor}", result | |
| def generate_operation(self, operation_type: str, difficulty: str) -> tuple[str, int]: | |
| """Génère une opération selon le type et la difficulté""" | |
| if operation_type == "×": | |
| return self.generate_multiplication(difficulty) | |
| elif operation_type == "+": | |
| return self.generate_addition(difficulty) | |
| elif operation_type == "-": | |
| return self.generate_subtraction(difficulty) | |
| elif operation_type == "÷": | |
| return self.generate_division(difficulty) | |
| elif operation_type == "Aléatoire": | |
| random_op = random.choice(["×", "+", "-", "÷"]) | |
| return self.generate_operation(random_op, difficulty) | |
| else: | |
| return self.generate_multiplication(difficulty) | |
| def start_game(self, duration: str, operation: str, difficulty: str) -> tuple[str, Image.Image, str, str, gr.update, gr.update, str]: | |
| """Démarre le jeu avec la configuration choisie""" | |
| # Configuration | |
| self.duration = 60 if duration == "60 secondes" else 30 | |
| self.operation_type = operation | |
| self.difficulty = difficulty | |
| # Nettoyage | |
| if hasattr(self, 'user_images') and self.user_images: | |
| for img in self.user_images: | |
| if hasattr(img, 'close'): | |
| try: | |
| img.close() | |
| except: | |
| pass | |
| if hasattr(self, 'session_data') and self.session_data: | |
| for entry in self.session_data: | |
| if 'user_drawing' in entry and entry['user_drawing']: | |
| entry['user_drawing'] = None | |
| self.session_data.clear() | |
| # Réinit avec nettoyage parallèle | |
| self._stop_background_processing() | |
| self.results_cache.clear() | |
| while not self.processing_queue.empty(): | |
| try: | |
| self.processing_queue.get_nowait() | |
| except queue.Empty: | |
| break | |
| self.is_running = True | |
| self.start_time = time.time() | |
| self.user_images = [] | |
| self.expected_answers = [] | |
| self.operations_history = [] | |
| self.question_count = 0 | |
| self.time_remaining = self.duration | |
| self.session_data = [] | |
| # Reset export | |
| self.export_status = "not_exported" | |
| self.export_timestamp = None | |
| self.export_result = None | |
| # Démarrer le traitement parallèle | |
| self._start_background_processing() | |
| gc.collect() | |
| # Première opération | |
| operation_str, answer = self.generate_operation(self.operation_type, self.difficulty) | |
| self.current_operation = operation_str | |
| self.correct_answer = answer | |
| # Parser l'opération pour l'historique | |
| parts = operation_str.split() | |
| a, op, b = int(parts[0]), parts[1], int(parts[2]) | |
| self.operations_history.append((a, b, op, answer)) | |
| # Affichage adapté selon l'opération | |
| operation_emoji = { | |
| "×": "✖️", "+": "➕", "-": "➖", "÷": "➗", "Aléatoire": "🎲" | |
| } | |
| emoji = operation_emoji.get(self.operation_type, "🔢") | |
| return ( | |
| f'<div style="font-size: 3em; font-weight: bold; text-align: center; padding: 20px; background: linear-gradient(45deg, #667eea 0%, #764ba2 100%); color: white; border-radius: 10px;">{operation_str}</div>', | |
| create_white_canvas(), | |
| f"🎯 {emoji} {self.operation_type} • {self.difficulty} • Écrivez votre réponse !", | |
| f"⏱️ Temps restant: {self.time_remaining}s", | |
| gr.update(interactive=False), | |
| gr.update(interactive=True), | |
| "" | |
| ) | |
| def next_question(self, image_data: dict | np.ndarray | Image.Image | None) -> tuple[str, Image.Image, str, str, gr.update, gr.update, str]: | |
| if not self.is_running: | |
| return ( | |
| f'<div style="font-size: 3em; font-weight: bold; text-align: center; padding: 20px; background: linear-gradient(45deg, #667eea 0%, #764ba2 100%); color: white; border-radius: 10px;">{self.current_operation}</div>', | |
| image_data, | |
| "❌ Le jeu n'est pas en cours !", | |
| "⏱️ Temps: 0s", | |
| gr.update(interactive=True), | |
| gr.update(interactive=False), | |
| "" | |
| ) | |
| elapsed_time = time.time() - self.start_time | |
| if elapsed_time >= self.duration: | |
| return self.end_game(image_data) | |
| if image_data is not None: | |
| # Ajouter l'image à la liste ET au traitement parallèle | |
| self.user_images.append(image_data) | |
| self.expected_answers.append(self.correct_answer) | |
| # Parser l'opération actuelle pour le traitement | |
| parts = self.current_operation.split() | |
| a, op, b = int(parts[0]), parts[1], int(parts[2]) | |
| current_operation_data = (a, b, op, self.correct_answer) | |
| # Lancer le traitement en parallèle de l'image qu'on vient de recevoir | |
| self._add_image_to_processing_queue(self.question_count, image_data, self.correct_answer, current_operation_data) | |
| self.question_count += 1 | |
| # Nouvelle opération | |
| operation_str, answer = self.generate_operation(self.operation_type, self.difficulty) | |
| self.current_operation = operation_str | |
| self.correct_answer = answer | |
| # Parser pour l'historique | |
| parts = operation_str.split() | |
| a, op, b = int(parts[0]), parts[1], int(parts[2]) | |
| self.operations_history.append((a, b, op, answer)) | |
| time_remaining = max(0, self.duration - int(elapsed_time)) | |
| self.time_remaining = time_remaining | |
| if time_remaining <= 0: | |
| return self.end_game(image_data) | |
| # Emoji pour l'opération | |
| operation_emoji = { | |
| "×": "✖️", "+": "➕", "-": "➖", "÷": "➗", "Aléatoire": "🎲" | |
| } | |
| emoji = operation_emoji.get(self.operation_type, "🔢") | |
| return ( | |
| f'<div style="font-size: 3em; font-weight: bold; text-align: center; padding: 20px; background: linear-gradient(45deg, #667eea 0%, #764ba2 100%); color: white; border-radius: 10px;">{operation_str}</div>', | |
| create_white_canvas(), | |
| f"🎯 {emoji} Question {self.question_count + 1} • {self.difficulty}", | |
| f"⏱️ Temps restant: {time_remaining}s", | |
| gr.update(interactive=False), | |
| gr.update(interactive=True), | |
| "" | |
| ) | |
| def end_game(self, final_image: dict | np.ndarray | Image.Image | None) -> tuple[str, Image.Image, str, str, gr.update, gr.update, str]: | |
| self.is_running = False | |
| # Arrêter le traitement parallèle | |
| self._stop_background_processing() | |
| print("🏁 Fin de jeu - Assemblage des résultats...") | |
| if final_image is not None: | |
| self.user_images.append(final_image) | |
| self.expected_answers.append(self.correct_answer) | |
| # Traitement de la dernière image | |
| parts = self.current_operation.split() | |
| a, op, b = int(parts[0]), parts[1], int(parts[2]) | |
| final_operation_data = (a, b, op, self.correct_answer) | |
| # Traiter la dernière image immédiatement (pas en parallèle) | |
| print(f"🔄 Traitement final de l'image {self.question_count}...") | |
| final_result = create_result_row_with_images(self.question_count, final_image, self.correct_answer, final_operation_data) | |
| self.results_cache[self.question_count] = final_result | |
| self.question_count += 1 | |
| if len(self.operations_history) < len(self.user_images): | |
| self.operations_history.append((a, b, op, self.correct_answer)) | |
| # Attendre que toutes les images soient traitées | |
| max_wait = 10 | |
| wait_start = time.time() | |
| expected_results = len(self.user_images) | |
| print(f"⏳ Attente de {expected_results} résultats...") | |
| while len(self.results_cache) < expected_results and (time.time() - wait_start) < max_wait: | |
| time.sleep(0.1) | |
| results_ready = len(self.results_cache) | |
| print(f"✅ {results_ready}/{expected_results} résultats prêts") | |
| # Assembler les résultats dans l'ordre | |
| correct_answers = 0 | |
| total_questions = len(self.user_images) | |
| table_rows_html = "" | |
| session_timestamp = datetime.datetime.now().isoformat() | |
| session_id = f"session_{int(datetime.datetime.now().timestamp())}_{str(uuid.uuid4())[:8]}" | |
| self.session_data = [] | |
| images_saved = 0 | |
| total_image_size_kb = 0 | |
| print(f"📊 Assemblage de {total_questions} résultats...") | |
| for i in range(total_questions): | |
| if i in self.results_cache: | |
| row_data = self.results_cache[i] | |
| print(f" ✅ Résultat {i} du cache parallèle") | |
| else: | |
| print(f" 🔄 Traitement fallback pour résultat {i}...") | |
| if i < len(self.operations_history): | |
| row_data = create_result_row_with_images(i, self.user_images[i], self.expected_answers[i], self.operations_history[i]) | |
| else: | |
| row_data = { | |
| 'html_row': f'<tr><td>{i+1}</td><td colspan="7">Erreur traitement</td></tr>', | |
| 'is_correct': False, | |
| 'recognized': "0", | |
| 'recognized_num': 0, | |
| 'dataset_image_data': None | |
| } | |
| table_rows_html += row_data['html_row'] | |
| if row_data['is_correct']: | |
| correct_answers += 1 | |
| # Structure pour dataset avec debug OCR | |
| a, b, operation, correct_result = self.operations_history[i] if i < len(self.operations_history) else (0, 0, "×", 0) | |
| try: | |
| ocr_info_data = get_ocr_model_info() | |
| print(f"🔍 Debug OCR info: {ocr_info_data}") | |
| except Exception as e: | |
| print(f"❌ Erreur get_ocr_model_info: {e}") | |
| ocr_info_data = {"model_name": "Error", "device": "Unknown"} | |
| entry = { | |
| "session_id": session_id, | |
| "timestamp": session_timestamp, | |
| "question_number": i + 1, | |
| "session_duration": self.duration, | |
| "operation_type": self.operation_type, | |
| "difficulty_level": self.difficulty, | |
| "operand_a": a, | |
| "operand_b": b, | |
| "operation": operation, | |
| "correct_answer": self.expected_answers[i] if i < len(self.expected_answers) else 0, | |
| "ocr_model": ocr_info_data.get("model_name", "Unknown"), | |
| "ocr_device": ocr_info_data.get("device", "Unknown"), | |
| "user_answer_ocr": row_data['recognized'], | |
| "user_answer_parsed": row_data['recognized_num'], | |
| "is_correct": row_data['is_correct'], | |
| "total_questions": total_questions, | |
| "app_version": "3.0_calcul_ocr_parallel" | |
| } | |
| print(f"🔍 Debug entry OCR fields: ocr_model={entry['ocr_model']}, ocr_device={entry['ocr_device']}") | |
| if row_data['dataset_image_data']: | |
| entry["handwriting_image"] = row_data['dataset_image_data']["image_base64"] | |
| entry["image_width"] = int(row_data['dataset_image_data']["compressed_size"][0]) | |
| entry["image_height"] = int(row_data['dataset_image_data']["compressed_size"][1]) | |
| entry["image_size_kb"] = float(row_data['dataset_image_data']["file_size_kb"]) | |
| entry["has_image"] = True | |
| images_saved += 1 | |
| total_image_size_kb += row_data['dataset_image_data']["file_size_kb"] | |
| else: | |
| entry["has_image"] = False | |
| self.session_data.append(entry) | |
| accuracy = (correct_answers / total_questions * 100) if total_questions > 0 else 0 | |
| for entry in self.session_data: | |
| entry["session_accuracy"] = accuracy | |
| # Nettoyage mémoire | |
| for img in self.user_images: | |
| if hasattr(img, 'close'): | |
| try: | |
| img.close() | |
| except: | |
| pass | |
| gc.collect() | |
| # HTML résultats | |
| table_html = f""" | |
| <div style="overflow-x: auto; margin: 20px 0;"> | |
| <table style="width: 100%; border-collapse: collapse; border: 2px solid #4a90e2;"> | |
| <thead> | |
| <tr style="background: #4a90e2; color: white;"> | |
| <th style="padding: 8px;">Question</th> | |
| <th style="padding: 8px;">A</th> | |
| <th style="padding: 8px;">Op</th> | |
| <th style="padding: 8px;">B</th> | |
| <th style="padding: 8px;">Réponse</th> | |
| <th style="padding: 8px;">Votre dessin</th> | |
| <th style="padding: 8px;">OCR</th> | |
| <th style="padding: 8px;">Statut</th> | |
| </tr> | |
| </thead> | |
| <tbody> | |
| {table_rows_html} | |
| </tbody> | |
| </table> | |
| </div> | |
| """ | |
| # Configuration session pour affichage | |
| config_display = f"{self.operation_type} • {self.difficulty} • {self.duration}s" | |
| operation_emoji = { | |
| "×": "✖️", "+": "➕", "-": "➖", "÷": "➗", "Aléatoire": "🎲" | |
| } | |
| emoji = operation_emoji.get(self.operation_type, "🔢") | |
| export_info = self.get_export_status() | |
| if export_info["can_export"]: | |
| export_section = f""" | |
| <div style="margin-top: 20px; padding: 15px; background-color: #e8f5e8; border-radius: 8px;"> | |
| <h3 style="color: #2e7d32;">📤 Ajouter cette série au dataset ?</h3> | |
| <p style="color: #2e7d32;"> | |
| ✅ {total_questions} réponses • 📊 {accuracy:.1f}% de précision<br> | |
| 📸 {images_saved} opérations et images sauvegardées ({total_image_size_kb:.1f}KB)<br> | |
| ⚙️ Configuration: {config_display} | |
| </p> | |
| </div> | |
| """ | |
| else: | |
| export_section = "" | |
| final_results = f""" | |
| <div style="margin: 20px 0;"> | |
| <h1 style="text-align: center; color: #4a90e2;">🎉 Session terminée !</h1> | |
| <div style="background: linear-gradient(45deg, #667eea 0%, #764ba2 100%); color: white; padding: 20px; border-radius: 10px; margin: 20px 0;"> | |
| <h2>📈 Résultats</h2> | |
| <div style="text-align: center; margin-bottom: 15px;"> | |
| <strong>{emoji} {config_display}</strong> | |
| </div> | |
| <div style="display: flex; justify-content: space-around; flex-wrap: wrap;"> | |
| <div style="text-align: center; margin: 10px;"> | |
| <div style="font-size: 2em; font-weight: bold;">{total_questions}</div> | |
| <div>Questions</div> | |
| </div> | |
| <div style="text-align: center; margin: 10px;"> | |
| <div style="font-size: 2em; font-weight: bold; color: #90EE90;">{correct_answers}</div> | |
| <div>Correctes</div> | |
| </div> | |
| <div style="text-align: center; margin: 10px;"> | |
| <div style="font-size: 2em; font-weight: bold; color: #FFB6C1;">{total_questions - correct_answers}</div> | |
| <div>Incorrectes</div> | |
| </div> | |
| <div style="text-align: center; margin: 10px;"> | |
| <div style="font-size: 2em; font-weight: bold;">{accuracy:.1f}%</div> | |
| <div>Précision</div> | |
| </div> | |
| </div> | |
| </div> | |
| <h2 style="color: #4a90e2;">📊 Détail des Réponses</h2> | |
| {table_html} | |
| {export_section} | |
| </div> | |
| """ | |
| return ( | |
| """<div style="font-size: 3em; font-weight: bold; text-align: center; padding: 20px; background: linear-gradient(45deg, #667eea 0%, #764ba2 100%); color: white; border-radius: 10px;">🏁 C'est fini !</div>""", | |
| create_white_canvas(), | |
| f"✨ Session {config_display} terminée !", | |
| "⏱️ Temps écoulé !", | |
| gr.update(interactive=True), | |
| gr.update(interactive=False), | |
| final_results | |
| ) | |
| def export_to_clean_dataset(session_data: list[dict], dataset_name: str = None) -> str: | |
| """Export vers le nouveau dataset calcul_ocr_dataset""" | |
| if dataset_name is None: | |
| dataset_name = DATASET_NAME # Utiliser la variable globale | |
| if not DATASET_AVAILABLE: | |
| return "❌ Modules dataset non disponibles" | |
| hf_token = os.getenv("HF_TOKEN") or os.getenv("tk_calcul_ocr") | |
| if not hf_token: | |
| return "❌ Token HuggingFace manquant (HF_TOKEN ou tk_calcul_ocr)" | |
| try: | |
| print(f"\n🚀 === EXPORT VERS DATASET CALCUL OCR ===") | |
| print(f"📊 Dataset: {dataset_name}") | |
| # Filtrer les entrées avec images et ajouter les infos OCR globalement | |
| clean_entries = [] | |
| # Récupérer une seule fois les infos OCR pour toute la session | |
| try: | |
| global_ocr_info = get_ocr_model_info() | |
| print(f"🔍 Infos OCR globales: {global_ocr_info}") | |
| except Exception as e: | |
| print(f"❌ Erreur infos OCR globales: {e}") | |
| global_ocr_info = {"model_name": "Unknown", "device": "Unknown"} | |
| for entry in session_data: | |
| if entry.get('has_image', False): | |
| # Ajouter explicitement les champs OCR manquants | |
| entry_with_ocr = entry.copy() | |
| entry_with_ocr["ocr_model"] = global_ocr_info.get("model_name", "Unknown") | |
| entry_with_ocr["ocr_device"] = global_ocr_info.get("device", "Unknown") | |
| print(f"🔍 Entry avec OCR: ocr_model={entry_with_ocr['ocr_model']}, ocr_device={entry_with_ocr['ocr_device']}") | |
| clean_entries.append(entry_with_ocr) | |
| # Créer un dataset de test avec structure forcée | |
| if len(clean_entries) == 0: | |
| return "❌ Aucune entrée avec image à exporter" | |
| # Vérifier la structure de la première entrée | |
| sample_entry = clean_entries[0] | |
| print(f"🔍 Structure première entrée: {list(sample_entry.keys())}") | |
| print(f"🔍 OCR dans entrée: ocr_model={sample_entry.get('ocr_model', 'MISSING')}, ocr_device={sample_entry.get('ocr_device', 'MISSING')}") | |
| # Charger dataset existant et combiner (IMPORTANT!) | |
| try: | |
| existing_dataset = load_dataset(dataset_name, split="train") | |
| existing_data = existing_dataset.to_list() | |
| print(f"📊 {len(existing_data)} entrées existantes trouvées") | |
| # Combiner ancien + nouveau | |
| combined_data = existing_data + clean_entries | |
| clean_dataset = Dataset.from_list(combined_data) | |
| print(f"📊 Dataset combiné: {len(existing_data)} existantes + {len(clean_entries)} nouvelles = {len(combined_data)} total") | |
| except Exception as e: | |
| print(f"📊 Dataset non trouvé, création nouveau: {e}") | |
| # Si le dataset n'existe pas, créer depuis les nouvelles entrées | |
| clean_dataset = Dataset.from_list(clean_entries) | |
| print(f"📊 Nouveau dataset créé avec {len(clean_entries)} entrées") | |
| print(f"✅ Dataset créé - Features:") | |
| for feature_name in clean_dataset.features: | |
| print(f" - {feature_name}: {clean_dataset.features[feature_name]}") | |
| # Statistiques par opération | |
| operations_count = {} | |
| for entry in clean_entries: | |
| op = entry.get('operation_type', 'unknown') | |
| operations_count[op] = operations_count.get(op, 0) + 1 | |
| operations_summary = ", ".join([f"{op}: {count}" for op, count in operations_count.items()]) | |
| # Push vers HuggingFace | |
| print(f"📤 Push vers {dataset_name}...") | |
| clean_dataset.push_to_hub( | |
| dataset_name, | |
| private=False, | |
| token=hf_token, | |
| commit_message=f"Add {len(clean_entries)} handwriting samples for math OCR ({operations_summary})" | |
| ) | |
| cleanup_memory() | |
| return f"""✅ Session ajoutée au dataset avec succès ! | |
| 📊 Dataset: {dataset_name} | |
| 📸 Images: {len(clean_entries)} | |
| 🔢 Opérations: {operations_summary} | |
| 📈 Total: {len(clean_dataset)} | |
| 🔗 Le dataset est consultable ici : https://huggingface.co/datasets/{dataset_name}""" | |
| except Exception as e: | |
| print(f"❌ ERREUR: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return f"❌ Erreur: {str(e)}" |