from fastapi import FastAPI, Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from pymongo import MongoClient from pymongo.errors import DuplicateKeyError from bson import ObjectId from datetime import datetime, timedelta import jwt from passlib.context import CryptContext app = FastAPI() # Configuration de la base de données MongoDB client = MongoClient("mongodb://localhost:27017/") db = client["user_db"] users_collection = db["users"] # Configuration de la sécurité SECRET_KEY = "your_secret_key" ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 30 pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") # Fonction pour vérifier le mot de passe def verify_password(plain_password, hashed_password): return pwd_context.verify(plain_password, hashed_password) # Fonction pour hasher le mot de passe def get_password_hash(password): return pwd_context.hash(password) # Fonction pour créer un token JWT def create_access_token(data: dict, expires_delta: timedelta = None): to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt # Fonction pour authentifier un utilisateur def authenticate_user(email: str, password: str): user = users_collection.find_one({"email": email}) if not user: return False if not verify_password(password, user["hashed_password"]): return False return user # Route pour s'inscrire @app.post("/register") async def register(email: str, password: str): hashed_password = get_password_hash(password) try: users_collection.insert_one({"email": email, "hashed_password": hashed_password}) return {"message": "User created successfully"} except DuplicateKeyError: raise HTTPException(status_code=400, detail="Email already registered") # Route pour se connecter @app.post("/token") async def login(form_data: OAuth2PasswordRequestForm = Depends()): user = authenticate_user(form_data.username, form_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect email or password", headers={"WWW-Authenticate": "Bearer"}, ) access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": user["email"]}, expires_delta=access_token_expires ) return {"access_token": access_token, "token_type": "bearer"} # Route protégée @app.get("/protected") async def protected_route(token: str = Depends(oauth2_scheme)): try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) email = payload.get("sub") if email is None: raise HTTPException(status_code=400, detail="Invalid token") user = users_collection.find_one({"email": email}) if user is None: raise HTTPException(status_code=404, detail="User not found") return {"message": "You are authenticated", "user": email} except jwt.PyJWTError: raise HTTPException(status_code=401, detail="Invalid token")