exopy_ggame / database.py
dav74's picture
Upload database.py
f725b53 verified
import sqlite3
import os
# Define base path for the server directory
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
DB_PATH = os.path.join(BASE_DIR, "code.db")
TXT_PATH = os.path.join(BASE_DIR, "code.txt")
def create_table():
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS code (
id INTEGER PRIMARY KEY,
titre TEXT,
niveau INTEGER,
mots_cle TEXT,
enonce TEXT NOT NULL,
test TEXT
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS multiplayer_sessions (
game_code TEXT PRIMARY KEY,
exercise_id INTEGER,
status TEXT, -- 'open', 'active', 'finished'
winner TEXT,
include_terminale BOOLEAN DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS multiplayer_players (
game_code TEXT,
username TEXT,
joined_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (game_code, username)
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS br_sessions (
game_code TEXT PRIMARY KEY,
niveau INTEGER,
status TEXT, -- 'open', 'active', 'round_end', 'finished'
winner TEXT,
current_exercise_id INTEGER,
used_exercises TEXT, -- JSON string list of IDs
round_start_time TIMESTAMP,
round_end_time TIMESTAMP,
include_terminale BOOLEAN DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS br_players (
game_code TEXT,
username TEXT,
state TEXT, -- 'playing', 'qualified', 'eliminated', 'eliminated_this_round'
joined_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (game_code, username)
)
""")
# Data cleanup/migration for existing exercises
try:
cursor.execute("SELECT id, titre, niveau FROM code")
rows = cursor.fetchall()
for r_id, r_titre, r_niveau in rows:
needs_update = False
new_titre = r_titre
new_niveau = r_niveau
if isinstance(r_titre, str) and (r_titre.endswith('\n') or r_titre.endswith('\r')):
new_titre = r_titre.strip()
needs_update = True
try:
if isinstance(r_niveau, str):
new_niveau = int(r_niveau.strip())
needs_update = True
elif not isinstance(r_niveau, int):
new_niveau = int(r_niveau)
needs_update = True
except:
pass
if needs_update:
cursor.execute("UPDATE code SET titre = ?, niveau = ? WHERE id = ?", (new_titre, new_niveau, r_id))
conn.commit()
except sqlite3.OperationalError:
pass # Table might not exist yet
# Migration: Add joined_at if it doesn't exist
try:
cursor.execute("ALTER TABLE multiplayer_players ADD COLUMN joined_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP")
except sqlite3.OperationalError:
pass # Column already exists
try:
cursor.execute("ALTER TABLE br_players ADD COLUMN joined_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP")
except sqlite3.OperationalError:
pass # Column already exists
# Migration: Add include_terminale to sessions if missing
try:
cursor.execute("ALTER TABLE multiplayer_sessions ADD COLUMN include_terminale BOOLEAN DEFAULT 0")
except sqlite3.OperationalError:
pass # Column already exists
try:
cursor.execute("ALTER TABLE br_sessions ADD COLUMN include_terminale BOOLEAN DEFAULT 0")
except sqlite3.OperationalError:
pass # Column already exists
# We load exercises from code.txt if table is empty or we want to refresh
cursor.execute("SELECT count(*) FROM code")
count = cursor.fetchone()[0]
if count == 0:
if not os.path.exists(TXT_PATH):
import logging
logging.error(f"CRITICAL: {TXT_PATH} not found! Exercises will not be loaded.")
conn.close()
return
try:
with open(TXT_PATH, 'r', encoding='utf-8') as f:
content = f.read()
# Use a more robust separator splitting that handles various line endings
# Each exercise is separated by ;;; (5 parts per exercise)
parts = content.split(";;;")
num_exercises = 0
# An exercise has 5 fields + the separator itself creates extra parts if not handled carefully
# Let's clean the parts and group them by 5
cleaned_parts = [p.strip() for p in parts if p.strip() or parts.index(p) < len(parts)-1]
# Actually, the logic should be: split by ;;;, and every 5 parts is an exercise
# The last part might be empty if the file ends with ;;;
i = 0
exercise_id = 1
while i + 4 < len(cleaned_parts):
titre = cleaned_parts[i]
niveau = cleaned_parts[i+1]
mots_cle = cleaned_parts[i+2]
enonce = cleaned_parts[i+3]
test = cleaned_parts[i+4]
try:
cursor.execute(
"INSERT INTO code (id, titre, niveau, mots_cle, enonce, test) VALUES (?, ?, ?, ?, ?, ?)",
(exercise_id, titre, int(niveau), mots_cle, enonce, test)
)
exercise_id += 1
num_exercises += 1
except (ValueError, sqlite3.Error) as e:
import logging
logging.warning(f"Skipping malformed exercise at index {i}: {e}")
i += 5
conn.commit()
import logging
logging.info(f"Successfully loaded {num_exercises} exercises from {TXT_PATH}")
except Exception as e:
import logging
logging.error(f"Error loading exercises: {e}")
conn.close()
def return_title():
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT id, titre, niveau FROM code")
rows = cursor.fetchall()
conn.close()
return [{"id" : v[0], "title" : v[1].replace("\n",""), "niveau":v[2]} for v in rows]
def return_exercise(id):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT * FROM code WHERE id = ?", (id,))
rows = cursor.fetchall()
conn.close()
if not rows:
return None
return rows[0]
def return_mot_cle():
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT id, mots_cle FROM code")
rows = cursor.fetchall()
conn.close()
return [{"id" : v[0], "mot_cle" : v[1].replace("\n","")} for v in rows]
def return_niveau():
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT id, niveau FROM code")
rows = cursor.fetchall()
conn.close()
return [{"id" : v[0], "niveau" : v[1]} for v in rows]
def raffle_exercise(niveau, include_terminale=False):
import random
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
query = "SELECT id FROM code WHERE niveau = ?"
params = [int(niveau)]
if not include_terminale:
query += " AND titre NOT LIKE '(T) %'"
cursor.execute(query, tuple(params))
rows = cursor.fetchall()
import logging
logging.info(f"raffle_exercise: niveau={niveau}, include_terminale={include_terminale}, query={query}, params={params}, found {len(rows)} candidates")
conn.close()
if not rows:
return None
choice = random.choice(rows)[0]
return choice
def raffle_exercises(niveau, count=3, include_terminale=False):
import random
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
query = "SELECT id FROM code"
params = []
conditions = []
if niveau != 0:
conditions.append("niveau = ?")
params.append(int(niveau))
if not include_terminale:
conditions.append("titre NOT LIKE '(T) %'")
if conditions:
query += " WHERE " + " AND ".join(conditions)
cursor.execute(query, tuple(params))
rows = cursor.fetchall()
conn.close()
if not rows:
return []
# Get random sample, but ensure it doesn't exceed available exercises
actual_count = min(len(rows), count)
choices = random.sample(rows, actual_count)
return [c[0] for c in choices]
def create_multiplayer_game(niveau, include_terminale=False):
import string
import random
game_code = ''.join(random.choices(string.ascii_uppercase + string.digits, k=6))
exo_id = raffle_exercise(niveau, include_terminale)
if not exo_id:
import logging
logging.warning(f'create_multiplayer_game: raffle_exercise returned None for level {niveau}')
return None
try:
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("INSERT INTO multiplayer_sessions (game_code, exercise_id, status, winner, include_terminale) VALUES (?, ?, ?, ?, ?)",
(game_code, exo_id, 'open', None, include_terminale))
conn.commit()
conn.close()
return game_code
except Exception as e:
print(f"Database error: {e}")
return None
def get_multiplayer_game(game_code):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT exercise_id, status, winner FROM multiplayer_sessions WHERE game_code = ?", (game_code,))
row = cursor.fetchone()
conn.close()
return row
def add_player_to_game(game_code, username):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("INSERT OR IGNORE INTO multiplayer_players (game_code, username) VALUES (?, ?)", (game_code, username))
conn.commit()
conn.close()
def remove_player_from_game(game_code, username):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("DELETE FROM multiplayer_players WHERE game_code = ? AND username = ?", (game_code, username))
conn.commit()
conn.close()
def get_game_players(game_code):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT username FROM multiplayer_players WHERE game_code = ? ORDER BY joined_at ASC", (game_code,))
rows = cursor.fetchall()
conn.close()
import logging
players_list = [r[0] for r in rows]
logging.info(f"DEBUG: Players in {game_code}: {players_list}")
return players_list
def start_multiplayer_game_in_db(game_code):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("UPDATE multiplayer_sessions SET status = 'active' WHERE game_code = ?", (game_code,))
conn.commit()
conn.close()
def set_multiplayer_winner(game_code, username):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT winner FROM multiplayer_sessions WHERE game_code = ?", (game_code,))
row = cursor.fetchone()
if row and row[0]: # Already has a winner
return row[0]
cursor.execute("UPDATE multiplayer_sessions SET winner = ?, status = 'finished' WHERE game_code = ?", (username, game_code))
conn.commit()
conn.close()
return username
def replay_multiplayer_game(game_code):
import random
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT exercise_id, include_terminale FROM multiplayer_sessions WHERE game_code = ?", (game_code,))
row = cursor.fetchone()
if not row:
conn.close()
return None
exercise_id = row[0]
include_terminale = bool(row[1]) if len(row) > 1 and row[1] is not None else False
cursor.execute("SELECT niveau FROM code WHERE id = ?", (exercise_id,))
niveau_row = cursor.fetchone()
niveau = niveau_row[0] if niveau_row else 1
query = "SELECT id FROM code WHERE niveau = ? AND id != ?"
params = [int(niveau), exercise_id]
if not include_terminale:
query += " AND titre NOT LIKE '(T) %'"
cursor.execute(query, tuple(params))
rows = cursor.fetchall()
if not rows:
query2 = "SELECT id FROM code WHERE niveau = ?"
params2 = [int(niveau)]
if not include_terminale:
query2 += " AND titre NOT LIKE '(T) %'"
cursor.execute(query2, tuple(params2))
rows = cursor.fetchall()
new_exo_id = random.choice(rows)[0] if rows else exercise_id
cursor.execute("UPDATE multiplayer_sessions SET exercise_id = ?, status = 'active', winner = NULL WHERE game_code = ?", (new_exo_id, game_code))
conn.commit()
conn.close()
return new_exo_id
def create_br_game(niveau, include_terminale=False):
import string
import random
game_code = ''.join(random.choices(string.ascii_uppercase + string.digits, k=6))
exo_id = raffle_exercise(niveau, include_terminale)
if not exo_id:
import logging
logging.warning(f'create_multiplayer_game: raffle_exercise returned None for level {niveau}')
return None
try:
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
import json
used_exercises = json.dumps([exo_id])
cursor.execute("INSERT INTO br_sessions (game_code, niveau, status, current_exercise_id, used_exercises, include_terminale) VALUES (?, ?, 'open', ?, ?, ?)",
(game_code, niveau, exo_id, used_exercises, include_terminale))
conn.commit()
conn.close()
return game_code
except Exception as e:
print(f"Database error: {e}")
return None
def get_br_game(game_code):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT niveau, status, winner, current_exercise_id, used_exercises, round_start_time, round_end_time FROM br_sessions WHERE game_code = ?", (game_code,))
row = cursor.fetchone()
conn.close()
return row
def add_player_to_br(game_code, username):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("INSERT OR IGNORE INTO br_players (game_code, username, state) VALUES (?, ?, 'playing')", (game_code, username))
conn.commit()
conn.close()
def remove_player_from_br(game_code, username):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("DELETE FROM br_players WHERE game_code = ? AND username = ?", (game_code, username))
conn.commit()
conn.close()
def get_br_players(game_code):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT username, state FROM br_players WHERE game_code = ? ORDER BY joined_at ASC", (game_code,))
rows = cursor.fetchall()
conn.close()
return [{"username": r[0], "state": r[1]} for r in rows]
def start_br_round(game_code):
from datetime import datetime
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# Récupérer la session actuelle
cursor.execute("SELECT niveau, used_exercises, include_terminale FROM br_sessions WHERE game_code = ?", (game_code,))
row = cursor.fetchone()
if not row:
conn.close()
return False
niveau = row[0]
import json
used_exercises = json.loads(row[1]) if row[1] else []
include_terminale = bool(row[2]) if len(row) > 2 and row[2] is not None else False
# Choisir un nouvel exercice qui n'a pas été utilisé
query = "SELECT id FROM code WHERE niveau = ?"
params = [int(niveau)]
if not include_terminale:
query += " AND titre NOT LIKE '(T) %'"
cursor.execute(query, tuple(params))
all_exos = [r[0] for r in cursor.fetchall()]
available_exos = list(set(all_exos) - set(used_exercises))
import random
if available_exos:
new_exo_id = random.choice(available_exos)
else:
# Si tous les exos ont été faits, on reset la liste (ou on garde le dernier)
new_exo_id = random.choice(all_exos) if all_exos else 1
used_exercises = []
used_exercises.append(new_exo_id)
# Temps imparti en fonction du niveau
time_limits = {1: 120, 2: 300, 3: 540, 4: 900}
limit_seconds = time_limits.get(niveau, 120)
now = datetime.now()
# Reset l'état des joueurs qualifiés uniquement (ceux éliminés restent éliminés)
cursor.execute("UPDATE br_players SET state = 'playing' WHERE game_code = ? AND state = 'qualified'", (game_code,))
# On gère l'initialisation depuis 'open'
cursor.execute("UPDATE br_players SET state = 'playing' WHERE game_code = ? AND state = 'open'", (game_code,)) # if any
cursor.execute("""
UPDATE br_sessions
SET status = 'active',
current_exercise_id = ?,
used_exercises = ?,
round_start_time = ?,
round_end_time = NULL
WHERE game_code = ?
""", (new_exo_id, json.dumps(used_exercises), now.isoformat(), game_code))
conn.commit()
conn.close()
return True
def qualify_br_player(game_code, username):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("UPDATE br_players SET state = 'qualified' WHERE game_code = ? AND username = ?", (game_code, username))
conn.commit()
# Vérifier si on doit clore le tour (il reste 1 seul joueur en 'playing')
cursor.execute("SELECT username FROM br_players WHERE game_code = ? AND state = 'playing'", (game_code,))
remaining_playing = cursor.fetchall()
round_ended = False
if len(remaining_playing) == 1:
# Le dernier joueur est éliminé
cursor.execute("UPDATE br_players SET state = 'eliminated_this_round' WHERE game_code = ? AND username = ?", (game_code, remaining_playing[0][0]))
conn.commit()
round_ended = True
elif len(remaining_playing) == 0:
# Tout le monde s'est qualifié dans la même seconde ? Ou joueur solo
round_ended = True
if round_ended:
from datetime import datetime
cursor.execute("UPDATE br_sessions SET status = 'round_end', round_end_time = ? WHERE game_code = ?", (datetime.now().isoformat(), game_code))
conn.commit()
conn.close()
return round_ended
def check_br_time_limit(game_code):
from datetime import datetime
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT niveau, round_start_time, status FROM br_sessions WHERE game_code = ?", (game_code,))
row = cursor.fetchone()
if not row or row[2] != 'active' or not row[1]:
conn.close()
return False
niveau = row[0]
start_time_str = row[1]
try:
start_time = datetime.fromisoformat(start_time_str)
except:
conn.close()
return False
time_limits = {1: 120, 2: 300, 3: 540, 4: 900}
limit_seconds = time_limits.get(niveau, 120)
now = datetime.now()
elapsed = (now - start_time).total_seconds()
if elapsed >= limit_seconds:
# Le temps est écoulé, tous ceux encore en 'playing' sont éliminés
cursor.execute("UPDATE br_players SET state = 'eliminated_this_round' WHERE game_code = ? AND state = 'playing'", (game_code,))
cursor.execute("UPDATE br_sessions SET status = 'round_end', round_end_time = ? WHERE game_code = ?", (now.isoformat(), game_code))
conn.commit()
conn.close()
return True
conn.close()
return False
def process_br_round_end(game_code):
from datetime import datetime
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT status, round_end_time FROM br_sessions WHERE game_code = ?", (game_code,))
row = cursor.fetchone()
if not row or row[0] != 'round_end' or not row[1]:
conn.close()
return None
try:
end_time = datetime.fromisoformat(row[1])
except:
conn.close()
return None
now = datetime.now()
elapsed = (now - end_time).total_seconds()
if elapsed >= 10:
# 10 secondes écoulées, on passe au tour suivant (ou on termine la partie)
# 1. On passe les 'eliminated_this_round' en 'eliminated'
cursor.execute("UPDATE br_players SET state = 'eliminated' WHERE game_code = ? AND state = 'eliminated_this_round'", (game_code,))
conn.commit()
# 2. Vérifier combien de qualifiés il reste
cursor.execute("SELECT username FROM br_players WHERE game_code = ? AND state = 'qualified'", (game_code,))
qualified = [r[0] for r in cursor.fetchall()]
if len(qualified) <= 1:
# Fin de la partie
winner = qualified[0] if len(qualified) == 1 else None
cursor.execute("UPDATE br_sessions SET status = 'finished', winner = ? WHERE game_code = ?", (winner, game_code))
conn.commit()
conn.close()
return "finished"
else:
# Lancer le tour suivant
conn.close()
start_br_round(game_code)
return "next_round"
conn.close()
return "waiting"