import mysql.connector from mysql.connector import Error from fastapi import FastAPI, Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from pydantic import BaseModel from passlib.context import CryptContext from typing import Optional import jwt from jwt import PyJWTError from datetime import datetime, timedelta, timezone import requests app = FastAPI() class UserDetail(BaseModel): FirstName: str LastName: str Email: str Authorization: str username: str # Configuration de la base de données MySQL MYSQL_HOST = "users_db" MYSQL_USER = "root" MYSQL_PASSWORD = "Rakuten" MYSQL_DB = "rakuten_db" # Configuration pour le JWT SECRET_KEY = "secret" ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRATION = 45 pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") # Fonction pour interroger la base de données et récupérer les informations d'identification de l'utilisateur def get_user(username: str): # return users_db.get(username) try: connection = mysql.connector.connect( host=MYSQL_HOST, user=MYSQL_USER, port="3306", password=MYSQL_PASSWORD, database=MYSQL_DB ) if connection.is_connected(): cursor = connection.cursor(dictionary=True) query = f"SELECT * FROM Users WHERE username = '{username}'" cursor.execute(query) user_connected = cursor.fetchone() print("userconnected : ",user_connected) return user_connected except Error as e: print(f"Error while querying MySQL: {e}") finally: cursor.close() connection.close() class Token(BaseModel): access_token: str token_type: str class TokenData(BaseModel): username: Optional[str] = None def verify_password(plain_password, hashed_password): return pwd_context.verify(plain_password, hashed_password) def create_access_token(data: dict, expires_delta: timedelta = None): to_encode = data.copy() if expires_delta: expire = datetime.now(timezone.utc) + expires_delta to_encode.update({"exp": expire}) else: expire = None # datetime.now(timezone.utc) + timedelta(minutes=15) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt def get_current_user(token: str = Depends(oauth2_scheme)): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(jwt=token, key=SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: raise credentials_exception token_data = TokenData(username=username) except PyJWTError as e: raise credentials_exception user = get_user(username) if user is None: raise credentials_exception return user @app.post("/token", response_model=Token) async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()): """ Description: Cette route permet à un utilisateur de s'authentifier en fournissant un nom d'utilisateur et un mot de passe. Si l'authentification est réussie, elle renvoie un jeton d'accès JWT. Args: - form_data (OAuth2PasswordRequestForm, dépendance): Les données de formulaire contenant le nom d'utilisateur et le mot de passe. Returns: - Token: Un modèle de jeton d'accès JWT. Raises: - HTTPException(400, detail="Incorrect username or password"): Si l'authentification échoue en raison d'un nom d'utilisateur ou d'un mot de passe incorrect, une exception HTTP 400 Bad Request est levée. """ user = get_user(form_data.username) hashed_password = user["password"] if not user or not verify_password(form_data.password, hashed_password): raise HTTPException(status_code=400, detail="Incorrect username or password") access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRATION) access_token = create_access_token(data={"sub": form_data.username}, expires_delta=None) #, expires_delta=access_token_expires) return {"access_token": access_token, "token_type": "bearer"} @app.get("/") def read_public_data(): """ Description: Cette route renvoie un message "Hello World!". Args: Aucun argument requis. Returns: - JSON: Renvoie un JSON contenant un message de salutation. Raises: Aucune exception n'est levée. """ return {"message": "Hello World!"} @app.get("/secured") def read_private_data(current_user: dict = Depends(get_current_user)): """ Description: Cette route renvoie un dictionnaire avec les données de l'utilisateur identifié avec le token. Args: - current_user (str, dépendance): Les donnée de l'utilisateur, obtenues grâce au username codé dans le token. Returns: - current_user: Dictionnaire avec toutes les données de l'utilisateur authentifié grâce au token. Raises: - HTTPException(401, detail="Unauthorized"): Si l'utilisateur n'est pas authentifié, une exception HTTP 401 Unauthorized est levée. """ return {**current_user} ''' @app.on_event("startup") async def startup_event(): @app.on_event("shutdown") async def shutdown_event(): '''