Spaces:
Runtime error
Runtime error
| """ | |
| User management routes | |
| """ | |
| from fastapi import APIRouter, Depends, HTTPException, status | |
| from fastapi.security import OAuth2PasswordRequestForm | |
| from sqlalchemy.orm import Session | |
| from typing import List | |
| from core.auth import ( | |
| get_current_active_user, | |
| create_access_token, | |
| verify_password, | |
| get_password_hash | |
| ) | |
| from core.database import get_db | |
| from models.user import User, UserRole | |
| from schemas.user import ( | |
| UserCreate, | |
| UserUpdate, | |
| UserResponse, | |
| TokenResponse | |
| ) | |
| router = APIRouter( | |
| prefix="/users", | |
| tags=["users"], | |
| responses={404: {"description": "Not found"}} | |
| ) | |
| async def login( | |
| form_data: OAuth2PasswordRequestForm = Depends(), | |
| db: Session = Depends(get_db) | |
| ): | |
| """Login user and create access token""" | |
| user = db.query(User).filter(User.username == form_data.username).first() | |
| if not user or not verify_password(form_data.password, user.password): | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Incorrect username or password", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| access_token = create_access_token(data={"sub": user.username}) | |
| return {"access_token": access_token, "token_type": "bearer"} | |
| async def read_user_me(current_user: User = Depends(get_current_active_user)): | |
| """Get current user information""" | |
| return current_user | |
| async def update_user_me( | |
| user_update: UserUpdate, | |
| current_user: User = Depends(get_current_active_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """Update current user information""" | |
| if user_update.password: | |
| current_user.password = get_password_hash(user_update.password) | |
| if user_update.email: | |
| current_user.email = user_update.email | |
| if user_update.vpn_protocol: | |
| current_user.vpn_protocol = user_update.vpn_protocol | |
| db.commit() | |
| return current_user | |
| async def register_user(user: UserCreate, db: Session = Depends(get_db)): | |
| """Register a new user""" | |
| # Check if username exists | |
| if db.query(User).filter(User.username == user.username).first(): | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Username already registered" | |
| ) | |
| # Create new user | |
| db_user = User( | |
| username=user.username, | |
| email=user.email, | |
| password=get_password_hash(user.password), | |
| role=UserRole.USER, | |
| vpn_protocol=user.vpn_protocol | |
| ) | |
| db.add(db_user) | |
| db.commit() | |
| db.refresh(db_user) | |
| return db_user | |