import sqlite3 import os # Define base path for the server directory BASE_DIR = os.path.dirname(os.path.abspath(__file__)) DB_PATH = os.path.join(BASE_DIR, "code.db") TXT_PATH = os.path.join(BASE_DIR, "code.txt") def create_table(): conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute(""" CREATE TABLE IF NOT EXISTS code ( id INTEGER PRIMARY KEY, titre TEXT, niveau INTEGER, mots_cle TEXT, enonce TEXT NOT NULL, test TEXT ) """) cursor.execute(""" CREATE TABLE IF NOT EXISTS multiplayer_sessions ( game_code TEXT PRIMARY KEY, exercise_id INTEGER, status TEXT, -- 'open', 'active', 'finished' winner TEXT, include_terminale BOOLEAN DEFAULT 0, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) cursor.execute(""" CREATE TABLE IF NOT EXISTS multiplayer_players ( game_code TEXT, username TEXT, joined_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (game_code, username) ) """) cursor.execute(""" CREATE TABLE IF NOT EXISTS br_sessions ( game_code TEXT PRIMARY KEY, niveau INTEGER, status TEXT, -- 'open', 'active', 'round_end', 'finished' winner TEXT, current_exercise_id INTEGER, used_exercises TEXT, -- JSON string list of IDs round_start_time TIMESTAMP, round_end_time TIMESTAMP, include_terminale BOOLEAN DEFAULT 0, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) cursor.execute(""" CREATE TABLE IF NOT EXISTS br_players ( game_code TEXT, username TEXT, state TEXT, -- 'playing', 'qualified', 'eliminated', 'eliminated_this_round' joined_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (game_code, username) ) """) # Data cleanup/migration for existing exercises try: cursor.execute("SELECT id, titre, niveau FROM code") rows = cursor.fetchall() for r_id, r_titre, r_niveau in rows: needs_update = False new_titre = r_titre new_niveau = r_niveau if isinstance(r_titre, str) and (r_titre.endswith('\n') or r_titre.endswith('\r')): new_titre = r_titre.strip() needs_update = True try: if isinstance(r_niveau, str): new_niveau = int(r_niveau.strip()) needs_update = True elif not isinstance(r_niveau, int): new_niveau = int(r_niveau) needs_update = True except: pass if needs_update: cursor.execute("UPDATE code SET titre = ?, niveau = ? WHERE id = ?", (new_titre, new_niveau, r_id)) conn.commit() except sqlite3.OperationalError: pass # Table might not exist yet # Migration: Add joined_at if it doesn't exist try: cursor.execute("ALTER TABLE multiplayer_players ADD COLUMN joined_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP") except sqlite3.OperationalError: pass # Column already exists try: cursor.execute("ALTER TABLE br_players ADD COLUMN joined_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP") except sqlite3.OperationalError: pass # Column already exists # Migration: Add include_terminale to sessions if missing try: cursor.execute("ALTER TABLE multiplayer_sessions ADD COLUMN include_terminale BOOLEAN DEFAULT 0") except sqlite3.OperationalError: pass # Column already exists try: cursor.execute("ALTER TABLE br_sessions ADD COLUMN include_terminale BOOLEAN DEFAULT 0") except sqlite3.OperationalError: pass # Column already exists # We load exercises from code.txt if table is empty or we want to refresh cursor.execute("SELECT count(*) FROM code") count = cursor.fetchone()[0] if count == 0: if not os.path.exists(TXT_PATH): import logging logging.error(f"CRITICAL: {TXT_PATH} not found! Exercises will not be loaded.") conn.close() return try: with open(TXT_PATH, 'r', encoding='utf-8') as f: content = f.read() # Use a more robust separator splitting that handles various line endings # Each exercise is separated by ;;; (5 parts per exercise) parts = content.split(";;;") num_exercises = 0 # An exercise has 5 fields + the separator itself creates extra parts if not handled carefully # Let's clean the parts and group them by 5 cleaned_parts = [p.strip() for p in parts if p.strip() or parts.index(p) < len(parts)-1] # Actually, the logic should be: split by ;;;, and every 5 parts is an exercise # The last part might be empty if the file ends with ;;; i = 0 exercise_id = 1 while i + 4 < len(cleaned_parts): titre = cleaned_parts[i] niveau = cleaned_parts[i+1] mots_cle = cleaned_parts[i+2] enonce = cleaned_parts[i+3] test = cleaned_parts[i+4] try: cursor.execute( "INSERT INTO code (id, titre, niveau, mots_cle, enonce, test) VALUES (?, ?, ?, ?, ?, ?)", (exercise_id, titre, int(niveau), mots_cle, enonce, test) ) exercise_id += 1 num_exercises += 1 except (ValueError, sqlite3.Error) as e: import logging logging.warning(f"Skipping malformed exercise at index {i}: {e}") i += 5 conn.commit() import logging logging.info(f"Successfully loaded {num_exercises} exercises from {TXT_PATH}") except Exception as e: import logging logging.error(f"Error loading exercises: {e}") conn.close() def return_title(): conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute("SELECT id, titre, niveau FROM code") rows = cursor.fetchall() conn.close() return [{"id" : v[0], "title" : v[1].replace("\n",""), "niveau":v[2]} for v in rows] def return_exercise(id): conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute("SELECT * FROM code WHERE id = ?", (id,)) rows = cursor.fetchall() conn.close() if not rows: return None return rows[0] def return_mot_cle(): conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute("SELECT id, mots_cle FROM code") rows = cursor.fetchall() conn.close() return [{"id" : v[0], "mot_cle" : v[1].replace("\n","")} for v in rows] def return_niveau(): conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute("SELECT id, niveau FROM code") rows = cursor.fetchall() conn.close() return [{"id" : v[0], "niveau" : v[1]} for v in rows] def raffle_exercise(niveau, include_terminale=False): import random conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() query = "SELECT id FROM code WHERE niveau = ?" params = [int(niveau)] if not include_terminale: query += " AND titre NOT LIKE '(T) %'" cursor.execute(query, tuple(params)) rows = cursor.fetchall() import logging logging.info(f"raffle_exercise: niveau={niveau}, include_terminale={include_terminale}, query={query}, params={params}, found {len(rows)} candidates") conn.close() if not rows: return None choice = random.choice(rows)[0] return choice def raffle_exercises(niveau, count=3, include_terminale=False): import random conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() query = "SELECT id FROM code" params = [] conditions = [] if niveau != 0: conditions.append("niveau = ?") params.append(int(niveau)) if not include_terminale: conditions.append("titre NOT LIKE '(T) %'") if conditions: query += " WHERE " + " AND ".join(conditions) cursor.execute(query, tuple(params)) rows = cursor.fetchall() conn.close() if not rows: return [] # Get random sample, but ensure it doesn't exceed available exercises actual_count = min(len(rows), count) choices = random.sample(rows, actual_count) return [c[0] for c in choices] def create_multiplayer_game(niveau, include_terminale=False): import string import random game_code = ''.join(random.choices(string.ascii_uppercase + string.digits, k=6)) exo_id = raffle_exercise(niveau, include_terminale) if not exo_id: import logging logging.warning(f'create_multiplayer_game: raffle_exercise returned None for level {niveau}') return None try: conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute("INSERT INTO multiplayer_sessions (game_code, exercise_id, status, winner, include_terminale) VALUES (?, ?, ?, ?, ?)", (game_code, exo_id, 'open', None, include_terminale)) conn.commit() conn.close() return game_code except Exception as e: print(f"Database error: {e}") return None def get_multiplayer_game(game_code): conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute("SELECT exercise_id, status, winner FROM multiplayer_sessions WHERE game_code = ?", (game_code,)) row = cursor.fetchone() conn.close() return row def add_player_to_game(game_code, username): conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute("INSERT OR IGNORE INTO multiplayer_players (game_code, username) VALUES (?, ?)", (game_code, username)) conn.commit() conn.close() def remove_player_from_game(game_code, username): conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute("DELETE FROM multiplayer_players WHERE game_code = ? AND username = ?", (game_code, username)) conn.commit() conn.close() def get_game_players(game_code): conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute("SELECT username FROM multiplayer_players WHERE game_code = ? ORDER BY joined_at ASC", (game_code,)) rows = cursor.fetchall() conn.close() import logging players_list = [r[0] for r in rows] logging.info(f"DEBUG: Players in {game_code}: {players_list}") return players_list def start_multiplayer_game_in_db(game_code): conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute("UPDATE multiplayer_sessions SET status = 'active' WHERE game_code = ?", (game_code,)) conn.commit() conn.close() def set_multiplayer_winner(game_code, username): conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute("SELECT winner FROM multiplayer_sessions WHERE game_code = ?", (game_code,)) row = cursor.fetchone() if row and row[0]: # Already has a winner return row[0] cursor.execute("UPDATE multiplayer_sessions SET winner = ?, status = 'finished' WHERE game_code = ?", (username, game_code)) conn.commit() conn.close() return username def replay_multiplayer_game(game_code): import random conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute("SELECT exercise_id, include_terminale FROM multiplayer_sessions WHERE game_code = ?", (game_code,)) row = cursor.fetchone() if not row: conn.close() return None exercise_id = row[0] include_terminale = bool(row[1]) if len(row) > 1 and row[1] is not None else False cursor.execute("SELECT niveau FROM code WHERE id = ?", (exercise_id,)) niveau_row = cursor.fetchone() niveau = niveau_row[0] if niveau_row else 1 query = "SELECT id FROM code WHERE niveau = ? AND id != ?" params = [int(niveau), exercise_id] if not include_terminale: query += " AND titre NOT LIKE '(T) %'" cursor.execute(query, tuple(params)) rows = cursor.fetchall() if not rows: query2 = "SELECT id FROM code WHERE niveau = ?" params2 = [int(niveau)] if not include_terminale: query2 += " AND titre NOT LIKE '(T) %'" cursor.execute(query2, tuple(params2)) rows = cursor.fetchall() new_exo_id = random.choice(rows)[0] if rows else exercise_id cursor.execute("UPDATE multiplayer_sessions SET exercise_id = ?, status = 'active', winner = NULL WHERE game_code = ?", (new_exo_id, game_code)) conn.commit() conn.close() return new_exo_id def create_br_game(niveau, include_terminale=False): import string import random game_code = ''.join(random.choices(string.ascii_uppercase + string.digits, k=6)) exo_id = raffle_exercise(niveau, include_terminale) if not exo_id: import logging logging.warning(f'create_multiplayer_game: raffle_exercise returned None for level {niveau}') return None try: conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() import json used_exercises = json.dumps([exo_id]) cursor.execute("INSERT INTO br_sessions (game_code, niveau, status, current_exercise_id, used_exercises, include_terminale) VALUES (?, ?, 'open', ?, ?, ?)", (game_code, niveau, exo_id, used_exercises, include_terminale)) conn.commit() conn.close() return game_code except Exception as e: print(f"Database error: {e}") return None def get_br_game(game_code): conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute("SELECT niveau, status, winner, current_exercise_id, used_exercises, round_start_time, round_end_time FROM br_sessions WHERE game_code = ?", (game_code,)) row = cursor.fetchone() conn.close() return row def add_player_to_br(game_code, username): conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute("INSERT OR IGNORE INTO br_players (game_code, username, state) VALUES (?, ?, 'playing')", (game_code, username)) conn.commit() conn.close() def remove_player_from_br(game_code, username): conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute("DELETE FROM br_players WHERE game_code = ? AND username = ?", (game_code, username)) conn.commit() conn.close() def get_br_players(game_code): conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute("SELECT username, state FROM br_players WHERE game_code = ? ORDER BY joined_at ASC", (game_code,)) rows = cursor.fetchall() conn.close() return [{"username": r[0], "state": r[1]} for r in rows] def start_br_round(game_code): from datetime import datetime conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() # Récupérer la session actuelle cursor.execute("SELECT niveau, used_exercises, include_terminale FROM br_sessions WHERE game_code = ?", (game_code,)) row = cursor.fetchone() if not row: conn.close() return False niveau = row[0] import json used_exercises = json.loads(row[1]) if row[1] else [] include_terminale = bool(row[2]) if len(row) > 2 and row[2] is not None else False # Choisir un nouvel exercice qui n'a pas été utilisé query = "SELECT id FROM code WHERE niveau = ?" params = [int(niveau)] if not include_terminale: query += " AND titre NOT LIKE '(T) %'" cursor.execute(query, tuple(params)) all_exos = [r[0] for r in cursor.fetchall()] available_exos = list(set(all_exos) - set(used_exercises)) import random if available_exos: new_exo_id = random.choice(available_exos) else: # Si tous les exos ont été faits, on reset la liste (ou on garde le dernier) new_exo_id = random.choice(all_exos) if all_exos else 1 used_exercises = [] used_exercises.append(new_exo_id) # Temps imparti en fonction du niveau time_limits = {1: 120, 2: 300, 3: 540, 4: 900} limit_seconds = time_limits.get(niveau, 120) now = datetime.now() # Reset l'état des joueurs qualifiés uniquement (ceux éliminés restent éliminés) cursor.execute("UPDATE br_players SET state = 'playing' WHERE game_code = ? AND state = 'qualified'", (game_code,)) # On gère l'initialisation depuis 'open' cursor.execute("UPDATE br_players SET state = 'playing' WHERE game_code = ? AND state = 'open'", (game_code,)) # if any cursor.execute(""" UPDATE br_sessions SET status = 'active', current_exercise_id = ?, used_exercises = ?, round_start_time = ?, round_end_time = NULL WHERE game_code = ? """, (new_exo_id, json.dumps(used_exercises), now.isoformat(), game_code)) conn.commit() conn.close() return True def qualify_br_player(game_code, username): conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute("UPDATE br_players SET state = 'qualified' WHERE game_code = ? AND username = ?", (game_code, username)) conn.commit() # Vérifier si on doit clore le tour (il reste 1 seul joueur en 'playing') cursor.execute("SELECT username FROM br_players WHERE game_code = ? AND state = 'playing'", (game_code,)) remaining_playing = cursor.fetchall() round_ended = False if len(remaining_playing) == 1: # Le dernier joueur est éliminé cursor.execute("UPDATE br_players SET state = 'eliminated_this_round' WHERE game_code = ? AND username = ?", (game_code, remaining_playing[0][0])) conn.commit() round_ended = True elif len(remaining_playing) == 0: # Tout le monde s'est qualifié dans la même seconde ? Ou joueur solo round_ended = True if round_ended: from datetime import datetime cursor.execute("UPDATE br_sessions SET status = 'round_end', round_end_time = ? WHERE game_code = ?", (datetime.now().isoformat(), game_code)) conn.commit() conn.close() return round_ended def check_br_time_limit(game_code): from datetime import datetime conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute("SELECT niveau, round_start_time, status FROM br_sessions WHERE game_code = ?", (game_code,)) row = cursor.fetchone() if not row or row[2] != 'active' or not row[1]: conn.close() return False niveau = row[0] start_time_str = row[1] try: start_time = datetime.fromisoformat(start_time_str) except: conn.close() return False time_limits = {1: 120, 2: 300, 3: 540, 4: 900} limit_seconds = time_limits.get(niveau, 120) now = datetime.now() elapsed = (now - start_time).total_seconds() if elapsed >= limit_seconds: # Le temps est écoulé, tous ceux encore en 'playing' sont éliminés cursor.execute("UPDATE br_players SET state = 'eliminated_this_round' WHERE game_code = ? AND state = 'playing'", (game_code,)) cursor.execute("UPDATE br_sessions SET status = 'round_end', round_end_time = ? WHERE game_code = ?", (now.isoformat(), game_code)) conn.commit() conn.close() return True conn.close() return False def process_br_round_end(game_code): from datetime import datetime conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute("SELECT status, round_end_time FROM br_sessions WHERE game_code = ?", (game_code,)) row = cursor.fetchone() if not row or row[0] != 'round_end' or not row[1]: conn.close() return None try: end_time = datetime.fromisoformat(row[1]) except: conn.close() return None now = datetime.now() elapsed = (now - end_time).total_seconds() if elapsed >= 10: # 10 secondes écoulées, on passe au tour suivant (ou on termine la partie) # 1. On passe les 'eliminated_this_round' en 'eliminated' cursor.execute("UPDATE br_players SET state = 'eliminated' WHERE game_code = ? AND state = 'eliminated_this_round'", (game_code,)) conn.commit() # 2. Vérifier combien de qualifiés il reste cursor.execute("SELECT username FROM br_players WHERE game_code = ? AND state = 'qualified'", (game_code,)) qualified = [r[0] for r in cursor.fetchall()] if len(qualified) <= 1: # Fin de la partie winner = qualified[0] if len(qualified) == 1 else None cursor.execute("UPDATE br_sessions SET status = 'finished', winner = ? WHERE game_code = ?", (winner, game_code)) conn.commit() conn.close() return "finished" else: # Lancer le tour suivant conn.close() start_br_round(game_code) return "next_round" conn.close() return "waiting"