# ==========================================
# game_engine.py - Calcul OCR v3.0 CLEAN
# ==========================================
"""
Moteur de jeu mathématique avec traitement parallèle et auto-détection OCR
"""
import random
import time
import datetime
import gradio as gr
import os
import uuid
import gc
import base64
from io import BytesIO
import numpy as np
from PIL import Image
import threading
import queue
from typing import Dict, Tuple, Optional
# Auto-détection propre : GPU OU CPU uniquement
ocr_module = None
ocr_info = {"model_name": "Unknown", "device": "Unknown"}
try:
# Test GPU : torch + CUDA disponible
import torch
if torch.cuda.is_available():
from image_processing_gpu import (
recognize_number_fast_with_image,
create_thumbnail_fast,
create_white_canvas,
cleanup_memory,
log_memory_usage,
get_ocr_model_info
)
ocr_module = "gpu"
print("✅ Game Engine: Mode GPU - TrOCR activé")
else:
# Torch installé mais pas de GPU → CPU
from image_processing_cpu import (
recognize_number_fast_with_image,
create_thumbnail_fast,
create_white_canvas,
cleanup_memory,
log_memory_usage,
get_ocr_model_info
)
ocr_module = "cpu"
print("✅ Game Engine: Mode CPU - EasyOCR activé")
except ImportError:
# Torch pas installé → CPU obligatoire
from image_processing_cpu import (
recognize_number_fast_with_image,
create_thumbnail_fast,
create_white_canvas,
cleanup_memory,
log_memory_usage,
get_ocr_model_info
)
ocr_module = "cpu"
print("✅ Game Engine: Mode CPU - EasyOCR activé")
# Récupérer les infos du modèle sélectionné
try:
ocr_info = get_ocr_model_info()
print(f"🎯 OCR sélectionné: {ocr_info['model_name']} sur {ocr_info['device']}")
except Exception as e:
print(f"⚠️ Impossible de récupérer les infos OCR: {e}")
ocr_info = {"model_name": "Error", "device": "Unknown"}
# Imports dataset avec gestion d'erreur
try:
from datasets import Dataset, load_dataset
DATASET_AVAILABLE = True
print("✅ Modules dataset disponibles")
except ImportError as e:
DATASET_AVAILABLE = False
print(f"⚠️ Modules dataset non disponibles: {e}")
# Nom du dataset cohérent avec le space
DATASET_NAME = "hoololi/calcul_ocr_dataset"
# Configuration des difficultés par opération
DIFFICULTY_RANGES = {
"×": {"Facile": (2, 9), "Difficile": (4, 12)},
"+": {"Facile": (1, 50), "Difficile": (10, 100)},
"-": {"Facile": (1, 50), "Difficile": (10, 100)},
"÷": {"Facile": (1, 10), "Difficile": (2, 12)}
}
def create_result_row_with_images(i: int, image: dict | np.ndarray | Image.Image, expected: int, operation_data: tuple[int, int, str, int]) -> dict:
# OCR optimisé
recognized, optimized_image, dataset_image_data = recognize_number_fast_with_image(image)
try:
recognized_num = int(recognized) if recognized.isdigit() else 0
except:
recognized_num = 0
is_correct = recognized_num == expected
a, b, operation, correct_result = operation_data
status_icon = "✅" if is_correct else "❌"
status_text = "Correct" if is_correct else "Incorrect"
row_color = "#e8f5e8" if is_correct else "#ffe8e8"
# Miniature
image_thumbnail = create_thumbnail_fast(optimized_image, size=(50, 50))
# Libérer mémoire
if optimized_image and hasattr(optimized_image, 'close'):
try:
optimized_image.close()
except:
pass
return {
'html_row': f"""
| {i+1} |
{a} |
{operation} |
{b} |
{expected} |
{image_thumbnail} |
{recognized_num} |
{status_icon} {status_text} |
""",
'is_correct': is_correct,
'recognized': recognized,
'recognized_num': recognized_num,
'dataset_image_data': dataset_image_data
}
class MathGame:
"""Moteur de jeu mathématique avec traitement parallèle"""
def __init__(self):
self.is_running = False
self.start_time = 0
self.current_operation = ""
self.correct_answer = 0
self.user_images = []
self.expected_answers = []
self.operations_history = []
self.question_count = 0
self.time_remaining = 30
self.session_data = []
# Configuration session
self.duration = 30
self.operation_type = "×"
self.difficulty = "Facile"
# Gestion export
self.export_status = "not_exported"
self.export_timestamp = None
self.export_result = None
# Traitement parallèle
self.processing_queue = queue.Queue()
self.results_cache: Dict[int, dict] = {}
self.worker_thread: Optional[threading.Thread] = None
self.processing_active = False
def get_export_status(self) -> dict[str, str | bool | None]:
return {
"status": self.export_status,
"timestamp": self.export_timestamp,
"result": self.export_result,
"can_export": self.export_status == "not_exported" and len(self.session_data) > 0
}
def mark_export_in_progress(self) -> None:
self.export_status = "exporting"
self.export_timestamp = datetime.datetime.now().isoformat()
def mark_export_completed(self, result: str) -> None:
self.export_status = "exported"
self.export_result = result
def _start_background_processing(self) -> None:
"""Démarre le thread de traitement en arrière-plan"""
if self.worker_thread is None or not self.worker_thread.is_alive():
self.processing_active = True
self.worker_thread = threading.Thread(target=self._process_images_worker, daemon=True)
self.worker_thread.start()
print("🔄 Thread de traitement parallèle démarré")
def _stop_background_processing(self) -> None:
"""Arrête le thread de traitement"""
self.processing_active = False
if self.worker_thread and self.worker_thread.is_alive():
print("⏹️ Arrêt du thread de traitement parallèle")
def _process_images_worker(self) -> None:
"""Worker thread qui traite les images en arrière-plan"""
print("🚀 Worker thread démarré")
while self.processing_active:
try:
if not self.processing_queue.empty():
question_num, image, expected, operation_data = self.processing_queue.get(timeout=1)
print(f"🔄 Traitement parallèle image {question_num}...")
start_time = time.time()
result_data = create_result_row_with_images(question_num, image, expected, operation_data)
processing_time = time.time() - start_time
# Stocker le résultat
self.results_cache[question_num] = result_data
print(f"✅ Image {question_num} traitée en {processing_time:.1f}s (parallèle)")
else:
time.sleep(0.1)
except queue.Empty:
continue
except Exception as e:
print(f"❌ Erreur traitement parallèle: {e}")
print("🛑 Worker thread terminé")
def _add_image_to_processing_queue(self, question_num: int, image: dict | np.ndarray | Image.Image,
expected: int, operation_data: tuple) -> None:
"""Ajoute une image à la queue de traitement"""
if image is not None:
self.processing_queue.put((question_num, image, expected, operation_data))
print(f"📝 Image {question_num} ajoutée à la queue de traitement")
def generate_multiplication(self, difficulty: str) -> tuple[str, int]:
"""Génère une multiplication"""
min_val, max_val = DIFFICULTY_RANGES["×"][difficulty]
a = random.randint(min_val, max_val)
b = random.randint(min_val, max_val)
return f"{a} × {b}", a * b
def generate_addition(self, difficulty: str) -> tuple[str, int]:
"""Génère une addition"""
min_val, max_val = DIFFICULTY_RANGES["+"][difficulty]
a = random.randint(min_val, max_val)
b = random.randint(min_val, max_val)
return f"{a} + {b}", a + b
def generate_subtraction(self, difficulty: str) -> tuple[str, int]:
"""Génère une soustraction (résultat toujours positif)"""
min_val, max_val = DIFFICULTY_RANGES["-"][difficulty]
a = random.randint(min_val, max_val)
b = random.randint(min_val, a)
return f"{a} - {b}", a - b
def generate_division(self, difficulty: str) -> tuple[str, int]:
"""Génère une division exacte"""
min_result, max_result = DIFFICULTY_RANGES["÷"][difficulty]
result = random.randint(min_result, max_result)
if difficulty == "Facile":
divisor = random.randint(2, 9)
else:
divisor = random.randint(2, 12)
dividend = result * divisor
return f"{dividend} ÷ {divisor}", result
def generate_operation(self, operation_type: str, difficulty: str) -> tuple[str, int]:
"""Génère une opération selon le type et la difficulté"""
if operation_type == "×":
return self.generate_multiplication(difficulty)
elif operation_type == "+":
return self.generate_addition(difficulty)
elif operation_type == "-":
return self.generate_subtraction(difficulty)
elif operation_type == "÷":
return self.generate_division(difficulty)
elif operation_type == "Aléatoire":
random_op = random.choice(["×", "+", "-", "÷"])
return self.generate_operation(random_op, difficulty)
else:
return self.generate_multiplication(difficulty)
def start_game(self, duration: str, operation: str, difficulty: str) -> tuple[str, Image.Image, str, str, gr.update, gr.update, str]:
"""Démarre le jeu avec la configuration choisie"""
# Configuration
self.duration = 60 if duration == "60 secondes" else 30
self.operation_type = operation
self.difficulty = difficulty
# Nettoyage
if hasattr(self, 'user_images') and self.user_images:
for img in self.user_images:
if hasattr(img, 'close'):
try:
img.close()
except:
pass
if hasattr(self, 'session_data') and self.session_data:
for entry in self.session_data:
if 'user_drawing' in entry and entry['user_drawing']:
entry['user_drawing'] = None
self.session_data.clear()
# Réinit avec nettoyage parallèle
self._stop_background_processing()
self.results_cache.clear()
while not self.processing_queue.empty():
try:
self.processing_queue.get_nowait()
except queue.Empty:
break
self.is_running = True
self.start_time = time.time()
self.user_images = []
self.expected_answers = []
self.operations_history = []
self.question_count = 0
self.time_remaining = self.duration
self.session_data = []
# Reset export
self.export_status = "not_exported"
self.export_timestamp = None
self.export_result = None
# Démarrer le traitement parallèle
self._start_background_processing()
gc.collect()
# Première opération
operation_str, answer = self.generate_operation(self.operation_type, self.difficulty)
self.current_operation = operation_str
self.correct_answer = answer
# Parser l'opération pour l'historique
parts = operation_str.split()
a, op, b = int(parts[0]), parts[1], int(parts[2])
self.operations_history.append((a, b, op, answer))
# Affichage adapté selon l'opération
operation_emoji = {
"×": "✖️", "+": "➕", "-": "➖", "÷": "➗", "Aléatoire": "🎲"
}
emoji = operation_emoji.get(self.operation_type, "🔢")
return (
f'{operation_str}
',
create_white_canvas(),
f"🎯 {emoji} {self.operation_type} • {self.difficulty} • Écrivez votre réponse !",
f"⏱️ Temps restant: {self.time_remaining}s",
gr.update(interactive=False),
gr.update(interactive=True),
""
)
def next_question(self, image_data: dict | np.ndarray | Image.Image | None) -> tuple[str, Image.Image, str, str, gr.update, gr.update, str]:
if not self.is_running:
return (
f'{self.current_operation}
',
image_data,
"❌ Le jeu n'est pas en cours !",
"⏱️ Temps: 0s",
gr.update(interactive=True),
gr.update(interactive=False),
""
)
elapsed_time = time.time() - self.start_time
if elapsed_time >= self.duration:
return self.end_game(image_data)
if image_data is not None:
# Ajouter l'image à la liste ET au traitement parallèle
self.user_images.append(image_data)
self.expected_answers.append(self.correct_answer)
# Parser l'opération actuelle pour le traitement
parts = self.current_operation.split()
a, op, b = int(parts[0]), parts[1], int(parts[2])
current_operation_data = (a, b, op, self.correct_answer)
# Lancer le traitement en parallèle de l'image qu'on vient de recevoir
self._add_image_to_processing_queue(self.question_count, image_data, self.correct_answer, current_operation_data)
self.question_count += 1
# Nouvelle opération
operation_str, answer = self.generate_operation(self.operation_type, self.difficulty)
self.current_operation = operation_str
self.correct_answer = answer
# Parser pour l'historique
parts = operation_str.split()
a, op, b = int(parts[0]), parts[1], int(parts[2])
self.operations_history.append((a, b, op, answer))
time_remaining = max(0, self.duration - int(elapsed_time))
self.time_remaining = time_remaining
if time_remaining <= 0:
return self.end_game(image_data)
# Emoji pour l'opération
operation_emoji = {
"×": "✖️", "+": "➕", "-": "➖", "÷": "➗", "Aléatoire": "🎲"
}
emoji = operation_emoji.get(self.operation_type, "🔢")
return (
f'{operation_str}
',
create_white_canvas(),
f"🎯 {emoji} Question {self.question_count + 1} • {self.difficulty}",
f"⏱️ Temps restant: {time_remaining}s",
gr.update(interactive=False),
gr.update(interactive=True),
""
)
def end_game(self, final_image: dict | np.ndarray | Image.Image | None) -> tuple[str, Image.Image, str, str, gr.update, gr.update, str]:
self.is_running = False
# Arrêter le traitement parallèle
self._stop_background_processing()
print("🏁 Fin de jeu - Assemblage des résultats...")
if final_image is not None:
self.user_images.append(final_image)
self.expected_answers.append(self.correct_answer)
# Traitement de la dernière image
parts = self.current_operation.split()
a, op, b = int(parts[0]), parts[1], int(parts[2])
final_operation_data = (a, b, op, self.correct_answer)
# Traiter la dernière image immédiatement (pas en parallèle)
print(f"🔄 Traitement final de l'image {self.question_count}...")
final_result = create_result_row_with_images(self.question_count, final_image, self.correct_answer, final_operation_data)
self.results_cache[self.question_count] = final_result
self.question_count += 1
if len(self.operations_history) < len(self.user_images):
self.operations_history.append((a, b, op, self.correct_answer))
# Attendre que toutes les images soient traitées
max_wait = 10
wait_start = time.time()
expected_results = len(self.user_images)
print(f"⏳ Attente de {expected_results} résultats...")
while len(self.results_cache) < expected_results and (time.time() - wait_start) < max_wait:
time.sleep(0.1)
results_ready = len(self.results_cache)
print(f"✅ {results_ready}/{expected_results} résultats prêts")
# Assembler les résultats dans l'ordre
correct_answers = 0
total_questions = len(self.user_images)
table_rows_html = ""
session_timestamp = datetime.datetime.now().isoformat()
session_id = f"session_{int(datetime.datetime.now().timestamp())}_{str(uuid.uuid4())[:8]}"
self.session_data = []
images_saved = 0
total_image_size_kb = 0
print(f"📊 Assemblage de {total_questions} résultats...")
for i in range(total_questions):
if i in self.results_cache:
row_data = self.results_cache[i]
print(f" ✅ Résultat {i} du cache parallèle")
else:
print(f" 🔄 Traitement fallback pour résultat {i}...")
if i < len(self.operations_history):
row_data = create_result_row_with_images(i, self.user_images[i], self.expected_answers[i], self.operations_history[i])
else:
row_data = {
'html_row': f'| {i+1} | Erreur traitement |
',
'is_correct': False,
'recognized': "0",
'recognized_num': 0,
'dataset_image_data': None
}
table_rows_html += row_data['html_row']
if row_data['is_correct']:
correct_answers += 1
# Structure pour dataset avec debug OCR
a, b, operation, correct_result = self.operations_history[i] if i < len(self.operations_history) else (0, 0, "×", 0)
try:
ocr_info_data = get_ocr_model_info()
print(f"🔍 Debug OCR info: {ocr_info_data}")
except Exception as e:
print(f"❌ Erreur get_ocr_model_info: {e}")
ocr_info_data = {"model_name": "Error", "device": "Unknown"}
entry = {
"session_id": session_id,
"timestamp": session_timestamp,
"question_number": i + 1,
"session_duration": self.duration,
"operation_type": self.operation_type,
"difficulty_level": self.difficulty,
"operand_a": a,
"operand_b": b,
"operation": operation,
"correct_answer": self.expected_answers[i] if i < len(self.expected_answers) else 0,
"ocr_model": ocr_info_data.get("model_name", "Unknown"),
"ocr_device": ocr_info_data.get("device", "Unknown"),
"user_answer_ocr": row_data['recognized'],
"user_answer_parsed": row_data['recognized_num'],
"is_correct": row_data['is_correct'],
"total_questions": total_questions,
"app_version": "3.0_calcul_ocr_parallel"
}
print(f"🔍 Debug entry OCR fields: ocr_model={entry['ocr_model']}, ocr_device={entry['ocr_device']}")
if row_data['dataset_image_data']:
entry["handwriting_image"] = row_data['dataset_image_data']["image_base64"]
entry["image_width"] = int(row_data['dataset_image_data']["compressed_size"][0])
entry["image_height"] = int(row_data['dataset_image_data']["compressed_size"][1])
entry["image_size_kb"] = float(row_data['dataset_image_data']["file_size_kb"])
entry["has_image"] = True
images_saved += 1
total_image_size_kb += row_data['dataset_image_data']["file_size_kb"]
else:
entry["has_image"] = False
self.session_data.append(entry)
accuracy = (correct_answers / total_questions * 100) if total_questions > 0 else 0
for entry in self.session_data:
entry["session_accuracy"] = accuracy
# Nettoyage mémoire
for img in self.user_images:
if hasattr(img, 'close'):
try:
img.close()
except:
pass
gc.collect()
# HTML résultats
table_html = f"""
| Question |
A |
Op |
B |
Réponse |
Votre dessin |
OCR |
Statut |
{table_rows_html}
"""
# Configuration session pour affichage
config_display = f"{self.operation_type} • {self.difficulty} • {self.duration}s"
operation_emoji = {
"×": "✖️", "+": "➕", "-": "➖", "÷": "➗", "Aléatoire": "🎲"
}
emoji = operation_emoji.get(self.operation_type, "🔢")
export_info = self.get_export_status()
if export_info["can_export"]:
export_section = f"""
📤 Ajouter cette série au dataset ?
✅ {total_questions} réponses • 📊 {accuracy:.1f}% de précision
📸 {images_saved} opérations et images sauvegardées ({total_image_size_kb:.1f}KB)
⚙️ Configuration: {config_display}
"""
else:
export_section = ""
final_results = f"""
🎉 Session terminée !
📈 Résultats
{emoji} {config_display}
{total_questions}
Questions
{correct_answers}
Correctes
{total_questions - correct_answers}
Incorrectes
{accuracy:.1f}%
Précision
📊 Détail des Réponses
{table_html}
{export_section}
"""
return (
"""🏁 C'est fini !
""",
create_white_canvas(),
f"✨ Session {config_display} terminée !",
"⏱️ Temps écoulé !",
gr.update(interactive=True),
gr.update(interactive=False),
final_results
)
def export_to_clean_dataset(session_data: list[dict], dataset_name: str = None) -> str:
"""Export vers le nouveau dataset calcul_ocr_dataset"""
if dataset_name is None:
dataset_name = DATASET_NAME # Utiliser la variable globale
if not DATASET_AVAILABLE:
return "❌ Modules dataset non disponibles"
hf_token = os.getenv("HF_TOKEN") or os.getenv("tk_calcul_ocr")
if not hf_token:
return "❌ Token HuggingFace manquant (HF_TOKEN ou tk_calcul_ocr)"
try:
print(f"\n🚀 === EXPORT VERS DATASET CALCUL OCR ===")
print(f"📊 Dataset: {dataset_name}")
# Filtrer les entrées avec images et ajouter les infos OCR globalement
clean_entries = []
# Récupérer une seule fois les infos OCR pour toute la session
try:
global_ocr_info = get_ocr_model_info()
print(f"🔍 Infos OCR globales: {global_ocr_info}")
except Exception as e:
print(f"❌ Erreur infos OCR globales: {e}")
global_ocr_info = {"model_name": "Unknown", "device": "Unknown"}
for entry in session_data:
if entry.get('has_image', False):
# Ajouter explicitement les champs OCR manquants
entry_with_ocr = entry.copy()
entry_with_ocr["ocr_model"] = global_ocr_info.get("model_name", "Unknown")
entry_with_ocr["ocr_device"] = global_ocr_info.get("device", "Unknown")
print(f"🔍 Entry avec OCR: ocr_model={entry_with_ocr['ocr_model']}, ocr_device={entry_with_ocr['ocr_device']}")
clean_entries.append(entry_with_ocr)
# Créer un dataset de test avec structure forcée
if len(clean_entries) == 0:
return "❌ Aucune entrée avec image à exporter"
# Vérifier la structure de la première entrée
sample_entry = clean_entries[0]
print(f"🔍 Structure première entrée: {list(sample_entry.keys())}")
print(f"🔍 OCR dans entrée: ocr_model={sample_entry.get('ocr_model', 'MISSING')}, ocr_device={sample_entry.get('ocr_device', 'MISSING')}")
# Charger dataset existant et combiner (IMPORTANT!)
try:
existing_dataset = load_dataset(dataset_name, split="train")
existing_data = existing_dataset.to_list()
print(f"📊 {len(existing_data)} entrées existantes trouvées")
# Combiner ancien + nouveau
combined_data = existing_data + clean_entries
clean_dataset = Dataset.from_list(combined_data)
print(f"📊 Dataset combiné: {len(existing_data)} existantes + {len(clean_entries)} nouvelles = {len(combined_data)} total")
except Exception as e:
print(f"📊 Dataset non trouvé, création nouveau: {e}")
# Si le dataset n'existe pas, créer depuis les nouvelles entrées
clean_dataset = Dataset.from_list(clean_entries)
print(f"📊 Nouveau dataset créé avec {len(clean_entries)} entrées")
print(f"✅ Dataset créé - Features:")
for feature_name in clean_dataset.features:
print(f" - {feature_name}: {clean_dataset.features[feature_name]}")
# Statistiques par opération
operations_count = {}
for entry in clean_entries:
op = entry.get('operation_type', 'unknown')
operations_count[op] = operations_count.get(op, 0) + 1
operations_summary = ", ".join([f"{op}: {count}" for op, count in operations_count.items()])
# Push vers HuggingFace
print(f"📤 Push vers {dataset_name}...")
clean_dataset.push_to_hub(
dataset_name,
private=False,
token=hf_token,
commit_message=f"Add {len(clean_entries)} handwriting samples for math OCR ({operations_summary})"
)
cleanup_memory()
return f"""✅ Session ajoutée au dataset avec succès !
📊 Dataset: {dataset_name}
📸 Images: {len(clean_entries)}
🔢 Opérations: {operations_summary}
📈 Total: {len(clean_dataset)}
🔗 Le dataset est consultable ici : https://huggingface.co/datasets/{dataset_name}"""
except Exception as e:
print(f"❌ ERREUR: {e}")
import traceback
traceback.print_exc()
return f"❌ Erreur: {str(e)}"