Spaces:
Running
Running
| import os | |
| from dotenv import load_dotenv | |
| from datetime import datetime, timedelta, timezone | |
| from typing import List | |
| from fastapi import FastAPI, Depends, HTTPException, status | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel | |
| from database import create_table, return_exercise, create_multiplayer_game, get_multiplayer_game, set_multiplayer_winner, add_player_to_game, get_game_players, start_multiplayer_game_in_db, raffle_exercises, remove_player_from_game, replay_multiplayer_game, create_br_game, get_br_game, add_player_to_br, remove_player_from_br, get_br_players, start_br_round, qualify_br_player, check_br_time_limit, process_br_round_end | |
| import logging | |
| from jose import JWTError, jwt | |
| from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm | |
| logging.basicConfig(level=logging.INFO) | |
| load_dotenv() | |
| # Configuration JWT | |
| SECRET_KEY = os.getenv("SECRET_KEY", "your-secret-key-for-jwt-keep-it-safe") | |
| ALGORITHM = "HS256" | |
| ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 # 24 hours | |
| oauth2_scheme = OAuth2PasswordBearer(tokenUrl="login") | |
| def create_access_token(data: dict, expires_delta: timedelta = None): | |
| to_encode = data.copy() | |
| if expires_delta: | |
| expire = datetime.now(timezone.utc) + expires_delta | |
| else: | |
| expire = datetime.now(timezone.utc) + timedelta(minutes=15) | |
| to_encode.update({"exp": expire, "aud": "exopy-game"}) | |
| encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) | |
| return encoded_jwt | |
| async def get_current_user(token: str = Depends(oauth2_scheme)): | |
| credentials_exception = HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Could not validate credentials", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| try: | |
| payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM], audience="exopy-game") | |
| username: str = payload.get("sub") | |
| if username is None: | |
| raise credentials_exception | |
| return username | |
| except JWTError: | |
| raise credentials_exception | |
| create_table() | |
| app = FastAPI(title="Exopy Multi - Serveur") | |
| origins = [ | |
| "https://pixees.fr", | |
| "https://pixees.fr/informatiquelycee/", | |
| "http://localhost:5173", | |
| "http://localhost:5174", | |
| "http://localhost:5175", | |
| "http://localhost:5176", | |
| "http://localhost:3000", | |
| "http://127.0.0.1:5173", | |
| "http://127.0.0.1:5174", | |
| "http://127.0.0.1:5175", | |
| "http://127.0.0.1:5176", | |
| "http://127.0.0.1:3000", | |
| ] | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=origins, | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| class Token(BaseModel): | |
| access_token: str | |
| token_type: str | |
| async def login(form_data: OAuth2PasswordRequestForm = Depends()): | |
| # NO AUTH VERIFICATION as requested. Use username as sub. | |
| # We just ensure a username is provided. | |
| if not form_data.username: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Nickname is required", | |
| ) | |
| if len(form_data.username) > 10: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Nickname must be 10 characters or less", | |
| ) | |
| access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) | |
| access_token = create_access_token( | |
| data={"sub": form_data.username}, expires_delta=access_token_expires | |
| ) | |
| return {"access_token": access_token, "token_type": "bearer"} | |
| def get_exercise(id : int, current_user: str = Depends(get_current_user)): | |
| ex = return_exercise(id) | |
| logging.info(f"Endpoint /exercise/{id} called") | |
| return {"id" : id, "title" : ex[1].replace("\n",""), "niveau": ex[2], "enonce" : ex[4], "test": ex[5]} | |
| def get_timetrial_exercises(niveau: int, include_terminale: bool = False): | |
| # niveau 0 means all levels | |
| ids = raffle_exercises(niveau, count=3, include_terminale=include_terminale) | |
| if not ids: | |
| raise HTTPException(status_code=404, detail="No exercises found for this level") | |
| exercises = [] | |
| for id in ids: | |
| ex = return_exercise(id) | |
| if ex: | |
| exercises.append({ | |
| "id" : id, | |
| "title" : ex[1].replace("\n",""), | |
| "niveau": ex[2], | |
| "enonce" : ex[4], | |
| "test": ex[5] | |
| }) | |
| return exercises | |
| def create_game(req: dict, current_user: str = Depends(get_current_user)): | |
| niveau = req.get("niveau") | |
| include_terminale = req.get("include_terminale", False) | |
| game_code = create_multiplayer_game(niveau, include_terminale) | |
| if not game_code: | |
| logging.error(f"Failed to create multiplayer game: level={niveau}, terminale={include_terminale}") | |
| raise HTTPException( | |
| status_code=404, | |
| detail=f"No exercises found for level {niveau} (Terminale: {include_terminale})" | |
| ) | |
| # Add creator to players | |
| add_player_to_game(game_code, current_user) | |
| return { | |
| "game_code": game_code, | |
| "players": [current_user] | |
| } | |
| def join_game(code: str, current_user: str = Depends(get_current_user)): | |
| logging.info(f"Lobby: Joueur {current_user} interroge la partie {code}") | |
| game = get_multiplayer_game(code.upper()) | |
| if not game: | |
| raise HTTPException(status_code=404, detail="Game not found") | |
| # Add joining player to list only if not already in game (prevents unnecessary locks during polling) | |
| players = get_game_players(code.upper()) | |
| if current_user not in players: | |
| add_player_to_game(code.upper(), current_user) | |
| players = get_game_players(code.upper()) | |
| logging.info(f"Lobby: Players found for {code}: {players}") | |
| status = game[1] | |
| response = { | |
| "status": status, | |
| "players": players, | |
| "winner": game[2] | |
| } | |
| # Only reveal exercise if game has started | |
| if status != 'open': | |
| ex = return_exercise(game[0]) | |
| response["exercise"] = {"id" : game[0], "title" : ex[1].replace("\n",""), "niveau": ex[2], "enonce" : ex[4], "test": ex[5]} | |
| return response | |
| def get_status(code: str, current_user: str = Depends(get_current_user)): | |
| game = get_multiplayer_game(code.upper()) | |
| if not game: | |
| raise HTTPException(status_code=404, detail="Game not found") | |
| players = get_game_players(code.upper()) | |
| # Si le joueur n'est plus dans la liste (et qu'il ne vient pas juste de créer/rejoindre via une autre fonction) | |
| if current_user not in players: | |
| return {"status": "kicked", "players": players} | |
| status = game[1] | |
| response = { | |
| "status": status, | |
| "players": players, | |
| "winner": game[2] | |
| } | |
| if status != 'open': | |
| ex = return_exercise(game[0]) | |
| response["exercise"] = {"id" : game[0], "title" : ex[1].replace("\n",""), "niveau": ex[2], "enonce" : ex[4], "test": ex[5]} | |
| return response | |
| def kick_player(req: dict, current_user: str = Depends(get_current_user)): | |
| game_code = req.get("game_code") | |
| target_user = req.get("target_user") | |
| if not game_code or not target_user: | |
| raise HTTPException(status_code=400, detail="Missing parameters") | |
| players = get_game_players(game_code.upper()) | |
| if players and players[0] == current_user: | |
| remove_player_from_game(game_code.upper(), target_user) | |
| return {"status": "success"} | |
| else: | |
| raise HTTPException(status_code=403, detail="Not authorized to kick players") | |
| def leave_game(req: dict, current_user: str = Depends(get_current_user)): | |
| game_code = req.get("game_code") | |
| if not game_code: | |
| raise HTTPException(status_code=400, detail="Missing parameters") | |
| remove_player_from_game(game_code.upper(), current_user) | |
| return {"status": "success"} | |
| def start_game(req: dict, current_user: str = Depends(get_current_user)): | |
| game_code = req.get("game_code") | |
| # In a real app, we'd check if current_user is creator | |
| start_multiplayer_game_in_db(game_code.upper()) | |
| return {"status": "active"} | |
| def declare_winner(req: dict, current_user: str = Depends(get_current_user)): | |
| game_code = req.get("game_code") | |
| username = current_user | |
| winner = set_multiplayer_winner(game_code.upper(), username) | |
| return {"winner": winner} | |
| def replay_game(req: dict, current_user: str = Depends(get_current_user)): | |
| game_code = req.get("game_code") | |
| players = get_game_players(game_code.upper()) | |
| if players and players[0] == current_user: | |
| new_exo = replay_multiplayer_game(game_code.upper()) | |
| if new_exo: | |
| return {"status": "success"} | |
| else: | |
| raise HTTPException(status_code=500, detail="Could not generate new exercise") | |
| else: | |
| raise HTTPException(status_code=403, detail="Not authorized to restart game") | |
| # --- BATTLE ROYAL ROUTES --- | |
| def create_br(req: dict, current_user: str = Depends(get_current_user)): | |
| niveau = req.get("niveau", 1) | |
| include_terminale = req.get("include_terminale", False) | |
| game_code = create_br_game(niveau, include_terminale) | |
| if not game_code: | |
| raise HTTPException(status_code=500, detail="Could not create game") | |
| add_player_to_br(game_code, current_user) | |
| return {"game_code": game_code} | |
| def join_br(code: str, current_user: str = Depends(get_current_user)): | |
| game = get_br_game(code.upper()) | |
| if not game: | |
| raise HTTPException(status_code=404, detail="Game not found") | |
| players = get_br_players(code.upper()) | |
| player_names = [p["username"] for p in players] | |
| if current_user not in player_names: | |
| if game[1] == 'open': # status | |
| add_player_to_br(code.upper(), current_user) | |
| else: | |
| raise HTTPException(status_code=403, detail="Game already started") | |
| return {"status": "success"} | |
| def get_br_status(code: str, current_user: str = Depends(get_current_user)): | |
| game = get_br_game(code.upper()) | |
| if not game: | |
| raise HTTPException(status_code=404, detail="Game not found") | |
| # Variables de la DB | |
| niveau, status, winner, current_exercise_id, used_exercises, round_start_time, round_end_time = game | |
| game_code = code.upper() | |
| # 1. Traitement des règles de temps / fin de tour (Côté Serveur) | |
| if status == 'active': | |
| if check_br_time_limit(game_code): | |
| # L'état a changé, on le recharge | |
| game = get_br_game(game_code) | |
| niveau, status, winner, current_exercise_id, used_exercises, round_start_time, round_end_time = game | |
| elif status == 'round_end': | |
| res = process_br_round_end(game_code) | |
| if res in ["finished", "next_round"]: | |
| # L'état a changé | |
| game = get_br_game(game_code) | |
| niveau, status, winner, current_exercise_id, used_exercises, round_start_time, round_end_time = game | |
| players = get_br_players(game_code) | |
| player_names = [p["username"] for p in players] | |
| if current_user not in player_names: | |
| return {"status": "kicked"} | |
| # Calcul temps restant | |
| from datetime import datetime | |
| time_remaining = 0 | |
| if status == 'active' and round_start_time: | |
| time_limits = {1: 120, 2: 300, 3: 540, 4: 900} | |
| limit = time_limits.get(niveau, 120) | |
| start = datetime.fromisoformat(round_start_time) | |
| elapsed = (datetime.now() - start).total_seconds() | |
| time_remaining = max(0, limit - elapsed) | |
| response = { | |
| "status": status, | |
| "players": players, | |
| "niveau": niveau, | |
| "time_remaining": time_remaining, | |
| "winner": winner | |
| } | |
| # Envoi de l'exercice seulement si la partie est active | |
| if status == 'active' and current_exercise_id: | |
| ex = return_exercise(current_exercise_id) | |
| if ex: | |
| response["exercise"] = {"id" : ex[0], "title" : ex[1].replace("\n",""), "niveau": ex[2], "enonce" : ex[4], "test": ex[5]} | |
| # Envoi du compte à rebours de fin de tour | |
| if status == 'round_end' and round_end_time: | |
| start = datetime.fromisoformat(round_end_time) | |
| elapsed = (datetime.now() - start).total_seconds() | |
| response["round_countdown"] = max(0, 10 - elapsed) | |
| return response | |
| def start_br(req: dict, current_user: str = Depends(get_current_user)): | |
| game_code = req.get("game_code") | |
| players = get_br_players(game_code.upper()) | |
| if players and players[0]["username"] == current_user: | |
| success = start_br_round(game_code.upper()) | |
| if success: | |
| return {"status": "success"} | |
| else: | |
| raise HTTPException(status_code=500, detail="Could not start round") | |
| raise HTTPException(status_code=403, detail="Not authorized") | |
| def qualify_br(req: dict, current_user: str = Depends(get_current_user)): | |
| game_code = req.get("game_code") | |
| qualify_br_player(game_code.upper(), current_user) | |
| return {"status": "success"} | |
| def kick_br(req: dict, current_user: str = Depends(get_current_user)): | |
| game_code = req.get("game_code") | |
| target_user = req.get("target_user") | |
| players = get_br_players(game_code.upper()) | |
| if players and players[0]["username"] == current_user: | |
| remove_player_from_br(game_code.upper(), target_user) | |
| return {"status": "success"} | |
| raise HTTPException(status_code=403, detail="Not authorized") | |
| def leave_br(req: dict, current_user: str = Depends(get_current_user)): | |
| game_code = req.get("game_code") | |
| if not game_code: | |
| raise HTTPException(status_code=400, detail="Missing parameters") | |
| remove_player_from_br(game_code.upper(), current_user) | |
| return {"status": "success"} | |
| def db_stats(): | |
| import sqlite3 | |
| from database import DB_PATH | |
| try: | |
| conn = sqlite3.connect(DB_PATH) | |
| cursor = conn.cursor() | |
| cursor.execute("SELECT niveau, COUNT(*) FROM code GROUP BY niveau") | |
| stats = cursor.fetchall() | |
| cursor.execute("SELECT COUNT(*) FROM code WHERE titre LIKE '(T) %'") | |
| terminale_count = cursor.fetchone()[0] | |
| conn.close() | |
| return { | |
| "exercises_per_level": {str(s[0]): s[1] for s in stats}, | |
| "total_terminale": terminale_count | |
| } | |
| except Exception as e: | |
| return {"error": str(e)} | |