exopy_ggame / main.py
dav74's picture
Upload main.py
556ec16 verified
import os
from dotenv import load_dotenv
from datetime import datetime, timedelta, timezone
from typing import List
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from database import create_table, return_exercise, create_multiplayer_game, get_multiplayer_game, set_multiplayer_winner, add_player_to_game, get_game_players, start_multiplayer_game_in_db, raffle_exercises, remove_player_from_game, replay_multiplayer_game, create_br_game, get_br_game, add_player_to_br, remove_player_from_br, get_br_players, start_br_round, qualify_br_player, check_br_time_limit, process_br_round_end
import logging
from jose import JWTError, jwt
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
logging.basicConfig(level=logging.INFO)
load_dotenv()
# Configuration JWT
SECRET_KEY = os.getenv("SECRET_KEY", "your-secret-key-for-jwt-keep-it-safe")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 # 24 hours
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="login")
def create_access_token(data: dict, expires_delta: timedelta = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
to_encode.update({"exp": expire, "aud": "exopy-game"})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM], audience="exopy-game")
username: str = payload.get("sub")
if username is None:
raise credentials_exception
return username
except JWTError:
raise credentials_exception
create_table()
app = FastAPI(title="Exopy Multi - Serveur")
origins = [
"https://pixees.fr",
"https://pixees.fr/informatiquelycee/",
"http://localhost:5173",
"http://localhost:5174",
"http://localhost:5175",
"http://localhost:5176",
"http://localhost:3000",
"http://127.0.0.1:5173",
"http://127.0.0.1:5174",
"http://127.0.0.1:5175",
"http://127.0.0.1:5176",
"http://127.0.0.1:3000",
]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
class Token(BaseModel):
access_token: str
token_type: str
@app.post("/login", response_model=Token)
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
# NO AUTH VERIFICATION as requested. Use username as sub.
# We just ensure a username is provided.
if not form_data.username:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Nickname is required",
)
if len(form_data.username) > 10:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Nickname must be 10 characters or less",
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": form_data.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@app.get('/exercise/{id}')
def get_exercise(id : int, current_user: str = Depends(get_current_user)):
ex = return_exercise(id)
logging.info(f"Endpoint /exercise/{id} called")
return {"id" : id, "title" : ex[1].replace("\n",""), "niveau": ex[2], "enonce" : ex[4], "test": ex[5]}
@app.get('/timetrial/exercises/{niveau}')
def get_timetrial_exercises(niveau: int, include_terminale: bool = False):
# niveau 0 means all levels
ids = raffle_exercises(niveau, count=3, include_terminale=include_terminale)
if not ids:
raise HTTPException(status_code=404, detail="No exercises found for this level")
exercises = []
for id in ids:
ex = return_exercise(id)
if ex:
exercises.append({
"id" : id,
"title" : ex[1].replace("\n",""),
"niveau": ex[2],
"enonce" : ex[4],
"test": ex[5]
})
return exercises
@app.post('/multiplayer/create')
def create_game(req: dict, current_user: str = Depends(get_current_user)):
niveau = req.get("niveau")
include_terminale = req.get("include_terminale", False)
game_code = create_multiplayer_game(niveau, include_terminale)
if not game_code:
logging.error(f"Failed to create multiplayer game: level={niveau}, terminale={include_terminale}")
raise HTTPException(
status_code=404,
detail=f"No exercises found for level {niveau} (Terminale: {include_terminale})"
)
# Add creator to players
add_player_to_game(game_code, current_user)
return {
"game_code": game_code,
"players": [current_user]
}
@app.get('/multiplayer/join/{code}')
def join_game(code: str, current_user: str = Depends(get_current_user)):
logging.info(f"Lobby: Joueur {current_user} interroge la partie {code}")
game = get_multiplayer_game(code.upper())
if not game:
raise HTTPException(status_code=404, detail="Game not found")
# Add joining player to list only if not already in game (prevents unnecessary locks during polling)
players = get_game_players(code.upper())
if current_user not in players:
add_player_to_game(code.upper(), current_user)
players = get_game_players(code.upper())
logging.info(f"Lobby: Players found for {code}: {players}")
status = game[1]
response = {
"status": status,
"players": players,
"winner": game[2]
}
# Only reveal exercise if game has started
if status != 'open':
ex = return_exercise(game[0])
response["exercise"] = {"id" : game[0], "title" : ex[1].replace("\n",""), "niveau": ex[2], "enonce" : ex[4], "test": ex[5]}
return response
@app.get('/multiplayer/status/{code}')
def get_status(code: str, current_user: str = Depends(get_current_user)):
game = get_multiplayer_game(code.upper())
if not game:
raise HTTPException(status_code=404, detail="Game not found")
players = get_game_players(code.upper())
# Si le joueur n'est plus dans la liste (et qu'il ne vient pas juste de créer/rejoindre via une autre fonction)
if current_user not in players:
return {"status": "kicked", "players": players}
status = game[1]
response = {
"status": status,
"players": players,
"winner": game[2]
}
if status != 'open':
ex = return_exercise(game[0])
response["exercise"] = {"id" : game[0], "title" : ex[1].replace("\n",""), "niveau": ex[2], "enonce" : ex[4], "test": ex[5]}
return response
@app.post('/multiplayer/kick')
def kick_player(req: dict, current_user: str = Depends(get_current_user)):
game_code = req.get("game_code")
target_user = req.get("target_user")
if not game_code or not target_user:
raise HTTPException(status_code=400, detail="Missing parameters")
players = get_game_players(game_code.upper())
if players and players[0] == current_user:
remove_player_from_game(game_code.upper(), target_user)
return {"status": "success"}
else:
raise HTTPException(status_code=403, detail="Not authorized to kick players")
@app.post('/multiplayer/leave')
def leave_game(req: dict, current_user: str = Depends(get_current_user)):
game_code = req.get("game_code")
if not game_code:
raise HTTPException(status_code=400, detail="Missing parameters")
remove_player_from_game(game_code.upper(), current_user)
return {"status": "success"}
@app.post('/multiplayer/start')
def start_game(req: dict, current_user: str = Depends(get_current_user)):
game_code = req.get("game_code")
# In a real app, we'd check if current_user is creator
start_multiplayer_game_in_db(game_code.upper())
return {"status": "active"}
@app.post('/multiplayer/win')
def declare_winner(req: dict, current_user: str = Depends(get_current_user)):
game_code = req.get("game_code")
username = current_user
winner = set_multiplayer_winner(game_code.upper(), username)
return {"winner": winner}
@app.post('/multiplayer/replay')
def replay_game(req: dict, current_user: str = Depends(get_current_user)):
game_code = req.get("game_code")
players = get_game_players(game_code.upper())
if players and players[0] == current_user:
new_exo = replay_multiplayer_game(game_code.upper())
if new_exo:
return {"status": "success"}
else:
raise HTTPException(status_code=500, detail="Could not generate new exercise")
else:
raise HTTPException(status_code=403, detail="Not authorized to restart game")
# --- BATTLE ROYAL ROUTES ---
@app.post('/battleroyal/create')
def create_br(req: dict, current_user: str = Depends(get_current_user)):
niveau = req.get("niveau", 1)
include_terminale = req.get("include_terminale", False)
game_code = create_br_game(niveau, include_terminale)
if not game_code:
raise HTTPException(status_code=500, detail="Could not create game")
add_player_to_br(game_code, current_user)
return {"game_code": game_code}
@app.get('/battleroyal/join/{code}')
def join_br(code: str, current_user: str = Depends(get_current_user)):
game = get_br_game(code.upper())
if not game:
raise HTTPException(status_code=404, detail="Game not found")
players = get_br_players(code.upper())
player_names = [p["username"] for p in players]
if current_user not in player_names:
if game[1] == 'open': # status
add_player_to_br(code.upper(), current_user)
else:
raise HTTPException(status_code=403, detail="Game already started")
return {"status": "success"}
@app.get('/battleroyal/status/{code}')
def get_br_status(code: str, current_user: str = Depends(get_current_user)):
game = get_br_game(code.upper())
if not game:
raise HTTPException(status_code=404, detail="Game not found")
# Variables de la DB
niveau, status, winner, current_exercise_id, used_exercises, round_start_time, round_end_time = game
game_code = code.upper()
# 1. Traitement des règles de temps / fin de tour (Côté Serveur)
if status == 'active':
if check_br_time_limit(game_code):
# L'état a changé, on le recharge
game = get_br_game(game_code)
niveau, status, winner, current_exercise_id, used_exercises, round_start_time, round_end_time = game
elif status == 'round_end':
res = process_br_round_end(game_code)
if res in ["finished", "next_round"]:
# L'état a changé
game = get_br_game(game_code)
niveau, status, winner, current_exercise_id, used_exercises, round_start_time, round_end_time = game
players = get_br_players(game_code)
player_names = [p["username"] for p in players]
if current_user not in player_names:
return {"status": "kicked"}
# Calcul temps restant
from datetime import datetime
time_remaining = 0
if status == 'active' and round_start_time:
time_limits = {1: 120, 2: 300, 3: 540, 4: 900}
limit = time_limits.get(niveau, 120)
start = datetime.fromisoformat(round_start_time)
elapsed = (datetime.now() - start).total_seconds()
time_remaining = max(0, limit - elapsed)
response = {
"status": status,
"players": players,
"niveau": niveau,
"time_remaining": time_remaining,
"winner": winner
}
# Envoi de l'exercice seulement si la partie est active
if status == 'active' and current_exercise_id:
ex = return_exercise(current_exercise_id)
if ex:
response["exercise"] = {"id" : ex[0], "title" : ex[1].replace("\n",""), "niveau": ex[2], "enonce" : ex[4], "test": ex[5]}
# Envoi du compte à rebours de fin de tour
if status == 'round_end' and round_end_time:
start = datetime.fromisoformat(round_end_time)
elapsed = (datetime.now() - start).total_seconds()
response["round_countdown"] = max(0, 10 - elapsed)
return response
@app.post('/battleroyal/start')
def start_br(req: dict, current_user: str = Depends(get_current_user)):
game_code = req.get("game_code")
players = get_br_players(game_code.upper())
if players and players[0]["username"] == current_user:
success = start_br_round(game_code.upper())
if success:
return {"status": "success"}
else:
raise HTTPException(status_code=500, detail="Could not start round")
raise HTTPException(status_code=403, detail="Not authorized")
@app.post('/battleroyal/qualify')
def qualify_br(req: dict, current_user: str = Depends(get_current_user)):
game_code = req.get("game_code")
qualify_br_player(game_code.upper(), current_user)
return {"status": "success"}
@app.post('/battleroyal/kick')
def kick_br(req: dict, current_user: str = Depends(get_current_user)):
game_code = req.get("game_code")
target_user = req.get("target_user")
players = get_br_players(game_code.upper())
if players and players[0]["username"] == current_user:
remove_player_from_br(game_code.upper(), target_user)
return {"status": "success"}
raise HTTPException(status_code=403, detail="Not authorized")
@app.post('/battleroyal/leave')
def leave_br(req: dict, current_user: str = Depends(get_current_user)):
game_code = req.get("game_code")
if not game_code:
raise HTTPException(status_code=400, detail="Missing parameters")
remove_player_from_br(game_code.upper(), current_user)
return {"status": "success"}
@app.get('/debug/db-stats')
def db_stats():
import sqlite3
from database import DB_PATH
try:
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT niveau, COUNT(*) FROM code GROUP BY niveau")
stats = cursor.fetchall()
cursor.execute("SELECT COUNT(*) FROM code WHERE titre LIKE '(T) %'")
terminale_count = cursor.fetchone()[0]
conn.close()
return {
"exercises_per_level": {str(s[0]): s[1] for s in stats},
"total_terminale": terminale_count
}
except Exception as e:
return {"error": str(e)}