| """
|
| User management routes
|
| """
|
| from fastapi import APIRouter, Depends, HTTPException, status
|
| from fastapi.security import OAuth2PasswordRequestForm
|
| from sqlalchemy.orm import Session
|
| from typing import List
|
|
|
| from core.auth import (
|
| get_current_active_user,
|
| create_access_token,
|
| verify_password,
|
| get_password_hash
|
| )
|
| from core.database import get_db
|
| from models.user import User, UserRole
|
| from schemas.user import (
|
| UserCreate,
|
| UserUpdate,
|
| UserResponse,
|
| TokenResponse
|
| )
|
|
|
| router = APIRouter(
|
| prefix="/users",
|
| tags=["users"],
|
| responses={404: {"description": "Not found"}}
|
| )
|
|
|
| @router.post("/token", response_model=TokenResponse)
|
| async def login(
|
| form_data: OAuth2PasswordRequestForm = Depends(),
|
| db: Session = Depends(get_db)
|
| ):
|
| """Login user and create access token"""
|
| user = db.query(User).filter(User.username == form_data.username).first()
|
| if not user or not verify_password(form_data.password, user.password):
|
| raise HTTPException(
|
| status_code=status.HTTP_401_UNAUTHORIZED,
|
| detail="Incorrect username or password",
|
| headers={"WWW-Authenticate": "Bearer"},
|
| )
|
|
|
| access_token = create_access_token(data={"sub": user.username})
|
| return {"access_token": access_token, "token_type": "bearer"}
|
|
|
| @router.get("/me", response_model=UserResponse)
|
| async def read_user_me(current_user: User = Depends(get_current_active_user)):
|
| """Get current user information"""
|
| return current_user
|
|
|
| @router.put("/me", response_model=UserResponse)
|
| async def update_user_me(
|
| user_update: UserUpdate,
|
| current_user: User = Depends(get_current_active_user),
|
| db: Session = Depends(get_db)
|
| ):
|
| """Update current user information"""
|
| if user_update.password:
|
| current_user.password = get_password_hash(user_update.password)
|
| if user_update.email:
|
| current_user.email = user_update.email
|
| if user_update.vpn_protocol:
|
| current_user.vpn_protocol = user_update.vpn_protocol
|
|
|
| db.commit()
|
| return current_user
|
|
|
| @router.post("/register", response_model=UserResponse)
|
| async def register_user(user: UserCreate, db: Session = Depends(get_db)):
|
| """Register a new user"""
|
|
|
| if db.query(User).filter(User.username == user.username).first():
|
| raise HTTPException(
|
| status_code=status.HTTP_400_BAD_REQUEST,
|
| detail="Username already registered"
|
| )
|
|
|
|
|
| db_user = User(
|
| username=user.username,
|
| email=user.email,
|
| password=get_password_hash(user.password),
|
| role=UserRole.USER,
|
| vpn_protocol=user.vpn_protocol
|
| )
|
|
|
| db.add(db_user)
|
| db.commit()
|
| db.refresh(db_user)
|
|
|
| return db_user
|
|
|