CPS-API / api /routes /auth.py
Ali2206's picture
Update api/routes/auth.py
8fbc1f3 verified
raw
history blame
3.58 kB
from fastapi import APIRouter, Depends, HTTPException, status, Request
from fastapi.security import OAuth2PasswordRequestForm
from db.mongo import get_user_db
from core.security import (
create_access_token,
verify_token,
get_password_hash,
verify_password
)
from models.schemas import (
TokenResponse,
UserCreate,
UserInDB,
UserResponse
)
from datetime import timedelta
import logging
from core.config import settings
router = APIRouter()
logger = logging.getLogger(__name__)
@router.post("/login", response_model=TokenResponse)
async def login(
request: Request,
form_data: OAuth2PasswordRequestForm = Depends()
):
logger.info(f"Login attempt from {request.client.host}")
user_db = await get_user_db()
user = await user_db.find_one({"email": form_data.username.lower()})
if not user or not verify_password(form_data.password, user["password"]):
logger.warning(f"Failed login attempt for {form_data.username}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user["email"], "role": user.get("role", "user")},
expires_delta=access_token_expires
)
logger.info(f"Successful login for {user['email']}")
return {
"access_token": access_token,
"token_type": "bearer",
"expires_in": access_token_expires.total_seconds()
}
@router.post("/signup", response_model=UserResponse)
async def signup(user_in: UserCreate):
logger.info(f"Signup attempt for {user_in.email}")
user_db = await get_user_db()
if await user_db.find_one({"email": user_in.email.lower()}):
logger.warning(f"Email already registered: {user_in.email}")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered"
)
hashed_password = get_password_hash(user_in.password)
db_user = UserInDB(
**user_in.dict(),
password=hashed_password,
role="user",
created_at=datetime.utcnow(),
updated_at=datetime.utcnow()
)
try:
result = await user_db.insert_one(db_user.dict())
logger.info(f"User created: {user_in.email}")
return UserResponse(
id=str(result.inserted_id),
email=db_user.email,
full_name=db_user.full_name,
role=db_user.role
)
except Exception as e:
logger.error(f"User creation failed: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="User creation failed"
)
@router.get("/me", response_model=UserResponse)
async def read_users_me(
request: Request,
current_user: UserInDB = Depends(verify_token)
):
logger.info(f"User profile request for {current_user.sub}")
user_db = await get_user_db()
user = await user_db.find_one({"email": current_user.sub})
if not user:
logger.error(f"User not found: {current_user.sub}")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
return UserResponse(
id=str(user["_id"]),
email=user["email"],
full_name=user.get("full_name", ""),
role=user.get("role", "user")
)