Spaces:
Runtime error
Runtime error
File size: 2,877 Bytes
6a5b8d8 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
"""
User management routes
"""
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from typing import List
from core.auth import (
get_current_active_user,
create_access_token,
verify_password,
get_password_hash
)
from core.database import get_db
from models.user import User, UserRole
from schemas.user import (
UserCreate,
UserUpdate,
UserResponse,
TokenResponse
)
router = APIRouter(
prefix="/users",
tags=["users"],
responses={404: {"description": "Not found"}}
)
@router.post("/token", response_model=TokenResponse)
async def login(
form_data: OAuth2PasswordRequestForm = Depends(),
db: Session = Depends(get_db)
):
"""Login user and create access token"""
user = db.query(User).filter(User.username == form_data.username).first()
if not user or not verify_password(form_data.password, user.password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token = create_access_token(data={"sub": user.username})
return {"access_token": access_token, "token_type": "bearer"}
@router.get("/me", response_model=UserResponse)
async def read_user_me(current_user: User = Depends(get_current_active_user)):
"""Get current user information"""
return current_user
@router.put("/me", response_model=UserResponse)
async def update_user_me(
user_update: UserUpdate,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""Update current user information"""
if user_update.password:
current_user.password = get_password_hash(user_update.password)
if user_update.email:
current_user.email = user_update.email
if user_update.vpn_protocol:
current_user.vpn_protocol = user_update.vpn_protocol
db.commit()
return current_user
@router.post("/register", response_model=UserResponse)
async def register_user(user: UserCreate, db: Session = Depends(get_db)):
"""Register a new user"""
# Check if username exists
if db.query(User).filter(User.username == user.username).first():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Username already registered"
)
# Create new user
db_user = User(
username=user.username,
email=user.email,
password=get_password_hash(user.password),
role=UserRole.USER,
vpn_protocol=user.vpn_protocol
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
|