Spaces:
Paused
Paused
File size: 5,493 Bytes
eb5ec73 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 | import mysql.connector
from mysql.connector import Error
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
from passlib.context import CryptContext
from typing import Optional
import jwt
from jwt import PyJWTError
from datetime import datetime, timedelta, timezone
import requests
app = FastAPI()
class UserDetail(BaseModel):
FirstName: str
LastName: str
Email: str
Authorization: str
username: str
# Configuration de la base de données MySQL
MYSQL_HOST = "users_db"
MYSQL_USER = "root"
MYSQL_PASSWORD = "Rakuten"
MYSQL_DB = "rakuten_db"
# Configuration pour le JWT
SECRET_KEY = "secret"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRATION = 45
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# Fonction pour interroger la base de données et récupérer les informations d'identification de l'utilisateur
def get_user(username: str):
# return users_db.get(username)
try:
connection = mysql.connector.connect(
host=MYSQL_HOST,
user=MYSQL_USER,
port="3306",
password=MYSQL_PASSWORD,
database=MYSQL_DB
)
if connection.is_connected():
cursor = connection.cursor(dictionary=True)
query = f"SELECT * FROM Users WHERE username = '{username}'"
cursor.execute(query)
user_connected = cursor.fetchone()
print("userconnected : ",user_connected)
return user_connected
except Error as e:
print(f"Error while querying MySQL: {e}")
finally:
cursor.close()
connection.close()
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Optional[str] = None
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def create_access_token(data: dict, expires_delta: timedelta = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
to_encode.update({"exp": expire})
else:
expire = None # datetime.now(timezone.utc) + timedelta(minutes=15)
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(jwt=token, key=SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except PyJWTError as e:
raise credentials_exception
user = get_user(username)
if user is None:
raise credentials_exception
return user
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
"""
Description:
Cette route permet à un utilisateur de s'authentifier en fournissant un nom d'utilisateur et un mot de passe. Si l'authentification est réussie, elle renvoie un jeton d'accès JWT.
Args:
- form_data (OAuth2PasswordRequestForm, dépendance): Les données de formulaire contenant le nom d'utilisateur et le mot de passe.
Returns:
- Token: Un modèle de jeton d'accès JWT.
Raises:
- HTTPException(400, detail="Incorrect username or password"): Si l'authentification échoue en raison d'un nom d'utilisateur ou d'un mot de passe incorrect, une exception HTTP 400 Bad Request est levée.
"""
user = get_user(form_data.username)
hashed_password = user["password"]
if not user or not verify_password(form_data.password, hashed_password):
raise HTTPException(status_code=400, detail="Incorrect username or password")
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRATION)
access_token = create_access_token(data={"sub": form_data.username}, expires_delta=None) #, expires_delta=access_token_expires)
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/")
def read_public_data():
"""
Description:
Cette route renvoie un message "Hello World!".
Args:
Aucun argument requis.
Returns:
- JSON: Renvoie un JSON contenant un message de salutation.
Raises:
Aucune exception n'est levée.
"""
return {"message": "Hello World!"}
@app.get("/secured")
def read_private_data(current_user: dict = Depends(get_current_user)):
"""
Description:
Cette route renvoie un dictionnaire avec les données de l'utilisateur identifié avec le token.
Args:
- current_user (str, dépendance): Les donnée de l'utilisateur, obtenues grâce au username codé dans le token.
Returns:
- current_user: Dictionnaire avec toutes les données de l'utilisateur authentifié grâce au token.
Raises:
- HTTPException(401, detail="Unauthorized"): Si l'utilisateur n'est pas authentifié, une exception HTTP 401 Unauthorized est levée.
"""
return {**current_user}
'''
@app.on_event("startup")
async def startup_event():
@app.on_event("shutdown")
async def shutdown_event():
'''
|