Spaces:
Running
Running
| import sqlite3 | |
| import os | |
| # Define base path for the server directory | |
| BASE_DIR = os.path.dirname(os.path.abspath(__file__)) | |
| DB_PATH = os.path.join(BASE_DIR, "code.db") | |
| TXT_PATH = os.path.join(BASE_DIR, "code.txt") | |
| def create_table(): | |
| conn = sqlite3.connect(DB_PATH) | |
| cursor = conn.cursor() | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS code ( | |
| id INTEGER PRIMARY KEY, | |
| titre TEXT, | |
| niveau INTEGER, | |
| mots_cle TEXT, | |
| enonce TEXT NOT NULL, | |
| test TEXT | |
| ) | |
| """) | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS multiplayer_sessions ( | |
| game_code TEXT PRIMARY KEY, | |
| exercise_id INTEGER, | |
| status TEXT, -- 'open', 'active', 'finished' | |
| winner TEXT, | |
| include_terminale BOOLEAN DEFAULT 0, | |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
| ) | |
| """) | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS multiplayer_players ( | |
| game_code TEXT, | |
| username TEXT, | |
| joined_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| PRIMARY KEY (game_code, username) | |
| ) | |
| """) | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS br_sessions ( | |
| game_code TEXT PRIMARY KEY, | |
| niveau INTEGER, | |
| status TEXT, -- 'open', 'active', 'round_end', 'finished' | |
| winner TEXT, | |
| current_exercise_id INTEGER, | |
| used_exercises TEXT, -- JSON string list of IDs | |
| round_start_time TIMESTAMP, | |
| round_end_time TIMESTAMP, | |
| include_terminale BOOLEAN DEFAULT 0, | |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
| ) | |
| """) | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS br_players ( | |
| game_code TEXT, | |
| username TEXT, | |
| state TEXT, -- 'playing', 'qualified', 'eliminated', 'eliminated_this_round' | |
| joined_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| PRIMARY KEY (game_code, username) | |
| ) | |
| """) | |
| # Data cleanup/migration for existing exercises | |
| try: | |
| cursor.execute("SELECT id, titre, niveau FROM code") | |
| rows = cursor.fetchall() | |
| for r_id, r_titre, r_niveau in rows: | |
| needs_update = False | |
| new_titre = r_titre | |
| new_niveau = r_niveau | |
| if isinstance(r_titre, str) and (r_titre.endswith('\n') or r_titre.endswith('\r')): | |
| new_titre = r_titre.strip() | |
| needs_update = True | |
| try: | |
| if isinstance(r_niveau, str): | |
| new_niveau = int(r_niveau.strip()) | |
| needs_update = True | |
| elif not isinstance(r_niveau, int): | |
| new_niveau = int(r_niveau) | |
| needs_update = True | |
| except: | |
| pass | |
| if needs_update: | |
| cursor.execute("UPDATE code SET titre = ?, niveau = ? WHERE id = ?", (new_titre, new_niveau, r_id)) | |
| conn.commit() | |
| except sqlite3.OperationalError: | |
| pass # Table might not exist yet | |
| # Migration: Add joined_at if it doesn't exist | |
| try: | |
| cursor.execute("ALTER TABLE multiplayer_players ADD COLUMN joined_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP") | |
| except sqlite3.OperationalError: | |
| pass # Column already exists | |
| try: | |
| cursor.execute("ALTER TABLE br_players ADD COLUMN joined_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP") | |
| except sqlite3.OperationalError: | |
| pass # Column already exists | |
| # Migration: Add include_terminale to sessions if missing | |
| try: | |
| cursor.execute("ALTER TABLE multiplayer_sessions ADD COLUMN include_terminale BOOLEAN DEFAULT 0") | |
| except sqlite3.OperationalError: | |
| pass # Column already exists | |
| try: | |
| cursor.execute("ALTER TABLE br_sessions ADD COLUMN include_terminale BOOLEAN DEFAULT 0") | |
| except sqlite3.OperationalError: | |
| pass # Column already exists | |
| # We load exercises from code.txt if table is empty or we want to refresh | |
| cursor.execute("SELECT count(*) FROM code") | |
| count = cursor.fetchone()[0] | |
| if count == 0: | |
| if not os.path.exists(TXT_PATH): | |
| import logging | |
| logging.error(f"CRITICAL: {TXT_PATH} not found! Exercises will not be loaded.") | |
| conn.close() | |
| return | |
| try: | |
| with open(TXT_PATH, 'r', encoding='utf-8') as f: | |
| content = f.read() | |
| # Use a more robust separator splitting that handles various line endings | |
| # Each exercise is separated by ;;; (5 parts per exercise) | |
| parts = content.split(";;;") | |
| num_exercises = 0 | |
| # An exercise has 5 fields + the separator itself creates extra parts if not handled carefully | |
| # Let's clean the parts and group them by 5 | |
| cleaned_parts = [p.strip() for p in parts if p.strip() or parts.index(p) < len(parts)-1] | |
| # Actually, the logic should be: split by ;;;, and every 5 parts is an exercise | |
| # The last part might be empty if the file ends with ;;; | |
| i = 0 | |
| exercise_id = 1 | |
| while i + 4 < len(cleaned_parts): | |
| titre = cleaned_parts[i] | |
| niveau = cleaned_parts[i+1] | |
| mots_cle = cleaned_parts[i+2] | |
| enonce = cleaned_parts[i+3] | |
| test = cleaned_parts[i+4] | |
| try: | |
| cursor.execute( | |
| "INSERT INTO code (id, titre, niveau, mots_cle, enonce, test) VALUES (?, ?, ?, ?, ?, ?)", | |
| (exercise_id, titre, int(niveau), mots_cle, enonce, test) | |
| ) | |
| exercise_id += 1 | |
| num_exercises += 1 | |
| except (ValueError, sqlite3.Error) as e: | |
| import logging | |
| logging.warning(f"Skipping malformed exercise at index {i}: {e}") | |
| i += 5 | |
| conn.commit() | |
| import logging | |
| logging.info(f"Successfully loaded {num_exercises} exercises from {TXT_PATH}") | |
| except Exception as e: | |
| import logging | |
| logging.error(f"Error loading exercises: {e}") | |
| conn.close() | |
| def return_title(): | |
| conn = sqlite3.connect(DB_PATH) | |
| cursor = conn.cursor() | |
| cursor.execute("SELECT id, titre, niveau FROM code") | |
| rows = cursor.fetchall() | |
| conn.close() | |
| return [{"id" : v[0], "title" : v[1].replace("\n",""), "niveau":v[2]} for v in rows] | |
| def return_exercise(id): | |
| conn = sqlite3.connect(DB_PATH) | |
| cursor = conn.cursor() | |
| cursor.execute("SELECT * FROM code WHERE id = ?", (id,)) | |
| rows = cursor.fetchall() | |
| conn.close() | |
| if not rows: | |
| return None | |
| return rows[0] | |
| def return_mot_cle(): | |
| conn = sqlite3.connect(DB_PATH) | |
| cursor = conn.cursor() | |
| cursor.execute("SELECT id, mots_cle FROM code") | |
| rows = cursor.fetchall() | |
| conn.close() | |
| return [{"id" : v[0], "mot_cle" : v[1].replace("\n","")} for v in rows] | |
| def return_niveau(): | |
| conn = sqlite3.connect(DB_PATH) | |
| cursor = conn.cursor() | |
| cursor.execute("SELECT id, niveau FROM code") | |
| rows = cursor.fetchall() | |
| conn.close() | |
| return [{"id" : v[0], "niveau" : v[1]} for v in rows] | |
| def raffle_exercise(niveau, include_terminale=False): | |
| import random | |
| conn = sqlite3.connect(DB_PATH) | |
| cursor = conn.cursor() | |
| query = "SELECT id FROM code WHERE niveau = ?" | |
| params = [int(niveau)] | |
| if not include_terminale: | |
| query += " AND titre NOT LIKE '(T) %'" | |
| cursor.execute(query, tuple(params)) | |
| rows = cursor.fetchall() | |
| import logging | |
| logging.info(f"raffle_exercise: niveau={niveau}, include_terminale={include_terminale}, query={query}, params={params}, found {len(rows)} candidates") | |
| conn.close() | |
| if not rows: | |
| return None | |
| choice = random.choice(rows)[0] | |
| return choice | |
| def raffle_exercises(niveau, count=3, include_terminale=False): | |
| import random | |
| conn = sqlite3.connect(DB_PATH) | |
| cursor = conn.cursor() | |
| query = "SELECT id FROM code" | |
| params = [] | |
| conditions = [] | |
| if niveau != 0: | |
| conditions.append("niveau = ?") | |
| params.append(int(niveau)) | |
| if not include_terminale: | |
| conditions.append("titre NOT LIKE '(T) %'") | |
| if conditions: | |
| query += " WHERE " + " AND ".join(conditions) | |
| cursor.execute(query, tuple(params)) | |
| rows = cursor.fetchall() | |
| conn.close() | |
| if not rows: | |
| return [] | |
| # Get random sample, but ensure it doesn't exceed available exercises | |
| actual_count = min(len(rows), count) | |
| choices = random.sample(rows, actual_count) | |
| return [c[0] for c in choices] | |
| def create_multiplayer_game(niveau, include_terminale=False): | |
| import string | |
| import random | |
| game_code = ''.join(random.choices(string.ascii_uppercase + string.digits, k=6)) | |
| exo_id = raffle_exercise(niveau, include_terminale) | |
| if not exo_id: | |
| import logging | |
| logging.warning(f'create_multiplayer_game: raffle_exercise returned None for level {niveau}') | |
| return None | |
| try: | |
| conn = sqlite3.connect(DB_PATH) | |
| cursor = conn.cursor() | |
| cursor.execute("INSERT INTO multiplayer_sessions (game_code, exercise_id, status, winner, include_terminale) VALUES (?, ?, ?, ?, ?)", | |
| (game_code, exo_id, 'open', None, include_terminale)) | |
| conn.commit() | |
| conn.close() | |
| return game_code | |
| except Exception as e: | |
| print(f"Database error: {e}") | |
| return None | |
| def get_multiplayer_game(game_code): | |
| conn = sqlite3.connect(DB_PATH) | |
| cursor = conn.cursor() | |
| cursor.execute("SELECT exercise_id, status, winner FROM multiplayer_sessions WHERE game_code = ?", (game_code,)) | |
| row = cursor.fetchone() | |
| conn.close() | |
| return row | |
| def add_player_to_game(game_code, username): | |
| conn = sqlite3.connect(DB_PATH) | |
| cursor = conn.cursor() | |
| cursor.execute("INSERT OR IGNORE INTO multiplayer_players (game_code, username) VALUES (?, ?)", (game_code, username)) | |
| conn.commit() | |
| conn.close() | |
| def remove_player_from_game(game_code, username): | |
| conn = sqlite3.connect(DB_PATH) | |
| cursor = conn.cursor() | |
| cursor.execute("DELETE FROM multiplayer_players WHERE game_code = ? AND username = ?", (game_code, username)) | |
| conn.commit() | |
| conn.close() | |
| def get_game_players(game_code): | |
| conn = sqlite3.connect(DB_PATH) | |
| cursor = conn.cursor() | |
| cursor.execute("SELECT username FROM multiplayer_players WHERE game_code = ? ORDER BY joined_at ASC", (game_code,)) | |
| rows = cursor.fetchall() | |
| conn.close() | |
| import logging | |
| players_list = [r[0] for r in rows] | |
| logging.info(f"DEBUG: Players in {game_code}: {players_list}") | |
| return players_list | |
| def start_multiplayer_game_in_db(game_code): | |
| conn = sqlite3.connect(DB_PATH) | |
| cursor = conn.cursor() | |
| cursor.execute("UPDATE multiplayer_sessions SET status = 'active' WHERE game_code = ?", (game_code,)) | |
| conn.commit() | |
| conn.close() | |
| def set_multiplayer_winner(game_code, username): | |
| conn = sqlite3.connect(DB_PATH) | |
| cursor = conn.cursor() | |
| cursor.execute("SELECT winner FROM multiplayer_sessions WHERE game_code = ?", (game_code,)) | |
| row = cursor.fetchone() | |
| if row and row[0]: # Already has a winner | |
| return row[0] | |
| cursor.execute("UPDATE multiplayer_sessions SET winner = ?, status = 'finished' WHERE game_code = ?", (username, game_code)) | |
| conn.commit() | |
| conn.close() | |
| return username | |
| def replay_multiplayer_game(game_code): | |
| import random | |
| conn = sqlite3.connect(DB_PATH) | |
| cursor = conn.cursor() | |
| cursor.execute("SELECT exercise_id, include_terminale FROM multiplayer_sessions WHERE game_code = ?", (game_code,)) | |
| row = cursor.fetchone() | |
| if not row: | |
| conn.close() | |
| return None | |
| exercise_id = row[0] | |
| include_terminale = bool(row[1]) if len(row) > 1 and row[1] is not None else False | |
| cursor.execute("SELECT niveau FROM code WHERE id = ?", (exercise_id,)) | |
| niveau_row = cursor.fetchone() | |
| niveau = niveau_row[0] if niveau_row else 1 | |
| query = "SELECT id FROM code WHERE niveau = ? AND id != ?" | |
| params = [int(niveau), exercise_id] | |
| if not include_terminale: | |
| query += " AND titre NOT LIKE '(T) %'" | |
| cursor.execute(query, tuple(params)) | |
| rows = cursor.fetchall() | |
| if not rows: | |
| query2 = "SELECT id FROM code WHERE niveau = ?" | |
| params2 = [int(niveau)] | |
| if not include_terminale: | |
| query2 += " AND titre NOT LIKE '(T) %'" | |
| cursor.execute(query2, tuple(params2)) | |
| rows = cursor.fetchall() | |
| new_exo_id = random.choice(rows)[0] if rows else exercise_id | |
| cursor.execute("UPDATE multiplayer_sessions SET exercise_id = ?, status = 'active', winner = NULL WHERE game_code = ?", (new_exo_id, game_code)) | |
| conn.commit() | |
| conn.close() | |
| return new_exo_id | |
| def create_br_game(niveau, include_terminale=False): | |
| import string | |
| import random | |
| game_code = ''.join(random.choices(string.ascii_uppercase + string.digits, k=6)) | |
| exo_id = raffle_exercise(niveau, include_terminale) | |
| if not exo_id: | |
| import logging | |
| logging.warning(f'create_multiplayer_game: raffle_exercise returned None for level {niveau}') | |
| return None | |
| try: | |
| conn = sqlite3.connect(DB_PATH) | |
| cursor = conn.cursor() | |
| import json | |
| used_exercises = json.dumps([exo_id]) | |
| cursor.execute("INSERT INTO br_sessions (game_code, niveau, status, current_exercise_id, used_exercises, include_terminale) VALUES (?, ?, 'open', ?, ?, ?)", | |
| (game_code, niveau, exo_id, used_exercises, include_terminale)) | |
| conn.commit() | |
| conn.close() | |
| return game_code | |
| except Exception as e: | |
| print(f"Database error: {e}") | |
| return None | |
| def get_br_game(game_code): | |
| conn = sqlite3.connect(DB_PATH) | |
| cursor = conn.cursor() | |
| cursor.execute("SELECT niveau, status, winner, current_exercise_id, used_exercises, round_start_time, round_end_time FROM br_sessions WHERE game_code = ?", (game_code,)) | |
| row = cursor.fetchone() | |
| conn.close() | |
| return row | |
| def add_player_to_br(game_code, username): | |
| conn = sqlite3.connect(DB_PATH) | |
| cursor = conn.cursor() | |
| cursor.execute("INSERT OR IGNORE INTO br_players (game_code, username, state) VALUES (?, ?, 'playing')", (game_code, username)) | |
| conn.commit() | |
| conn.close() | |
| def remove_player_from_br(game_code, username): | |
| conn = sqlite3.connect(DB_PATH) | |
| cursor = conn.cursor() | |
| cursor.execute("DELETE FROM br_players WHERE game_code = ? AND username = ?", (game_code, username)) | |
| conn.commit() | |
| conn.close() | |
| def get_br_players(game_code): | |
| conn = sqlite3.connect(DB_PATH) | |
| cursor = conn.cursor() | |
| cursor.execute("SELECT username, state FROM br_players WHERE game_code = ? ORDER BY joined_at ASC", (game_code,)) | |
| rows = cursor.fetchall() | |
| conn.close() | |
| return [{"username": r[0], "state": r[1]} for r in rows] | |
| def start_br_round(game_code): | |
| from datetime import datetime | |
| conn = sqlite3.connect(DB_PATH) | |
| cursor = conn.cursor() | |
| # Récupérer la session actuelle | |
| cursor.execute("SELECT niveau, used_exercises, include_terminale FROM br_sessions WHERE game_code = ?", (game_code,)) | |
| row = cursor.fetchone() | |
| if not row: | |
| conn.close() | |
| return False | |
| niveau = row[0] | |
| import json | |
| used_exercises = json.loads(row[1]) if row[1] else [] | |
| include_terminale = bool(row[2]) if len(row) > 2 and row[2] is not None else False | |
| # Choisir un nouvel exercice qui n'a pas été utilisé | |
| query = "SELECT id FROM code WHERE niveau = ?" | |
| params = [int(niveau)] | |
| if not include_terminale: | |
| query += " AND titre NOT LIKE '(T) %'" | |
| cursor.execute(query, tuple(params)) | |
| all_exos = [r[0] for r in cursor.fetchall()] | |
| available_exos = list(set(all_exos) - set(used_exercises)) | |
| import random | |
| if available_exos: | |
| new_exo_id = random.choice(available_exos) | |
| else: | |
| # Si tous les exos ont été faits, on reset la liste (ou on garde le dernier) | |
| new_exo_id = random.choice(all_exos) if all_exos else 1 | |
| used_exercises = [] | |
| used_exercises.append(new_exo_id) | |
| # Temps imparti en fonction du niveau | |
| time_limits = {1: 120, 2: 300, 3: 540, 4: 900} | |
| limit_seconds = time_limits.get(niveau, 120) | |
| now = datetime.now() | |
| # Reset l'état des joueurs qualifiés uniquement (ceux éliminés restent éliminés) | |
| cursor.execute("UPDATE br_players SET state = 'playing' WHERE game_code = ? AND state = 'qualified'", (game_code,)) | |
| # On gère l'initialisation depuis 'open' | |
| cursor.execute("UPDATE br_players SET state = 'playing' WHERE game_code = ? AND state = 'open'", (game_code,)) # if any | |
| cursor.execute(""" | |
| UPDATE br_sessions | |
| SET status = 'active', | |
| current_exercise_id = ?, | |
| used_exercises = ?, | |
| round_start_time = ?, | |
| round_end_time = NULL | |
| WHERE game_code = ? | |
| """, (new_exo_id, json.dumps(used_exercises), now.isoformat(), game_code)) | |
| conn.commit() | |
| conn.close() | |
| return True | |
| def qualify_br_player(game_code, username): | |
| conn = sqlite3.connect(DB_PATH) | |
| cursor = conn.cursor() | |
| cursor.execute("UPDATE br_players SET state = 'qualified' WHERE game_code = ? AND username = ?", (game_code, username)) | |
| conn.commit() | |
| # Vérifier si on doit clore le tour (il reste 1 seul joueur en 'playing') | |
| cursor.execute("SELECT username FROM br_players WHERE game_code = ? AND state = 'playing'", (game_code,)) | |
| remaining_playing = cursor.fetchall() | |
| round_ended = False | |
| if len(remaining_playing) == 1: | |
| # Le dernier joueur est éliminé | |
| cursor.execute("UPDATE br_players SET state = 'eliminated_this_round' WHERE game_code = ? AND username = ?", (game_code, remaining_playing[0][0])) | |
| conn.commit() | |
| round_ended = True | |
| elif len(remaining_playing) == 0: | |
| # Tout le monde s'est qualifié dans la même seconde ? Ou joueur solo | |
| round_ended = True | |
| if round_ended: | |
| from datetime import datetime | |
| cursor.execute("UPDATE br_sessions SET status = 'round_end', round_end_time = ? WHERE game_code = ?", (datetime.now().isoformat(), game_code)) | |
| conn.commit() | |
| conn.close() | |
| return round_ended | |
| def check_br_time_limit(game_code): | |
| from datetime import datetime | |
| conn = sqlite3.connect(DB_PATH) | |
| cursor = conn.cursor() | |
| cursor.execute("SELECT niveau, round_start_time, status FROM br_sessions WHERE game_code = ?", (game_code,)) | |
| row = cursor.fetchone() | |
| if not row or row[2] != 'active' or not row[1]: | |
| conn.close() | |
| return False | |
| niveau = row[0] | |
| start_time_str = row[1] | |
| try: | |
| start_time = datetime.fromisoformat(start_time_str) | |
| except: | |
| conn.close() | |
| return False | |
| time_limits = {1: 120, 2: 300, 3: 540, 4: 900} | |
| limit_seconds = time_limits.get(niveau, 120) | |
| now = datetime.now() | |
| elapsed = (now - start_time).total_seconds() | |
| if elapsed >= limit_seconds: | |
| # Le temps est écoulé, tous ceux encore en 'playing' sont éliminés | |
| cursor.execute("UPDATE br_players SET state = 'eliminated_this_round' WHERE game_code = ? AND state = 'playing'", (game_code,)) | |
| cursor.execute("UPDATE br_sessions SET status = 'round_end', round_end_time = ? WHERE game_code = ?", (now.isoformat(), game_code)) | |
| conn.commit() | |
| conn.close() | |
| return True | |
| conn.close() | |
| return False | |
| def process_br_round_end(game_code): | |
| from datetime import datetime | |
| conn = sqlite3.connect(DB_PATH) | |
| cursor = conn.cursor() | |
| cursor.execute("SELECT status, round_end_time FROM br_sessions WHERE game_code = ?", (game_code,)) | |
| row = cursor.fetchone() | |
| if not row or row[0] != 'round_end' or not row[1]: | |
| conn.close() | |
| return None | |
| try: | |
| end_time = datetime.fromisoformat(row[1]) | |
| except: | |
| conn.close() | |
| return None | |
| now = datetime.now() | |
| elapsed = (now - end_time).total_seconds() | |
| if elapsed >= 10: | |
| # 10 secondes écoulées, on passe au tour suivant (ou on termine la partie) | |
| # 1. On passe les 'eliminated_this_round' en 'eliminated' | |
| cursor.execute("UPDATE br_players SET state = 'eliminated' WHERE game_code = ? AND state = 'eliminated_this_round'", (game_code,)) | |
| conn.commit() | |
| # 2. Vérifier combien de qualifiés il reste | |
| cursor.execute("SELECT username FROM br_players WHERE game_code = ? AND state = 'qualified'", (game_code,)) | |
| qualified = [r[0] for r in cursor.fetchall()] | |
| if len(qualified) <= 1: | |
| # Fin de la partie | |
| winner = qualified[0] if len(qualified) == 1 else None | |
| cursor.execute("UPDATE br_sessions SET status = 'finished', winner = ? WHERE game_code = ?", (winner, game_code)) | |
| conn.commit() | |
| conn.close() | |
| return "finished" | |
| else: | |
| # Lancer le tour suivant | |
| conn.close() | |
| start_br_round(game_code) | |
| return "next_round" | |
| conn.close() | |
| return "waiting" | |